xref: /openbmc/linux/drivers/block/rnbd/rnbd-clt.c (revision b296a6d5)
1 // SPDX-License-Identifier: GPL-2.0-or-later
2 /*
3  * RDMA Network Block Driver
4  *
5  * Copyright (c) 2014 - 2018 ProfitBricks GmbH. All rights reserved.
6  * Copyright (c) 2018 - 2019 1&1 IONOS Cloud GmbH. All rights reserved.
7  * Copyright (c) 2019 - 2020 1&1 IONOS SE. All rights reserved.
8  */
9 
10 #undef pr_fmt
11 #define pr_fmt(fmt) KBUILD_MODNAME " L" __stringify(__LINE__) ": " fmt
12 
13 #include <linux/module.h>
14 #include <linux/blkdev.h>
15 #include <linux/hdreg.h>
16 #include <linux/scatterlist.h>
17 #include <linux/idr.h>
18 
19 #include "rnbd-clt.h"
20 
21 MODULE_DESCRIPTION("RDMA Network Block Device Client");
22 MODULE_LICENSE("GPL");
23 
24 static int rnbd_client_major;
25 static DEFINE_IDA(index_ida);
26 static DEFINE_MUTEX(ida_lock);
27 static DEFINE_MUTEX(sess_lock);
28 static LIST_HEAD(sess_list);
29 
30 /*
31  * Maximum number of partitions an instance can have.
32  * 6 bits = 64 minors = 63 partitions (one minor is used for the device itself)
33  */
34 #define RNBD_PART_BITS		6
35 
36 static inline bool rnbd_clt_get_sess(struct rnbd_clt_session *sess)
37 {
38 	return refcount_inc_not_zero(&sess->refcount);
39 }
40 
41 static void free_sess(struct rnbd_clt_session *sess);
42 
43 static void rnbd_clt_put_sess(struct rnbd_clt_session *sess)
44 {
45 	might_sleep();
46 
47 	if (refcount_dec_and_test(&sess->refcount))
48 		free_sess(sess);
49 }
50 
51 static void rnbd_clt_put_dev(struct rnbd_clt_dev *dev)
52 {
53 	might_sleep();
54 
55 	if (!refcount_dec_and_test(&dev->refcount))
56 		return;
57 
58 	mutex_lock(&ida_lock);
59 	ida_simple_remove(&index_ida, dev->clt_device_id);
60 	mutex_unlock(&ida_lock);
61 	kfree(dev->hw_queues);
62 	rnbd_clt_put_sess(dev->sess);
63 	mutex_destroy(&dev->lock);
64 	kfree(dev);
65 }
66 
67 static inline bool rnbd_clt_get_dev(struct rnbd_clt_dev *dev)
68 {
69 	return refcount_inc_not_zero(&dev->refcount);
70 }
71 
72 static int rnbd_clt_set_dev_attr(struct rnbd_clt_dev *dev,
73 				 const struct rnbd_msg_open_rsp *rsp)
74 {
75 	struct rnbd_clt_session *sess = dev->sess;
76 
77 	if (!rsp->logical_block_size)
78 		return -EINVAL;
79 
80 	dev->device_id		    = le32_to_cpu(rsp->device_id);
81 	dev->nsectors		    = le64_to_cpu(rsp->nsectors);
82 	dev->logical_block_size	    = le16_to_cpu(rsp->logical_block_size);
83 	dev->physical_block_size    = le16_to_cpu(rsp->physical_block_size);
84 	dev->max_write_same_sectors = le32_to_cpu(rsp->max_write_same_sectors);
85 	dev->max_discard_sectors    = le32_to_cpu(rsp->max_discard_sectors);
86 	dev->discard_granularity    = le32_to_cpu(rsp->discard_granularity);
87 	dev->discard_alignment	    = le32_to_cpu(rsp->discard_alignment);
88 	dev->secure_discard	    = le16_to_cpu(rsp->secure_discard);
89 	dev->rotational		    = rsp->rotational;
90 
91 	dev->max_hw_sectors = sess->max_io_size / SECTOR_SIZE;
92 	dev->max_segments = BMAX_SEGMENTS;
93 
94 	dev->max_hw_sectors = min_t(u32, dev->max_hw_sectors,
95 				    le32_to_cpu(rsp->max_hw_sectors));
96 	dev->max_segments = min_t(u16, dev->max_segments,
97 				  le16_to_cpu(rsp->max_segments));
98 
99 	return 0;
100 }
101 
102 static int rnbd_clt_change_capacity(struct rnbd_clt_dev *dev,
103 				    size_t new_nsectors)
104 {
105 	rnbd_clt_info(dev, "Device size changed from %zu to %zu sectors\n",
106 		       dev->nsectors, new_nsectors);
107 	dev->nsectors = new_nsectors;
108 	set_capacity(dev->gd, dev->nsectors);
109 	revalidate_disk_size(dev->gd, true);
110 	return 0;
111 }
112 
113 static int process_msg_open_rsp(struct rnbd_clt_dev *dev,
114 				struct rnbd_msg_open_rsp *rsp)
115 {
116 	int err = 0;
117 
118 	mutex_lock(&dev->lock);
119 	if (dev->dev_state == DEV_STATE_UNMAPPED) {
120 		rnbd_clt_info(dev,
121 			       "Ignoring Open-Response message from server for  unmapped device\n");
122 		err = -ENOENT;
123 		goto out;
124 	}
125 	if (dev->dev_state == DEV_STATE_MAPPED_DISCONNECTED) {
126 		u64 nsectors = le64_to_cpu(rsp->nsectors);
127 
128 		/*
129 		 * If the device was remapped and the size changed in the
130 		 * meantime we need to revalidate it
131 		 */
132 		if (dev->nsectors != nsectors)
133 			rnbd_clt_change_capacity(dev, nsectors);
134 		rnbd_clt_info(dev, "Device online, device remapped successfully\n");
135 	}
136 	err = rnbd_clt_set_dev_attr(dev, rsp);
137 	if (err)
138 		goto out;
139 	dev->dev_state = DEV_STATE_MAPPED;
140 
141 out:
142 	mutex_unlock(&dev->lock);
143 
144 	return err;
145 }
146 
147 int rnbd_clt_resize_disk(struct rnbd_clt_dev *dev, size_t newsize)
148 {
149 	int ret = 0;
150 
151 	mutex_lock(&dev->lock);
152 	if (dev->dev_state != DEV_STATE_MAPPED) {
153 		pr_err("Failed to set new size of the device, device is not opened\n");
154 		ret = -ENOENT;
155 		goto out;
156 	}
157 	ret = rnbd_clt_change_capacity(dev, newsize);
158 
159 out:
160 	mutex_unlock(&dev->lock);
161 
162 	return ret;
163 }
164 
165 static inline void rnbd_clt_dev_requeue(struct rnbd_queue *q)
166 {
167 	if (WARN_ON(!q->hctx))
168 		return;
169 
170 	/* We can come here from interrupt, thus async=true */
171 	blk_mq_run_hw_queue(q->hctx, true);
172 }
173 
174 enum {
175 	RNBD_DELAY_IFBUSY = -1,
176 };
177 
178 /**
179  * rnbd_get_cpu_qlist() - finds a list with HW queues to be rerun
180  * @sess:	Session to find a queue for
181  * @cpu:	Cpu to start the search from
182  *
183  * Description:
184  *     Each CPU has a list of HW queues, which needs to be rerun.  If a list
185  *     is not empty - it is marked with a bit.  This function finds first
186  *     set bit in a bitmap and returns corresponding CPU list.
187  */
188 static struct rnbd_cpu_qlist *
189 rnbd_get_cpu_qlist(struct rnbd_clt_session *sess, int cpu)
190 {
191 	int bit;
192 
193 	/* Search from cpu to nr_cpu_ids */
194 	bit = find_next_bit(sess->cpu_queues_bm, nr_cpu_ids, cpu);
195 	if (bit < nr_cpu_ids) {
196 		return per_cpu_ptr(sess->cpu_queues, bit);
197 	} else if (cpu != 0) {
198 		/* Search from 0 to cpu */
199 		bit = find_next_bit(sess->cpu_queues_bm, cpu, 0);
200 		if (bit < cpu)
201 			return per_cpu_ptr(sess->cpu_queues, bit);
202 	}
203 
204 	return NULL;
205 }
206 
207 static inline int nxt_cpu(int cpu)
208 {
209 	return (cpu + 1) % nr_cpu_ids;
210 }
211 
212 /**
213  * rnbd_rerun_if_needed() - rerun next queue marked as stopped
214  * @sess:	Session to rerun a queue on
215  *
216  * Description:
217  *     Each CPU has it's own list of HW queues, which should be rerun.
218  *     Function finds such list with HW queues, takes a list lock, picks up
219  *     the first HW queue out of the list and requeues it.
220  *
221  * Return:
222  *     True if the queue was requeued, false otherwise.
223  *
224  * Context:
225  *     Does not matter.
226  */
227 static bool rnbd_rerun_if_needed(struct rnbd_clt_session *sess)
228 {
229 	struct rnbd_queue *q = NULL;
230 	struct rnbd_cpu_qlist *cpu_q;
231 	unsigned long flags;
232 	int *cpup;
233 
234 	/*
235 	 * To keep fairness and not to let other queues starve we always
236 	 * try to wake up someone else in round-robin manner.  That of course
237 	 * increases latency but queues always have a chance to be executed.
238 	 */
239 	cpup = get_cpu_ptr(sess->cpu_rr);
240 	for (cpu_q = rnbd_get_cpu_qlist(sess, nxt_cpu(*cpup)); cpu_q;
241 	     cpu_q = rnbd_get_cpu_qlist(sess, nxt_cpu(cpu_q->cpu))) {
242 		if (!spin_trylock_irqsave(&cpu_q->requeue_lock, flags))
243 			continue;
244 		if (unlikely(!test_bit(cpu_q->cpu, sess->cpu_queues_bm)))
245 			goto unlock;
246 		q = list_first_entry_or_null(&cpu_q->requeue_list,
247 					     typeof(*q), requeue_list);
248 		if (WARN_ON(!q))
249 			goto clear_bit;
250 		list_del_init(&q->requeue_list);
251 		clear_bit_unlock(0, &q->in_list);
252 
253 		if (list_empty(&cpu_q->requeue_list)) {
254 			/* Clear bit if nothing is left */
255 clear_bit:
256 			clear_bit(cpu_q->cpu, sess->cpu_queues_bm);
257 		}
258 unlock:
259 		spin_unlock_irqrestore(&cpu_q->requeue_lock, flags);
260 
261 		if (q)
262 			break;
263 	}
264 
265 	/**
266 	 * Saves the CPU that is going to be requeued on the per-cpu var. Just
267 	 * incrementing it doesn't work because rnbd_get_cpu_qlist() will
268 	 * always return the first CPU with something on the queue list when the
269 	 * value stored on the var is greater than the last CPU with something
270 	 * on the list.
271 	 */
272 	if (cpu_q)
273 		*cpup = cpu_q->cpu;
274 	put_cpu_var(sess->cpu_rr);
275 
276 	if (q)
277 		rnbd_clt_dev_requeue(q);
278 
279 	return q;
280 }
281 
282 /**
283  * rnbd_rerun_all_if_idle() - rerun all queues left in the list if
284  *				 session is idling (there are no requests
285  *				 in-flight).
286  * @sess:	Session to rerun the queues on
287  *
288  * Description:
289  *     This function tries to rerun all stopped queues if there are no
290  *     requests in-flight anymore.  This function tries to solve an obvious
291  *     problem, when number of tags < than number of queues (hctx), which
292  *     are stopped and put to sleep.  If last permit, which has been just put,
293  *     does not wake up all left queues (hctxs), IO requests hang forever.
294  *
295  *     That can happen when all number of permits, say N, have been exhausted
296  *     from one CPU, and we have many block devices per session, say M.
297  *     Each block device has it's own queue (hctx) for each CPU, so eventually
298  *     we can put that number of queues (hctxs) to sleep: M x nr_cpu_ids.
299  *     If number of permits N < M x nr_cpu_ids finally we will get an IO hang.
300  *
301  *     To avoid this hang last caller of rnbd_put_permit() (last caller is the
302  *     one who observes sess->busy == 0) must wake up all remaining queues.
303  *
304  * Context:
305  *     Does not matter.
306  */
307 static void rnbd_rerun_all_if_idle(struct rnbd_clt_session *sess)
308 {
309 	bool requeued;
310 
311 	do {
312 		requeued = rnbd_rerun_if_needed(sess);
313 	} while (atomic_read(&sess->busy) == 0 && requeued);
314 }
315 
316 static struct rtrs_permit *rnbd_get_permit(struct rnbd_clt_session *sess,
317 					     enum rtrs_clt_con_type con_type,
318 					     int wait)
319 {
320 	struct rtrs_permit *permit;
321 
322 	permit = rtrs_clt_get_permit(sess->rtrs, con_type,
323 				      wait ? RTRS_PERMIT_WAIT :
324 				      RTRS_PERMIT_NOWAIT);
325 	if (likely(permit))
326 		/* We have a subtle rare case here, when all permits can be
327 		 * consumed before busy counter increased.  This is safe,
328 		 * because loser will get NULL as a permit, observe 0 busy
329 		 * counter and immediately restart the queue himself.
330 		 */
331 		atomic_inc(&sess->busy);
332 
333 	return permit;
334 }
335 
336 static void rnbd_put_permit(struct rnbd_clt_session *sess,
337 			     struct rtrs_permit *permit)
338 {
339 	rtrs_clt_put_permit(sess->rtrs, permit);
340 	atomic_dec(&sess->busy);
341 	/* Paired with rnbd_clt_dev_add_to_requeue().  Decrement first
342 	 * and then check queue bits.
343 	 */
344 	smp_mb__after_atomic();
345 	rnbd_rerun_all_if_idle(sess);
346 }
347 
348 static struct rnbd_iu *rnbd_get_iu(struct rnbd_clt_session *sess,
349 				     enum rtrs_clt_con_type con_type,
350 				     int wait)
351 {
352 	struct rnbd_iu *iu;
353 	struct rtrs_permit *permit;
354 
355 	permit = rnbd_get_permit(sess, con_type,
356 				  wait ? RTRS_PERMIT_WAIT :
357 				  RTRS_PERMIT_NOWAIT);
358 	if (unlikely(!permit))
359 		return NULL;
360 	iu = rtrs_permit_to_pdu(permit);
361 	iu->permit = permit;
362 	/*
363 	 * 1st reference is dropped after finishing sending a "user" message,
364 	 * 2nd reference is dropped after confirmation with the response is
365 	 * returned.
366 	 * 1st and 2nd can happen in any order, so the rnbd_iu should be
367 	 * released (rtrs_permit returned to ibbtrs) only leased after both
368 	 * are finished.
369 	 */
370 	atomic_set(&iu->refcount, 2);
371 	init_waitqueue_head(&iu->comp.wait);
372 	iu->comp.errno = INT_MAX;
373 
374 	return iu;
375 }
376 
377 static void rnbd_put_iu(struct rnbd_clt_session *sess, struct rnbd_iu *iu)
378 {
379 	if (atomic_dec_and_test(&iu->refcount))
380 		rnbd_put_permit(sess, iu->permit);
381 }
382 
383 static void rnbd_softirq_done_fn(struct request *rq)
384 {
385 	struct rnbd_clt_dev *dev	= rq->rq_disk->private_data;
386 	struct rnbd_clt_session *sess	= dev->sess;
387 	struct rnbd_iu *iu;
388 
389 	iu = blk_mq_rq_to_pdu(rq);
390 	rnbd_put_permit(sess, iu->permit);
391 	blk_mq_end_request(rq, errno_to_blk_status(iu->errno));
392 }
393 
394 static void msg_io_conf(void *priv, int errno)
395 {
396 	struct rnbd_iu *iu = priv;
397 	struct rnbd_clt_dev *dev = iu->dev;
398 	struct request *rq = iu->rq;
399 	int rw = rq_data_dir(rq);
400 
401 	iu->errno = errno;
402 
403 	blk_mq_complete_request(rq);
404 
405 	if (errno)
406 		rnbd_clt_info_rl(dev, "%s I/O failed with err: %d\n",
407 				 rw == READ ? "read" : "write", errno);
408 }
409 
410 static void wake_up_iu_comp(struct rnbd_iu *iu, int errno)
411 {
412 	iu->comp.errno = errno;
413 	wake_up(&iu->comp.wait);
414 }
415 
416 static void msg_conf(void *priv, int errno)
417 {
418 	struct rnbd_iu *iu = priv;
419 
420 	iu->errno = errno;
421 	schedule_work(&iu->work);
422 }
423 
424 enum wait_type {
425 	NO_WAIT = 0,
426 	WAIT    = 1
427 };
428 
429 static int send_usr_msg(struct rtrs_clt *rtrs, int dir,
430 			struct rnbd_iu *iu, struct kvec *vec, size_t nr,
431 			size_t len, struct scatterlist *sg, unsigned int sg_len,
432 			void (*conf)(struct work_struct *work),
433 			int *errno, enum wait_type wait)
434 {
435 	int err;
436 	struct rtrs_clt_req_ops req_ops;
437 
438 	INIT_WORK(&iu->work, conf);
439 	req_ops = (struct rtrs_clt_req_ops) {
440 		.priv = iu,
441 		.conf_fn = msg_conf,
442 	};
443 	err = rtrs_clt_request(dir, &req_ops, rtrs, iu->permit,
444 				vec, nr, len, sg, sg_len);
445 	if (!err && wait) {
446 		wait_event(iu->comp.wait, iu->comp.errno != INT_MAX);
447 		*errno = iu->comp.errno;
448 	} else {
449 		*errno = 0;
450 	}
451 
452 	return err;
453 }
454 
455 static void msg_close_conf(struct work_struct *work)
456 {
457 	struct rnbd_iu *iu = container_of(work, struct rnbd_iu, work);
458 	struct rnbd_clt_dev *dev = iu->dev;
459 
460 	wake_up_iu_comp(iu, iu->errno);
461 	rnbd_put_iu(dev->sess, iu);
462 	rnbd_clt_put_dev(dev);
463 }
464 
465 static int send_msg_close(struct rnbd_clt_dev *dev, u32 device_id, bool wait)
466 {
467 	struct rnbd_clt_session *sess = dev->sess;
468 	struct rnbd_msg_close msg;
469 	struct rnbd_iu *iu;
470 	struct kvec vec = {
471 		.iov_base = &msg,
472 		.iov_len  = sizeof(msg)
473 	};
474 	int err, errno;
475 
476 	iu = rnbd_get_iu(sess, RTRS_ADMIN_CON, RTRS_PERMIT_WAIT);
477 	if (!iu)
478 		return -ENOMEM;
479 
480 	iu->buf = NULL;
481 	iu->dev = dev;
482 
483 	sg_mark_end(&iu->sglist[0]);
484 
485 	msg.hdr.type	= cpu_to_le16(RNBD_MSG_CLOSE);
486 	msg.device_id	= cpu_to_le32(device_id);
487 
488 	WARN_ON(!rnbd_clt_get_dev(dev));
489 	err = send_usr_msg(sess->rtrs, WRITE, iu, &vec, 1, 0, NULL, 0,
490 			   msg_close_conf, &errno, wait);
491 	if (err) {
492 		rnbd_clt_put_dev(dev);
493 		rnbd_put_iu(sess, iu);
494 	} else {
495 		err = errno;
496 	}
497 
498 	rnbd_put_iu(sess, iu);
499 	return err;
500 }
501 
502 static void msg_open_conf(struct work_struct *work)
503 {
504 	struct rnbd_iu *iu = container_of(work, struct rnbd_iu, work);
505 	struct rnbd_msg_open_rsp *rsp = iu->buf;
506 	struct rnbd_clt_dev *dev = iu->dev;
507 	int errno = iu->errno;
508 
509 	if (errno) {
510 		rnbd_clt_err(dev,
511 			      "Opening failed, server responded: %d\n",
512 			      errno);
513 	} else {
514 		errno = process_msg_open_rsp(dev, rsp);
515 		if (errno) {
516 			u32 device_id = le32_to_cpu(rsp->device_id);
517 			/*
518 			 * If server thinks its fine, but we fail to process
519 			 * then be nice and send a close to server.
520 			 */
521 			(void)send_msg_close(dev, device_id, NO_WAIT);
522 		}
523 	}
524 	kfree(rsp);
525 	wake_up_iu_comp(iu, errno);
526 	rnbd_put_iu(dev->sess, iu);
527 	rnbd_clt_put_dev(dev);
528 }
529 
530 static void msg_sess_info_conf(struct work_struct *work)
531 {
532 	struct rnbd_iu *iu = container_of(work, struct rnbd_iu, work);
533 	struct rnbd_msg_sess_info_rsp *rsp = iu->buf;
534 	struct rnbd_clt_session *sess = iu->sess;
535 
536 	if (!iu->errno)
537 		sess->ver = min_t(u8, rsp->ver, RNBD_PROTO_VER_MAJOR);
538 
539 	kfree(rsp);
540 	wake_up_iu_comp(iu, iu->errno);
541 	rnbd_put_iu(sess, iu);
542 	rnbd_clt_put_sess(sess);
543 }
544 
545 static int send_msg_open(struct rnbd_clt_dev *dev, bool wait)
546 {
547 	struct rnbd_clt_session *sess = dev->sess;
548 	struct rnbd_msg_open_rsp *rsp;
549 	struct rnbd_msg_open msg;
550 	struct rnbd_iu *iu;
551 	struct kvec vec = {
552 		.iov_base = &msg,
553 		.iov_len  = sizeof(msg)
554 	};
555 	int err, errno;
556 
557 	rsp = kzalloc(sizeof(*rsp), GFP_KERNEL);
558 	if (!rsp)
559 		return -ENOMEM;
560 
561 	iu = rnbd_get_iu(sess, RTRS_ADMIN_CON, RTRS_PERMIT_WAIT);
562 	if (!iu) {
563 		kfree(rsp);
564 		return -ENOMEM;
565 	}
566 
567 	iu->buf = rsp;
568 	iu->dev = dev;
569 
570 	sg_init_one(iu->sglist, rsp, sizeof(*rsp));
571 
572 	msg.hdr.type	= cpu_to_le16(RNBD_MSG_OPEN);
573 	msg.access_mode	= dev->access_mode;
574 	strlcpy(msg.dev_name, dev->pathname, sizeof(msg.dev_name));
575 
576 	WARN_ON(!rnbd_clt_get_dev(dev));
577 	err = send_usr_msg(sess->rtrs, READ, iu,
578 			   &vec, 1, sizeof(*rsp), iu->sglist, 1,
579 			   msg_open_conf, &errno, wait);
580 	if (err) {
581 		rnbd_clt_put_dev(dev);
582 		rnbd_put_iu(sess, iu);
583 		kfree(rsp);
584 	} else {
585 		err = errno;
586 	}
587 
588 	rnbd_put_iu(sess, iu);
589 	return err;
590 }
591 
592 static int send_msg_sess_info(struct rnbd_clt_session *sess, bool wait)
593 {
594 	struct rnbd_msg_sess_info_rsp *rsp;
595 	struct rnbd_msg_sess_info msg;
596 	struct rnbd_iu *iu;
597 	struct kvec vec = {
598 		.iov_base = &msg,
599 		.iov_len  = sizeof(msg)
600 	};
601 	int err, errno;
602 
603 	rsp = kzalloc(sizeof(*rsp), GFP_KERNEL);
604 	if (!rsp)
605 		return -ENOMEM;
606 
607 	iu = rnbd_get_iu(sess, RTRS_ADMIN_CON, RTRS_PERMIT_WAIT);
608 	if (!iu) {
609 		kfree(rsp);
610 		return -ENOMEM;
611 	}
612 
613 	iu->buf = rsp;
614 	iu->sess = sess;
615 
616 	sg_init_one(iu->sglist, rsp, sizeof(*rsp));
617 
618 	msg.hdr.type = cpu_to_le16(RNBD_MSG_SESS_INFO);
619 	msg.ver      = RNBD_PROTO_VER_MAJOR;
620 
621 	if (!rnbd_clt_get_sess(sess)) {
622 		/*
623 		 * That can happen only in one case, when RTRS has restablished
624 		 * the connection and link_ev() is called, but session is almost
625 		 * dead, last reference on session is put and caller is waiting
626 		 * for RTRS to close everything.
627 		 */
628 		err = -ENODEV;
629 		goto put_iu;
630 	}
631 	err = send_usr_msg(sess->rtrs, READ, iu,
632 			   &vec, 1, sizeof(*rsp), iu->sglist, 1,
633 			   msg_sess_info_conf, &errno, wait);
634 	if (err) {
635 		rnbd_clt_put_sess(sess);
636 put_iu:
637 		rnbd_put_iu(sess, iu);
638 		kfree(rsp);
639 	} else {
640 		err = errno;
641 	}
642 
643 	rnbd_put_iu(sess, iu);
644 	return err;
645 }
646 
647 static void set_dev_states_to_disconnected(struct rnbd_clt_session *sess)
648 {
649 	struct rnbd_clt_dev *dev;
650 
651 	mutex_lock(&sess->lock);
652 	list_for_each_entry(dev, &sess->devs_list, list) {
653 		rnbd_clt_err(dev, "Device disconnected.\n");
654 
655 		mutex_lock(&dev->lock);
656 		if (dev->dev_state == DEV_STATE_MAPPED)
657 			dev->dev_state = DEV_STATE_MAPPED_DISCONNECTED;
658 		mutex_unlock(&dev->lock);
659 	}
660 	mutex_unlock(&sess->lock);
661 }
662 
663 static void remap_devs(struct rnbd_clt_session *sess)
664 {
665 	struct rnbd_clt_dev *dev;
666 	struct rtrs_attrs attrs;
667 	int err;
668 
669 	/*
670 	 * Careful here: we are called from RTRS link event directly,
671 	 * thus we can't send any RTRS request and wait for response
672 	 * or RTRS will not be able to complete request with failure
673 	 * if something goes wrong (failing of outstanding requests
674 	 * happens exactly from the context where we are blocking now).
675 	 *
676 	 * So to avoid deadlocks each usr message sent from here must
677 	 * be asynchronous.
678 	 */
679 
680 	err = send_msg_sess_info(sess, NO_WAIT);
681 	if (err) {
682 		pr_err("send_msg_sess_info(\"%s\"): %d\n", sess->sessname, err);
683 		return;
684 	}
685 
686 	rtrs_clt_query(sess->rtrs, &attrs);
687 	mutex_lock(&sess->lock);
688 	sess->max_io_size = attrs.max_io_size;
689 
690 	list_for_each_entry(dev, &sess->devs_list, list) {
691 		bool skip;
692 
693 		mutex_lock(&dev->lock);
694 		skip = (dev->dev_state == DEV_STATE_INIT);
695 		mutex_unlock(&dev->lock);
696 		if (skip)
697 			/*
698 			 * When device is establishing connection for the first
699 			 * time - do not remap, it will be closed soon.
700 			 */
701 			continue;
702 
703 		rnbd_clt_info(dev, "session reconnected, remapping device\n");
704 		err = send_msg_open(dev, NO_WAIT);
705 		if (err) {
706 			rnbd_clt_err(dev, "send_msg_open(): %d\n", err);
707 			break;
708 		}
709 	}
710 	mutex_unlock(&sess->lock);
711 }
712 
713 static void rnbd_clt_link_ev(void *priv, enum rtrs_clt_link_ev ev)
714 {
715 	struct rnbd_clt_session *sess = priv;
716 
717 	switch (ev) {
718 	case RTRS_CLT_LINK_EV_DISCONNECTED:
719 		set_dev_states_to_disconnected(sess);
720 		break;
721 	case RTRS_CLT_LINK_EV_RECONNECTED:
722 		remap_devs(sess);
723 		break;
724 	default:
725 		pr_err("Unknown session event received (%d), session: %s\n",
726 		       ev, sess->sessname);
727 	}
728 }
729 
730 static void rnbd_init_cpu_qlists(struct rnbd_cpu_qlist __percpu *cpu_queues)
731 {
732 	unsigned int cpu;
733 	struct rnbd_cpu_qlist *cpu_q;
734 
735 	for_each_possible_cpu(cpu) {
736 		cpu_q = per_cpu_ptr(cpu_queues, cpu);
737 
738 		cpu_q->cpu = cpu;
739 		INIT_LIST_HEAD(&cpu_q->requeue_list);
740 		spin_lock_init(&cpu_q->requeue_lock);
741 	}
742 }
743 
744 static void destroy_mq_tags(struct rnbd_clt_session *sess)
745 {
746 	if (sess->tag_set.tags)
747 		blk_mq_free_tag_set(&sess->tag_set);
748 }
749 
750 static inline void wake_up_rtrs_waiters(struct rnbd_clt_session *sess)
751 {
752 	sess->rtrs_ready = true;
753 	wake_up_all(&sess->rtrs_waitq);
754 }
755 
756 static void close_rtrs(struct rnbd_clt_session *sess)
757 {
758 	might_sleep();
759 
760 	if (!IS_ERR_OR_NULL(sess->rtrs)) {
761 		rtrs_clt_close(sess->rtrs);
762 		sess->rtrs = NULL;
763 		wake_up_rtrs_waiters(sess);
764 	}
765 }
766 
767 static void free_sess(struct rnbd_clt_session *sess)
768 {
769 	WARN_ON(!list_empty(&sess->devs_list));
770 
771 	might_sleep();
772 
773 	close_rtrs(sess);
774 	destroy_mq_tags(sess);
775 	if (!list_empty(&sess->list)) {
776 		mutex_lock(&sess_lock);
777 		list_del(&sess->list);
778 		mutex_unlock(&sess_lock);
779 	}
780 	free_percpu(sess->cpu_queues);
781 	free_percpu(sess->cpu_rr);
782 	mutex_destroy(&sess->lock);
783 	kfree(sess);
784 }
785 
786 static struct rnbd_clt_session *alloc_sess(const char *sessname)
787 {
788 	struct rnbd_clt_session *sess;
789 	int err, cpu;
790 
791 	sess = kzalloc_node(sizeof(*sess), GFP_KERNEL, NUMA_NO_NODE);
792 	if (!sess)
793 		return ERR_PTR(-ENOMEM);
794 	strlcpy(sess->sessname, sessname, sizeof(sess->sessname));
795 	atomic_set(&sess->busy, 0);
796 	mutex_init(&sess->lock);
797 	INIT_LIST_HEAD(&sess->devs_list);
798 	INIT_LIST_HEAD(&sess->list);
799 	bitmap_zero(sess->cpu_queues_bm, NR_CPUS);
800 	init_waitqueue_head(&sess->rtrs_waitq);
801 	refcount_set(&sess->refcount, 1);
802 
803 	sess->cpu_queues = alloc_percpu(struct rnbd_cpu_qlist);
804 	if (!sess->cpu_queues) {
805 		err = -ENOMEM;
806 		goto err;
807 	}
808 	rnbd_init_cpu_qlists(sess->cpu_queues);
809 
810 	/*
811 	 * That is simple percpu variable which stores cpu indeces, which are
812 	 * incremented on each access.  We need that for the sake of fairness
813 	 * to wake up queues in a round-robin manner.
814 	 */
815 	sess->cpu_rr = alloc_percpu(int);
816 	if (!sess->cpu_rr) {
817 		err = -ENOMEM;
818 		goto err;
819 	}
820 	for_each_possible_cpu(cpu)
821 		* per_cpu_ptr(sess->cpu_rr, cpu) = cpu;
822 
823 	return sess;
824 
825 err:
826 	free_sess(sess);
827 
828 	return ERR_PTR(err);
829 }
830 
831 static int wait_for_rtrs_connection(struct rnbd_clt_session *sess)
832 {
833 	wait_event(sess->rtrs_waitq, sess->rtrs_ready);
834 	if (IS_ERR_OR_NULL(sess->rtrs))
835 		return -ECONNRESET;
836 
837 	return 0;
838 }
839 
840 static void wait_for_rtrs_disconnection(struct rnbd_clt_session *sess)
841 	__releases(&sess_lock)
842 	__acquires(&sess_lock)
843 {
844 	DEFINE_WAIT(wait);
845 
846 	prepare_to_wait(&sess->rtrs_waitq, &wait, TASK_UNINTERRUPTIBLE);
847 	if (IS_ERR_OR_NULL(sess->rtrs)) {
848 		finish_wait(&sess->rtrs_waitq, &wait);
849 		return;
850 	}
851 	mutex_unlock(&sess_lock);
852 	/* loop in caller, see __find_and_get_sess().
853 	 * You can't leave mutex locked and call schedule(), you will catch a
854 	 * deadlock with a caller of free_sess(), which has just put the last
855 	 * reference and is about to take the sess_lock in order to delete
856 	 * the session from the list.
857 	 */
858 	schedule();
859 	mutex_lock(&sess_lock);
860 }
861 
862 static struct rnbd_clt_session *__find_and_get_sess(const char *sessname)
863 	__releases(&sess_lock)
864 	__acquires(&sess_lock)
865 {
866 	struct rnbd_clt_session *sess, *sn;
867 	int err;
868 
869 again:
870 	list_for_each_entry_safe(sess, sn, &sess_list, list) {
871 		if (strcmp(sessname, sess->sessname))
872 			continue;
873 
874 		if (sess->rtrs_ready && IS_ERR_OR_NULL(sess->rtrs))
875 			/*
876 			 * No RTRS connection, session is dying.
877 			 */
878 			continue;
879 
880 		if (rnbd_clt_get_sess(sess)) {
881 			/*
882 			 * Alive session is found, wait for RTRS connection.
883 			 */
884 			mutex_unlock(&sess_lock);
885 			err = wait_for_rtrs_connection(sess);
886 			if (err)
887 				rnbd_clt_put_sess(sess);
888 			mutex_lock(&sess_lock);
889 
890 			if (err)
891 				/* Session is dying, repeat the loop */
892 				goto again;
893 
894 			return sess;
895 		}
896 		/*
897 		 * Ref is 0, session is dying, wait for RTRS disconnect
898 		 * in order to avoid session names clashes.
899 		 */
900 		wait_for_rtrs_disconnection(sess);
901 		/*
902 		 * RTRS is disconnected and soon session will be freed,
903 		 * so repeat a loop.
904 		 */
905 		goto again;
906 	}
907 
908 	return NULL;
909 }
910 
911 static struct
912 rnbd_clt_session *find_or_create_sess(const char *sessname, bool *first)
913 {
914 	struct rnbd_clt_session *sess = NULL;
915 
916 	mutex_lock(&sess_lock);
917 	sess = __find_and_get_sess(sessname);
918 	if (!sess) {
919 		sess = alloc_sess(sessname);
920 		if (IS_ERR(sess)) {
921 			mutex_unlock(&sess_lock);
922 			return sess;
923 		}
924 		list_add(&sess->list, &sess_list);
925 		*first = true;
926 	} else
927 		*first = false;
928 	mutex_unlock(&sess_lock);
929 
930 	return sess;
931 }
932 
933 static int rnbd_client_open(struct block_device *block_device, fmode_t mode)
934 {
935 	struct rnbd_clt_dev *dev = block_device->bd_disk->private_data;
936 
937 	if (dev->read_only && (mode & FMODE_WRITE))
938 		return -EPERM;
939 
940 	if (dev->dev_state == DEV_STATE_UNMAPPED ||
941 	    !rnbd_clt_get_dev(dev))
942 		return -EIO;
943 
944 	return 0;
945 }
946 
947 static void rnbd_client_release(struct gendisk *gen, fmode_t mode)
948 {
949 	struct rnbd_clt_dev *dev = gen->private_data;
950 
951 	rnbd_clt_put_dev(dev);
952 }
953 
954 static int rnbd_client_getgeo(struct block_device *block_device,
955 			      struct hd_geometry *geo)
956 {
957 	u64 size;
958 	struct rnbd_clt_dev *dev;
959 
960 	dev = block_device->bd_disk->private_data;
961 	size = dev->size * (dev->logical_block_size / SECTOR_SIZE);
962 	geo->cylinders	= size >> 6;	/* size/64 */
963 	geo->heads	= 4;
964 	geo->sectors	= 16;
965 	geo->start	= 0;
966 
967 	return 0;
968 }
969 
970 static const struct block_device_operations rnbd_client_ops = {
971 	.owner		= THIS_MODULE,
972 	.open		= rnbd_client_open,
973 	.release	= rnbd_client_release,
974 	.getgeo		= rnbd_client_getgeo
975 };
976 
977 /* The amount of data that belongs to an I/O and the amount of data that
978  * should be read or written to the disk (bi_size) can differ.
979  *
980  * E.g. When WRITE_SAME is used, only a small amount of data is
981  * transferred that is then written repeatedly over a lot of sectors.
982  *
983  * Get the size of data to be transferred via RTRS by summing up the size
984  * of the scather-gather list entries.
985  */
986 static size_t rnbd_clt_get_sg_size(struct scatterlist *sglist, u32 len)
987 {
988 	struct scatterlist *sg;
989 	size_t tsize = 0;
990 	int i;
991 
992 	for_each_sg(sglist, sg, len, i)
993 		tsize += sg->length;
994 	return tsize;
995 }
996 
997 static int rnbd_client_xfer_request(struct rnbd_clt_dev *dev,
998 				     struct request *rq,
999 				     struct rnbd_iu *iu)
1000 {
1001 	struct rtrs_clt *rtrs = dev->sess->rtrs;
1002 	struct rtrs_permit *permit = iu->permit;
1003 	struct rnbd_msg_io msg;
1004 	struct rtrs_clt_req_ops req_ops;
1005 	unsigned int sg_cnt = 0;
1006 	struct kvec vec;
1007 	size_t size;
1008 	int err;
1009 
1010 	iu->rq		= rq;
1011 	iu->dev		= dev;
1012 	msg.sector	= cpu_to_le64(blk_rq_pos(rq));
1013 	msg.bi_size	= cpu_to_le32(blk_rq_bytes(rq));
1014 	msg.rw		= cpu_to_le32(rq_to_rnbd_flags(rq));
1015 	msg.prio	= cpu_to_le16(req_get_ioprio(rq));
1016 
1017 	/*
1018 	 * We only support discards with single segment for now.
1019 	 * See queue limits.
1020 	 */
1021 	if (req_op(rq) != REQ_OP_DISCARD)
1022 		sg_cnt = blk_rq_map_sg(dev->queue, rq, iu->sglist);
1023 
1024 	if (sg_cnt == 0)
1025 		/* Do not forget to mark the end */
1026 		sg_mark_end(&iu->sglist[0]);
1027 
1028 	msg.hdr.type	= cpu_to_le16(RNBD_MSG_IO);
1029 	msg.device_id	= cpu_to_le32(dev->device_id);
1030 
1031 	vec = (struct kvec) {
1032 		.iov_base = &msg,
1033 		.iov_len  = sizeof(msg)
1034 	};
1035 	size = rnbd_clt_get_sg_size(iu->sglist, sg_cnt);
1036 	req_ops = (struct rtrs_clt_req_ops) {
1037 		.priv = iu,
1038 		.conf_fn = msg_io_conf,
1039 	};
1040 	err = rtrs_clt_request(rq_data_dir(rq), &req_ops, rtrs, permit,
1041 			       &vec, 1, size, iu->sglist, sg_cnt);
1042 	if (unlikely(err)) {
1043 		rnbd_clt_err_rl(dev, "RTRS failed to transfer IO, err: %d\n",
1044 				 err);
1045 		return err;
1046 	}
1047 
1048 	return 0;
1049 }
1050 
1051 /**
1052  * rnbd_clt_dev_add_to_requeue() - add device to requeue if session is busy
1053  * @dev:	Device to be checked
1054  * @q:		Queue to be added to the requeue list if required
1055  *
1056  * Description:
1057  *     If session is busy, that means someone will requeue us when resources
1058  *     are freed.  If session is not doing anything - device is not added to
1059  *     the list and @false is returned.
1060  */
1061 static bool rnbd_clt_dev_add_to_requeue(struct rnbd_clt_dev *dev,
1062 						struct rnbd_queue *q)
1063 {
1064 	struct rnbd_clt_session *sess = dev->sess;
1065 	struct rnbd_cpu_qlist *cpu_q;
1066 	unsigned long flags;
1067 	bool added = true;
1068 	bool need_set;
1069 
1070 	cpu_q = get_cpu_ptr(sess->cpu_queues);
1071 	spin_lock_irqsave(&cpu_q->requeue_lock, flags);
1072 
1073 	if (likely(!test_and_set_bit_lock(0, &q->in_list))) {
1074 		if (WARN_ON(!list_empty(&q->requeue_list)))
1075 			goto unlock;
1076 
1077 		need_set = !test_bit(cpu_q->cpu, sess->cpu_queues_bm);
1078 		if (need_set) {
1079 			set_bit(cpu_q->cpu, sess->cpu_queues_bm);
1080 			/* Paired with rnbd_put_permit(). Set a bit first
1081 			 * and then observe the busy counter.
1082 			 */
1083 			smp_mb__before_atomic();
1084 		}
1085 		if (likely(atomic_read(&sess->busy))) {
1086 			list_add_tail(&q->requeue_list, &cpu_q->requeue_list);
1087 		} else {
1088 			/* Very unlikely, but possible: busy counter was
1089 			 * observed as zero.  Drop all bits and return
1090 			 * false to restart the queue by ourselves.
1091 			 */
1092 			if (need_set)
1093 				clear_bit(cpu_q->cpu, sess->cpu_queues_bm);
1094 			clear_bit_unlock(0, &q->in_list);
1095 			added = false;
1096 		}
1097 	}
1098 unlock:
1099 	spin_unlock_irqrestore(&cpu_q->requeue_lock, flags);
1100 	put_cpu_ptr(sess->cpu_queues);
1101 
1102 	return added;
1103 }
1104 
1105 static void rnbd_clt_dev_kick_mq_queue(struct rnbd_clt_dev *dev,
1106 					struct blk_mq_hw_ctx *hctx,
1107 					int delay)
1108 {
1109 	struct rnbd_queue *q = hctx->driver_data;
1110 
1111 	if (delay != RNBD_DELAY_IFBUSY)
1112 		blk_mq_delay_run_hw_queue(hctx, delay);
1113 	else if (unlikely(!rnbd_clt_dev_add_to_requeue(dev, q)))
1114 		/*
1115 		 * If session is not busy we have to restart
1116 		 * the queue ourselves.
1117 		 */
1118 		blk_mq_delay_run_hw_queue(hctx, 10/*ms*/);
1119 }
1120 
1121 static blk_status_t rnbd_queue_rq(struct blk_mq_hw_ctx *hctx,
1122 				   const struct blk_mq_queue_data *bd)
1123 {
1124 	struct request *rq = bd->rq;
1125 	struct rnbd_clt_dev *dev = rq->rq_disk->private_data;
1126 	struct rnbd_iu *iu = blk_mq_rq_to_pdu(rq);
1127 	int err;
1128 
1129 	if (unlikely(dev->dev_state != DEV_STATE_MAPPED))
1130 		return BLK_STS_IOERR;
1131 
1132 	iu->permit = rnbd_get_permit(dev->sess, RTRS_IO_CON,
1133 				      RTRS_PERMIT_NOWAIT);
1134 	if (unlikely(!iu->permit)) {
1135 		rnbd_clt_dev_kick_mq_queue(dev, hctx, RNBD_DELAY_IFBUSY);
1136 		return BLK_STS_RESOURCE;
1137 	}
1138 
1139 	blk_mq_start_request(rq);
1140 	err = rnbd_client_xfer_request(dev, rq, iu);
1141 	if (likely(err == 0))
1142 		return BLK_STS_OK;
1143 	if (unlikely(err == -EAGAIN || err == -ENOMEM)) {
1144 		rnbd_clt_dev_kick_mq_queue(dev, hctx, 10/*ms*/);
1145 		rnbd_put_permit(dev->sess, iu->permit);
1146 		return BLK_STS_RESOURCE;
1147 	}
1148 
1149 	rnbd_put_permit(dev->sess, iu->permit);
1150 	return BLK_STS_IOERR;
1151 }
1152 
1153 static int rnbd_init_request(struct blk_mq_tag_set *set, struct request *rq,
1154 			      unsigned int hctx_idx, unsigned int numa_node)
1155 {
1156 	struct rnbd_iu *iu = blk_mq_rq_to_pdu(rq);
1157 
1158 	sg_init_table(iu->sglist, BMAX_SEGMENTS);
1159 	return 0;
1160 }
1161 
1162 static struct blk_mq_ops rnbd_mq_ops = {
1163 	.queue_rq	= rnbd_queue_rq,
1164 	.init_request	= rnbd_init_request,
1165 	.complete	= rnbd_softirq_done_fn,
1166 };
1167 
1168 static int setup_mq_tags(struct rnbd_clt_session *sess)
1169 {
1170 	struct blk_mq_tag_set *tag_set = &sess->tag_set;
1171 
1172 	memset(tag_set, 0, sizeof(*tag_set));
1173 	tag_set->ops		= &rnbd_mq_ops;
1174 	tag_set->queue_depth	= sess->queue_depth;
1175 	tag_set->numa_node		= NUMA_NO_NODE;
1176 	tag_set->flags		= BLK_MQ_F_SHOULD_MERGE |
1177 				  BLK_MQ_F_TAG_QUEUE_SHARED;
1178 	tag_set->cmd_size		= sizeof(struct rnbd_iu);
1179 	tag_set->nr_hw_queues	= num_online_cpus();
1180 
1181 	return blk_mq_alloc_tag_set(tag_set);
1182 }
1183 
1184 static struct rnbd_clt_session *
1185 find_and_get_or_create_sess(const char *sessname,
1186 			    const struct rtrs_addr *paths,
1187 			    size_t path_cnt, u16 port_nr)
1188 {
1189 	struct rnbd_clt_session *sess;
1190 	struct rtrs_attrs attrs;
1191 	int err;
1192 	bool first;
1193 	struct rtrs_clt_ops rtrs_ops;
1194 
1195 	sess = find_or_create_sess(sessname, &first);
1196 	if (sess == ERR_PTR(-ENOMEM))
1197 		return ERR_PTR(-ENOMEM);
1198 	else if (!first)
1199 		return sess;
1200 
1201 	rtrs_ops = (struct rtrs_clt_ops) {
1202 		.priv = sess,
1203 		.link_ev = rnbd_clt_link_ev,
1204 	};
1205 	/*
1206 	 * Nothing was found, establish rtrs connection and proceed further.
1207 	 */
1208 	sess->rtrs = rtrs_clt_open(&rtrs_ops, sessname,
1209 				   paths, path_cnt, port_nr,
1210 				   sizeof(struct rnbd_iu),
1211 				   RECONNECT_DELAY, BMAX_SEGMENTS,
1212 				   BLK_MAX_SEGMENT_SIZE,
1213 				   MAX_RECONNECTS);
1214 	if (IS_ERR(sess->rtrs)) {
1215 		err = PTR_ERR(sess->rtrs);
1216 		goto wake_up_and_put;
1217 	}
1218 	rtrs_clt_query(sess->rtrs, &attrs);
1219 	sess->max_io_size = attrs.max_io_size;
1220 	sess->queue_depth = attrs.queue_depth;
1221 
1222 	err = setup_mq_tags(sess);
1223 	if (err)
1224 		goto close_rtrs;
1225 
1226 	err = send_msg_sess_info(sess, WAIT);
1227 	if (err)
1228 		goto close_rtrs;
1229 
1230 	wake_up_rtrs_waiters(sess);
1231 
1232 	return sess;
1233 
1234 close_rtrs:
1235 	close_rtrs(sess);
1236 put_sess:
1237 	rnbd_clt_put_sess(sess);
1238 
1239 	return ERR_PTR(err);
1240 
1241 wake_up_and_put:
1242 	wake_up_rtrs_waiters(sess);
1243 	goto put_sess;
1244 }
1245 
1246 static inline void rnbd_init_hw_queue(struct rnbd_clt_dev *dev,
1247 				       struct rnbd_queue *q,
1248 				       struct blk_mq_hw_ctx *hctx)
1249 {
1250 	INIT_LIST_HEAD(&q->requeue_list);
1251 	q->dev  = dev;
1252 	q->hctx = hctx;
1253 }
1254 
1255 static void rnbd_init_mq_hw_queues(struct rnbd_clt_dev *dev)
1256 {
1257 	int i;
1258 	struct blk_mq_hw_ctx *hctx;
1259 	struct rnbd_queue *q;
1260 
1261 	queue_for_each_hw_ctx(dev->queue, hctx, i) {
1262 		q = &dev->hw_queues[i];
1263 		rnbd_init_hw_queue(dev, q, hctx);
1264 		hctx->driver_data = q;
1265 	}
1266 }
1267 
1268 static int setup_mq_dev(struct rnbd_clt_dev *dev)
1269 {
1270 	dev->queue = blk_mq_init_queue(&dev->sess->tag_set);
1271 	if (IS_ERR(dev->queue)) {
1272 		rnbd_clt_err(dev, "Initializing multiqueue queue failed, err: %ld\n",
1273 			      PTR_ERR(dev->queue));
1274 		return PTR_ERR(dev->queue);
1275 	}
1276 	rnbd_init_mq_hw_queues(dev);
1277 	return 0;
1278 }
1279 
1280 static void setup_request_queue(struct rnbd_clt_dev *dev)
1281 {
1282 	blk_queue_logical_block_size(dev->queue, dev->logical_block_size);
1283 	blk_queue_physical_block_size(dev->queue, dev->physical_block_size);
1284 	blk_queue_max_hw_sectors(dev->queue, dev->max_hw_sectors);
1285 	blk_queue_max_write_same_sectors(dev->queue,
1286 					 dev->max_write_same_sectors);
1287 
1288 	/*
1289 	 * we don't support discards to "discontiguous" segments
1290 	 * in on request
1291 	 */
1292 	blk_queue_max_discard_segments(dev->queue, 1);
1293 
1294 	blk_queue_max_discard_sectors(dev->queue, dev->max_discard_sectors);
1295 	dev->queue->limits.discard_granularity	= dev->discard_granularity;
1296 	dev->queue->limits.discard_alignment	= dev->discard_alignment;
1297 	if (dev->max_discard_sectors)
1298 		blk_queue_flag_set(QUEUE_FLAG_DISCARD, dev->queue);
1299 	if (dev->secure_discard)
1300 		blk_queue_flag_set(QUEUE_FLAG_SECERASE, dev->queue);
1301 
1302 	blk_queue_flag_set(QUEUE_FLAG_SAME_COMP, dev->queue);
1303 	blk_queue_flag_set(QUEUE_FLAG_SAME_FORCE, dev->queue);
1304 	blk_queue_max_segments(dev->queue, dev->max_segments);
1305 	blk_queue_io_opt(dev->queue, dev->sess->max_io_size);
1306 	blk_queue_virt_boundary(dev->queue, SZ_4K - 1);
1307 	blk_queue_write_cache(dev->queue, true, true);
1308 	dev->queue->queuedata = dev;
1309 }
1310 
1311 static void rnbd_clt_setup_gen_disk(struct rnbd_clt_dev *dev, int idx)
1312 {
1313 	dev->gd->major		= rnbd_client_major;
1314 	dev->gd->first_minor	= idx << RNBD_PART_BITS;
1315 	dev->gd->fops		= &rnbd_client_ops;
1316 	dev->gd->queue		= dev->queue;
1317 	dev->gd->private_data	= dev;
1318 	snprintf(dev->gd->disk_name, sizeof(dev->gd->disk_name), "rnbd%d",
1319 		 idx);
1320 	pr_debug("disk_name=%s, capacity=%zu\n",
1321 		 dev->gd->disk_name,
1322 		 dev->nsectors * (dev->logical_block_size / SECTOR_SIZE)
1323 		 );
1324 
1325 	set_capacity(dev->gd, dev->nsectors);
1326 
1327 	if (dev->access_mode == RNBD_ACCESS_RO) {
1328 		dev->read_only = true;
1329 		set_disk_ro(dev->gd, true);
1330 	} else {
1331 		dev->read_only = false;
1332 	}
1333 
1334 	if (!dev->rotational)
1335 		blk_queue_flag_set(QUEUE_FLAG_NONROT, dev->queue);
1336 }
1337 
1338 static int rnbd_client_setup_device(struct rnbd_clt_session *sess,
1339 				     struct rnbd_clt_dev *dev, int idx)
1340 {
1341 	int err;
1342 
1343 	dev->size = dev->nsectors * dev->logical_block_size;
1344 
1345 	err = setup_mq_dev(dev);
1346 	if (err)
1347 		return err;
1348 
1349 	setup_request_queue(dev);
1350 
1351 	dev->gd = alloc_disk_node(1 << RNBD_PART_BITS,	NUMA_NO_NODE);
1352 	if (!dev->gd) {
1353 		blk_cleanup_queue(dev->queue);
1354 		return -ENOMEM;
1355 	}
1356 
1357 	rnbd_clt_setup_gen_disk(dev, idx);
1358 
1359 	return 0;
1360 }
1361 
1362 static struct rnbd_clt_dev *init_dev(struct rnbd_clt_session *sess,
1363 				      enum rnbd_access_mode access_mode,
1364 				      const char *pathname)
1365 {
1366 	struct rnbd_clt_dev *dev;
1367 	int ret;
1368 
1369 	dev = kzalloc_node(sizeof(*dev), GFP_KERNEL, NUMA_NO_NODE);
1370 	if (!dev)
1371 		return ERR_PTR(-ENOMEM);
1372 
1373 	dev->hw_queues = kcalloc(nr_cpu_ids, sizeof(*dev->hw_queues),
1374 				 GFP_KERNEL);
1375 	if (!dev->hw_queues) {
1376 		ret = -ENOMEM;
1377 		goto out_alloc;
1378 	}
1379 
1380 	mutex_lock(&ida_lock);
1381 	ret = ida_simple_get(&index_ida, 0, 1 << (MINORBITS - RNBD_PART_BITS),
1382 			     GFP_KERNEL);
1383 	mutex_unlock(&ida_lock);
1384 	if (ret < 0) {
1385 		pr_err("Failed to initialize device '%s' from session %s, allocating idr failed, err: %d\n",
1386 		       pathname, sess->sessname, ret);
1387 		goto out_queues;
1388 	}
1389 	dev->clt_device_id	= ret;
1390 	dev->sess		= sess;
1391 	dev->access_mode	= access_mode;
1392 	strlcpy(dev->pathname, pathname, sizeof(dev->pathname));
1393 	mutex_init(&dev->lock);
1394 	refcount_set(&dev->refcount, 1);
1395 	dev->dev_state = DEV_STATE_INIT;
1396 
1397 	/*
1398 	 * Here we called from sysfs entry, thus clt-sysfs is
1399 	 * responsible that session will not disappear.
1400 	 */
1401 	WARN_ON(!rnbd_clt_get_sess(sess));
1402 
1403 	return dev;
1404 
1405 out_queues:
1406 	kfree(dev->hw_queues);
1407 out_alloc:
1408 	kfree(dev);
1409 	return ERR_PTR(ret);
1410 }
1411 
1412 static bool __exists_dev(const char *pathname)
1413 {
1414 	struct rnbd_clt_session *sess;
1415 	struct rnbd_clt_dev *dev;
1416 	bool found = false;
1417 
1418 	list_for_each_entry(sess, &sess_list, list) {
1419 		mutex_lock(&sess->lock);
1420 		list_for_each_entry(dev, &sess->devs_list, list) {
1421 			if (!strncmp(dev->pathname, pathname,
1422 				     sizeof(dev->pathname))) {
1423 				found = true;
1424 				break;
1425 			}
1426 		}
1427 		mutex_unlock(&sess->lock);
1428 		if (found)
1429 			break;
1430 	}
1431 
1432 	return found;
1433 }
1434 
1435 static bool exists_devpath(const char *pathname)
1436 {
1437 	bool found;
1438 
1439 	mutex_lock(&sess_lock);
1440 	found = __exists_dev(pathname);
1441 	mutex_unlock(&sess_lock);
1442 
1443 	return found;
1444 }
1445 
1446 static bool insert_dev_if_not_exists_devpath(const char *pathname,
1447 					     struct rnbd_clt_session *sess,
1448 					     struct rnbd_clt_dev *dev)
1449 {
1450 	bool found;
1451 
1452 	mutex_lock(&sess_lock);
1453 	found = __exists_dev(pathname);
1454 	if (!found) {
1455 		mutex_lock(&sess->lock);
1456 		list_add_tail(&dev->list, &sess->devs_list);
1457 		mutex_unlock(&sess->lock);
1458 	}
1459 	mutex_unlock(&sess_lock);
1460 
1461 	return found;
1462 }
1463 
1464 static void delete_dev(struct rnbd_clt_dev *dev)
1465 {
1466 	struct rnbd_clt_session *sess = dev->sess;
1467 
1468 	mutex_lock(&sess->lock);
1469 	list_del(&dev->list);
1470 	mutex_unlock(&sess->lock);
1471 }
1472 
1473 struct rnbd_clt_dev *rnbd_clt_map_device(const char *sessname,
1474 					   struct rtrs_addr *paths,
1475 					   size_t path_cnt, u16 port_nr,
1476 					   const char *pathname,
1477 					   enum rnbd_access_mode access_mode)
1478 {
1479 	struct rnbd_clt_session *sess;
1480 	struct rnbd_clt_dev *dev;
1481 	int ret;
1482 
1483 	if (exists_devpath(pathname))
1484 		return ERR_PTR(-EEXIST);
1485 
1486 	sess = find_and_get_or_create_sess(sessname, paths, path_cnt, port_nr);
1487 	if (IS_ERR(sess))
1488 		return ERR_CAST(sess);
1489 
1490 	dev = init_dev(sess, access_mode, pathname);
1491 	if (IS_ERR(dev)) {
1492 		pr_err("map_device: failed to map device '%s' from session %s, can't initialize device, err: %ld\n",
1493 		       pathname, sess->sessname, PTR_ERR(dev));
1494 		ret = PTR_ERR(dev);
1495 		goto put_sess;
1496 	}
1497 	if (insert_dev_if_not_exists_devpath(pathname, sess, dev)) {
1498 		ret = -EEXIST;
1499 		goto put_dev;
1500 	}
1501 	ret = send_msg_open(dev, WAIT);
1502 	if (ret) {
1503 		rnbd_clt_err(dev,
1504 			      "map_device: failed, can't open remote device, err: %d\n",
1505 			      ret);
1506 		goto del_dev;
1507 	}
1508 	mutex_lock(&dev->lock);
1509 	pr_debug("Opened remote device: session=%s, path='%s'\n",
1510 		 sess->sessname, pathname);
1511 	ret = rnbd_client_setup_device(sess, dev, dev->clt_device_id);
1512 	if (ret) {
1513 		rnbd_clt_err(dev,
1514 			      "map_device: Failed to configure device, err: %d\n",
1515 			      ret);
1516 		mutex_unlock(&dev->lock);
1517 		goto del_dev;
1518 	}
1519 
1520 	rnbd_clt_info(dev,
1521 		       "map_device: Device mapped as %s (nsectors: %zu, logical_block_size: %d, physical_block_size: %d, max_write_same_sectors: %d, max_discard_sectors: %d, discard_granularity: %d, discard_alignment: %d, secure_discard: %d, max_segments: %d, max_hw_sectors: %d, rotational: %d)\n",
1522 		       dev->gd->disk_name, dev->nsectors,
1523 		       dev->logical_block_size, dev->physical_block_size,
1524 		       dev->max_write_same_sectors, dev->max_discard_sectors,
1525 		       dev->discard_granularity, dev->discard_alignment,
1526 		       dev->secure_discard, dev->max_segments,
1527 		       dev->max_hw_sectors, dev->rotational);
1528 
1529 	mutex_unlock(&dev->lock);
1530 
1531 	add_disk(dev->gd);
1532 	rnbd_clt_put_sess(sess);
1533 
1534 	return dev;
1535 
1536 del_dev:
1537 	delete_dev(dev);
1538 put_dev:
1539 	rnbd_clt_put_dev(dev);
1540 put_sess:
1541 	rnbd_clt_put_sess(sess);
1542 
1543 	return ERR_PTR(ret);
1544 }
1545 
1546 static void destroy_gen_disk(struct rnbd_clt_dev *dev)
1547 {
1548 	del_gendisk(dev->gd);
1549 	blk_cleanup_queue(dev->queue);
1550 	put_disk(dev->gd);
1551 }
1552 
1553 static void destroy_sysfs(struct rnbd_clt_dev *dev,
1554 			  const struct attribute *sysfs_self)
1555 {
1556 	rnbd_clt_remove_dev_symlink(dev);
1557 	if (dev->kobj.state_initialized) {
1558 		if (sysfs_self)
1559 			/* To avoid deadlock firstly remove itself */
1560 			sysfs_remove_file_self(&dev->kobj, sysfs_self);
1561 		kobject_del(&dev->kobj);
1562 		kobject_put(&dev->kobj);
1563 	}
1564 }
1565 
1566 int rnbd_clt_unmap_device(struct rnbd_clt_dev *dev, bool force,
1567 			   const struct attribute *sysfs_self)
1568 {
1569 	struct rnbd_clt_session *sess = dev->sess;
1570 	int refcount, ret = 0;
1571 	bool was_mapped;
1572 
1573 	mutex_lock(&dev->lock);
1574 	if (dev->dev_state == DEV_STATE_UNMAPPED) {
1575 		rnbd_clt_info(dev, "Device is already being unmapped\n");
1576 		ret = -EALREADY;
1577 		goto err;
1578 	}
1579 	refcount = refcount_read(&dev->refcount);
1580 	if (!force && refcount > 1) {
1581 		rnbd_clt_err(dev,
1582 			      "Closing device failed, device is in use, (%d device users)\n",
1583 			      refcount - 1);
1584 		ret = -EBUSY;
1585 		goto err;
1586 	}
1587 	was_mapped = (dev->dev_state == DEV_STATE_MAPPED);
1588 	dev->dev_state = DEV_STATE_UNMAPPED;
1589 	mutex_unlock(&dev->lock);
1590 
1591 	delete_dev(dev);
1592 	destroy_sysfs(dev, sysfs_self);
1593 	destroy_gen_disk(dev);
1594 	if (was_mapped && sess->rtrs)
1595 		send_msg_close(dev, dev->device_id, WAIT);
1596 
1597 	rnbd_clt_info(dev, "Device is unmapped\n");
1598 
1599 	/* Likely last reference put */
1600 	rnbd_clt_put_dev(dev);
1601 
1602 	/*
1603 	 * Here device and session can be vanished!
1604 	 */
1605 
1606 	return 0;
1607 err:
1608 	mutex_unlock(&dev->lock);
1609 
1610 	return ret;
1611 }
1612 
1613 int rnbd_clt_remap_device(struct rnbd_clt_dev *dev)
1614 {
1615 	int err;
1616 
1617 	mutex_lock(&dev->lock);
1618 	if (dev->dev_state == DEV_STATE_MAPPED_DISCONNECTED)
1619 		err = 0;
1620 	else if (dev->dev_state == DEV_STATE_UNMAPPED)
1621 		err = -ENODEV;
1622 	else if (dev->dev_state == DEV_STATE_MAPPED)
1623 		err = -EALREADY;
1624 	else
1625 		err = -EBUSY;
1626 	mutex_unlock(&dev->lock);
1627 	if (!err) {
1628 		rnbd_clt_info(dev, "Remapping device.\n");
1629 		err = send_msg_open(dev, WAIT);
1630 		if (err)
1631 			rnbd_clt_err(dev, "remap_device: %d\n", err);
1632 	}
1633 
1634 	return err;
1635 }
1636 
1637 static void unmap_device_work(struct work_struct *work)
1638 {
1639 	struct rnbd_clt_dev *dev;
1640 
1641 	dev = container_of(work, typeof(*dev), unmap_on_rmmod_work);
1642 	rnbd_clt_unmap_device(dev, true, NULL);
1643 }
1644 
1645 static void rnbd_destroy_sessions(void)
1646 {
1647 	struct rnbd_clt_session *sess, *sn;
1648 	struct rnbd_clt_dev *dev, *tn;
1649 
1650 	/* Firstly forbid access through sysfs interface */
1651 	rnbd_clt_destroy_default_group();
1652 	rnbd_clt_destroy_sysfs_files();
1653 
1654 	/*
1655 	 * Here at this point there is no any concurrent access to sessions
1656 	 * list and devices list:
1657 	 *   1. New session or device can'be be created - session sysfs files
1658 	 *      are removed.
1659 	 *   2. Device or session can't be removed - module reference is taken
1660 	 *      into account in unmap device sysfs callback.
1661 	 *   3. No IO requests inflight - each file open of block_dev increases
1662 	 *      module reference in get_disk().
1663 	 *
1664 	 * But still there can be user requests inflights, which are sent by
1665 	 * asynchronous send_msg_*() functions, thus before unmapping devices
1666 	 * RTRS session must be explicitly closed.
1667 	 */
1668 
1669 	list_for_each_entry_safe(sess, sn, &sess_list, list) {
1670 		WARN_ON(!rnbd_clt_get_sess(sess));
1671 		close_rtrs(sess);
1672 		list_for_each_entry_safe(dev, tn, &sess->devs_list, list) {
1673 			/*
1674 			 * Here unmap happens in parallel for only one reason:
1675 			 * blk_cleanup_queue() takes around half a second, so
1676 			 * on huge amount of devices the whole module unload
1677 			 * procedure takes minutes.
1678 			 */
1679 			INIT_WORK(&dev->unmap_on_rmmod_work, unmap_device_work);
1680 			queue_work(system_long_wq, &dev->unmap_on_rmmod_work);
1681 		}
1682 		rnbd_clt_put_sess(sess);
1683 	}
1684 	/* Wait for all scheduled unmap works */
1685 	flush_workqueue(system_long_wq);
1686 	WARN_ON(!list_empty(&sess_list));
1687 }
1688 
1689 static int __init rnbd_client_init(void)
1690 {
1691 	int err = 0;
1692 
1693 	BUILD_BUG_ON(sizeof(struct rnbd_msg_hdr) != 4);
1694 	BUILD_BUG_ON(sizeof(struct rnbd_msg_sess_info) != 36);
1695 	BUILD_BUG_ON(sizeof(struct rnbd_msg_sess_info_rsp) != 36);
1696 	BUILD_BUG_ON(sizeof(struct rnbd_msg_open) != 264);
1697 	BUILD_BUG_ON(sizeof(struct rnbd_msg_close) != 8);
1698 	BUILD_BUG_ON(sizeof(struct rnbd_msg_open_rsp) != 56);
1699 	rnbd_client_major = register_blkdev(rnbd_client_major, "rnbd");
1700 	if (rnbd_client_major <= 0) {
1701 		pr_err("Failed to load module, block device registration failed\n");
1702 		return -EBUSY;
1703 	}
1704 
1705 	err = rnbd_clt_create_sysfs_files();
1706 	if (err) {
1707 		pr_err("Failed to load module, creating sysfs device files failed, err: %d\n",
1708 		       err);
1709 		unregister_blkdev(rnbd_client_major, "rnbd");
1710 	}
1711 
1712 	return err;
1713 }
1714 
1715 static void __exit rnbd_client_exit(void)
1716 {
1717 	rnbd_destroy_sessions();
1718 	unregister_blkdev(rnbd_client_major, "rnbd");
1719 	ida_destroy(&index_ida);
1720 }
1721 
1722 module_init(rnbd_client_init);
1723 module_exit(rnbd_client_exit);
1724