xref: /openbmc/linux/drivers/block/rbd.c (revision 9c6d26df1fae6ad4718d51c48e6517913304ed27)
1 
2 /*
3    rbd.c -- Export ceph rados objects as a Linux block device
4 
5 
6    based on drivers/block/osdblk.c:
7 
8    Copyright 2009 Red Hat, Inc.
9 
10    This program is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation.
13 
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License for more details.
18 
19    You should have received a copy of the GNU General Public License
20    along with this program; see the file COPYING.  If not, write to
21    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
22 
23 
24 
25    For usage instructions, please refer to:
26 
27                  Documentation/ABI/testing/sysfs-bus-rbd
28 
29  */
30 
31 #include <linux/ceph/libceph.h>
32 #include <linux/ceph/osd_client.h>
33 #include <linux/ceph/mon_client.h>
34 #include <linux/ceph/cls_lock_client.h>
35 #include <linux/ceph/striper.h>
36 #include <linux/ceph/decode.h>
37 #include <linux/parser.h>
38 #include <linux/bsearch.h>
39 
40 #include <linux/kernel.h>
41 #include <linux/device.h>
42 #include <linux/module.h>
43 #include <linux/blk-mq.h>
44 #include <linux/fs.h>
45 #include <linux/blkdev.h>
46 #include <linux/slab.h>
47 #include <linux/idr.h>
48 #include <linux/workqueue.h>
49 
50 #include "rbd_types.h"
51 
52 #define RBD_DEBUG	/* Activate rbd_assert() calls */
53 
54 /*
55  * Increment the given counter and return its updated value.
56  * If the counter is already 0 it will not be incremented.
57  * If the counter is already at its maximum value returns
58  * -EINVAL without updating it.
59  */
60 static int atomic_inc_return_safe(atomic_t *v)
61 {
62 	unsigned int counter;
63 
64 	counter = (unsigned int)__atomic_add_unless(v, 1, 0);
65 	if (counter <= (unsigned int)INT_MAX)
66 		return (int)counter;
67 
68 	atomic_dec(v);
69 
70 	return -EINVAL;
71 }
72 
73 /* Decrement the counter.  Return the resulting value, or -EINVAL */
74 static int atomic_dec_return_safe(atomic_t *v)
75 {
76 	int counter;
77 
78 	counter = atomic_dec_return(v);
79 	if (counter >= 0)
80 		return counter;
81 
82 	atomic_inc(v);
83 
84 	return -EINVAL;
85 }
86 
87 #define RBD_DRV_NAME "rbd"
88 
89 #define RBD_MINORS_PER_MAJOR		256
90 #define RBD_SINGLE_MAJOR_PART_SHIFT	4
91 
92 #define RBD_MAX_PARENT_CHAIN_LEN	16
93 
94 #define RBD_SNAP_DEV_NAME_PREFIX	"snap_"
95 #define RBD_MAX_SNAP_NAME_LEN	\
96 			(NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
97 
98 #define RBD_MAX_SNAP_COUNT	510	/* allows max snapc to fit in 4KB */
99 
100 #define RBD_SNAP_HEAD_NAME	"-"
101 
102 #define	BAD_SNAP_INDEX	U32_MAX		/* invalid index into snap array */
103 
104 /* This allows a single page to hold an image name sent by OSD */
105 #define RBD_IMAGE_NAME_LEN_MAX	(PAGE_SIZE - sizeof (__le32) - 1)
106 #define RBD_IMAGE_ID_LEN_MAX	64
107 
108 #define RBD_OBJ_PREFIX_LEN_MAX	64
109 
110 #define RBD_NOTIFY_TIMEOUT	5	/* seconds */
111 #define RBD_RETRY_DELAY		msecs_to_jiffies(1000)
112 
113 /* Feature bits */
114 
115 #define RBD_FEATURE_LAYERING		(1ULL<<0)
116 #define RBD_FEATURE_STRIPINGV2		(1ULL<<1)
117 #define RBD_FEATURE_EXCLUSIVE_LOCK	(1ULL<<2)
118 #define RBD_FEATURE_DATA_POOL		(1ULL<<7)
119 #define RBD_FEATURE_OPERATIONS		(1ULL<<8)
120 
121 #define RBD_FEATURES_ALL	(RBD_FEATURE_LAYERING |		\
122 				 RBD_FEATURE_STRIPINGV2 |	\
123 				 RBD_FEATURE_EXCLUSIVE_LOCK |	\
124 				 RBD_FEATURE_DATA_POOL |	\
125 				 RBD_FEATURE_OPERATIONS)
126 
127 /* Features supported by this (client software) implementation. */
128 
129 #define RBD_FEATURES_SUPPORTED	(RBD_FEATURES_ALL)
130 
131 /*
132  * An RBD device name will be "rbd#", where the "rbd" comes from
133  * RBD_DRV_NAME above, and # is a unique integer identifier.
134  */
135 #define DEV_NAME_LEN		32
136 
137 /*
138  * block device image metadata (in-memory version)
139  */
140 struct rbd_image_header {
141 	/* These six fields never change for a given rbd image */
142 	char *object_prefix;
143 	__u8 obj_order;
144 	u64 stripe_unit;
145 	u64 stripe_count;
146 	s64 data_pool_id;
147 	u64 features;		/* Might be changeable someday? */
148 
149 	/* The remaining fields need to be updated occasionally */
150 	u64 image_size;
151 	struct ceph_snap_context *snapc;
152 	char *snap_names;	/* format 1 only */
153 	u64 *snap_sizes;	/* format 1 only */
154 };
155 
156 /*
157  * An rbd image specification.
158  *
159  * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
160  * identify an image.  Each rbd_dev structure includes a pointer to
161  * an rbd_spec structure that encapsulates this identity.
162  *
163  * Each of the id's in an rbd_spec has an associated name.  For a
164  * user-mapped image, the names are supplied and the id's associated
165  * with them are looked up.  For a layered image, a parent image is
166  * defined by the tuple, and the names are looked up.
167  *
168  * An rbd_dev structure contains a parent_spec pointer which is
169  * non-null if the image it represents is a child in a layered
170  * image.  This pointer will refer to the rbd_spec structure used
171  * by the parent rbd_dev for its own identity (i.e., the structure
172  * is shared between the parent and child).
173  *
174  * Since these structures are populated once, during the discovery
175  * phase of image construction, they are effectively immutable so
176  * we make no effort to synchronize access to them.
177  *
178  * Note that code herein does not assume the image name is known (it
179  * could be a null pointer).
180  */
181 struct rbd_spec {
182 	u64		pool_id;
183 	const char	*pool_name;
184 
185 	const char	*image_id;
186 	const char	*image_name;
187 
188 	u64		snap_id;
189 	const char	*snap_name;
190 
191 	struct kref	kref;
192 };
193 
194 /*
195  * an instance of the client.  multiple devices may share an rbd client.
196  */
197 struct rbd_client {
198 	struct ceph_client	*client;
199 	struct kref		kref;
200 	struct list_head	node;
201 };
202 
203 struct rbd_img_request;
204 
205 enum obj_request_type {
206 	OBJ_REQUEST_NODATA = 1,
207 	OBJ_REQUEST_BIO,	/* pointer into provided bio (list) */
208 	OBJ_REQUEST_BVECS,	/* pointer into provided bio_vec array */
209 	OBJ_REQUEST_OWN_BVECS,	/* private bio_vec array, doesn't own pages */
210 };
211 
212 enum obj_operation_type {
213 	OBJ_OP_READ = 1,
214 	OBJ_OP_WRITE,
215 	OBJ_OP_DISCARD,
216 };
217 
218 /*
219  * Writes go through the following state machine to deal with
220  * layering:
221  *
222  *                       need copyup
223  * RBD_OBJ_WRITE_GUARD ---------------> RBD_OBJ_WRITE_COPYUP
224  *        |     ^                              |
225  *        v     \------------------------------/
226  *      done
227  *        ^
228  *        |
229  * RBD_OBJ_WRITE_FLAT
230  *
231  * Writes start in RBD_OBJ_WRITE_GUARD or _FLAT, depending on whether
232  * there is a parent or not.
233  */
234 enum rbd_obj_write_state {
235 	RBD_OBJ_WRITE_FLAT = 1,
236 	RBD_OBJ_WRITE_GUARD,
237 	RBD_OBJ_WRITE_COPYUP,
238 };
239 
240 struct rbd_obj_request {
241 	struct ceph_object_extent ex;
242 	union {
243 		bool			tried_parent;	/* for reads */
244 		enum rbd_obj_write_state write_state;	/* for writes */
245 	};
246 
247 	struct rbd_img_request	*img_request;
248 	struct ceph_file_extent	*img_extents;
249 	u32			num_img_extents;
250 
251 	union {
252 		struct ceph_bio_iter	bio_pos;
253 		struct {
254 			struct ceph_bvec_iter	bvec_pos;
255 			u32			bvec_count;
256 			u32			bvec_idx;
257 		};
258 	};
259 	struct bio_vec		*copyup_bvecs;
260 	u32			copyup_bvec_count;
261 
262 	struct ceph_osd_request	*osd_req;
263 
264 	u64			xferred;	/* bytes transferred */
265 	int			result;
266 
267 	struct kref		kref;
268 };
269 
270 enum img_req_flags {
271 	IMG_REQ_CHILD,		/* initiator: block = 0, child image = 1 */
272 	IMG_REQ_LAYERED,	/* ENOENT handling: normal = 0, layered = 1 */
273 };
274 
275 struct rbd_img_request {
276 	struct rbd_device	*rbd_dev;
277 	enum obj_operation_type	op_type;
278 	enum obj_request_type	data_type;
279 	unsigned long		flags;
280 	union {
281 		u64			snap_id;	/* for reads */
282 		struct ceph_snap_context *snapc;	/* for writes */
283 	};
284 	union {
285 		struct request		*rq;		/* block request */
286 		struct rbd_obj_request	*obj_request;	/* obj req initiator */
287 	};
288 	spinlock_t		completion_lock;
289 	u64			xferred;/* aggregate bytes transferred */
290 	int			result;	/* first nonzero obj_request result */
291 
292 	struct list_head	object_extents;	/* obj_req.ex structs */
293 	u32			obj_request_count;
294 	u32			pending_count;
295 
296 	struct kref		kref;
297 };
298 
299 #define for_each_obj_request(ireq, oreq) \
300 	list_for_each_entry(oreq, &(ireq)->object_extents, ex.oe_item)
301 #define for_each_obj_request_safe(ireq, oreq, n) \
302 	list_for_each_entry_safe(oreq, n, &(ireq)->object_extents, ex.oe_item)
303 
304 enum rbd_watch_state {
305 	RBD_WATCH_STATE_UNREGISTERED,
306 	RBD_WATCH_STATE_REGISTERED,
307 	RBD_WATCH_STATE_ERROR,
308 };
309 
310 enum rbd_lock_state {
311 	RBD_LOCK_STATE_UNLOCKED,
312 	RBD_LOCK_STATE_LOCKED,
313 	RBD_LOCK_STATE_RELEASING,
314 };
315 
316 /* WatchNotify::ClientId */
317 struct rbd_client_id {
318 	u64 gid;
319 	u64 handle;
320 };
321 
322 struct rbd_mapping {
323 	u64                     size;
324 	u64                     features;
325 };
326 
327 /*
328  * a single device
329  */
330 struct rbd_device {
331 	int			dev_id;		/* blkdev unique id */
332 
333 	int			major;		/* blkdev assigned major */
334 	int			minor;
335 	struct gendisk		*disk;		/* blkdev's gendisk and rq */
336 
337 	u32			image_format;	/* Either 1 or 2 */
338 	struct rbd_client	*rbd_client;
339 
340 	char			name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
341 
342 	spinlock_t		lock;		/* queue, flags, open_count */
343 
344 	struct rbd_image_header	header;
345 	unsigned long		flags;		/* possibly lock protected */
346 	struct rbd_spec		*spec;
347 	struct rbd_options	*opts;
348 	char			*config_info;	/* add{,_single_major} string */
349 
350 	struct ceph_object_id	header_oid;
351 	struct ceph_object_locator header_oloc;
352 
353 	struct ceph_file_layout	layout;		/* used for all rbd requests */
354 
355 	struct mutex		watch_mutex;
356 	enum rbd_watch_state	watch_state;
357 	struct ceph_osd_linger_request *watch_handle;
358 	u64			watch_cookie;
359 	struct delayed_work	watch_dwork;
360 
361 	struct rw_semaphore	lock_rwsem;
362 	enum rbd_lock_state	lock_state;
363 	char			lock_cookie[32];
364 	struct rbd_client_id	owner_cid;
365 	struct work_struct	acquired_lock_work;
366 	struct work_struct	released_lock_work;
367 	struct delayed_work	lock_dwork;
368 	struct work_struct	unlock_work;
369 	wait_queue_head_t	lock_waitq;
370 
371 	struct workqueue_struct	*task_wq;
372 
373 	struct rbd_spec		*parent_spec;
374 	u64			parent_overlap;
375 	atomic_t		parent_ref;
376 	struct rbd_device	*parent;
377 
378 	/* Block layer tags. */
379 	struct blk_mq_tag_set	tag_set;
380 
381 	/* protects updating the header */
382 	struct rw_semaphore     header_rwsem;
383 
384 	struct rbd_mapping	mapping;
385 
386 	struct list_head	node;
387 
388 	/* sysfs related */
389 	struct device		dev;
390 	unsigned long		open_count;	/* protected by lock */
391 };
392 
393 /*
394  * Flag bits for rbd_dev->flags:
395  * - REMOVING (which is coupled with rbd_dev->open_count) is protected
396  *   by rbd_dev->lock
397  * - BLACKLISTED is protected by rbd_dev->lock_rwsem
398  */
399 enum rbd_dev_flags {
400 	RBD_DEV_FLAG_EXISTS,	/* mapped snapshot has not been deleted */
401 	RBD_DEV_FLAG_REMOVING,	/* this mapping is being removed */
402 	RBD_DEV_FLAG_BLACKLISTED, /* our ceph_client is blacklisted */
403 };
404 
405 static DEFINE_MUTEX(client_mutex);	/* Serialize client creation */
406 
407 static LIST_HEAD(rbd_dev_list);    /* devices */
408 static DEFINE_SPINLOCK(rbd_dev_list_lock);
409 
410 static LIST_HEAD(rbd_client_list);		/* clients */
411 static DEFINE_SPINLOCK(rbd_client_list_lock);
412 
413 /* Slab caches for frequently-allocated structures */
414 
415 static struct kmem_cache	*rbd_img_request_cache;
416 static struct kmem_cache	*rbd_obj_request_cache;
417 
418 static int rbd_major;
419 static DEFINE_IDA(rbd_dev_id_ida);
420 
421 static struct workqueue_struct *rbd_wq;
422 
423 /*
424  * single-major requires >= 0.75 version of userspace rbd utility.
425  */
426 static bool single_major = true;
427 module_param(single_major, bool, S_IRUGO);
428 MODULE_PARM_DESC(single_major, "Use a single major number for all rbd devices (default: true)");
429 
430 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
431 		       size_t count);
432 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
433 			  size_t count);
434 static ssize_t rbd_add_single_major(struct bus_type *bus, const char *buf,
435 				    size_t count);
436 static ssize_t rbd_remove_single_major(struct bus_type *bus, const char *buf,
437 				       size_t count);
438 static int rbd_dev_image_probe(struct rbd_device *rbd_dev, int depth);
439 
440 static int rbd_dev_id_to_minor(int dev_id)
441 {
442 	return dev_id << RBD_SINGLE_MAJOR_PART_SHIFT;
443 }
444 
445 static int minor_to_rbd_dev_id(int minor)
446 {
447 	return minor >> RBD_SINGLE_MAJOR_PART_SHIFT;
448 }
449 
450 static bool __rbd_is_lock_owner(struct rbd_device *rbd_dev)
451 {
452 	return rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED ||
453 	       rbd_dev->lock_state == RBD_LOCK_STATE_RELEASING;
454 }
455 
456 static bool rbd_is_lock_owner(struct rbd_device *rbd_dev)
457 {
458 	bool is_lock_owner;
459 
460 	down_read(&rbd_dev->lock_rwsem);
461 	is_lock_owner = __rbd_is_lock_owner(rbd_dev);
462 	up_read(&rbd_dev->lock_rwsem);
463 	return is_lock_owner;
464 }
465 
466 static ssize_t rbd_supported_features_show(struct bus_type *bus, char *buf)
467 {
468 	return sprintf(buf, "0x%llx\n", RBD_FEATURES_SUPPORTED);
469 }
470 
471 static BUS_ATTR(add, S_IWUSR, NULL, rbd_add);
472 static BUS_ATTR(remove, S_IWUSR, NULL, rbd_remove);
473 static BUS_ATTR(add_single_major, S_IWUSR, NULL, rbd_add_single_major);
474 static BUS_ATTR(remove_single_major, S_IWUSR, NULL, rbd_remove_single_major);
475 static BUS_ATTR(supported_features, S_IRUGO, rbd_supported_features_show, NULL);
476 
477 static struct attribute *rbd_bus_attrs[] = {
478 	&bus_attr_add.attr,
479 	&bus_attr_remove.attr,
480 	&bus_attr_add_single_major.attr,
481 	&bus_attr_remove_single_major.attr,
482 	&bus_attr_supported_features.attr,
483 	NULL,
484 };
485 
486 static umode_t rbd_bus_is_visible(struct kobject *kobj,
487 				  struct attribute *attr, int index)
488 {
489 	if (!single_major &&
490 	    (attr == &bus_attr_add_single_major.attr ||
491 	     attr == &bus_attr_remove_single_major.attr))
492 		return 0;
493 
494 	return attr->mode;
495 }
496 
497 static const struct attribute_group rbd_bus_group = {
498 	.attrs = rbd_bus_attrs,
499 	.is_visible = rbd_bus_is_visible,
500 };
501 __ATTRIBUTE_GROUPS(rbd_bus);
502 
503 static struct bus_type rbd_bus_type = {
504 	.name		= "rbd",
505 	.bus_groups	= rbd_bus_groups,
506 };
507 
508 static void rbd_root_dev_release(struct device *dev)
509 {
510 }
511 
512 static struct device rbd_root_dev = {
513 	.init_name =    "rbd",
514 	.release =      rbd_root_dev_release,
515 };
516 
517 static __printf(2, 3)
518 void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
519 {
520 	struct va_format vaf;
521 	va_list args;
522 
523 	va_start(args, fmt);
524 	vaf.fmt = fmt;
525 	vaf.va = &args;
526 
527 	if (!rbd_dev)
528 		printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
529 	else if (rbd_dev->disk)
530 		printk(KERN_WARNING "%s: %s: %pV\n",
531 			RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
532 	else if (rbd_dev->spec && rbd_dev->spec->image_name)
533 		printk(KERN_WARNING "%s: image %s: %pV\n",
534 			RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
535 	else if (rbd_dev->spec && rbd_dev->spec->image_id)
536 		printk(KERN_WARNING "%s: id %s: %pV\n",
537 			RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
538 	else	/* punt */
539 		printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
540 			RBD_DRV_NAME, rbd_dev, &vaf);
541 	va_end(args);
542 }
543 
544 #ifdef RBD_DEBUG
545 #define rbd_assert(expr)						\
546 		if (unlikely(!(expr))) {				\
547 			printk(KERN_ERR "\nAssertion failure in %s() "	\
548 						"at line %d:\n\n"	\
549 					"\trbd_assert(%s);\n\n",	\
550 					__func__, __LINE__, #expr);	\
551 			BUG();						\
552 		}
553 #else /* !RBD_DEBUG */
554 #  define rbd_assert(expr)	((void) 0)
555 #endif /* !RBD_DEBUG */
556 
557 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev);
558 
559 static int rbd_dev_refresh(struct rbd_device *rbd_dev);
560 static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev);
561 static int rbd_dev_header_info(struct rbd_device *rbd_dev);
562 static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev);
563 static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
564 					u64 snap_id);
565 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
566 				u8 *order, u64 *snap_size);
567 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
568 		u64 *snap_features);
569 
570 static int rbd_open(struct block_device *bdev, fmode_t mode)
571 {
572 	struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
573 	bool removing = false;
574 
575 	spin_lock_irq(&rbd_dev->lock);
576 	if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags))
577 		removing = true;
578 	else
579 		rbd_dev->open_count++;
580 	spin_unlock_irq(&rbd_dev->lock);
581 	if (removing)
582 		return -ENOENT;
583 
584 	(void) get_device(&rbd_dev->dev);
585 
586 	return 0;
587 }
588 
589 static void rbd_release(struct gendisk *disk, fmode_t mode)
590 {
591 	struct rbd_device *rbd_dev = disk->private_data;
592 	unsigned long open_count_before;
593 
594 	spin_lock_irq(&rbd_dev->lock);
595 	open_count_before = rbd_dev->open_count--;
596 	spin_unlock_irq(&rbd_dev->lock);
597 	rbd_assert(open_count_before > 0);
598 
599 	put_device(&rbd_dev->dev);
600 }
601 
602 static int rbd_ioctl_set_ro(struct rbd_device *rbd_dev, unsigned long arg)
603 {
604 	int ro;
605 
606 	if (get_user(ro, (int __user *)arg))
607 		return -EFAULT;
608 
609 	/* Snapshots can't be marked read-write */
610 	if (rbd_dev->spec->snap_id != CEPH_NOSNAP && !ro)
611 		return -EROFS;
612 
613 	/* Let blkdev_roset() handle it */
614 	return -ENOTTY;
615 }
616 
617 static int rbd_ioctl(struct block_device *bdev, fmode_t mode,
618 			unsigned int cmd, unsigned long arg)
619 {
620 	struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
621 	int ret;
622 
623 	switch (cmd) {
624 	case BLKROSET:
625 		ret = rbd_ioctl_set_ro(rbd_dev, arg);
626 		break;
627 	default:
628 		ret = -ENOTTY;
629 	}
630 
631 	return ret;
632 }
633 
634 #ifdef CONFIG_COMPAT
635 static int rbd_compat_ioctl(struct block_device *bdev, fmode_t mode,
636 				unsigned int cmd, unsigned long arg)
637 {
638 	return rbd_ioctl(bdev, mode, cmd, arg);
639 }
640 #endif /* CONFIG_COMPAT */
641 
642 static const struct block_device_operations rbd_bd_ops = {
643 	.owner			= THIS_MODULE,
644 	.open			= rbd_open,
645 	.release		= rbd_release,
646 	.ioctl			= rbd_ioctl,
647 #ifdef CONFIG_COMPAT
648 	.compat_ioctl		= rbd_compat_ioctl,
649 #endif
650 };
651 
652 /*
653  * Initialize an rbd client instance.  Success or not, this function
654  * consumes ceph_opts.  Caller holds client_mutex.
655  */
656 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
657 {
658 	struct rbd_client *rbdc;
659 	int ret = -ENOMEM;
660 
661 	dout("%s:\n", __func__);
662 	rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
663 	if (!rbdc)
664 		goto out_opt;
665 
666 	kref_init(&rbdc->kref);
667 	INIT_LIST_HEAD(&rbdc->node);
668 
669 	rbdc->client = ceph_create_client(ceph_opts, rbdc);
670 	if (IS_ERR(rbdc->client))
671 		goto out_rbdc;
672 	ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
673 
674 	ret = ceph_open_session(rbdc->client);
675 	if (ret < 0)
676 		goto out_client;
677 
678 	spin_lock(&rbd_client_list_lock);
679 	list_add_tail(&rbdc->node, &rbd_client_list);
680 	spin_unlock(&rbd_client_list_lock);
681 
682 	dout("%s: rbdc %p\n", __func__, rbdc);
683 
684 	return rbdc;
685 out_client:
686 	ceph_destroy_client(rbdc->client);
687 out_rbdc:
688 	kfree(rbdc);
689 out_opt:
690 	if (ceph_opts)
691 		ceph_destroy_options(ceph_opts);
692 	dout("%s: error %d\n", __func__, ret);
693 
694 	return ERR_PTR(ret);
695 }
696 
697 static struct rbd_client *__rbd_get_client(struct rbd_client *rbdc)
698 {
699 	kref_get(&rbdc->kref);
700 
701 	return rbdc;
702 }
703 
704 /*
705  * Find a ceph client with specific addr and configuration.  If
706  * found, bump its reference count.
707  */
708 static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
709 {
710 	struct rbd_client *client_node;
711 	bool found = false;
712 
713 	if (ceph_opts->flags & CEPH_OPT_NOSHARE)
714 		return NULL;
715 
716 	spin_lock(&rbd_client_list_lock);
717 	list_for_each_entry(client_node, &rbd_client_list, node) {
718 		if (!ceph_compare_options(ceph_opts, client_node->client)) {
719 			__rbd_get_client(client_node);
720 
721 			found = true;
722 			break;
723 		}
724 	}
725 	spin_unlock(&rbd_client_list_lock);
726 
727 	return found ? client_node : NULL;
728 }
729 
730 /*
731  * (Per device) rbd map options
732  */
733 enum {
734 	Opt_queue_depth,
735 	Opt_last_int,
736 	/* int args above */
737 	Opt_last_string,
738 	/* string args above */
739 	Opt_read_only,
740 	Opt_read_write,
741 	Opt_lock_on_read,
742 	Opt_exclusive,
743 	Opt_err
744 };
745 
746 static match_table_t rbd_opts_tokens = {
747 	{Opt_queue_depth, "queue_depth=%d"},
748 	/* int args above */
749 	/* string args above */
750 	{Opt_read_only, "read_only"},
751 	{Opt_read_only, "ro"},		/* Alternate spelling */
752 	{Opt_read_write, "read_write"},
753 	{Opt_read_write, "rw"},		/* Alternate spelling */
754 	{Opt_lock_on_read, "lock_on_read"},
755 	{Opt_exclusive, "exclusive"},
756 	{Opt_err, NULL}
757 };
758 
759 struct rbd_options {
760 	int	queue_depth;
761 	bool	read_only;
762 	bool	lock_on_read;
763 	bool	exclusive;
764 };
765 
766 #define RBD_QUEUE_DEPTH_DEFAULT	BLKDEV_MAX_RQ
767 #define RBD_READ_ONLY_DEFAULT	false
768 #define RBD_LOCK_ON_READ_DEFAULT false
769 #define RBD_EXCLUSIVE_DEFAULT	false
770 
771 static int parse_rbd_opts_token(char *c, void *private)
772 {
773 	struct rbd_options *rbd_opts = private;
774 	substring_t argstr[MAX_OPT_ARGS];
775 	int token, intval, ret;
776 
777 	token = match_token(c, rbd_opts_tokens, argstr);
778 	if (token < Opt_last_int) {
779 		ret = match_int(&argstr[0], &intval);
780 		if (ret < 0) {
781 			pr_err("bad mount option arg (not int) at '%s'\n", c);
782 			return ret;
783 		}
784 		dout("got int token %d val %d\n", token, intval);
785 	} else if (token > Opt_last_int && token < Opt_last_string) {
786 		dout("got string token %d val %s\n", token, argstr[0].from);
787 	} else {
788 		dout("got token %d\n", token);
789 	}
790 
791 	switch (token) {
792 	case Opt_queue_depth:
793 		if (intval < 1) {
794 			pr_err("queue_depth out of range\n");
795 			return -EINVAL;
796 		}
797 		rbd_opts->queue_depth = intval;
798 		break;
799 	case Opt_read_only:
800 		rbd_opts->read_only = true;
801 		break;
802 	case Opt_read_write:
803 		rbd_opts->read_only = false;
804 		break;
805 	case Opt_lock_on_read:
806 		rbd_opts->lock_on_read = true;
807 		break;
808 	case Opt_exclusive:
809 		rbd_opts->exclusive = true;
810 		break;
811 	default:
812 		/* libceph prints "bad option" msg */
813 		return -EINVAL;
814 	}
815 
816 	return 0;
817 }
818 
819 static char* obj_op_name(enum obj_operation_type op_type)
820 {
821 	switch (op_type) {
822 	case OBJ_OP_READ:
823 		return "read";
824 	case OBJ_OP_WRITE:
825 		return "write";
826 	case OBJ_OP_DISCARD:
827 		return "discard";
828 	default:
829 		return "???";
830 	}
831 }
832 
833 /*
834  * Destroy ceph client
835  *
836  * Caller must hold rbd_client_list_lock.
837  */
838 static void rbd_client_release(struct kref *kref)
839 {
840 	struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
841 
842 	dout("%s: rbdc %p\n", __func__, rbdc);
843 	spin_lock(&rbd_client_list_lock);
844 	list_del(&rbdc->node);
845 	spin_unlock(&rbd_client_list_lock);
846 
847 	ceph_destroy_client(rbdc->client);
848 	kfree(rbdc);
849 }
850 
851 /*
852  * Drop reference to ceph client node. If it's not referenced anymore, release
853  * it.
854  */
855 static void rbd_put_client(struct rbd_client *rbdc)
856 {
857 	if (rbdc)
858 		kref_put(&rbdc->kref, rbd_client_release);
859 }
860 
861 static int wait_for_latest_osdmap(struct ceph_client *client)
862 {
863 	u64 newest_epoch;
864 	int ret;
865 
866 	ret = ceph_monc_get_version(&client->monc, "osdmap", &newest_epoch);
867 	if (ret)
868 		return ret;
869 
870 	if (client->osdc.osdmap->epoch >= newest_epoch)
871 		return 0;
872 
873 	ceph_osdc_maybe_request_map(&client->osdc);
874 	return ceph_monc_wait_osdmap(&client->monc, newest_epoch,
875 				     client->options->mount_timeout);
876 }
877 
878 /*
879  * Get a ceph client with specific addr and configuration, if one does
880  * not exist create it.  Either way, ceph_opts is consumed by this
881  * function.
882  */
883 static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
884 {
885 	struct rbd_client *rbdc;
886 	int ret;
887 
888 	mutex_lock_nested(&client_mutex, SINGLE_DEPTH_NESTING);
889 	rbdc = rbd_client_find(ceph_opts);
890 	if (rbdc) {
891 		ceph_destroy_options(ceph_opts);
892 
893 		/*
894 		 * Using an existing client.  Make sure ->pg_pools is up to
895 		 * date before we look up the pool id in do_rbd_add().
896 		 */
897 		ret = wait_for_latest_osdmap(rbdc->client);
898 		if (ret) {
899 			rbd_warn(NULL, "failed to get latest osdmap: %d", ret);
900 			rbd_put_client(rbdc);
901 			rbdc = ERR_PTR(ret);
902 		}
903 	} else {
904 		rbdc = rbd_client_create(ceph_opts);
905 	}
906 	mutex_unlock(&client_mutex);
907 
908 	return rbdc;
909 }
910 
911 static bool rbd_image_format_valid(u32 image_format)
912 {
913 	return image_format == 1 || image_format == 2;
914 }
915 
916 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
917 {
918 	size_t size;
919 	u32 snap_count;
920 
921 	/* The header has to start with the magic rbd header text */
922 	if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
923 		return false;
924 
925 	/* The bio layer requires at least sector-sized I/O */
926 
927 	if (ondisk->options.order < SECTOR_SHIFT)
928 		return false;
929 
930 	/* If we use u64 in a few spots we may be able to loosen this */
931 
932 	if (ondisk->options.order > 8 * sizeof (int) - 1)
933 		return false;
934 
935 	/*
936 	 * The size of a snapshot header has to fit in a size_t, and
937 	 * that limits the number of snapshots.
938 	 */
939 	snap_count = le32_to_cpu(ondisk->snap_count);
940 	size = SIZE_MAX - sizeof (struct ceph_snap_context);
941 	if (snap_count > size / sizeof (__le64))
942 		return false;
943 
944 	/*
945 	 * Not only that, but the size of the entire the snapshot
946 	 * header must also be representable in a size_t.
947 	 */
948 	size -= snap_count * sizeof (__le64);
949 	if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
950 		return false;
951 
952 	return true;
953 }
954 
955 /*
956  * returns the size of an object in the image
957  */
958 static u32 rbd_obj_bytes(struct rbd_image_header *header)
959 {
960 	return 1U << header->obj_order;
961 }
962 
963 static void rbd_init_layout(struct rbd_device *rbd_dev)
964 {
965 	if (rbd_dev->header.stripe_unit == 0 ||
966 	    rbd_dev->header.stripe_count == 0) {
967 		rbd_dev->header.stripe_unit = rbd_obj_bytes(&rbd_dev->header);
968 		rbd_dev->header.stripe_count = 1;
969 	}
970 
971 	rbd_dev->layout.stripe_unit = rbd_dev->header.stripe_unit;
972 	rbd_dev->layout.stripe_count = rbd_dev->header.stripe_count;
973 	rbd_dev->layout.object_size = rbd_obj_bytes(&rbd_dev->header);
974 	rbd_dev->layout.pool_id = rbd_dev->header.data_pool_id == CEPH_NOPOOL ?
975 			  rbd_dev->spec->pool_id : rbd_dev->header.data_pool_id;
976 	RCU_INIT_POINTER(rbd_dev->layout.pool_ns, NULL);
977 }
978 
979 /*
980  * Fill an rbd image header with information from the given format 1
981  * on-disk header.
982  */
983 static int rbd_header_from_disk(struct rbd_device *rbd_dev,
984 				 struct rbd_image_header_ondisk *ondisk)
985 {
986 	struct rbd_image_header *header = &rbd_dev->header;
987 	bool first_time = header->object_prefix == NULL;
988 	struct ceph_snap_context *snapc;
989 	char *object_prefix = NULL;
990 	char *snap_names = NULL;
991 	u64 *snap_sizes = NULL;
992 	u32 snap_count;
993 	int ret = -ENOMEM;
994 	u32 i;
995 
996 	/* Allocate this now to avoid having to handle failure below */
997 
998 	if (first_time) {
999 		object_prefix = kstrndup(ondisk->object_prefix,
1000 					 sizeof(ondisk->object_prefix),
1001 					 GFP_KERNEL);
1002 		if (!object_prefix)
1003 			return -ENOMEM;
1004 	}
1005 
1006 	/* Allocate the snapshot context and fill it in */
1007 
1008 	snap_count = le32_to_cpu(ondisk->snap_count);
1009 	snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
1010 	if (!snapc)
1011 		goto out_err;
1012 	snapc->seq = le64_to_cpu(ondisk->snap_seq);
1013 	if (snap_count) {
1014 		struct rbd_image_snap_ondisk *snaps;
1015 		u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
1016 
1017 		/* We'll keep a copy of the snapshot names... */
1018 
1019 		if (snap_names_len > (u64)SIZE_MAX)
1020 			goto out_2big;
1021 		snap_names = kmalloc(snap_names_len, GFP_KERNEL);
1022 		if (!snap_names)
1023 			goto out_err;
1024 
1025 		/* ...as well as the array of their sizes. */
1026 		snap_sizes = kmalloc_array(snap_count,
1027 					   sizeof(*header->snap_sizes),
1028 					   GFP_KERNEL);
1029 		if (!snap_sizes)
1030 			goto out_err;
1031 
1032 		/*
1033 		 * Copy the names, and fill in each snapshot's id
1034 		 * and size.
1035 		 *
1036 		 * Note that rbd_dev_v1_header_info() guarantees the
1037 		 * ondisk buffer we're working with has
1038 		 * snap_names_len bytes beyond the end of the
1039 		 * snapshot id array, this memcpy() is safe.
1040 		 */
1041 		memcpy(snap_names, &ondisk->snaps[snap_count], snap_names_len);
1042 		snaps = ondisk->snaps;
1043 		for (i = 0; i < snap_count; i++) {
1044 			snapc->snaps[i] = le64_to_cpu(snaps[i].id);
1045 			snap_sizes[i] = le64_to_cpu(snaps[i].image_size);
1046 		}
1047 	}
1048 
1049 	/* We won't fail any more, fill in the header */
1050 
1051 	if (first_time) {
1052 		header->object_prefix = object_prefix;
1053 		header->obj_order = ondisk->options.order;
1054 		rbd_init_layout(rbd_dev);
1055 	} else {
1056 		ceph_put_snap_context(header->snapc);
1057 		kfree(header->snap_names);
1058 		kfree(header->snap_sizes);
1059 	}
1060 
1061 	/* The remaining fields always get updated (when we refresh) */
1062 
1063 	header->image_size = le64_to_cpu(ondisk->image_size);
1064 	header->snapc = snapc;
1065 	header->snap_names = snap_names;
1066 	header->snap_sizes = snap_sizes;
1067 
1068 	return 0;
1069 out_2big:
1070 	ret = -EIO;
1071 out_err:
1072 	kfree(snap_sizes);
1073 	kfree(snap_names);
1074 	ceph_put_snap_context(snapc);
1075 	kfree(object_prefix);
1076 
1077 	return ret;
1078 }
1079 
1080 static const char *_rbd_dev_v1_snap_name(struct rbd_device *rbd_dev, u32 which)
1081 {
1082 	const char *snap_name;
1083 
1084 	rbd_assert(which < rbd_dev->header.snapc->num_snaps);
1085 
1086 	/* Skip over names until we find the one we are looking for */
1087 
1088 	snap_name = rbd_dev->header.snap_names;
1089 	while (which--)
1090 		snap_name += strlen(snap_name) + 1;
1091 
1092 	return kstrdup(snap_name, GFP_KERNEL);
1093 }
1094 
1095 /*
1096  * Snapshot id comparison function for use with qsort()/bsearch().
1097  * Note that result is for snapshots in *descending* order.
1098  */
1099 static int snapid_compare_reverse(const void *s1, const void *s2)
1100 {
1101 	u64 snap_id1 = *(u64 *)s1;
1102 	u64 snap_id2 = *(u64 *)s2;
1103 
1104 	if (snap_id1 < snap_id2)
1105 		return 1;
1106 	return snap_id1 == snap_id2 ? 0 : -1;
1107 }
1108 
1109 /*
1110  * Search a snapshot context to see if the given snapshot id is
1111  * present.
1112  *
1113  * Returns the position of the snapshot id in the array if it's found,
1114  * or BAD_SNAP_INDEX otherwise.
1115  *
1116  * Note: The snapshot array is in kept sorted (by the osd) in
1117  * reverse order, highest snapshot id first.
1118  */
1119 static u32 rbd_dev_snap_index(struct rbd_device *rbd_dev, u64 snap_id)
1120 {
1121 	struct ceph_snap_context *snapc = rbd_dev->header.snapc;
1122 	u64 *found;
1123 
1124 	found = bsearch(&snap_id, &snapc->snaps, snapc->num_snaps,
1125 				sizeof (snap_id), snapid_compare_reverse);
1126 
1127 	return found ? (u32)(found - &snapc->snaps[0]) : BAD_SNAP_INDEX;
1128 }
1129 
1130 static const char *rbd_dev_v1_snap_name(struct rbd_device *rbd_dev,
1131 					u64 snap_id)
1132 {
1133 	u32 which;
1134 	const char *snap_name;
1135 
1136 	which = rbd_dev_snap_index(rbd_dev, snap_id);
1137 	if (which == BAD_SNAP_INDEX)
1138 		return ERR_PTR(-ENOENT);
1139 
1140 	snap_name = _rbd_dev_v1_snap_name(rbd_dev, which);
1141 	return snap_name ? snap_name : ERR_PTR(-ENOMEM);
1142 }
1143 
1144 static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
1145 {
1146 	if (snap_id == CEPH_NOSNAP)
1147 		return RBD_SNAP_HEAD_NAME;
1148 
1149 	rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1150 	if (rbd_dev->image_format == 1)
1151 		return rbd_dev_v1_snap_name(rbd_dev, snap_id);
1152 
1153 	return rbd_dev_v2_snap_name(rbd_dev, snap_id);
1154 }
1155 
1156 static int rbd_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
1157 				u64 *snap_size)
1158 {
1159 	rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1160 	if (snap_id == CEPH_NOSNAP) {
1161 		*snap_size = rbd_dev->header.image_size;
1162 	} else if (rbd_dev->image_format == 1) {
1163 		u32 which;
1164 
1165 		which = rbd_dev_snap_index(rbd_dev, snap_id);
1166 		if (which == BAD_SNAP_INDEX)
1167 			return -ENOENT;
1168 
1169 		*snap_size = rbd_dev->header.snap_sizes[which];
1170 	} else {
1171 		u64 size = 0;
1172 		int ret;
1173 
1174 		ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, NULL, &size);
1175 		if (ret)
1176 			return ret;
1177 
1178 		*snap_size = size;
1179 	}
1180 	return 0;
1181 }
1182 
1183 static int rbd_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
1184 			u64 *snap_features)
1185 {
1186 	rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1187 	if (snap_id == CEPH_NOSNAP) {
1188 		*snap_features = rbd_dev->header.features;
1189 	} else if (rbd_dev->image_format == 1) {
1190 		*snap_features = 0;	/* No features for format 1 */
1191 	} else {
1192 		u64 features = 0;
1193 		int ret;
1194 
1195 		ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, &features);
1196 		if (ret)
1197 			return ret;
1198 
1199 		*snap_features = features;
1200 	}
1201 	return 0;
1202 }
1203 
1204 static int rbd_dev_mapping_set(struct rbd_device *rbd_dev)
1205 {
1206 	u64 snap_id = rbd_dev->spec->snap_id;
1207 	u64 size = 0;
1208 	u64 features = 0;
1209 	int ret;
1210 
1211 	ret = rbd_snap_size(rbd_dev, snap_id, &size);
1212 	if (ret)
1213 		return ret;
1214 	ret = rbd_snap_features(rbd_dev, snap_id, &features);
1215 	if (ret)
1216 		return ret;
1217 
1218 	rbd_dev->mapping.size = size;
1219 	rbd_dev->mapping.features = features;
1220 
1221 	return 0;
1222 }
1223 
1224 static void rbd_dev_mapping_clear(struct rbd_device *rbd_dev)
1225 {
1226 	rbd_dev->mapping.size = 0;
1227 	rbd_dev->mapping.features = 0;
1228 }
1229 
1230 static void zero_bvec(struct bio_vec *bv)
1231 {
1232 	void *buf;
1233 	unsigned long flags;
1234 
1235 	buf = bvec_kmap_irq(bv, &flags);
1236 	memset(buf, 0, bv->bv_len);
1237 	flush_dcache_page(bv->bv_page);
1238 	bvec_kunmap_irq(buf, &flags);
1239 }
1240 
1241 static void zero_bios(struct ceph_bio_iter *bio_pos, u32 off, u32 bytes)
1242 {
1243 	struct ceph_bio_iter it = *bio_pos;
1244 
1245 	ceph_bio_iter_advance(&it, off);
1246 	ceph_bio_iter_advance_step(&it, bytes, ({
1247 		zero_bvec(&bv);
1248 	}));
1249 }
1250 
1251 static void zero_bvecs(struct ceph_bvec_iter *bvec_pos, u32 off, u32 bytes)
1252 {
1253 	struct ceph_bvec_iter it = *bvec_pos;
1254 
1255 	ceph_bvec_iter_advance(&it, off);
1256 	ceph_bvec_iter_advance_step(&it, bytes, ({
1257 		zero_bvec(&bv);
1258 	}));
1259 }
1260 
1261 /*
1262  * Zero a range in @obj_req data buffer defined by a bio (list) or
1263  * (private) bio_vec array.
1264  *
1265  * @off is relative to the start of the data buffer.
1266  */
1267 static void rbd_obj_zero_range(struct rbd_obj_request *obj_req, u32 off,
1268 			       u32 bytes)
1269 {
1270 	switch (obj_req->img_request->data_type) {
1271 	case OBJ_REQUEST_BIO:
1272 		zero_bios(&obj_req->bio_pos, off, bytes);
1273 		break;
1274 	case OBJ_REQUEST_BVECS:
1275 	case OBJ_REQUEST_OWN_BVECS:
1276 		zero_bvecs(&obj_req->bvec_pos, off, bytes);
1277 		break;
1278 	default:
1279 		rbd_assert(0);
1280 	}
1281 }
1282 
1283 static void rbd_obj_request_destroy(struct kref *kref);
1284 static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
1285 {
1286 	rbd_assert(obj_request != NULL);
1287 	dout("%s: obj %p (was %d)\n", __func__, obj_request,
1288 		kref_read(&obj_request->kref));
1289 	kref_put(&obj_request->kref, rbd_obj_request_destroy);
1290 }
1291 
1292 static void rbd_img_request_get(struct rbd_img_request *img_request)
1293 {
1294 	dout("%s: img %p (was %d)\n", __func__, img_request,
1295 	     kref_read(&img_request->kref));
1296 	kref_get(&img_request->kref);
1297 }
1298 
1299 static void rbd_img_request_destroy(struct kref *kref);
1300 static void rbd_img_request_put(struct rbd_img_request *img_request)
1301 {
1302 	rbd_assert(img_request != NULL);
1303 	dout("%s: img %p (was %d)\n", __func__, img_request,
1304 		kref_read(&img_request->kref));
1305 	kref_put(&img_request->kref, rbd_img_request_destroy);
1306 }
1307 
1308 static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
1309 					struct rbd_obj_request *obj_request)
1310 {
1311 	rbd_assert(obj_request->img_request == NULL);
1312 
1313 	/* Image request now owns object's original reference */
1314 	obj_request->img_request = img_request;
1315 	img_request->obj_request_count++;
1316 	img_request->pending_count++;
1317 	dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
1318 }
1319 
1320 static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
1321 					struct rbd_obj_request *obj_request)
1322 {
1323 	dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
1324 	list_del(&obj_request->ex.oe_item);
1325 	rbd_assert(img_request->obj_request_count > 0);
1326 	img_request->obj_request_count--;
1327 	rbd_assert(obj_request->img_request == img_request);
1328 	rbd_obj_request_put(obj_request);
1329 }
1330 
1331 static void rbd_obj_request_submit(struct rbd_obj_request *obj_request)
1332 {
1333 	struct ceph_osd_request *osd_req = obj_request->osd_req;
1334 
1335 	dout("%s %p object_no %016llx %llu~%llu osd_req %p\n", __func__,
1336 	     obj_request, obj_request->ex.oe_objno, obj_request->ex.oe_off,
1337 	     obj_request->ex.oe_len, osd_req);
1338 	ceph_osdc_start_request(osd_req->r_osdc, osd_req, false);
1339 }
1340 
1341 /*
1342  * The default/initial value for all image request flags is 0.  Each
1343  * is conditionally set to 1 at image request initialization time
1344  * and currently never change thereafter.
1345  */
1346 static void img_request_layered_set(struct rbd_img_request *img_request)
1347 {
1348 	set_bit(IMG_REQ_LAYERED, &img_request->flags);
1349 	smp_mb();
1350 }
1351 
1352 static void img_request_layered_clear(struct rbd_img_request *img_request)
1353 {
1354 	clear_bit(IMG_REQ_LAYERED, &img_request->flags);
1355 	smp_mb();
1356 }
1357 
1358 static bool img_request_layered_test(struct rbd_img_request *img_request)
1359 {
1360 	smp_mb();
1361 	return test_bit(IMG_REQ_LAYERED, &img_request->flags) != 0;
1362 }
1363 
1364 static bool rbd_obj_is_entire(struct rbd_obj_request *obj_req)
1365 {
1366 	struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
1367 
1368 	return !obj_req->ex.oe_off &&
1369 	       obj_req->ex.oe_len == rbd_dev->layout.object_size;
1370 }
1371 
1372 static bool rbd_obj_is_tail(struct rbd_obj_request *obj_req)
1373 {
1374 	struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
1375 
1376 	return obj_req->ex.oe_off + obj_req->ex.oe_len ==
1377 					rbd_dev->layout.object_size;
1378 }
1379 
1380 static u64 rbd_obj_img_extents_bytes(struct rbd_obj_request *obj_req)
1381 {
1382 	return ceph_file_extents_bytes(obj_req->img_extents,
1383 				       obj_req->num_img_extents);
1384 }
1385 
1386 static bool rbd_img_is_write(struct rbd_img_request *img_req)
1387 {
1388 	switch (img_req->op_type) {
1389 	case OBJ_OP_READ:
1390 		return false;
1391 	case OBJ_OP_WRITE:
1392 	case OBJ_OP_DISCARD:
1393 		return true;
1394 	default:
1395 		rbd_assert(0);
1396 	}
1397 }
1398 
1399 static void rbd_obj_handle_request(struct rbd_obj_request *obj_req);
1400 
1401 static void rbd_osd_req_callback(struct ceph_osd_request *osd_req)
1402 {
1403 	struct rbd_obj_request *obj_req = osd_req->r_priv;
1404 
1405 	dout("%s osd_req %p result %d for obj_req %p\n", __func__, osd_req,
1406 	     osd_req->r_result, obj_req);
1407 	rbd_assert(osd_req == obj_req->osd_req);
1408 
1409 	obj_req->result = osd_req->r_result < 0 ? osd_req->r_result : 0;
1410 	if (!obj_req->result && !rbd_img_is_write(obj_req->img_request))
1411 		obj_req->xferred = osd_req->r_result;
1412 	else
1413 		/*
1414 		 * Writes aren't allowed to return a data payload.  In some
1415 		 * guarded write cases (e.g. stat + zero on an empty object)
1416 		 * a stat response makes it through, but we don't care.
1417 		 */
1418 		obj_req->xferred = 0;
1419 
1420 	rbd_obj_handle_request(obj_req);
1421 }
1422 
1423 static void rbd_osd_req_format_read(struct rbd_obj_request *obj_request)
1424 {
1425 	struct ceph_osd_request *osd_req = obj_request->osd_req;
1426 
1427 	osd_req->r_flags = CEPH_OSD_FLAG_READ;
1428 	osd_req->r_snapid = obj_request->img_request->snap_id;
1429 }
1430 
1431 static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request)
1432 {
1433 	struct ceph_osd_request *osd_req = obj_request->osd_req;
1434 
1435 	osd_req->r_flags = CEPH_OSD_FLAG_WRITE;
1436 	ktime_get_real_ts(&osd_req->r_mtime);
1437 	osd_req->r_data_offset = obj_request->ex.oe_off;
1438 }
1439 
1440 static struct ceph_osd_request *
1441 rbd_osd_req_create(struct rbd_obj_request *obj_req, unsigned int num_ops)
1442 {
1443 	struct rbd_img_request *img_req = obj_req->img_request;
1444 	struct rbd_device *rbd_dev = img_req->rbd_dev;
1445 	struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1446 	struct ceph_osd_request *req;
1447 	const char *name_format = rbd_dev->image_format == 1 ?
1448 				      RBD_V1_DATA_FORMAT : RBD_V2_DATA_FORMAT;
1449 
1450 	req = ceph_osdc_alloc_request(osdc,
1451 			(rbd_img_is_write(img_req) ? img_req->snapc : NULL),
1452 			num_ops, false, GFP_NOIO);
1453 	if (!req)
1454 		return NULL;
1455 
1456 	req->r_callback = rbd_osd_req_callback;
1457 	req->r_priv = obj_req;
1458 
1459 	req->r_base_oloc.pool = rbd_dev->layout.pool_id;
1460 	if (ceph_oid_aprintf(&req->r_base_oid, GFP_NOIO, name_format,
1461 			rbd_dev->header.object_prefix, obj_req->ex.oe_objno))
1462 		goto err_req;
1463 
1464 	if (ceph_osdc_alloc_messages(req, GFP_NOIO))
1465 		goto err_req;
1466 
1467 	return req;
1468 
1469 err_req:
1470 	ceph_osdc_put_request(req);
1471 	return NULL;
1472 }
1473 
1474 static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
1475 {
1476 	ceph_osdc_put_request(osd_req);
1477 }
1478 
1479 static struct rbd_obj_request *rbd_obj_request_create(void)
1480 {
1481 	struct rbd_obj_request *obj_request;
1482 
1483 	obj_request = kmem_cache_zalloc(rbd_obj_request_cache, GFP_NOIO);
1484 	if (!obj_request)
1485 		return NULL;
1486 
1487 	ceph_object_extent_init(&obj_request->ex);
1488 	kref_init(&obj_request->kref);
1489 
1490 	dout("%s %p\n", __func__, obj_request);
1491 	return obj_request;
1492 }
1493 
1494 static void rbd_obj_request_destroy(struct kref *kref)
1495 {
1496 	struct rbd_obj_request *obj_request;
1497 	u32 i;
1498 
1499 	obj_request = container_of(kref, struct rbd_obj_request, kref);
1500 
1501 	dout("%s: obj %p\n", __func__, obj_request);
1502 
1503 	if (obj_request->osd_req)
1504 		rbd_osd_req_destroy(obj_request->osd_req);
1505 
1506 	switch (obj_request->img_request->data_type) {
1507 	case OBJ_REQUEST_NODATA:
1508 	case OBJ_REQUEST_BIO:
1509 	case OBJ_REQUEST_BVECS:
1510 		break;		/* Nothing to do */
1511 	case OBJ_REQUEST_OWN_BVECS:
1512 		kfree(obj_request->bvec_pos.bvecs);
1513 		break;
1514 	default:
1515 		rbd_assert(0);
1516 	}
1517 
1518 	kfree(obj_request->img_extents);
1519 	if (obj_request->copyup_bvecs) {
1520 		for (i = 0; i < obj_request->copyup_bvec_count; i++) {
1521 			if (obj_request->copyup_bvecs[i].bv_page)
1522 				__free_page(obj_request->copyup_bvecs[i].bv_page);
1523 		}
1524 		kfree(obj_request->copyup_bvecs);
1525 	}
1526 
1527 	kmem_cache_free(rbd_obj_request_cache, obj_request);
1528 }
1529 
1530 /* It's OK to call this for a device with no parent */
1531 
1532 static void rbd_spec_put(struct rbd_spec *spec);
1533 static void rbd_dev_unparent(struct rbd_device *rbd_dev)
1534 {
1535 	rbd_dev_remove_parent(rbd_dev);
1536 	rbd_spec_put(rbd_dev->parent_spec);
1537 	rbd_dev->parent_spec = NULL;
1538 	rbd_dev->parent_overlap = 0;
1539 }
1540 
1541 /*
1542  * Parent image reference counting is used to determine when an
1543  * image's parent fields can be safely torn down--after there are no
1544  * more in-flight requests to the parent image.  When the last
1545  * reference is dropped, cleaning them up is safe.
1546  */
1547 static void rbd_dev_parent_put(struct rbd_device *rbd_dev)
1548 {
1549 	int counter;
1550 
1551 	if (!rbd_dev->parent_spec)
1552 		return;
1553 
1554 	counter = atomic_dec_return_safe(&rbd_dev->parent_ref);
1555 	if (counter > 0)
1556 		return;
1557 
1558 	/* Last reference; clean up parent data structures */
1559 
1560 	if (!counter)
1561 		rbd_dev_unparent(rbd_dev);
1562 	else
1563 		rbd_warn(rbd_dev, "parent reference underflow");
1564 }
1565 
1566 /*
1567  * If an image has a non-zero parent overlap, get a reference to its
1568  * parent.
1569  *
1570  * Returns true if the rbd device has a parent with a non-zero
1571  * overlap and a reference for it was successfully taken, or
1572  * false otherwise.
1573  */
1574 static bool rbd_dev_parent_get(struct rbd_device *rbd_dev)
1575 {
1576 	int counter = 0;
1577 
1578 	if (!rbd_dev->parent_spec)
1579 		return false;
1580 
1581 	down_read(&rbd_dev->header_rwsem);
1582 	if (rbd_dev->parent_overlap)
1583 		counter = atomic_inc_return_safe(&rbd_dev->parent_ref);
1584 	up_read(&rbd_dev->header_rwsem);
1585 
1586 	if (counter < 0)
1587 		rbd_warn(rbd_dev, "parent reference overflow");
1588 
1589 	return counter > 0;
1590 }
1591 
1592 /*
1593  * Caller is responsible for filling in the list of object requests
1594  * that comprises the image request, and the Linux request pointer
1595  * (if there is one).
1596  */
1597 static struct rbd_img_request *rbd_img_request_create(
1598 					struct rbd_device *rbd_dev,
1599 					enum obj_operation_type op_type,
1600 					struct ceph_snap_context *snapc)
1601 {
1602 	struct rbd_img_request *img_request;
1603 
1604 	img_request = kmem_cache_zalloc(rbd_img_request_cache, GFP_NOIO);
1605 	if (!img_request)
1606 		return NULL;
1607 
1608 	img_request->rbd_dev = rbd_dev;
1609 	img_request->op_type = op_type;
1610 	if (!rbd_img_is_write(img_request))
1611 		img_request->snap_id = rbd_dev->spec->snap_id;
1612 	else
1613 		img_request->snapc = snapc;
1614 
1615 	if (rbd_dev_parent_get(rbd_dev))
1616 		img_request_layered_set(img_request);
1617 
1618 	spin_lock_init(&img_request->completion_lock);
1619 	INIT_LIST_HEAD(&img_request->object_extents);
1620 	kref_init(&img_request->kref);
1621 
1622 	dout("%s: rbd_dev %p %s -> img %p\n", __func__, rbd_dev,
1623 	     obj_op_name(op_type), img_request);
1624 	return img_request;
1625 }
1626 
1627 static void rbd_img_request_destroy(struct kref *kref)
1628 {
1629 	struct rbd_img_request *img_request;
1630 	struct rbd_obj_request *obj_request;
1631 	struct rbd_obj_request *next_obj_request;
1632 
1633 	img_request = container_of(kref, struct rbd_img_request, kref);
1634 
1635 	dout("%s: img %p\n", __func__, img_request);
1636 
1637 	for_each_obj_request_safe(img_request, obj_request, next_obj_request)
1638 		rbd_img_obj_request_del(img_request, obj_request);
1639 	rbd_assert(img_request->obj_request_count == 0);
1640 
1641 	if (img_request_layered_test(img_request)) {
1642 		img_request_layered_clear(img_request);
1643 		rbd_dev_parent_put(img_request->rbd_dev);
1644 	}
1645 
1646 	if (rbd_img_is_write(img_request))
1647 		ceph_put_snap_context(img_request->snapc);
1648 
1649 	kmem_cache_free(rbd_img_request_cache, img_request);
1650 }
1651 
1652 static void prune_extents(struct ceph_file_extent *img_extents,
1653 			  u32 *num_img_extents, u64 overlap)
1654 {
1655 	u32 cnt = *num_img_extents;
1656 
1657 	/* drop extents completely beyond the overlap */
1658 	while (cnt && img_extents[cnt - 1].fe_off >= overlap)
1659 		cnt--;
1660 
1661 	if (cnt) {
1662 		struct ceph_file_extent *ex = &img_extents[cnt - 1];
1663 
1664 		/* trim final overlapping extent */
1665 		if (ex->fe_off + ex->fe_len > overlap)
1666 			ex->fe_len = overlap - ex->fe_off;
1667 	}
1668 
1669 	*num_img_extents = cnt;
1670 }
1671 
1672 /*
1673  * Determine the byte range(s) covered by either just the object extent
1674  * or the entire object in the parent image.
1675  */
1676 static int rbd_obj_calc_img_extents(struct rbd_obj_request *obj_req,
1677 				    bool entire)
1678 {
1679 	struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
1680 	int ret;
1681 
1682 	if (!rbd_dev->parent_overlap)
1683 		return 0;
1684 
1685 	ret = ceph_extent_to_file(&rbd_dev->layout, obj_req->ex.oe_objno,
1686 				  entire ? 0 : obj_req->ex.oe_off,
1687 				  entire ? rbd_dev->layout.object_size :
1688 							obj_req->ex.oe_len,
1689 				  &obj_req->img_extents,
1690 				  &obj_req->num_img_extents);
1691 	if (ret)
1692 		return ret;
1693 
1694 	prune_extents(obj_req->img_extents, &obj_req->num_img_extents,
1695 		      rbd_dev->parent_overlap);
1696 	return 0;
1697 }
1698 
1699 static void rbd_osd_req_setup_data(struct rbd_obj_request *obj_req, u32 which)
1700 {
1701 	switch (obj_req->img_request->data_type) {
1702 	case OBJ_REQUEST_BIO:
1703 		osd_req_op_extent_osd_data_bio(obj_req->osd_req, which,
1704 					       &obj_req->bio_pos,
1705 					       obj_req->ex.oe_len);
1706 		break;
1707 	case OBJ_REQUEST_BVECS:
1708 	case OBJ_REQUEST_OWN_BVECS:
1709 		rbd_assert(obj_req->bvec_pos.iter.bi_size ==
1710 							obj_req->ex.oe_len);
1711 		rbd_assert(obj_req->bvec_idx == obj_req->bvec_count);
1712 		osd_req_op_extent_osd_data_bvec_pos(obj_req->osd_req, which,
1713 						    &obj_req->bvec_pos);
1714 		break;
1715 	default:
1716 		rbd_assert(0);
1717 	}
1718 }
1719 
1720 static int rbd_obj_setup_read(struct rbd_obj_request *obj_req)
1721 {
1722 	obj_req->osd_req = rbd_osd_req_create(obj_req, 1);
1723 	if (!obj_req->osd_req)
1724 		return -ENOMEM;
1725 
1726 	osd_req_op_extent_init(obj_req->osd_req, 0, CEPH_OSD_OP_READ,
1727 			       obj_req->ex.oe_off, obj_req->ex.oe_len, 0, 0);
1728 	rbd_osd_req_setup_data(obj_req, 0);
1729 
1730 	rbd_osd_req_format_read(obj_req);
1731 	return 0;
1732 }
1733 
1734 static int __rbd_obj_setup_stat(struct rbd_obj_request *obj_req,
1735 				unsigned int which)
1736 {
1737 	struct page **pages;
1738 
1739 	/*
1740 	 * The response data for a STAT call consists of:
1741 	 *     le64 length;
1742 	 *     struct {
1743 	 *         le32 tv_sec;
1744 	 *         le32 tv_nsec;
1745 	 *     } mtime;
1746 	 */
1747 	pages = ceph_alloc_page_vector(1, GFP_NOIO);
1748 	if (IS_ERR(pages))
1749 		return PTR_ERR(pages);
1750 
1751 	osd_req_op_init(obj_req->osd_req, which, CEPH_OSD_OP_STAT, 0);
1752 	osd_req_op_raw_data_in_pages(obj_req->osd_req, which, pages,
1753 				     8 + sizeof(struct ceph_timespec),
1754 				     0, false, true);
1755 	return 0;
1756 }
1757 
1758 static void __rbd_obj_setup_write(struct rbd_obj_request *obj_req,
1759 				  unsigned int which)
1760 {
1761 	struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
1762 	u16 opcode;
1763 
1764 	osd_req_op_alloc_hint_init(obj_req->osd_req, which++,
1765 				   rbd_dev->layout.object_size,
1766 				   rbd_dev->layout.object_size);
1767 
1768 	if (rbd_obj_is_entire(obj_req))
1769 		opcode = CEPH_OSD_OP_WRITEFULL;
1770 	else
1771 		opcode = CEPH_OSD_OP_WRITE;
1772 
1773 	osd_req_op_extent_init(obj_req->osd_req, which, opcode,
1774 			       obj_req->ex.oe_off, obj_req->ex.oe_len, 0, 0);
1775 	rbd_osd_req_setup_data(obj_req, which++);
1776 
1777 	rbd_assert(which == obj_req->osd_req->r_num_ops);
1778 	rbd_osd_req_format_write(obj_req);
1779 }
1780 
1781 static int rbd_obj_setup_write(struct rbd_obj_request *obj_req)
1782 {
1783 	unsigned int num_osd_ops, which = 0;
1784 	int ret;
1785 
1786 	/* reverse map the entire object onto the parent */
1787 	ret = rbd_obj_calc_img_extents(obj_req, true);
1788 	if (ret)
1789 		return ret;
1790 
1791 	if (obj_req->num_img_extents) {
1792 		obj_req->write_state = RBD_OBJ_WRITE_GUARD;
1793 		num_osd_ops = 3; /* stat + setallochint + write/writefull */
1794 	} else {
1795 		obj_req->write_state = RBD_OBJ_WRITE_FLAT;
1796 		num_osd_ops = 2; /* setallochint + write/writefull */
1797 	}
1798 
1799 	obj_req->osd_req = rbd_osd_req_create(obj_req, num_osd_ops);
1800 	if (!obj_req->osd_req)
1801 		return -ENOMEM;
1802 
1803 	if (obj_req->num_img_extents) {
1804 		ret = __rbd_obj_setup_stat(obj_req, which++);
1805 		if (ret)
1806 			return ret;
1807 	}
1808 
1809 	__rbd_obj_setup_write(obj_req, which);
1810 	return 0;
1811 }
1812 
1813 static void __rbd_obj_setup_discard(struct rbd_obj_request *obj_req,
1814 				    unsigned int which)
1815 {
1816 	u16 opcode;
1817 
1818 	if (rbd_obj_is_entire(obj_req)) {
1819 		if (obj_req->num_img_extents) {
1820 			osd_req_op_init(obj_req->osd_req, which++,
1821 					CEPH_OSD_OP_CREATE, 0);
1822 			opcode = CEPH_OSD_OP_TRUNCATE;
1823 		} else {
1824 			osd_req_op_init(obj_req->osd_req, which++,
1825 					CEPH_OSD_OP_DELETE, 0);
1826 			opcode = 0;
1827 		}
1828 	} else if (rbd_obj_is_tail(obj_req)) {
1829 		opcode = CEPH_OSD_OP_TRUNCATE;
1830 	} else {
1831 		opcode = CEPH_OSD_OP_ZERO;
1832 	}
1833 
1834 	if (opcode)
1835 		osd_req_op_extent_init(obj_req->osd_req, which++, opcode,
1836 				       obj_req->ex.oe_off, obj_req->ex.oe_len,
1837 				       0, 0);
1838 
1839 	rbd_assert(which == obj_req->osd_req->r_num_ops);
1840 	rbd_osd_req_format_write(obj_req);
1841 }
1842 
1843 static int rbd_obj_setup_discard(struct rbd_obj_request *obj_req)
1844 {
1845 	unsigned int num_osd_ops, which = 0;
1846 	int ret;
1847 
1848 	/* reverse map the entire object onto the parent */
1849 	ret = rbd_obj_calc_img_extents(obj_req, true);
1850 	if (ret)
1851 		return ret;
1852 
1853 	if (rbd_obj_is_entire(obj_req)) {
1854 		obj_req->write_state = RBD_OBJ_WRITE_FLAT;
1855 		if (obj_req->num_img_extents)
1856 			num_osd_ops = 2; /* create + truncate */
1857 		else
1858 			num_osd_ops = 1; /* delete */
1859 	} else {
1860 		if (obj_req->num_img_extents) {
1861 			obj_req->write_state = RBD_OBJ_WRITE_GUARD;
1862 			num_osd_ops = 2; /* stat + truncate/zero */
1863 		} else {
1864 			obj_req->write_state = RBD_OBJ_WRITE_FLAT;
1865 			num_osd_ops = 1; /* truncate/zero */
1866 		}
1867 	}
1868 
1869 	obj_req->osd_req = rbd_osd_req_create(obj_req, num_osd_ops);
1870 	if (!obj_req->osd_req)
1871 		return -ENOMEM;
1872 
1873 	if (!rbd_obj_is_entire(obj_req) && obj_req->num_img_extents) {
1874 		ret = __rbd_obj_setup_stat(obj_req, which++);
1875 		if (ret)
1876 			return ret;
1877 	}
1878 
1879 	__rbd_obj_setup_discard(obj_req, which);
1880 	return 0;
1881 }
1882 
1883 /*
1884  * For each object request in @img_req, allocate an OSD request, add
1885  * individual OSD ops and prepare them for submission.  The number of
1886  * OSD ops depends on op_type and the overlap point (if any).
1887  */
1888 static int __rbd_img_fill_request(struct rbd_img_request *img_req)
1889 {
1890 	struct rbd_obj_request *obj_req;
1891 	int ret;
1892 
1893 	for_each_obj_request(img_req, obj_req) {
1894 		switch (img_req->op_type) {
1895 		case OBJ_OP_READ:
1896 			ret = rbd_obj_setup_read(obj_req);
1897 			break;
1898 		case OBJ_OP_WRITE:
1899 			ret = rbd_obj_setup_write(obj_req);
1900 			break;
1901 		case OBJ_OP_DISCARD:
1902 			ret = rbd_obj_setup_discard(obj_req);
1903 			break;
1904 		default:
1905 			rbd_assert(0);
1906 		}
1907 		if (ret)
1908 			return ret;
1909 	}
1910 
1911 	return 0;
1912 }
1913 
1914 union rbd_img_fill_iter {
1915 	struct ceph_bio_iter	bio_iter;
1916 	struct ceph_bvec_iter	bvec_iter;
1917 };
1918 
1919 struct rbd_img_fill_ctx {
1920 	enum obj_request_type	pos_type;
1921 	union rbd_img_fill_iter	*pos;
1922 	union rbd_img_fill_iter	iter;
1923 	ceph_object_extent_fn_t	set_pos_fn;
1924 	ceph_object_extent_fn_t	count_fn;
1925 	ceph_object_extent_fn_t	copy_fn;
1926 };
1927 
1928 static struct ceph_object_extent *alloc_object_extent(void *arg)
1929 {
1930 	struct rbd_img_request *img_req = arg;
1931 	struct rbd_obj_request *obj_req;
1932 
1933 	obj_req = rbd_obj_request_create();
1934 	if (!obj_req)
1935 		return NULL;
1936 
1937 	rbd_img_obj_request_add(img_req, obj_req);
1938 	return &obj_req->ex;
1939 }
1940 
1941 /*
1942  * While su != os && sc == 1 is technically not fancy (it's the same
1943  * layout as su == os && sc == 1), we can't use the nocopy path for it
1944  * because ->set_pos_fn() should be called only once per object.
1945  * ceph_file_to_extents() invokes action_fn once per stripe unit, so
1946  * treat su != os && sc == 1 as fancy.
1947  */
1948 static bool rbd_layout_is_fancy(struct ceph_file_layout *l)
1949 {
1950 	return l->stripe_unit != l->object_size;
1951 }
1952 
1953 static int rbd_img_fill_request_nocopy(struct rbd_img_request *img_req,
1954 				       struct ceph_file_extent *img_extents,
1955 				       u32 num_img_extents,
1956 				       struct rbd_img_fill_ctx *fctx)
1957 {
1958 	u32 i;
1959 	int ret;
1960 
1961 	img_req->data_type = fctx->pos_type;
1962 
1963 	/*
1964 	 * Create object requests and set each object request's starting
1965 	 * position in the provided bio (list) or bio_vec array.
1966 	 */
1967 	fctx->iter = *fctx->pos;
1968 	for (i = 0; i < num_img_extents; i++) {
1969 		ret = ceph_file_to_extents(&img_req->rbd_dev->layout,
1970 					   img_extents[i].fe_off,
1971 					   img_extents[i].fe_len,
1972 					   &img_req->object_extents,
1973 					   alloc_object_extent, img_req,
1974 					   fctx->set_pos_fn, &fctx->iter);
1975 		if (ret)
1976 			return ret;
1977 	}
1978 
1979 	return __rbd_img_fill_request(img_req);
1980 }
1981 
1982 /*
1983  * Map a list of image extents to a list of object extents, create the
1984  * corresponding object requests (normally each to a different object,
1985  * but not always) and add them to @img_req.  For each object request,
1986  * set up its data descriptor to point to the corresponding chunk(s) of
1987  * @fctx->pos data buffer.
1988  *
1989  * Because ceph_file_to_extents() will merge adjacent object extents
1990  * together, each object request's data descriptor may point to multiple
1991  * different chunks of @fctx->pos data buffer.
1992  *
1993  * @fctx->pos data buffer is assumed to be large enough.
1994  */
1995 static int rbd_img_fill_request(struct rbd_img_request *img_req,
1996 				struct ceph_file_extent *img_extents,
1997 				u32 num_img_extents,
1998 				struct rbd_img_fill_ctx *fctx)
1999 {
2000 	struct rbd_device *rbd_dev = img_req->rbd_dev;
2001 	struct rbd_obj_request *obj_req;
2002 	u32 i;
2003 	int ret;
2004 
2005 	if (fctx->pos_type == OBJ_REQUEST_NODATA ||
2006 	    !rbd_layout_is_fancy(&rbd_dev->layout))
2007 		return rbd_img_fill_request_nocopy(img_req, img_extents,
2008 						   num_img_extents, fctx);
2009 
2010 	img_req->data_type = OBJ_REQUEST_OWN_BVECS;
2011 
2012 	/*
2013 	 * Create object requests and determine ->bvec_count for each object
2014 	 * request.  Note that ->bvec_count sum over all object requests may
2015 	 * be greater than the number of bio_vecs in the provided bio (list)
2016 	 * or bio_vec array because when mapped, those bio_vecs can straddle
2017 	 * stripe unit boundaries.
2018 	 */
2019 	fctx->iter = *fctx->pos;
2020 	for (i = 0; i < num_img_extents; i++) {
2021 		ret = ceph_file_to_extents(&rbd_dev->layout,
2022 					   img_extents[i].fe_off,
2023 					   img_extents[i].fe_len,
2024 					   &img_req->object_extents,
2025 					   alloc_object_extent, img_req,
2026 					   fctx->count_fn, &fctx->iter);
2027 		if (ret)
2028 			return ret;
2029 	}
2030 
2031 	for_each_obj_request(img_req, obj_req) {
2032 		obj_req->bvec_pos.bvecs = kmalloc_array(obj_req->bvec_count,
2033 					      sizeof(*obj_req->bvec_pos.bvecs),
2034 					      GFP_NOIO);
2035 		if (!obj_req->bvec_pos.bvecs)
2036 			return -ENOMEM;
2037 	}
2038 
2039 	/*
2040 	 * Fill in each object request's private bio_vec array, splitting and
2041 	 * rearranging the provided bio_vecs in stripe unit chunks as needed.
2042 	 */
2043 	fctx->iter = *fctx->pos;
2044 	for (i = 0; i < num_img_extents; i++) {
2045 		ret = ceph_iterate_extents(&rbd_dev->layout,
2046 					   img_extents[i].fe_off,
2047 					   img_extents[i].fe_len,
2048 					   &img_req->object_extents,
2049 					   fctx->copy_fn, &fctx->iter);
2050 		if (ret)
2051 			return ret;
2052 	}
2053 
2054 	return __rbd_img_fill_request(img_req);
2055 }
2056 
2057 static int rbd_img_fill_nodata(struct rbd_img_request *img_req,
2058 			       u64 off, u64 len)
2059 {
2060 	struct ceph_file_extent ex = { off, len };
2061 	union rbd_img_fill_iter dummy;
2062 	struct rbd_img_fill_ctx fctx = {
2063 		.pos_type = OBJ_REQUEST_NODATA,
2064 		.pos = &dummy,
2065 	};
2066 
2067 	return rbd_img_fill_request(img_req, &ex, 1, &fctx);
2068 }
2069 
2070 static void set_bio_pos(struct ceph_object_extent *ex, u32 bytes, void *arg)
2071 {
2072 	struct rbd_obj_request *obj_req =
2073 	    container_of(ex, struct rbd_obj_request, ex);
2074 	struct ceph_bio_iter *it = arg;
2075 
2076 	dout("%s objno %llu bytes %u\n", __func__, ex->oe_objno, bytes);
2077 	obj_req->bio_pos = *it;
2078 	ceph_bio_iter_advance(it, bytes);
2079 }
2080 
2081 static void count_bio_bvecs(struct ceph_object_extent *ex, u32 bytes, void *arg)
2082 {
2083 	struct rbd_obj_request *obj_req =
2084 	    container_of(ex, struct rbd_obj_request, ex);
2085 	struct ceph_bio_iter *it = arg;
2086 
2087 	dout("%s objno %llu bytes %u\n", __func__, ex->oe_objno, bytes);
2088 	ceph_bio_iter_advance_step(it, bytes, ({
2089 		obj_req->bvec_count++;
2090 	}));
2091 
2092 }
2093 
2094 static void copy_bio_bvecs(struct ceph_object_extent *ex, u32 bytes, void *arg)
2095 {
2096 	struct rbd_obj_request *obj_req =
2097 	    container_of(ex, struct rbd_obj_request, ex);
2098 	struct ceph_bio_iter *it = arg;
2099 
2100 	dout("%s objno %llu bytes %u\n", __func__, ex->oe_objno, bytes);
2101 	ceph_bio_iter_advance_step(it, bytes, ({
2102 		obj_req->bvec_pos.bvecs[obj_req->bvec_idx++] = bv;
2103 		obj_req->bvec_pos.iter.bi_size += bv.bv_len;
2104 	}));
2105 }
2106 
2107 static int __rbd_img_fill_from_bio(struct rbd_img_request *img_req,
2108 				   struct ceph_file_extent *img_extents,
2109 				   u32 num_img_extents,
2110 				   struct ceph_bio_iter *bio_pos)
2111 {
2112 	struct rbd_img_fill_ctx fctx = {
2113 		.pos_type = OBJ_REQUEST_BIO,
2114 		.pos = (union rbd_img_fill_iter *)bio_pos,
2115 		.set_pos_fn = set_bio_pos,
2116 		.count_fn = count_bio_bvecs,
2117 		.copy_fn = copy_bio_bvecs,
2118 	};
2119 
2120 	return rbd_img_fill_request(img_req, img_extents, num_img_extents,
2121 				    &fctx);
2122 }
2123 
2124 static int rbd_img_fill_from_bio(struct rbd_img_request *img_req,
2125 				 u64 off, u64 len, struct bio *bio)
2126 {
2127 	struct ceph_file_extent ex = { off, len };
2128 	struct ceph_bio_iter it = { .bio = bio, .iter = bio->bi_iter };
2129 
2130 	return __rbd_img_fill_from_bio(img_req, &ex, 1, &it);
2131 }
2132 
2133 static void set_bvec_pos(struct ceph_object_extent *ex, u32 bytes, void *arg)
2134 {
2135 	struct rbd_obj_request *obj_req =
2136 	    container_of(ex, struct rbd_obj_request, ex);
2137 	struct ceph_bvec_iter *it = arg;
2138 
2139 	obj_req->bvec_pos = *it;
2140 	ceph_bvec_iter_shorten(&obj_req->bvec_pos, bytes);
2141 	ceph_bvec_iter_advance(it, bytes);
2142 }
2143 
2144 static void count_bvecs(struct ceph_object_extent *ex, u32 bytes, void *arg)
2145 {
2146 	struct rbd_obj_request *obj_req =
2147 	    container_of(ex, struct rbd_obj_request, ex);
2148 	struct ceph_bvec_iter *it = arg;
2149 
2150 	ceph_bvec_iter_advance_step(it, bytes, ({
2151 		obj_req->bvec_count++;
2152 	}));
2153 }
2154 
2155 static void copy_bvecs(struct ceph_object_extent *ex, u32 bytes, void *arg)
2156 {
2157 	struct rbd_obj_request *obj_req =
2158 	    container_of(ex, struct rbd_obj_request, ex);
2159 	struct ceph_bvec_iter *it = arg;
2160 
2161 	ceph_bvec_iter_advance_step(it, bytes, ({
2162 		obj_req->bvec_pos.bvecs[obj_req->bvec_idx++] = bv;
2163 		obj_req->bvec_pos.iter.bi_size += bv.bv_len;
2164 	}));
2165 }
2166 
2167 static int __rbd_img_fill_from_bvecs(struct rbd_img_request *img_req,
2168 				     struct ceph_file_extent *img_extents,
2169 				     u32 num_img_extents,
2170 				     struct ceph_bvec_iter *bvec_pos)
2171 {
2172 	struct rbd_img_fill_ctx fctx = {
2173 		.pos_type = OBJ_REQUEST_BVECS,
2174 		.pos = (union rbd_img_fill_iter *)bvec_pos,
2175 		.set_pos_fn = set_bvec_pos,
2176 		.count_fn = count_bvecs,
2177 		.copy_fn = copy_bvecs,
2178 	};
2179 
2180 	return rbd_img_fill_request(img_req, img_extents, num_img_extents,
2181 				    &fctx);
2182 }
2183 
2184 static int rbd_img_fill_from_bvecs(struct rbd_img_request *img_req,
2185 				   struct ceph_file_extent *img_extents,
2186 				   u32 num_img_extents,
2187 				   struct bio_vec *bvecs)
2188 {
2189 	struct ceph_bvec_iter it = {
2190 		.bvecs = bvecs,
2191 		.iter = { .bi_size = ceph_file_extents_bytes(img_extents,
2192 							     num_img_extents) },
2193 	};
2194 
2195 	return __rbd_img_fill_from_bvecs(img_req, img_extents, num_img_extents,
2196 					 &it);
2197 }
2198 
2199 static void rbd_img_request_submit(struct rbd_img_request *img_request)
2200 {
2201 	struct rbd_obj_request *obj_request;
2202 
2203 	dout("%s: img %p\n", __func__, img_request);
2204 
2205 	rbd_img_request_get(img_request);
2206 	for_each_obj_request(img_request, obj_request)
2207 		rbd_obj_request_submit(obj_request);
2208 
2209 	rbd_img_request_put(img_request);
2210 }
2211 
2212 static int rbd_obj_read_from_parent(struct rbd_obj_request *obj_req)
2213 {
2214 	struct rbd_img_request *img_req = obj_req->img_request;
2215 	struct rbd_img_request *child_img_req;
2216 	int ret;
2217 
2218 	child_img_req = rbd_img_request_create(img_req->rbd_dev->parent,
2219 					       OBJ_OP_READ, NULL);
2220 	if (!child_img_req)
2221 		return -ENOMEM;
2222 
2223 	__set_bit(IMG_REQ_CHILD, &child_img_req->flags);
2224 	child_img_req->obj_request = obj_req;
2225 
2226 	if (!rbd_img_is_write(img_req)) {
2227 		switch (img_req->data_type) {
2228 		case OBJ_REQUEST_BIO:
2229 			ret = __rbd_img_fill_from_bio(child_img_req,
2230 						      obj_req->img_extents,
2231 						      obj_req->num_img_extents,
2232 						      &obj_req->bio_pos);
2233 			break;
2234 		case OBJ_REQUEST_BVECS:
2235 		case OBJ_REQUEST_OWN_BVECS:
2236 			ret = __rbd_img_fill_from_bvecs(child_img_req,
2237 						      obj_req->img_extents,
2238 						      obj_req->num_img_extents,
2239 						      &obj_req->bvec_pos);
2240 			break;
2241 		default:
2242 			rbd_assert(0);
2243 		}
2244 	} else {
2245 		ret = rbd_img_fill_from_bvecs(child_img_req,
2246 					      obj_req->img_extents,
2247 					      obj_req->num_img_extents,
2248 					      obj_req->copyup_bvecs);
2249 	}
2250 	if (ret) {
2251 		rbd_img_request_put(child_img_req);
2252 		return ret;
2253 	}
2254 
2255 	rbd_img_request_submit(child_img_req);
2256 	return 0;
2257 }
2258 
2259 static bool rbd_obj_handle_read(struct rbd_obj_request *obj_req)
2260 {
2261 	struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
2262 	int ret;
2263 
2264 	if (obj_req->result == -ENOENT &&
2265 	    rbd_dev->parent_overlap && !obj_req->tried_parent) {
2266 		/* reverse map this object extent onto the parent */
2267 		ret = rbd_obj_calc_img_extents(obj_req, false);
2268 		if (ret) {
2269 			obj_req->result = ret;
2270 			return true;
2271 		}
2272 
2273 		if (obj_req->num_img_extents) {
2274 			obj_req->tried_parent = true;
2275 			ret = rbd_obj_read_from_parent(obj_req);
2276 			if (ret) {
2277 				obj_req->result = ret;
2278 				return true;
2279 			}
2280 			return false;
2281 		}
2282 	}
2283 
2284 	/*
2285 	 * -ENOENT means a hole in the image -- zero-fill the entire
2286 	 * length of the request.  A short read also implies zero-fill
2287 	 * to the end of the request.  In both cases we update xferred
2288 	 * count to indicate the whole request was satisfied.
2289 	 */
2290 	if (obj_req->result == -ENOENT ||
2291 	    (!obj_req->result && obj_req->xferred < obj_req->ex.oe_len)) {
2292 		rbd_assert(!obj_req->xferred || !obj_req->result);
2293 		rbd_obj_zero_range(obj_req, obj_req->xferred,
2294 				   obj_req->ex.oe_len - obj_req->xferred);
2295 		obj_req->result = 0;
2296 		obj_req->xferred = obj_req->ex.oe_len;
2297 	}
2298 
2299 	return true;
2300 }
2301 
2302 /*
2303  * copyup_bvecs pages are never highmem pages
2304  */
2305 static bool is_zero_bvecs(struct bio_vec *bvecs, u32 bytes)
2306 {
2307 	struct ceph_bvec_iter it = {
2308 		.bvecs = bvecs,
2309 		.iter = { .bi_size = bytes },
2310 	};
2311 
2312 	ceph_bvec_iter_advance_step(&it, bytes, ({
2313 		if (memchr_inv(page_address(bv.bv_page) + bv.bv_offset, 0,
2314 			       bv.bv_len))
2315 			return false;
2316 	}));
2317 	return true;
2318 }
2319 
2320 static int rbd_obj_issue_copyup(struct rbd_obj_request *obj_req, u32 bytes)
2321 {
2322 	unsigned int num_osd_ops = obj_req->osd_req->r_num_ops;
2323 
2324 	dout("%s obj_req %p bytes %u\n", __func__, obj_req, bytes);
2325 	rbd_assert(obj_req->osd_req->r_ops[0].op == CEPH_OSD_OP_STAT);
2326 	rbd_osd_req_destroy(obj_req->osd_req);
2327 
2328 	/*
2329 	 * Create a copyup request with the same number of OSD ops as
2330 	 * the original request.  The original request was stat + op(s),
2331 	 * the new copyup request will be copyup + the same op(s).
2332 	 */
2333 	obj_req->osd_req = rbd_osd_req_create(obj_req, num_osd_ops);
2334 	if (!obj_req->osd_req)
2335 		return -ENOMEM;
2336 
2337 	/*
2338 	 * Only send non-zero copyup data to save some I/O and network
2339 	 * bandwidth -- zero copyup data is equivalent to the object not
2340 	 * existing.
2341 	 */
2342 	if (is_zero_bvecs(obj_req->copyup_bvecs, bytes)) {
2343 		dout("%s obj_req %p detected zeroes\n", __func__, obj_req);
2344 		bytes = 0;
2345 	}
2346 
2347 	osd_req_op_cls_init(obj_req->osd_req, 0, CEPH_OSD_OP_CALL, "rbd",
2348 			    "copyup");
2349 	osd_req_op_cls_request_data_bvecs(obj_req->osd_req, 0,
2350 					  obj_req->copyup_bvecs, bytes);
2351 
2352 	switch (obj_req->img_request->op_type) {
2353 	case OBJ_OP_WRITE:
2354 		__rbd_obj_setup_write(obj_req, 1);
2355 		break;
2356 	case OBJ_OP_DISCARD:
2357 		rbd_assert(!rbd_obj_is_entire(obj_req));
2358 		__rbd_obj_setup_discard(obj_req, 1);
2359 		break;
2360 	default:
2361 		rbd_assert(0);
2362 	}
2363 
2364 	rbd_obj_request_submit(obj_req);
2365 	return 0;
2366 }
2367 
2368 static int setup_copyup_bvecs(struct rbd_obj_request *obj_req, u64 obj_overlap)
2369 {
2370 	u32 i;
2371 
2372 	rbd_assert(!obj_req->copyup_bvecs);
2373 	obj_req->copyup_bvec_count = calc_pages_for(0, obj_overlap);
2374 	obj_req->copyup_bvecs = kcalloc(obj_req->copyup_bvec_count,
2375 					sizeof(*obj_req->copyup_bvecs),
2376 					GFP_NOIO);
2377 	if (!obj_req->copyup_bvecs)
2378 		return -ENOMEM;
2379 
2380 	for (i = 0; i < obj_req->copyup_bvec_count; i++) {
2381 		unsigned int len = min(obj_overlap, (u64)PAGE_SIZE);
2382 
2383 		obj_req->copyup_bvecs[i].bv_page = alloc_page(GFP_NOIO);
2384 		if (!obj_req->copyup_bvecs[i].bv_page)
2385 			return -ENOMEM;
2386 
2387 		obj_req->copyup_bvecs[i].bv_offset = 0;
2388 		obj_req->copyup_bvecs[i].bv_len = len;
2389 		obj_overlap -= len;
2390 	}
2391 
2392 	rbd_assert(!obj_overlap);
2393 	return 0;
2394 }
2395 
2396 static int rbd_obj_handle_write_guard(struct rbd_obj_request *obj_req)
2397 {
2398 	struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
2399 	int ret;
2400 
2401 	rbd_assert(obj_req->num_img_extents);
2402 	prune_extents(obj_req->img_extents, &obj_req->num_img_extents,
2403 		      rbd_dev->parent_overlap);
2404 	if (!obj_req->num_img_extents) {
2405 		/*
2406 		 * The overlap has become 0 (most likely because the
2407 		 * image has been flattened).  Use rbd_obj_issue_copyup()
2408 		 * to re-submit the original write request -- the copyup
2409 		 * operation itself will be a no-op, since someone must
2410 		 * have populated the child object while we weren't
2411 		 * looking.  Move to WRITE_FLAT state as we'll be done
2412 		 * with the operation once the null copyup completes.
2413 		 */
2414 		obj_req->write_state = RBD_OBJ_WRITE_FLAT;
2415 		return rbd_obj_issue_copyup(obj_req, 0);
2416 	}
2417 
2418 	ret = setup_copyup_bvecs(obj_req, rbd_obj_img_extents_bytes(obj_req));
2419 	if (ret)
2420 		return ret;
2421 
2422 	obj_req->write_state = RBD_OBJ_WRITE_COPYUP;
2423 	return rbd_obj_read_from_parent(obj_req);
2424 }
2425 
2426 static bool rbd_obj_handle_write(struct rbd_obj_request *obj_req)
2427 {
2428 	int ret;
2429 
2430 again:
2431 	switch (obj_req->write_state) {
2432 	case RBD_OBJ_WRITE_GUARD:
2433 		rbd_assert(!obj_req->xferred);
2434 		if (obj_req->result == -ENOENT) {
2435 			/*
2436 			 * The target object doesn't exist.  Read the data for
2437 			 * the entire target object up to the overlap point (if
2438 			 * any) from the parent, so we can use it for a copyup.
2439 			 */
2440 			ret = rbd_obj_handle_write_guard(obj_req);
2441 			if (ret) {
2442 				obj_req->result = ret;
2443 				return true;
2444 			}
2445 			return false;
2446 		}
2447 		/* fall through */
2448 	case RBD_OBJ_WRITE_FLAT:
2449 		if (!obj_req->result)
2450 			/*
2451 			 * There is no such thing as a successful short
2452 			 * write -- indicate the whole request was satisfied.
2453 			 */
2454 			obj_req->xferred = obj_req->ex.oe_len;
2455 		return true;
2456 	case RBD_OBJ_WRITE_COPYUP:
2457 		obj_req->write_state = RBD_OBJ_WRITE_GUARD;
2458 		if (obj_req->result)
2459 			goto again;
2460 
2461 		rbd_assert(obj_req->xferred);
2462 		ret = rbd_obj_issue_copyup(obj_req, obj_req->xferred);
2463 		if (ret) {
2464 			obj_req->result = ret;
2465 			return true;
2466 		}
2467 		return false;
2468 	default:
2469 		rbd_assert(0);
2470 	}
2471 }
2472 
2473 /*
2474  * Returns true if @obj_req is completed, or false otherwise.
2475  */
2476 static bool __rbd_obj_handle_request(struct rbd_obj_request *obj_req)
2477 {
2478 	switch (obj_req->img_request->op_type) {
2479 	case OBJ_OP_READ:
2480 		return rbd_obj_handle_read(obj_req);
2481 	case OBJ_OP_WRITE:
2482 		return rbd_obj_handle_write(obj_req);
2483 	case OBJ_OP_DISCARD:
2484 		if (rbd_obj_handle_write(obj_req)) {
2485 			/*
2486 			 * Hide -ENOENT from delete/truncate/zero -- discarding
2487 			 * a non-existent object is not a problem.
2488 			 */
2489 			if (obj_req->result == -ENOENT) {
2490 				obj_req->result = 0;
2491 				obj_req->xferred = obj_req->ex.oe_len;
2492 			}
2493 			return true;
2494 		}
2495 		return false;
2496 	default:
2497 		rbd_assert(0);
2498 	}
2499 }
2500 
2501 static void rbd_obj_end_request(struct rbd_obj_request *obj_req)
2502 {
2503 	struct rbd_img_request *img_req = obj_req->img_request;
2504 
2505 	rbd_assert((!obj_req->result &&
2506 		    obj_req->xferred == obj_req->ex.oe_len) ||
2507 		   (obj_req->result < 0 && !obj_req->xferred));
2508 	if (!obj_req->result) {
2509 		img_req->xferred += obj_req->xferred;
2510 		return;
2511 	}
2512 
2513 	rbd_warn(img_req->rbd_dev,
2514 		 "%s at objno %llu %llu~%llu result %d xferred %llu",
2515 		 obj_op_name(img_req->op_type), obj_req->ex.oe_objno,
2516 		 obj_req->ex.oe_off, obj_req->ex.oe_len, obj_req->result,
2517 		 obj_req->xferred);
2518 	if (!img_req->result) {
2519 		img_req->result = obj_req->result;
2520 		img_req->xferred = 0;
2521 	}
2522 }
2523 
2524 static void rbd_img_end_child_request(struct rbd_img_request *img_req)
2525 {
2526 	struct rbd_obj_request *obj_req = img_req->obj_request;
2527 
2528 	rbd_assert(test_bit(IMG_REQ_CHILD, &img_req->flags));
2529 	rbd_assert((!img_req->result &&
2530 		    img_req->xferred == rbd_obj_img_extents_bytes(obj_req)) ||
2531 		   (img_req->result < 0 && !img_req->xferred));
2532 
2533 	obj_req->result = img_req->result;
2534 	obj_req->xferred = img_req->xferred;
2535 	rbd_img_request_put(img_req);
2536 }
2537 
2538 static void rbd_img_end_request(struct rbd_img_request *img_req)
2539 {
2540 	rbd_assert(!test_bit(IMG_REQ_CHILD, &img_req->flags));
2541 	rbd_assert((!img_req->result &&
2542 		    img_req->xferred == blk_rq_bytes(img_req->rq)) ||
2543 		   (img_req->result < 0 && !img_req->xferred));
2544 
2545 	blk_mq_end_request(img_req->rq,
2546 			   errno_to_blk_status(img_req->result));
2547 	rbd_img_request_put(img_req);
2548 }
2549 
2550 static void rbd_obj_handle_request(struct rbd_obj_request *obj_req)
2551 {
2552 	struct rbd_img_request *img_req;
2553 
2554 again:
2555 	if (!__rbd_obj_handle_request(obj_req))
2556 		return;
2557 
2558 	img_req = obj_req->img_request;
2559 	spin_lock(&img_req->completion_lock);
2560 	rbd_obj_end_request(obj_req);
2561 	rbd_assert(img_req->pending_count);
2562 	if (--img_req->pending_count) {
2563 		spin_unlock(&img_req->completion_lock);
2564 		return;
2565 	}
2566 
2567 	spin_unlock(&img_req->completion_lock);
2568 	if (test_bit(IMG_REQ_CHILD, &img_req->flags)) {
2569 		obj_req = img_req->obj_request;
2570 		rbd_img_end_child_request(img_req);
2571 		goto again;
2572 	}
2573 	rbd_img_end_request(img_req);
2574 }
2575 
2576 static const struct rbd_client_id rbd_empty_cid;
2577 
2578 static bool rbd_cid_equal(const struct rbd_client_id *lhs,
2579 			  const struct rbd_client_id *rhs)
2580 {
2581 	return lhs->gid == rhs->gid && lhs->handle == rhs->handle;
2582 }
2583 
2584 static struct rbd_client_id rbd_get_cid(struct rbd_device *rbd_dev)
2585 {
2586 	struct rbd_client_id cid;
2587 
2588 	mutex_lock(&rbd_dev->watch_mutex);
2589 	cid.gid = ceph_client_gid(rbd_dev->rbd_client->client);
2590 	cid.handle = rbd_dev->watch_cookie;
2591 	mutex_unlock(&rbd_dev->watch_mutex);
2592 	return cid;
2593 }
2594 
2595 /*
2596  * lock_rwsem must be held for write
2597  */
2598 static void rbd_set_owner_cid(struct rbd_device *rbd_dev,
2599 			      const struct rbd_client_id *cid)
2600 {
2601 	dout("%s rbd_dev %p %llu-%llu -> %llu-%llu\n", __func__, rbd_dev,
2602 	     rbd_dev->owner_cid.gid, rbd_dev->owner_cid.handle,
2603 	     cid->gid, cid->handle);
2604 	rbd_dev->owner_cid = *cid; /* struct */
2605 }
2606 
2607 static void format_lock_cookie(struct rbd_device *rbd_dev, char *buf)
2608 {
2609 	mutex_lock(&rbd_dev->watch_mutex);
2610 	sprintf(buf, "%s %llu", RBD_LOCK_COOKIE_PREFIX, rbd_dev->watch_cookie);
2611 	mutex_unlock(&rbd_dev->watch_mutex);
2612 }
2613 
2614 static void __rbd_lock(struct rbd_device *rbd_dev, const char *cookie)
2615 {
2616 	struct rbd_client_id cid = rbd_get_cid(rbd_dev);
2617 
2618 	strcpy(rbd_dev->lock_cookie, cookie);
2619 	rbd_set_owner_cid(rbd_dev, &cid);
2620 	queue_work(rbd_dev->task_wq, &rbd_dev->acquired_lock_work);
2621 }
2622 
2623 /*
2624  * lock_rwsem must be held for write
2625  */
2626 static int rbd_lock(struct rbd_device *rbd_dev)
2627 {
2628 	struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2629 	char cookie[32];
2630 	int ret;
2631 
2632 	WARN_ON(__rbd_is_lock_owner(rbd_dev) ||
2633 		rbd_dev->lock_cookie[0] != '\0');
2634 
2635 	format_lock_cookie(rbd_dev, cookie);
2636 	ret = ceph_cls_lock(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
2637 			    RBD_LOCK_NAME, CEPH_CLS_LOCK_EXCLUSIVE, cookie,
2638 			    RBD_LOCK_TAG, "", 0);
2639 	if (ret)
2640 		return ret;
2641 
2642 	rbd_dev->lock_state = RBD_LOCK_STATE_LOCKED;
2643 	__rbd_lock(rbd_dev, cookie);
2644 	return 0;
2645 }
2646 
2647 /*
2648  * lock_rwsem must be held for write
2649  */
2650 static void rbd_unlock(struct rbd_device *rbd_dev)
2651 {
2652 	struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2653 	int ret;
2654 
2655 	WARN_ON(!__rbd_is_lock_owner(rbd_dev) ||
2656 		rbd_dev->lock_cookie[0] == '\0');
2657 
2658 	ret = ceph_cls_unlock(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
2659 			      RBD_LOCK_NAME, rbd_dev->lock_cookie);
2660 	if (ret && ret != -ENOENT)
2661 		rbd_warn(rbd_dev, "failed to unlock: %d", ret);
2662 
2663 	/* treat errors as the image is unlocked */
2664 	rbd_dev->lock_state = RBD_LOCK_STATE_UNLOCKED;
2665 	rbd_dev->lock_cookie[0] = '\0';
2666 	rbd_set_owner_cid(rbd_dev, &rbd_empty_cid);
2667 	queue_work(rbd_dev->task_wq, &rbd_dev->released_lock_work);
2668 }
2669 
2670 static int __rbd_notify_op_lock(struct rbd_device *rbd_dev,
2671 				enum rbd_notify_op notify_op,
2672 				struct page ***preply_pages,
2673 				size_t *preply_len)
2674 {
2675 	struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2676 	struct rbd_client_id cid = rbd_get_cid(rbd_dev);
2677 	char buf[4 + 8 + 8 + CEPH_ENCODING_START_BLK_LEN];
2678 	int buf_size = sizeof(buf);
2679 	void *p = buf;
2680 
2681 	dout("%s rbd_dev %p notify_op %d\n", __func__, rbd_dev, notify_op);
2682 
2683 	/* encode *LockPayload NotifyMessage (op + ClientId) */
2684 	ceph_start_encoding(&p, 2, 1, buf_size - CEPH_ENCODING_START_BLK_LEN);
2685 	ceph_encode_32(&p, notify_op);
2686 	ceph_encode_64(&p, cid.gid);
2687 	ceph_encode_64(&p, cid.handle);
2688 
2689 	return ceph_osdc_notify(osdc, &rbd_dev->header_oid,
2690 				&rbd_dev->header_oloc, buf, buf_size,
2691 				RBD_NOTIFY_TIMEOUT, preply_pages, preply_len);
2692 }
2693 
2694 static void rbd_notify_op_lock(struct rbd_device *rbd_dev,
2695 			       enum rbd_notify_op notify_op)
2696 {
2697 	struct page **reply_pages;
2698 	size_t reply_len;
2699 
2700 	__rbd_notify_op_lock(rbd_dev, notify_op, &reply_pages, &reply_len);
2701 	ceph_release_page_vector(reply_pages, calc_pages_for(0, reply_len));
2702 }
2703 
2704 static void rbd_notify_acquired_lock(struct work_struct *work)
2705 {
2706 	struct rbd_device *rbd_dev = container_of(work, struct rbd_device,
2707 						  acquired_lock_work);
2708 
2709 	rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_ACQUIRED_LOCK);
2710 }
2711 
2712 static void rbd_notify_released_lock(struct work_struct *work)
2713 {
2714 	struct rbd_device *rbd_dev = container_of(work, struct rbd_device,
2715 						  released_lock_work);
2716 
2717 	rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_RELEASED_LOCK);
2718 }
2719 
2720 static int rbd_request_lock(struct rbd_device *rbd_dev)
2721 {
2722 	struct page **reply_pages;
2723 	size_t reply_len;
2724 	bool lock_owner_responded = false;
2725 	int ret;
2726 
2727 	dout("%s rbd_dev %p\n", __func__, rbd_dev);
2728 
2729 	ret = __rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_REQUEST_LOCK,
2730 				   &reply_pages, &reply_len);
2731 	if (ret && ret != -ETIMEDOUT) {
2732 		rbd_warn(rbd_dev, "failed to request lock: %d", ret);
2733 		goto out;
2734 	}
2735 
2736 	if (reply_len > 0 && reply_len <= PAGE_SIZE) {
2737 		void *p = page_address(reply_pages[0]);
2738 		void *const end = p + reply_len;
2739 		u32 n;
2740 
2741 		ceph_decode_32_safe(&p, end, n, e_inval); /* num_acks */
2742 		while (n--) {
2743 			u8 struct_v;
2744 			u32 len;
2745 
2746 			ceph_decode_need(&p, end, 8 + 8, e_inval);
2747 			p += 8 + 8; /* skip gid and cookie */
2748 
2749 			ceph_decode_32_safe(&p, end, len, e_inval);
2750 			if (!len)
2751 				continue;
2752 
2753 			if (lock_owner_responded) {
2754 				rbd_warn(rbd_dev,
2755 					 "duplicate lock owners detected");
2756 				ret = -EIO;
2757 				goto out;
2758 			}
2759 
2760 			lock_owner_responded = true;
2761 			ret = ceph_start_decoding(&p, end, 1, "ResponseMessage",
2762 						  &struct_v, &len);
2763 			if (ret) {
2764 				rbd_warn(rbd_dev,
2765 					 "failed to decode ResponseMessage: %d",
2766 					 ret);
2767 				goto e_inval;
2768 			}
2769 
2770 			ret = ceph_decode_32(&p);
2771 		}
2772 	}
2773 
2774 	if (!lock_owner_responded) {
2775 		rbd_warn(rbd_dev, "no lock owners detected");
2776 		ret = -ETIMEDOUT;
2777 	}
2778 
2779 out:
2780 	ceph_release_page_vector(reply_pages, calc_pages_for(0, reply_len));
2781 	return ret;
2782 
2783 e_inval:
2784 	ret = -EINVAL;
2785 	goto out;
2786 }
2787 
2788 static void wake_requests(struct rbd_device *rbd_dev, bool wake_all)
2789 {
2790 	dout("%s rbd_dev %p wake_all %d\n", __func__, rbd_dev, wake_all);
2791 
2792 	cancel_delayed_work(&rbd_dev->lock_dwork);
2793 	if (wake_all)
2794 		wake_up_all(&rbd_dev->lock_waitq);
2795 	else
2796 		wake_up(&rbd_dev->lock_waitq);
2797 }
2798 
2799 static int get_lock_owner_info(struct rbd_device *rbd_dev,
2800 			       struct ceph_locker **lockers, u32 *num_lockers)
2801 {
2802 	struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2803 	u8 lock_type;
2804 	char *lock_tag;
2805 	int ret;
2806 
2807 	dout("%s rbd_dev %p\n", __func__, rbd_dev);
2808 
2809 	ret = ceph_cls_lock_info(osdc, &rbd_dev->header_oid,
2810 				 &rbd_dev->header_oloc, RBD_LOCK_NAME,
2811 				 &lock_type, &lock_tag, lockers, num_lockers);
2812 	if (ret)
2813 		return ret;
2814 
2815 	if (*num_lockers == 0) {
2816 		dout("%s rbd_dev %p no lockers detected\n", __func__, rbd_dev);
2817 		goto out;
2818 	}
2819 
2820 	if (strcmp(lock_tag, RBD_LOCK_TAG)) {
2821 		rbd_warn(rbd_dev, "locked by external mechanism, tag %s",
2822 			 lock_tag);
2823 		ret = -EBUSY;
2824 		goto out;
2825 	}
2826 
2827 	if (lock_type == CEPH_CLS_LOCK_SHARED) {
2828 		rbd_warn(rbd_dev, "shared lock type detected");
2829 		ret = -EBUSY;
2830 		goto out;
2831 	}
2832 
2833 	if (strncmp((*lockers)[0].id.cookie, RBD_LOCK_COOKIE_PREFIX,
2834 		    strlen(RBD_LOCK_COOKIE_PREFIX))) {
2835 		rbd_warn(rbd_dev, "locked by external mechanism, cookie %s",
2836 			 (*lockers)[0].id.cookie);
2837 		ret = -EBUSY;
2838 		goto out;
2839 	}
2840 
2841 out:
2842 	kfree(lock_tag);
2843 	return ret;
2844 }
2845 
2846 static int find_watcher(struct rbd_device *rbd_dev,
2847 			const struct ceph_locker *locker)
2848 {
2849 	struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2850 	struct ceph_watch_item *watchers;
2851 	u32 num_watchers;
2852 	u64 cookie;
2853 	int i;
2854 	int ret;
2855 
2856 	ret = ceph_osdc_list_watchers(osdc, &rbd_dev->header_oid,
2857 				      &rbd_dev->header_oloc, &watchers,
2858 				      &num_watchers);
2859 	if (ret)
2860 		return ret;
2861 
2862 	sscanf(locker->id.cookie, RBD_LOCK_COOKIE_PREFIX " %llu", &cookie);
2863 	for (i = 0; i < num_watchers; i++) {
2864 		if (!memcmp(&watchers[i].addr, &locker->info.addr,
2865 			    sizeof(locker->info.addr)) &&
2866 		    watchers[i].cookie == cookie) {
2867 			struct rbd_client_id cid = {
2868 				.gid = le64_to_cpu(watchers[i].name.num),
2869 				.handle = cookie,
2870 			};
2871 
2872 			dout("%s rbd_dev %p found cid %llu-%llu\n", __func__,
2873 			     rbd_dev, cid.gid, cid.handle);
2874 			rbd_set_owner_cid(rbd_dev, &cid);
2875 			ret = 1;
2876 			goto out;
2877 		}
2878 	}
2879 
2880 	dout("%s rbd_dev %p no watchers\n", __func__, rbd_dev);
2881 	ret = 0;
2882 out:
2883 	kfree(watchers);
2884 	return ret;
2885 }
2886 
2887 /*
2888  * lock_rwsem must be held for write
2889  */
2890 static int rbd_try_lock(struct rbd_device *rbd_dev)
2891 {
2892 	struct ceph_client *client = rbd_dev->rbd_client->client;
2893 	struct ceph_locker *lockers;
2894 	u32 num_lockers;
2895 	int ret;
2896 
2897 	for (;;) {
2898 		ret = rbd_lock(rbd_dev);
2899 		if (ret != -EBUSY)
2900 			return ret;
2901 
2902 		/* determine if the current lock holder is still alive */
2903 		ret = get_lock_owner_info(rbd_dev, &lockers, &num_lockers);
2904 		if (ret)
2905 			return ret;
2906 
2907 		if (num_lockers == 0)
2908 			goto again;
2909 
2910 		ret = find_watcher(rbd_dev, lockers);
2911 		if (ret) {
2912 			if (ret > 0)
2913 				ret = 0; /* have to request lock */
2914 			goto out;
2915 		}
2916 
2917 		rbd_warn(rbd_dev, "%s%llu seems dead, breaking lock",
2918 			 ENTITY_NAME(lockers[0].id.name));
2919 
2920 		ret = ceph_monc_blacklist_add(&client->monc,
2921 					      &lockers[0].info.addr);
2922 		if (ret) {
2923 			rbd_warn(rbd_dev, "blacklist of %s%llu failed: %d",
2924 				 ENTITY_NAME(lockers[0].id.name), ret);
2925 			goto out;
2926 		}
2927 
2928 		ret = ceph_cls_break_lock(&client->osdc, &rbd_dev->header_oid,
2929 					  &rbd_dev->header_oloc, RBD_LOCK_NAME,
2930 					  lockers[0].id.cookie,
2931 					  &lockers[0].id.name);
2932 		if (ret && ret != -ENOENT)
2933 			goto out;
2934 
2935 again:
2936 		ceph_free_lockers(lockers, num_lockers);
2937 	}
2938 
2939 out:
2940 	ceph_free_lockers(lockers, num_lockers);
2941 	return ret;
2942 }
2943 
2944 /*
2945  * ret is set only if lock_state is RBD_LOCK_STATE_UNLOCKED
2946  */
2947 static enum rbd_lock_state rbd_try_acquire_lock(struct rbd_device *rbd_dev,
2948 						int *pret)
2949 {
2950 	enum rbd_lock_state lock_state;
2951 
2952 	down_read(&rbd_dev->lock_rwsem);
2953 	dout("%s rbd_dev %p read lock_state %d\n", __func__, rbd_dev,
2954 	     rbd_dev->lock_state);
2955 	if (__rbd_is_lock_owner(rbd_dev)) {
2956 		lock_state = rbd_dev->lock_state;
2957 		up_read(&rbd_dev->lock_rwsem);
2958 		return lock_state;
2959 	}
2960 
2961 	up_read(&rbd_dev->lock_rwsem);
2962 	down_write(&rbd_dev->lock_rwsem);
2963 	dout("%s rbd_dev %p write lock_state %d\n", __func__, rbd_dev,
2964 	     rbd_dev->lock_state);
2965 	if (!__rbd_is_lock_owner(rbd_dev)) {
2966 		*pret = rbd_try_lock(rbd_dev);
2967 		if (*pret)
2968 			rbd_warn(rbd_dev, "failed to acquire lock: %d", *pret);
2969 	}
2970 
2971 	lock_state = rbd_dev->lock_state;
2972 	up_write(&rbd_dev->lock_rwsem);
2973 	return lock_state;
2974 }
2975 
2976 static void rbd_acquire_lock(struct work_struct *work)
2977 {
2978 	struct rbd_device *rbd_dev = container_of(to_delayed_work(work),
2979 					    struct rbd_device, lock_dwork);
2980 	enum rbd_lock_state lock_state;
2981 	int ret = 0;
2982 
2983 	dout("%s rbd_dev %p\n", __func__, rbd_dev);
2984 again:
2985 	lock_state = rbd_try_acquire_lock(rbd_dev, &ret);
2986 	if (lock_state != RBD_LOCK_STATE_UNLOCKED || ret == -EBLACKLISTED) {
2987 		if (lock_state == RBD_LOCK_STATE_LOCKED)
2988 			wake_requests(rbd_dev, true);
2989 		dout("%s rbd_dev %p lock_state %d ret %d - done\n", __func__,
2990 		     rbd_dev, lock_state, ret);
2991 		return;
2992 	}
2993 
2994 	ret = rbd_request_lock(rbd_dev);
2995 	if (ret == -ETIMEDOUT) {
2996 		goto again; /* treat this as a dead client */
2997 	} else if (ret == -EROFS) {
2998 		rbd_warn(rbd_dev, "peer will not release lock");
2999 		/*
3000 		 * If this is rbd_add_acquire_lock(), we want to fail
3001 		 * immediately -- reuse BLACKLISTED flag.  Otherwise we
3002 		 * want to block.
3003 		 */
3004 		if (!(rbd_dev->disk->flags & GENHD_FL_UP)) {
3005 			set_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags);
3006 			/* wake "rbd map --exclusive" process */
3007 			wake_requests(rbd_dev, false);
3008 		}
3009 	} else if (ret < 0) {
3010 		rbd_warn(rbd_dev, "error requesting lock: %d", ret);
3011 		mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork,
3012 				 RBD_RETRY_DELAY);
3013 	} else {
3014 		/*
3015 		 * lock owner acked, but resend if we don't see them
3016 		 * release the lock
3017 		 */
3018 		dout("%s rbd_dev %p requeueing lock_dwork\n", __func__,
3019 		     rbd_dev);
3020 		mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork,
3021 		    msecs_to_jiffies(2 * RBD_NOTIFY_TIMEOUT * MSEC_PER_SEC));
3022 	}
3023 }
3024 
3025 /*
3026  * lock_rwsem must be held for write
3027  */
3028 static bool rbd_release_lock(struct rbd_device *rbd_dev)
3029 {
3030 	dout("%s rbd_dev %p read lock_state %d\n", __func__, rbd_dev,
3031 	     rbd_dev->lock_state);
3032 	if (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED)
3033 		return false;
3034 
3035 	rbd_dev->lock_state = RBD_LOCK_STATE_RELEASING;
3036 	downgrade_write(&rbd_dev->lock_rwsem);
3037 	/*
3038 	 * Ensure that all in-flight IO is flushed.
3039 	 *
3040 	 * FIXME: ceph_osdc_sync() flushes the entire OSD client, which
3041 	 * may be shared with other devices.
3042 	 */
3043 	ceph_osdc_sync(&rbd_dev->rbd_client->client->osdc);
3044 	up_read(&rbd_dev->lock_rwsem);
3045 
3046 	down_write(&rbd_dev->lock_rwsem);
3047 	dout("%s rbd_dev %p write lock_state %d\n", __func__, rbd_dev,
3048 	     rbd_dev->lock_state);
3049 	if (rbd_dev->lock_state != RBD_LOCK_STATE_RELEASING)
3050 		return false;
3051 
3052 	rbd_unlock(rbd_dev);
3053 	/*
3054 	 * Give others a chance to grab the lock - we would re-acquire
3055 	 * almost immediately if we got new IO during ceph_osdc_sync()
3056 	 * otherwise.  We need to ack our own notifications, so this
3057 	 * lock_dwork will be requeued from rbd_wait_state_locked()
3058 	 * after wake_requests() in rbd_handle_released_lock().
3059 	 */
3060 	cancel_delayed_work(&rbd_dev->lock_dwork);
3061 	return true;
3062 }
3063 
3064 static void rbd_release_lock_work(struct work_struct *work)
3065 {
3066 	struct rbd_device *rbd_dev = container_of(work, struct rbd_device,
3067 						  unlock_work);
3068 
3069 	down_write(&rbd_dev->lock_rwsem);
3070 	rbd_release_lock(rbd_dev);
3071 	up_write(&rbd_dev->lock_rwsem);
3072 }
3073 
3074 static void rbd_handle_acquired_lock(struct rbd_device *rbd_dev, u8 struct_v,
3075 				     void **p)
3076 {
3077 	struct rbd_client_id cid = { 0 };
3078 
3079 	if (struct_v >= 2) {
3080 		cid.gid = ceph_decode_64(p);
3081 		cid.handle = ceph_decode_64(p);
3082 	}
3083 
3084 	dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid,
3085 	     cid.handle);
3086 	if (!rbd_cid_equal(&cid, &rbd_empty_cid)) {
3087 		down_write(&rbd_dev->lock_rwsem);
3088 		if (rbd_cid_equal(&cid, &rbd_dev->owner_cid)) {
3089 			/*
3090 			 * we already know that the remote client is
3091 			 * the owner
3092 			 */
3093 			up_write(&rbd_dev->lock_rwsem);
3094 			return;
3095 		}
3096 
3097 		rbd_set_owner_cid(rbd_dev, &cid);
3098 		downgrade_write(&rbd_dev->lock_rwsem);
3099 	} else {
3100 		down_read(&rbd_dev->lock_rwsem);
3101 	}
3102 
3103 	if (!__rbd_is_lock_owner(rbd_dev))
3104 		wake_requests(rbd_dev, false);
3105 	up_read(&rbd_dev->lock_rwsem);
3106 }
3107 
3108 static void rbd_handle_released_lock(struct rbd_device *rbd_dev, u8 struct_v,
3109 				     void **p)
3110 {
3111 	struct rbd_client_id cid = { 0 };
3112 
3113 	if (struct_v >= 2) {
3114 		cid.gid = ceph_decode_64(p);
3115 		cid.handle = ceph_decode_64(p);
3116 	}
3117 
3118 	dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid,
3119 	     cid.handle);
3120 	if (!rbd_cid_equal(&cid, &rbd_empty_cid)) {
3121 		down_write(&rbd_dev->lock_rwsem);
3122 		if (!rbd_cid_equal(&cid, &rbd_dev->owner_cid)) {
3123 			dout("%s rbd_dev %p unexpected owner, cid %llu-%llu != owner_cid %llu-%llu\n",
3124 			     __func__, rbd_dev, cid.gid, cid.handle,
3125 			     rbd_dev->owner_cid.gid, rbd_dev->owner_cid.handle);
3126 			up_write(&rbd_dev->lock_rwsem);
3127 			return;
3128 		}
3129 
3130 		rbd_set_owner_cid(rbd_dev, &rbd_empty_cid);
3131 		downgrade_write(&rbd_dev->lock_rwsem);
3132 	} else {
3133 		down_read(&rbd_dev->lock_rwsem);
3134 	}
3135 
3136 	if (!__rbd_is_lock_owner(rbd_dev))
3137 		wake_requests(rbd_dev, false);
3138 	up_read(&rbd_dev->lock_rwsem);
3139 }
3140 
3141 /*
3142  * Returns result for ResponseMessage to be encoded (<= 0), or 1 if no
3143  * ResponseMessage is needed.
3144  */
3145 static int rbd_handle_request_lock(struct rbd_device *rbd_dev, u8 struct_v,
3146 				   void **p)
3147 {
3148 	struct rbd_client_id my_cid = rbd_get_cid(rbd_dev);
3149 	struct rbd_client_id cid = { 0 };
3150 	int result = 1;
3151 
3152 	if (struct_v >= 2) {
3153 		cid.gid = ceph_decode_64(p);
3154 		cid.handle = ceph_decode_64(p);
3155 	}
3156 
3157 	dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid,
3158 	     cid.handle);
3159 	if (rbd_cid_equal(&cid, &my_cid))
3160 		return result;
3161 
3162 	down_read(&rbd_dev->lock_rwsem);
3163 	if (__rbd_is_lock_owner(rbd_dev)) {
3164 		if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED &&
3165 		    rbd_cid_equal(&rbd_dev->owner_cid, &rbd_empty_cid))
3166 			goto out_unlock;
3167 
3168 		/*
3169 		 * encode ResponseMessage(0) so the peer can detect
3170 		 * a missing owner
3171 		 */
3172 		result = 0;
3173 
3174 		if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED) {
3175 			if (!rbd_dev->opts->exclusive) {
3176 				dout("%s rbd_dev %p queueing unlock_work\n",
3177 				     __func__, rbd_dev);
3178 				queue_work(rbd_dev->task_wq,
3179 					   &rbd_dev->unlock_work);
3180 			} else {
3181 				/* refuse to release the lock */
3182 				result = -EROFS;
3183 			}
3184 		}
3185 	}
3186 
3187 out_unlock:
3188 	up_read(&rbd_dev->lock_rwsem);
3189 	return result;
3190 }
3191 
3192 static void __rbd_acknowledge_notify(struct rbd_device *rbd_dev,
3193 				     u64 notify_id, u64 cookie, s32 *result)
3194 {
3195 	struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3196 	char buf[4 + CEPH_ENCODING_START_BLK_LEN];
3197 	int buf_size = sizeof(buf);
3198 	int ret;
3199 
3200 	if (result) {
3201 		void *p = buf;
3202 
3203 		/* encode ResponseMessage */
3204 		ceph_start_encoding(&p, 1, 1,
3205 				    buf_size - CEPH_ENCODING_START_BLK_LEN);
3206 		ceph_encode_32(&p, *result);
3207 	} else {
3208 		buf_size = 0;
3209 	}
3210 
3211 	ret = ceph_osdc_notify_ack(osdc, &rbd_dev->header_oid,
3212 				   &rbd_dev->header_oloc, notify_id, cookie,
3213 				   buf, buf_size);
3214 	if (ret)
3215 		rbd_warn(rbd_dev, "acknowledge_notify failed: %d", ret);
3216 }
3217 
3218 static void rbd_acknowledge_notify(struct rbd_device *rbd_dev, u64 notify_id,
3219 				   u64 cookie)
3220 {
3221 	dout("%s rbd_dev %p\n", __func__, rbd_dev);
3222 	__rbd_acknowledge_notify(rbd_dev, notify_id, cookie, NULL);
3223 }
3224 
3225 static void rbd_acknowledge_notify_result(struct rbd_device *rbd_dev,
3226 					  u64 notify_id, u64 cookie, s32 result)
3227 {
3228 	dout("%s rbd_dev %p result %d\n", __func__, rbd_dev, result);
3229 	__rbd_acknowledge_notify(rbd_dev, notify_id, cookie, &result);
3230 }
3231 
3232 static void rbd_watch_cb(void *arg, u64 notify_id, u64 cookie,
3233 			 u64 notifier_id, void *data, size_t data_len)
3234 {
3235 	struct rbd_device *rbd_dev = arg;
3236 	void *p = data;
3237 	void *const end = p + data_len;
3238 	u8 struct_v = 0;
3239 	u32 len;
3240 	u32 notify_op;
3241 	int ret;
3242 
3243 	dout("%s rbd_dev %p cookie %llu notify_id %llu data_len %zu\n",
3244 	     __func__, rbd_dev, cookie, notify_id, data_len);
3245 	if (data_len) {
3246 		ret = ceph_start_decoding(&p, end, 1, "NotifyMessage",
3247 					  &struct_v, &len);
3248 		if (ret) {
3249 			rbd_warn(rbd_dev, "failed to decode NotifyMessage: %d",
3250 				 ret);
3251 			return;
3252 		}
3253 
3254 		notify_op = ceph_decode_32(&p);
3255 	} else {
3256 		/* legacy notification for header updates */
3257 		notify_op = RBD_NOTIFY_OP_HEADER_UPDATE;
3258 		len = 0;
3259 	}
3260 
3261 	dout("%s rbd_dev %p notify_op %u\n", __func__, rbd_dev, notify_op);
3262 	switch (notify_op) {
3263 	case RBD_NOTIFY_OP_ACQUIRED_LOCK:
3264 		rbd_handle_acquired_lock(rbd_dev, struct_v, &p);
3265 		rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
3266 		break;
3267 	case RBD_NOTIFY_OP_RELEASED_LOCK:
3268 		rbd_handle_released_lock(rbd_dev, struct_v, &p);
3269 		rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
3270 		break;
3271 	case RBD_NOTIFY_OP_REQUEST_LOCK:
3272 		ret = rbd_handle_request_lock(rbd_dev, struct_v, &p);
3273 		if (ret <= 0)
3274 			rbd_acknowledge_notify_result(rbd_dev, notify_id,
3275 						      cookie, ret);
3276 		else
3277 			rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
3278 		break;
3279 	case RBD_NOTIFY_OP_HEADER_UPDATE:
3280 		ret = rbd_dev_refresh(rbd_dev);
3281 		if (ret)
3282 			rbd_warn(rbd_dev, "refresh failed: %d", ret);
3283 
3284 		rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
3285 		break;
3286 	default:
3287 		if (rbd_is_lock_owner(rbd_dev))
3288 			rbd_acknowledge_notify_result(rbd_dev, notify_id,
3289 						      cookie, -EOPNOTSUPP);
3290 		else
3291 			rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
3292 		break;
3293 	}
3294 }
3295 
3296 static void __rbd_unregister_watch(struct rbd_device *rbd_dev);
3297 
3298 static void rbd_watch_errcb(void *arg, u64 cookie, int err)
3299 {
3300 	struct rbd_device *rbd_dev = arg;
3301 
3302 	rbd_warn(rbd_dev, "encountered watch error: %d", err);
3303 
3304 	down_write(&rbd_dev->lock_rwsem);
3305 	rbd_set_owner_cid(rbd_dev, &rbd_empty_cid);
3306 	up_write(&rbd_dev->lock_rwsem);
3307 
3308 	mutex_lock(&rbd_dev->watch_mutex);
3309 	if (rbd_dev->watch_state == RBD_WATCH_STATE_REGISTERED) {
3310 		__rbd_unregister_watch(rbd_dev);
3311 		rbd_dev->watch_state = RBD_WATCH_STATE_ERROR;
3312 
3313 		queue_delayed_work(rbd_dev->task_wq, &rbd_dev->watch_dwork, 0);
3314 	}
3315 	mutex_unlock(&rbd_dev->watch_mutex);
3316 }
3317 
3318 /*
3319  * watch_mutex must be locked
3320  */
3321 static int __rbd_register_watch(struct rbd_device *rbd_dev)
3322 {
3323 	struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3324 	struct ceph_osd_linger_request *handle;
3325 
3326 	rbd_assert(!rbd_dev->watch_handle);
3327 	dout("%s rbd_dev %p\n", __func__, rbd_dev);
3328 
3329 	handle = ceph_osdc_watch(osdc, &rbd_dev->header_oid,
3330 				 &rbd_dev->header_oloc, rbd_watch_cb,
3331 				 rbd_watch_errcb, rbd_dev);
3332 	if (IS_ERR(handle))
3333 		return PTR_ERR(handle);
3334 
3335 	rbd_dev->watch_handle = handle;
3336 	return 0;
3337 }
3338 
3339 /*
3340  * watch_mutex must be locked
3341  */
3342 static void __rbd_unregister_watch(struct rbd_device *rbd_dev)
3343 {
3344 	struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3345 	int ret;
3346 
3347 	rbd_assert(rbd_dev->watch_handle);
3348 	dout("%s rbd_dev %p\n", __func__, rbd_dev);
3349 
3350 	ret = ceph_osdc_unwatch(osdc, rbd_dev->watch_handle);
3351 	if (ret)
3352 		rbd_warn(rbd_dev, "failed to unwatch: %d", ret);
3353 
3354 	rbd_dev->watch_handle = NULL;
3355 }
3356 
3357 static int rbd_register_watch(struct rbd_device *rbd_dev)
3358 {
3359 	int ret;
3360 
3361 	mutex_lock(&rbd_dev->watch_mutex);
3362 	rbd_assert(rbd_dev->watch_state == RBD_WATCH_STATE_UNREGISTERED);
3363 	ret = __rbd_register_watch(rbd_dev);
3364 	if (ret)
3365 		goto out;
3366 
3367 	rbd_dev->watch_state = RBD_WATCH_STATE_REGISTERED;
3368 	rbd_dev->watch_cookie = rbd_dev->watch_handle->linger_id;
3369 
3370 out:
3371 	mutex_unlock(&rbd_dev->watch_mutex);
3372 	return ret;
3373 }
3374 
3375 static void cancel_tasks_sync(struct rbd_device *rbd_dev)
3376 {
3377 	dout("%s rbd_dev %p\n", __func__, rbd_dev);
3378 
3379 	cancel_delayed_work_sync(&rbd_dev->watch_dwork);
3380 	cancel_work_sync(&rbd_dev->acquired_lock_work);
3381 	cancel_work_sync(&rbd_dev->released_lock_work);
3382 	cancel_delayed_work_sync(&rbd_dev->lock_dwork);
3383 	cancel_work_sync(&rbd_dev->unlock_work);
3384 }
3385 
3386 static void rbd_unregister_watch(struct rbd_device *rbd_dev)
3387 {
3388 	WARN_ON(waitqueue_active(&rbd_dev->lock_waitq));
3389 	cancel_tasks_sync(rbd_dev);
3390 
3391 	mutex_lock(&rbd_dev->watch_mutex);
3392 	if (rbd_dev->watch_state == RBD_WATCH_STATE_REGISTERED)
3393 		__rbd_unregister_watch(rbd_dev);
3394 	rbd_dev->watch_state = RBD_WATCH_STATE_UNREGISTERED;
3395 	mutex_unlock(&rbd_dev->watch_mutex);
3396 
3397 	ceph_osdc_flush_notifies(&rbd_dev->rbd_client->client->osdc);
3398 }
3399 
3400 /*
3401  * lock_rwsem must be held for write
3402  */
3403 static void rbd_reacquire_lock(struct rbd_device *rbd_dev)
3404 {
3405 	struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3406 	char cookie[32];
3407 	int ret;
3408 
3409 	WARN_ON(rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED);
3410 
3411 	format_lock_cookie(rbd_dev, cookie);
3412 	ret = ceph_cls_set_cookie(osdc, &rbd_dev->header_oid,
3413 				  &rbd_dev->header_oloc, RBD_LOCK_NAME,
3414 				  CEPH_CLS_LOCK_EXCLUSIVE, rbd_dev->lock_cookie,
3415 				  RBD_LOCK_TAG, cookie);
3416 	if (ret) {
3417 		if (ret != -EOPNOTSUPP)
3418 			rbd_warn(rbd_dev, "failed to update lock cookie: %d",
3419 				 ret);
3420 
3421 		/*
3422 		 * Lock cookie cannot be updated on older OSDs, so do
3423 		 * a manual release and queue an acquire.
3424 		 */
3425 		if (rbd_release_lock(rbd_dev))
3426 			queue_delayed_work(rbd_dev->task_wq,
3427 					   &rbd_dev->lock_dwork, 0);
3428 	} else {
3429 		__rbd_lock(rbd_dev, cookie);
3430 	}
3431 }
3432 
3433 static void rbd_reregister_watch(struct work_struct *work)
3434 {
3435 	struct rbd_device *rbd_dev = container_of(to_delayed_work(work),
3436 					    struct rbd_device, watch_dwork);
3437 	int ret;
3438 
3439 	dout("%s rbd_dev %p\n", __func__, rbd_dev);
3440 
3441 	mutex_lock(&rbd_dev->watch_mutex);
3442 	if (rbd_dev->watch_state != RBD_WATCH_STATE_ERROR) {
3443 		mutex_unlock(&rbd_dev->watch_mutex);
3444 		return;
3445 	}
3446 
3447 	ret = __rbd_register_watch(rbd_dev);
3448 	if (ret) {
3449 		rbd_warn(rbd_dev, "failed to reregister watch: %d", ret);
3450 		if (ret == -EBLACKLISTED || ret == -ENOENT) {
3451 			set_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags);
3452 			wake_requests(rbd_dev, true);
3453 		} else {
3454 			queue_delayed_work(rbd_dev->task_wq,
3455 					   &rbd_dev->watch_dwork,
3456 					   RBD_RETRY_DELAY);
3457 		}
3458 		mutex_unlock(&rbd_dev->watch_mutex);
3459 		return;
3460 	}
3461 
3462 	rbd_dev->watch_state = RBD_WATCH_STATE_REGISTERED;
3463 	rbd_dev->watch_cookie = rbd_dev->watch_handle->linger_id;
3464 	mutex_unlock(&rbd_dev->watch_mutex);
3465 
3466 	down_write(&rbd_dev->lock_rwsem);
3467 	if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED)
3468 		rbd_reacquire_lock(rbd_dev);
3469 	up_write(&rbd_dev->lock_rwsem);
3470 
3471 	ret = rbd_dev_refresh(rbd_dev);
3472 	if (ret)
3473 		rbd_warn(rbd_dev, "reregistration refresh failed: %d", ret);
3474 }
3475 
3476 /*
3477  * Synchronous osd object method call.  Returns the number of bytes
3478  * returned in the outbound buffer, or a negative error code.
3479  */
3480 static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
3481 			     struct ceph_object_id *oid,
3482 			     struct ceph_object_locator *oloc,
3483 			     const char *method_name,
3484 			     const void *outbound,
3485 			     size_t outbound_size,
3486 			     void *inbound,
3487 			     size_t inbound_size)
3488 {
3489 	struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3490 	struct page *req_page = NULL;
3491 	struct page *reply_page;
3492 	int ret;
3493 
3494 	/*
3495 	 * Method calls are ultimately read operations.  The result
3496 	 * should placed into the inbound buffer provided.  They
3497 	 * also supply outbound data--parameters for the object
3498 	 * method.  Currently if this is present it will be a
3499 	 * snapshot id.
3500 	 */
3501 	if (outbound) {
3502 		if (outbound_size > PAGE_SIZE)
3503 			return -E2BIG;
3504 
3505 		req_page = alloc_page(GFP_KERNEL);
3506 		if (!req_page)
3507 			return -ENOMEM;
3508 
3509 		memcpy(page_address(req_page), outbound, outbound_size);
3510 	}
3511 
3512 	reply_page = alloc_page(GFP_KERNEL);
3513 	if (!reply_page) {
3514 		if (req_page)
3515 			__free_page(req_page);
3516 		return -ENOMEM;
3517 	}
3518 
3519 	ret = ceph_osdc_call(osdc, oid, oloc, RBD_DRV_NAME, method_name,
3520 			     CEPH_OSD_FLAG_READ, req_page, outbound_size,
3521 			     reply_page, &inbound_size);
3522 	if (!ret) {
3523 		memcpy(inbound, page_address(reply_page), inbound_size);
3524 		ret = inbound_size;
3525 	}
3526 
3527 	if (req_page)
3528 		__free_page(req_page);
3529 	__free_page(reply_page);
3530 	return ret;
3531 }
3532 
3533 /*
3534  * lock_rwsem must be held for read
3535  */
3536 static void rbd_wait_state_locked(struct rbd_device *rbd_dev)
3537 {
3538 	DEFINE_WAIT(wait);
3539 
3540 	do {
3541 		/*
3542 		 * Note the use of mod_delayed_work() in rbd_acquire_lock()
3543 		 * and cancel_delayed_work() in wake_requests().
3544 		 */
3545 		dout("%s rbd_dev %p queueing lock_dwork\n", __func__, rbd_dev);
3546 		queue_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 0);
3547 		prepare_to_wait_exclusive(&rbd_dev->lock_waitq, &wait,
3548 					  TASK_UNINTERRUPTIBLE);
3549 		up_read(&rbd_dev->lock_rwsem);
3550 		schedule();
3551 		down_read(&rbd_dev->lock_rwsem);
3552 	} while (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED &&
3553 		 !test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags));
3554 
3555 	finish_wait(&rbd_dev->lock_waitq, &wait);
3556 }
3557 
3558 static void rbd_queue_workfn(struct work_struct *work)
3559 {
3560 	struct request *rq = blk_mq_rq_from_pdu(work);
3561 	struct rbd_device *rbd_dev = rq->q->queuedata;
3562 	struct rbd_img_request *img_request;
3563 	struct ceph_snap_context *snapc = NULL;
3564 	u64 offset = (u64)blk_rq_pos(rq) << SECTOR_SHIFT;
3565 	u64 length = blk_rq_bytes(rq);
3566 	enum obj_operation_type op_type;
3567 	u64 mapping_size;
3568 	bool must_be_locked;
3569 	int result;
3570 
3571 	switch (req_op(rq)) {
3572 	case REQ_OP_DISCARD:
3573 	case REQ_OP_WRITE_ZEROES:
3574 		op_type = OBJ_OP_DISCARD;
3575 		break;
3576 	case REQ_OP_WRITE:
3577 		op_type = OBJ_OP_WRITE;
3578 		break;
3579 	case REQ_OP_READ:
3580 		op_type = OBJ_OP_READ;
3581 		break;
3582 	default:
3583 		dout("%s: non-fs request type %d\n", __func__, req_op(rq));
3584 		result = -EIO;
3585 		goto err;
3586 	}
3587 
3588 	/* Ignore/skip any zero-length requests */
3589 
3590 	if (!length) {
3591 		dout("%s: zero-length request\n", __func__);
3592 		result = 0;
3593 		goto err_rq;
3594 	}
3595 
3596 	rbd_assert(op_type == OBJ_OP_READ ||
3597 		   rbd_dev->spec->snap_id == CEPH_NOSNAP);
3598 
3599 	/*
3600 	 * Quit early if the mapped snapshot no longer exists.  It's
3601 	 * still possible the snapshot will have disappeared by the
3602 	 * time our request arrives at the osd, but there's no sense in
3603 	 * sending it if we already know.
3604 	 */
3605 	if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) {
3606 		dout("request for non-existent snapshot");
3607 		rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
3608 		result = -ENXIO;
3609 		goto err_rq;
3610 	}
3611 
3612 	if (offset && length > U64_MAX - offset + 1) {
3613 		rbd_warn(rbd_dev, "bad request range (%llu~%llu)", offset,
3614 			 length);
3615 		result = -EINVAL;
3616 		goto err_rq;	/* Shouldn't happen */
3617 	}
3618 
3619 	blk_mq_start_request(rq);
3620 
3621 	down_read(&rbd_dev->header_rwsem);
3622 	mapping_size = rbd_dev->mapping.size;
3623 	if (op_type != OBJ_OP_READ) {
3624 		snapc = rbd_dev->header.snapc;
3625 		ceph_get_snap_context(snapc);
3626 	}
3627 	up_read(&rbd_dev->header_rwsem);
3628 
3629 	if (offset + length > mapping_size) {
3630 		rbd_warn(rbd_dev, "beyond EOD (%llu~%llu > %llu)", offset,
3631 			 length, mapping_size);
3632 		result = -EIO;
3633 		goto err_rq;
3634 	}
3635 
3636 	must_be_locked =
3637 	    (rbd_dev->header.features & RBD_FEATURE_EXCLUSIVE_LOCK) &&
3638 	    (op_type != OBJ_OP_READ || rbd_dev->opts->lock_on_read);
3639 	if (must_be_locked) {
3640 		down_read(&rbd_dev->lock_rwsem);
3641 		if (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED &&
3642 		    !test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags)) {
3643 			if (rbd_dev->opts->exclusive) {
3644 				rbd_warn(rbd_dev, "exclusive lock required");
3645 				result = -EROFS;
3646 				goto err_unlock;
3647 			}
3648 			rbd_wait_state_locked(rbd_dev);
3649 		}
3650 		if (test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags)) {
3651 			result = -EBLACKLISTED;
3652 			goto err_unlock;
3653 		}
3654 	}
3655 
3656 	img_request = rbd_img_request_create(rbd_dev, op_type, snapc);
3657 	if (!img_request) {
3658 		result = -ENOMEM;
3659 		goto err_unlock;
3660 	}
3661 	img_request->rq = rq;
3662 	snapc = NULL; /* img_request consumes a ref */
3663 
3664 	if (op_type == OBJ_OP_DISCARD)
3665 		result = rbd_img_fill_nodata(img_request, offset, length);
3666 	else
3667 		result = rbd_img_fill_from_bio(img_request, offset, length,
3668 					       rq->bio);
3669 	if (result)
3670 		goto err_img_request;
3671 
3672 	rbd_img_request_submit(img_request);
3673 	if (must_be_locked)
3674 		up_read(&rbd_dev->lock_rwsem);
3675 	return;
3676 
3677 err_img_request:
3678 	rbd_img_request_put(img_request);
3679 err_unlock:
3680 	if (must_be_locked)
3681 		up_read(&rbd_dev->lock_rwsem);
3682 err_rq:
3683 	if (result)
3684 		rbd_warn(rbd_dev, "%s %llx at %llx result %d",
3685 			 obj_op_name(op_type), length, offset, result);
3686 	ceph_put_snap_context(snapc);
3687 err:
3688 	blk_mq_end_request(rq, errno_to_blk_status(result));
3689 }
3690 
3691 static blk_status_t rbd_queue_rq(struct blk_mq_hw_ctx *hctx,
3692 		const struct blk_mq_queue_data *bd)
3693 {
3694 	struct request *rq = bd->rq;
3695 	struct work_struct *work = blk_mq_rq_to_pdu(rq);
3696 
3697 	queue_work(rbd_wq, work);
3698 	return BLK_STS_OK;
3699 }
3700 
3701 static void rbd_free_disk(struct rbd_device *rbd_dev)
3702 {
3703 	blk_cleanup_queue(rbd_dev->disk->queue);
3704 	blk_mq_free_tag_set(&rbd_dev->tag_set);
3705 	put_disk(rbd_dev->disk);
3706 	rbd_dev->disk = NULL;
3707 }
3708 
3709 static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
3710 			     struct ceph_object_id *oid,
3711 			     struct ceph_object_locator *oloc,
3712 			     void *buf, int buf_len)
3713 
3714 {
3715 	struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3716 	struct ceph_osd_request *req;
3717 	struct page **pages;
3718 	int num_pages = calc_pages_for(0, buf_len);
3719 	int ret;
3720 
3721 	req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_KERNEL);
3722 	if (!req)
3723 		return -ENOMEM;
3724 
3725 	ceph_oid_copy(&req->r_base_oid, oid);
3726 	ceph_oloc_copy(&req->r_base_oloc, oloc);
3727 	req->r_flags = CEPH_OSD_FLAG_READ;
3728 
3729 	ret = ceph_osdc_alloc_messages(req, GFP_KERNEL);
3730 	if (ret)
3731 		goto out_req;
3732 
3733 	pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
3734 	if (IS_ERR(pages)) {
3735 		ret = PTR_ERR(pages);
3736 		goto out_req;
3737 	}
3738 
3739 	osd_req_op_extent_init(req, 0, CEPH_OSD_OP_READ, 0, buf_len, 0, 0);
3740 	osd_req_op_extent_osd_data_pages(req, 0, pages, buf_len, 0, false,
3741 					 true);
3742 
3743 	ceph_osdc_start_request(osdc, req, false);
3744 	ret = ceph_osdc_wait_request(osdc, req);
3745 	if (ret >= 0)
3746 		ceph_copy_from_page_vector(pages, buf, 0, ret);
3747 
3748 out_req:
3749 	ceph_osdc_put_request(req);
3750 	return ret;
3751 }
3752 
3753 /*
3754  * Read the complete header for the given rbd device.  On successful
3755  * return, the rbd_dev->header field will contain up-to-date
3756  * information about the image.
3757  */
3758 static int rbd_dev_v1_header_info(struct rbd_device *rbd_dev)
3759 {
3760 	struct rbd_image_header_ondisk *ondisk = NULL;
3761 	u32 snap_count = 0;
3762 	u64 names_size = 0;
3763 	u32 want_count;
3764 	int ret;
3765 
3766 	/*
3767 	 * The complete header will include an array of its 64-bit
3768 	 * snapshot ids, followed by the names of those snapshots as
3769 	 * a contiguous block of NUL-terminated strings.  Note that
3770 	 * the number of snapshots could change by the time we read
3771 	 * it in, in which case we re-read it.
3772 	 */
3773 	do {
3774 		size_t size;
3775 
3776 		kfree(ondisk);
3777 
3778 		size = sizeof (*ondisk);
3779 		size += snap_count * sizeof (struct rbd_image_snap_ondisk);
3780 		size += names_size;
3781 		ondisk = kmalloc(size, GFP_KERNEL);
3782 		if (!ondisk)
3783 			return -ENOMEM;
3784 
3785 		ret = rbd_obj_read_sync(rbd_dev, &rbd_dev->header_oid,
3786 					&rbd_dev->header_oloc, ondisk, size);
3787 		if (ret < 0)
3788 			goto out;
3789 		if ((size_t)ret < size) {
3790 			ret = -ENXIO;
3791 			rbd_warn(rbd_dev, "short header read (want %zd got %d)",
3792 				size, ret);
3793 			goto out;
3794 		}
3795 		if (!rbd_dev_ondisk_valid(ondisk)) {
3796 			ret = -ENXIO;
3797 			rbd_warn(rbd_dev, "invalid header");
3798 			goto out;
3799 		}
3800 
3801 		names_size = le64_to_cpu(ondisk->snap_names_len);
3802 		want_count = snap_count;
3803 		snap_count = le32_to_cpu(ondisk->snap_count);
3804 	} while (snap_count != want_count);
3805 
3806 	ret = rbd_header_from_disk(rbd_dev, ondisk);
3807 out:
3808 	kfree(ondisk);
3809 
3810 	return ret;
3811 }
3812 
3813 /*
3814  * Clear the rbd device's EXISTS flag if the snapshot it's mapped to
3815  * has disappeared from the (just updated) snapshot context.
3816  */
3817 static void rbd_exists_validate(struct rbd_device *rbd_dev)
3818 {
3819 	u64 snap_id;
3820 
3821 	if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags))
3822 		return;
3823 
3824 	snap_id = rbd_dev->spec->snap_id;
3825 	if (snap_id == CEPH_NOSNAP)
3826 		return;
3827 
3828 	if (rbd_dev_snap_index(rbd_dev, snap_id) == BAD_SNAP_INDEX)
3829 		clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
3830 }
3831 
3832 static void rbd_dev_update_size(struct rbd_device *rbd_dev)
3833 {
3834 	sector_t size;
3835 
3836 	/*
3837 	 * If EXISTS is not set, rbd_dev->disk may be NULL, so don't
3838 	 * try to update its size.  If REMOVING is set, updating size
3839 	 * is just useless work since the device can't be opened.
3840 	 */
3841 	if (test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags) &&
3842 	    !test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags)) {
3843 		size = (sector_t)rbd_dev->mapping.size / SECTOR_SIZE;
3844 		dout("setting size to %llu sectors", (unsigned long long)size);
3845 		set_capacity(rbd_dev->disk, size);
3846 		revalidate_disk(rbd_dev->disk);
3847 	}
3848 }
3849 
3850 static int rbd_dev_refresh(struct rbd_device *rbd_dev)
3851 {
3852 	u64 mapping_size;
3853 	int ret;
3854 
3855 	down_write(&rbd_dev->header_rwsem);
3856 	mapping_size = rbd_dev->mapping.size;
3857 
3858 	ret = rbd_dev_header_info(rbd_dev);
3859 	if (ret)
3860 		goto out;
3861 
3862 	/*
3863 	 * If there is a parent, see if it has disappeared due to the
3864 	 * mapped image getting flattened.
3865 	 */
3866 	if (rbd_dev->parent) {
3867 		ret = rbd_dev_v2_parent_info(rbd_dev);
3868 		if (ret)
3869 			goto out;
3870 	}
3871 
3872 	if (rbd_dev->spec->snap_id == CEPH_NOSNAP) {
3873 		rbd_dev->mapping.size = rbd_dev->header.image_size;
3874 	} else {
3875 		/* validate mapped snapshot's EXISTS flag */
3876 		rbd_exists_validate(rbd_dev);
3877 	}
3878 
3879 out:
3880 	up_write(&rbd_dev->header_rwsem);
3881 	if (!ret && mapping_size != rbd_dev->mapping.size)
3882 		rbd_dev_update_size(rbd_dev);
3883 
3884 	return ret;
3885 }
3886 
3887 static int rbd_init_request(struct blk_mq_tag_set *set, struct request *rq,
3888 		unsigned int hctx_idx, unsigned int numa_node)
3889 {
3890 	struct work_struct *work = blk_mq_rq_to_pdu(rq);
3891 
3892 	INIT_WORK(work, rbd_queue_workfn);
3893 	return 0;
3894 }
3895 
3896 static const struct blk_mq_ops rbd_mq_ops = {
3897 	.queue_rq	= rbd_queue_rq,
3898 	.init_request	= rbd_init_request,
3899 };
3900 
3901 static int rbd_init_disk(struct rbd_device *rbd_dev)
3902 {
3903 	struct gendisk *disk;
3904 	struct request_queue *q;
3905 	u64 segment_size;
3906 	int err;
3907 
3908 	/* create gendisk info */
3909 	disk = alloc_disk(single_major ?
3910 			  (1 << RBD_SINGLE_MAJOR_PART_SHIFT) :
3911 			  RBD_MINORS_PER_MAJOR);
3912 	if (!disk)
3913 		return -ENOMEM;
3914 
3915 	snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
3916 		 rbd_dev->dev_id);
3917 	disk->major = rbd_dev->major;
3918 	disk->first_minor = rbd_dev->minor;
3919 	if (single_major)
3920 		disk->flags |= GENHD_FL_EXT_DEVT;
3921 	disk->fops = &rbd_bd_ops;
3922 	disk->private_data = rbd_dev;
3923 
3924 	memset(&rbd_dev->tag_set, 0, sizeof(rbd_dev->tag_set));
3925 	rbd_dev->tag_set.ops = &rbd_mq_ops;
3926 	rbd_dev->tag_set.queue_depth = rbd_dev->opts->queue_depth;
3927 	rbd_dev->tag_set.numa_node = NUMA_NO_NODE;
3928 	rbd_dev->tag_set.flags = BLK_MQ_F_SHOULD_MERGE | BLK_MQ_F_SG_MERGE;
3929 	rbd_dev->tag_set.nr_hw_queues = 1;
3930 	rbd_dev->tag_set.cmd_size = sizeof(struct work_struct);
3931 
3932 	err = blk_mq_alloc_tag_set(&rbd_dev->tag_set);
3933 	if (err)
3934 		goto out_disk;
3935 
3936 	q = blk_mq_init_queue(&rbd_dev->tag_set);
3937 	if (IS_ERR(q)) {
3938 		err = PTR_ERR(q);
3939 		goto out_tag_set;
3940 	}
3941 
3942 	blk_queue_flag_set(QUEUE_FLAG_NONROT, q);
3943 	/* QUEUE_FLAG_ADD_RANDOM is off by default for blk-mq */
3944 
3945 	/* set io sizes to object size */
3946 	segment_size = rbd_obj_bytes(&rbd_dev->header);
3947 	blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
3948 	q->limits.max_sectors = queue_max_hw_sectors(q);
3949 	blk_queue_max_segments(q, USHRT_MAX);
3950 	blk_queue_max_segment_size(q, UINT_MAX);
3951 	blk_queue_io_min(q, segment_size);
3952 	blk_queue_io_opt(q, segment_size);
3953 
3954 	/* enable the discard support */
3955 	blk_queue_flag_set(QUEUE_FLAG_DISCARD, q);
3956 	q->limits.discard_granularity = segment_size;
3957 	blk_queue_max_discard_sectors(q, segment_size / SECTOR_SIZE);
3958 	blk_queue_max_write_zeroes_sectors(q, segment_size / SECTOR_SIZE);
3959 
3960 	if (!ceph_test_opt(rbd_dev->rbd_client->client, NOCRC))
3961 		q->backing_dev_info->capabilities |= BDI_CAP_STABLE_WRITES;
3962 
3963 	/*
3964 	 * disk_release() expects a queue ref from add_disk() and will
3965 	 * put it.  Hold an extra ref until add_disk() is called.
3966 	 */
3967 	WARN_ON(!blk_get_queue(q));
3968 	disk->queue = q;
3969 	q->queuedata = rbd_dev;
3970 
3971 	rbd_dev->disk = disk;
3972 
3973 	return 0;
3974 out_tag_set:
3975 	blk_mq_free_tag_set(&rbd_dev->tag_set);
3976 out_disk:
3977 	put_disk(disk);
3978 	return err;
3979 }
3980 
3981 /*
3982   sysfs
3983 */
3984 
3985 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
3986 {
3987 	return container_of(dev, struct rbd_device, dev);
3988 }
3989 
3990 static ssize_t rbd_size_show(struct device *dev,
3991 			     struct device_attribute *attr, char *buf)
3992 {
3993 	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3994 
3995 	return sprintf(buf, "%llu\n",
3996 		(unsigned long long)rbd_dev->mapping.size);
3997 }
3998 
3999 /*
4000  * Note this shows the features for whatever's mapped, which is not
4001  * necessarily the base image.
4002  */
4003 static ssize_t rbd_features_show(struct device *dev,
4004 			     struct device_attribute *attr, char *buf)
4005 {
4006 	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4007 
4008 	return sprintf(buf, "0x%016llx\n",
4009 			(unsigned long long)rbd_dev->mapping.features);
4010 }
4011 
4012 static ssize_t rbd_major_show(struct device *dev,
4013 			      struct device_attribute *attr, char *buf)
4014 {
4015 	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4016 
4017 	if (rbd_dev->major)
4018 		return sprintf(buf, "%d\n", rbd_dev->major);
4019 
4020 	return sprintf(buf, "(none)\n");
4021 }
4022 
4023 static ssize_t rbd_minor_show(struct device *dev,
4024 			      struct device_attribute *attr, char *buf)
4025 {
4026 	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4027 
4028 	return sprintf(buf, "%d\n", rbd_dev->minor);
4029 }
4030 
4031 static ssize_t rbd_client_addr_show(struct device *dev,
4032 				    struct device_attribute *attr, char *buf)
4033 {
4034 	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4035 	struct ceph_entity_addr *client_addr =
4036 	    ceph_client_addr(rbd_dev->rbd_client->client);
4037 
4038 	return sprintf(buf, "%pISpc/%u\n", &client_addr->in_addr,
4039 		       le32_to_cpu(client_addr->nonce));
4040 }
4041 
4042 static ssize_t rbd_client_id_show(struct device *dev,
4043 				  struct device_attribute *attr, char *buf)
4044 {
4045 	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4046 
4047 	return sprintf(buf, "client%lld\n",
4048 		       ceph_client_gid(rbd_dev->rbd_client->client));
4049 }
4050 
4051 static ssize_t rbd_cluster_fsid_show(struct device *dev,
4052 				     struct device_attribute *attr, char *buf)
4053 {
4054 	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4055 
4056 	return sprintf(buf, "%pU\n", &rbd_dev->rbd_client->client->fsid);
4057 }
4058 
4059 static ssize_t rbd_config_info_show(struct device *dev,
4060 				    struct device_attribute *attr, char *buf)
4061 {
4062 	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4063 
4064 	return sprintf(buf, "%s\n", rbd_dev->config_info);
4065 }
4066 
4067 static ssize_t rbd_pool_show(struct device *dev,
4068 			     struct device_attribute *attr, char *buf)
4069 {
4070 	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4071 
4072 	return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
4073 }
4074 
4075 static ssize_t rbd_pool_id_show(struct device *dev,
4076 			     struct device_attribute *attr, char *buf)
4077 {
4078 	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4079 
4080 	return sprintf(buf, "%llu\n",
4081 			(unsigned long long) rbd_dev->spec->pool_id);
4082 }
4083 
4084 static ssize_t rbd_name_show(struct device *dev,
4085 			     struct device_attribute *attr, char *buf)
4086 {
4087 	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4088 
4089 	if (rbd_dev->spec->image_name)
4090 		return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
4091 
4092 	return sprintf(buf, "(unknown)\n");
4093 }
4094 
4095 static ssize_t rbd_image_id_show(struct device *dev,
4096 			     struct device_attribute *attr, char *buf)
4097 {
4098 	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4099 
4100 	return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
4101 }
4102 
4103 /*
4104  * Shows the name of the currently-mapped snapshot (or
4105  * RBD_SNAP_HEAD_NAME for the base image).
4106  */
4107 static ssize_t rbd_snap_show(struct device *dev,
4108 			     struct device_attribute *attr,
4109 			     char *buf)
4110 {
4111 	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4112 
4113 	return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
4114 }
4115 
4116 static ssize_t rbd_snap_id_show(struct device *dev,
4117 				struct device_attribute *attr, char *buf)
4118 {
4119 	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4120 
4121 	return sprintf(buf, "%llu\n", rbd_dev->spec->snap_id);
4122 }
4123 
4124 /*
4125  * For a v2 image, shows the chain of parent images, separated by empty
4126  * lines.  For v1 images or if there is no parent, shows "(no parent
4127  * image)".
4128  */
4129 static ssize_t rbd_parent_show(struct device *dev,
4130 			       struct device_attribute *attr,
4131 			       char *buf)
4132 {
4133 	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4134 	ssize_t count = 0;
4135 
4136 	if (!rbd_dev->parent)
4137 		return sprintf(buf, "(no parent image)\n");
4138 
4139 	for ( ; rbd_dev->parent; rbd_dev = rbd_dev->parent) {
4140 		struct rbd_spec *spec = rbd_dev->parent_spec;
4141 
4142 		count += sprintf(&buf[count], "%s"
4143 			    "pool_id %llu\npool_name %s\n"
4144 			    "image_id %s\nimage_name %s\n"
4145 			    "snap_id %llu\nsnap_name %s\n"
4146 			    "overlap %llu\n",
4147 			    !count ? "" : "\n", /* first? */
4148 			    spec->pool_id, spec->pool_name,
4149 			    spec->image_id, spec->image_name ?: "(unknown)",
4150 			    spec->snap_id, spec->snap_name,
4151 			    rbd_dev->parent_overlap);
4152 	}
4153 
4154 	return count;
4155 }
4156 
4157 static ssize_t rbd_image_refresh(struct device *dev,
4158 				 struct device_attribute *attr,
4159 				 const char *buf,
4160 				 size_t size)
4161 {
4162 	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4163 	int ret;
4164 
4165 	ret = rbd_dev_refresh(rbd_dev);
4166 	if (ret)
4167 		return ret;
4168 
4169 	return size;
4170 }
4171 
4172 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
4173 static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
4174 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
4175 static DEVICE_ATTR(minor, S_IRUGO, rbd_minor_show, NULL);
4176 static DEVICE_ATTR(client_addr, S_IRUGO, rbd_client_addr_show, NULL);
4177 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
4178 static DEVICE_ATTR(cluster_fsid, S_IRUGO, rbd_cluster_fsid_show, NULL);
4179 static DEVICE_ATTR(config_info, S_IRUSR, rbd_config_info_show, NULL);
4180 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
4181 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
4182 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
4183 static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
4184 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
4185 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
4186 static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
4187 static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL);
4188 
4189 static struct attribute *rbd_attrs[] = {
4190 	&dev_attr_size.attr,
4191 	&dev_attr_features.attr,
4192 	&dev_attr_major.attr,
4193 	&dev_attr_minor.attr,
4194 	&dev_attr_client_addr.attr,
4195 	&dev_attr_client_id.attr,
4196 	&dev_attr_cluster_fsid.attr,
4197 	&dev_attr_config_info.attr,
4198 	&dev_attr_pool.attr,
4199 	&dev_attr_pool_id.attr,
4200 	&dev_attr_name.attr,
4201 	&dev_attr_image_id.attr,
4202 	&dev_attr_current_snap.attr,
4203 	&dev_attr_snap_id.attr,
4204 	&dev_attr_parent.attr,
4205 	&dev_attr_refresh.attr,
4206 	NULL
4207 };
4208 
4209 static struct attribute_group rbd_attr_group = {
4210 	.attrs = rbd_attrs,
4211 };
4212 
4213 static const struct attribute_group *rbd_attr_groups[] = {
4214 	&rbd_attr_group,
4215 	NULL
4216 };
4217 
4218 static void rbd_dev_release(struct device *dev);
4219 
4220 static const struct device_type rbd_device_type = {
4221 	.name		= "rbd",
4222 	.groups		= rbd_attr_groups,
4223 	.release	= rbd_dev_release,
4224 };
4225 
4226 static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
4227 {
4228 	kref_get(&spec->kref);
4229 
4230 	return spec;
4231 }
4232 
4233 static void rbd_spec_free(struct kref *kref);
4234 static void rbd_spec_put(struct rbd_spec *spec)
4235 {
4236 	if (spec)
4237 		kref_put(&spec->kref, rbd_spec_free);
4238 }
4239 
4240 static struct rbd_spec *rbd_spec_alloc(void)
4241 {
4242 	struct rbd_spec *spec;
4243 
4244 	spec = kzalloc(sizeof (*spec), GFP_KERNEL);
4245 	if (!spec)
4246 		return NULL;
4247 
4248 	spec->pool_id = CEPH_NOPOOL;
4249 	spec->snap_id = CEPH_NOSNAP;
4250 	kref_init(&spec->kref);
4251 
4252 	return spec;
4253 }
4254 
4255 static void rbd_spec_free(struct kref *kref)
4256 {
4257 	struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
4258 
4259 	kfree(spec->pool_name);
4260 	kfree(spec->image_id);
4261 	kfree(spec->image_name);
4262 	kfree(spec->snap_name);
4263 	kfree(spec);
4264 }
4265 
4266 static void rbd_dev_free(struct rbd_device *rbd_dev)
4267 {
4268 	WARN_ON(rbd_dev->watch_state != RBD_WATCH_STATE_UNREGISTERED);
4269 	WARN_ON(rbd_dev->lock_state != RBD_LOCK_STATE_UNLOCKED);
4270 
4271 	ceph_oid_destroy(&rbd_dev->header_oid);
4272 	ceph_oloc_destroy(&rbd_dev->header_oloc);
4273 	kfree(rbd_dev->config_info);
4274 
4275 	rbd_put_client(rbd_dev->rbd_client);
4276 	rbd_spec_put(rbd_dev->spec);
4277 	kfree(rbd_dev->opts);
4278 	kfree(rbd_dev);
4279 }
4280 
4281 static void rbd_dev_release(struct device *dev)
4282 {
4283 	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4284 	bool need_put = !!rbd_dev->opts;
4285 
4286 	if (need_put) {
4287 		destroy_workqueue(rbd_dev->task_wq);
4288 		ida_simple_remove(&rbd_dev_id_ida, rbd_dev->dev_id);
4289 	}
4290 
4291 	rbd_dev_free(rbd_dev);
4292 
4293 	/*
4294 	 * This is racy, but way better than putting module outside of
4295 	 * the release callback.  The race window is pretty small, so
4296 	 * doing something similar to dm (dm-builtin.c) is overkill.
4297 	 */
4298 	if (need_put)
4299 		module_put(THIS_MODULE);
4300 }
4301 
4302 static struct rbd_device *__rbd_dev_create(struct rbd_client *rbdc,
4303 					   struct rbd_spec *spec)
4304 {
4305 	struct rbd_device *rbd_dev;
4306 
4307 	rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
4308 	if (!rbd_dev)
4309 		return NULL;
4310 
4311 	spin_lock_init(&rbd_dev->lock);
4312 	INIT_LIST_HEAD(&rbd_dev->node);
4313 	init_rwsem(&rbd_dev->header_rwsem);
4314 
4315 	rbd_dev->header.data_pool_id = CEPH_NOPOOL;
4316 	ceph_oid_init(&rbd_dev->header_oid);
4317 	rbd_dev->header_oloc.pool = spec->pool_id;
4318 
4319 	mutex_init(&rbd_dev->watch_mutex);
4320 	rbd_dev->watch_state = RBD_WATCH_STATE_UNREGISTERED;
4321 	INIT_DELAYED_WORK(&rbd_dev->watch_dwork, rbd_reregister_watch);
4322 
4323 	init_rwsem(&rbd_dev->lock_rwsem);
4324 	rbd_dev->lock_state = RBD_LOCK_STATE_UNLOCKED;
4325 	INIT_WORK(&rbd_dev->acquired_lock_work, rbd_notify_acquired_lock);
4326 	INIT_WORK(&rbd_dev->released_lock_work, rbd_notify_released_lock);
4327 	INIT_DELAYED_WORK(&rbd_dev->lock_dwork, rbd_acquire_lock);
4328 	INIT_WORK(&rbd_dev->unlock_work, rbd_release_lock_work);
4329 	init_waitqueue_head(&rbd_dev->lock_waitq);
4330 
4331 	rbd_dev->dev.bus = &rbd_bus_type;
4332 	rbd_dev->dev.type = &rbd_device_type;
4333 	rbd_dev->dev.parent = &rbd_root_dev;
4334 	device_initialize(&rbd_dev->dev);
4335 
4336 	rbd_dev->rbd_client = rbdc;
4337 	rbd_dev->spec = spec;
4338 
4339 	return rbd_dev;
4340 }
4341 
4342 /*
4343  * Create a mapping rbd_dev.
4344  */
4345 static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
4346 					 struct rbd_spec *spec,
4347 					 struct rbd_options *opts)
4348 {
4349 	struct rbd_device *rbd_dev;
4350 
4351 	rbd_dev = __rbd_dev_create(rbdc, spec);
4352 	if (!rbd_dev)
4353 		return NULL;
4354 
4355 	rbd_dev->opts = opts;
4356 
4357 	/* get an id and fill in device name */
4358 	rbd_dev->dev_id = ida_simple_get(&rbd_dev_id_ida, 0,
4359 					 minor_to_rbd_dev_id(1 << MINORBITS),
4360 					 GFP_KERNEL);
4361 	if (rbd_dev->dev_id < 0)
4362 		goto fail_rbd_dev;
4363 
4364 	sprintf(rbd_dev->name, RBD_DRV_NAME "%d", rbd_dev->dev_id);
4365 	rbd_dev->task_wq = alloc_ordered_workqueue("%s-tasks", WQ_MEM_RECLAIM,
4366 						   rbd_dev->name);
4367 	if (!rbd_dev->task_wq)
4368 		goto fail_dev_id;
4369 
4370 	/* we have a ref from do_rbd_add() */
4371 	__module_get(THIS_MODULE);
4372 
4373 	dout("%s rbd_dev %p dev_id %d\n", __func__, rbd_dev, rbd_dev->dev_id);
4374 	return rbd_dev;
4375 
4376 fail_dev_id:
4377 	ida_simple_remove(&rbd_dev_id_ida, rbd_dev->dev_id);
4378 fail_rbd_dev:
4379 	rbd_dev_free(rbd_dev);
4380 	return NULL;
4381 }
4382 
4383 static void rbd_dev_destroy(struct rbd_device *rbd_dev)
4384 {
4385 	if (rbd_dev)
4386 		put_device(&rbd_dev->dev);
4387 }
4388 
4389 /*
4390  * Get the size and object order for an image snapshot, or if
4391  * snap_id is CEPH_NOSNAP, gets this information for the base
4392  * image.
4393  */
4394 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
4395 				u8 *order, u64 *snap_size)
4396 {
4397 	__le64 snapid = cpu_to_le64(snap_id);
4398 	int ret;
4399 	struct {
4400 		u8 order;
4401 		__le64 size;
4402 	} __attribute__ ((packed)) size_buf = { 0 };
4403 
4404 	ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
4405 				  &rbd_dev->header_oloc, "get_size",
4406 				  &snapid, sizeof(snapid),
4407 				  &size_buf, sizeof(size_buf));
4408 	dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4409 	if (ret < 0)
4410 		return ret;
4411 	if (ret < sizeof (size_buf))
4412 		return -ERANGE;
4413 
4414 	if (order) {
4415 		*order = size_buf.order;
4416 		dout("  order %u", (unsigned int)*order);
4417 	}
4418 	*snap_size = le64_to_cpu(size_buf.size);
4419 
4420 	dout("  snap_id 0x%016llx snap_size = %llu\n",
4421 		(unsigned long long)snap_id,
4422 		(unsigned long long)*snap_size);
4423 
4424 	return 0;
4425 }
4426 
4427 static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
4428 {
4429 	return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
4430 					&rbd_dev->header.obj_order,
4431 					&rbd_dev->header.image_size);
4432 }
4433 
4434 static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
4435 {
4436 	void *reply_buf;
4437 	int ret;
4438 	void *p;
4439 
4440 	reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
4441 	if (!reply_buf)
4442 		return -ENOMEM;
4443 
4444 	ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
4445 				  &rbd_dev->header_oloc, "get_object_prefix",
4446 				  NULL, 0, reply_buf, RBD_OBJ_PREFIX_LEN_MAX);
4447 	dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4448 	if (ret < 0)
4449 		goto out;
4450 
4451 	p = reply_buf;
4452 	rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
4453 						p + ret, NULL, GFP_NOIO);
4454 	ret = 0;
4455 
4456 	if (IS_ERR(rbd_dev->header.object_prefix)) {
4457 		ret = PTR_ERR(rbd_dev->header.object_prefix);
4458 		rbd_dev->header.object_prefix = NULL;
4459 	} else {
4460 		dout("  object_prefix = %s\n", rbd_dev->header.object_prefix);
4461 	}
4462 out:
4463 	kfree(reply_buf);
4464 
4465 	return ret;
4466 }
4467 
4468 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
4469 		u64 *snap_features)
4470 {
4471 	__le64 snapid = cpu_to_le64(snap_id);
4472 	struct {
4473 		__le64 features;
4474 		__le64 incompat;
4475 	} __attribute__ ((packed)) features_buf = { 0 };
4476 	u64 unsup;
4477 	int ret;
4478 
4479 	ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
4480 				  &rbd_dev->header_oloc, "get_features",
4481 				  &snapid, sizeof(snapid),
4482 				  &features_buf, sizeof(features_buf));
4483 	dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4484 	if (ret < 0)
4485 		return ret;
4486 	if (ret < sizeof (features_buf))
4487 		return -ERANGE;
4488 
4489 	unsup = le64_to_cpu(features_buf.incompat) & ~RBD_FEATURES_SUPPORTED;
4490 	if (unsup) {
4491 		rbd_warn(rbd_dev, "image uses unsupported features: 0x%llx",
4492 			 unsup);
4493 		return -ENXIO;
4494 	}
4495 
4496 	*snap_features = le64_to_cpu(features_buf.features);
4497 
4498 	dout("  snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
4499 		(unsigned long long)snap_id,
4500 		(unsigned long long)*snap_features,
4501 		(unsigned long long)le64_to_cpu(features_buf.incompat));
4502 
4503 	return 0;
4504 }
4505 
4506 static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
4507 {
4508 	return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
4509 						&rbd_dev->header.features);
4510 }
4511 
4512 static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
4513 {
4514 	struct rbd_spec *parent_spec;
4515 	size_t size;
4516 	void *reply_buf = NULL;
4517 	__le64 snapid;
4518 	void *p;
4519 	void *end;
4520 	u64 pool_id;
4521 	char *image_id;
4522 	u64 snap_id;
4523 	u64 overlap;
4524 	int ret;
4525 
4526 	parent_spec = rbd_spec_alloc();
4527 	if (!parent_spec)
4528 		return -ENOMEM;
4529 
4530 	size = sizeof (__le64) +				/* pool_id */
4531 		sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX +	/* image_id */
4532 		sizeof (__le64) +				/* snap_id */
4533 		sizeof (__le64);				/* overlap */
4534 	reply_buf = kmalloc(size, GFP_KERNEL);
4535 	if (!reply_buf) {
4536 		ret = -ENOMEM;
4537 		goto out_err;
4538 	}
4539 
4540 	snapid = cpu_to_le64(rbd_dev->spec->snap_id);
4541 	ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
4542 				  &rbd_dev->header_oloc, "get_parent",
4543 				  &snapid, sizeof(snapid), reply_buf, size);
4544 	dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4545 	if (ret < 0)
4546 		goto out_err;
4547 
4548 	p = reply_buf;
4549 	end = reply_buf + ret;
4550 	ret = -ERANGE;
4551 	ceph_decode_64_safe(&p, end, pool_id, out_err);
4552 	if (pool_id == CEPH_NOPOOL) {
4553 		/*
4554 		 * Either the parent never existed, or we have
4555 		 * record of it but the image got flattened so it no
4556 		 * longer has a parent.  When the parent of a
4557 		 * layered image disappears we immediately set the
4558 		 * overlap to 0.  The effect of this is that all new
4559 		 * requests will be treated as if the image had no
4560 		 * parent.
4561 		 */
4562 		if (rbd_dev->parent_overlap) {
4563 			rbd_dev->parent_overlap = 0;
4564 			rbd_dev_parent_put(rbd_dev);
4565 			pr_info("%s: clone image has been flattened\n",
4566 				rbd_dev->disk->disk_name);
4567 		}
4568 
4569 		goto out;	/* No parent?  No problem. */
4570 	}
4571 
4572 	/* The ceph file layout needs to fit pool id in 32 bits */
4573 
4574 	ret = -EIO;
4575 	if (pool_id > (u64)U32_MAX) {
4576 		rbd_warn(NULL, "parent pool id too large (%llu > %u)",
4577 			(unsigned long long)pool_id, U32_MAX);
4578 		goto out_err;
4579 	}
4580 
4581 	image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
4582 	if (IS_ERR(image_id)) {
4583 		ret = PTR_ERR(image_id);
4584 		goto out_err;
4585 	}
4586 	ceph_decode_64_safe(&p, end, snap_id, out_err);
4587 	ceph_decode_64_safe(&p, end, overlap, out_err);
4588 
4589 	/*
4590 	 * The parent won't change (except when the clone is
4591 	 * flattened, already handled that).  So we only need to
4592 	 * record the parent spec we have not already done so.
4593 	 */
4594 	if (!rbd_dev->parent_spec) {
4595 		parent_spec->pool_id = pool_id;
4596 		parent_spec->image_id = image_id;
4597 		parent_spec->snap_id = snap_id;
4598 		rbd_dev->parent_spec = parent_spec;
4599 		parent_spec = NULL;	/* rbd_dev now owns this */
4600 	} else {
4601 		kfree(image_id);
4602 	}
4603 
4604 	/*
4605 	 * We always update the parent overlap.  If it's zero we issue
4606 	 * a warning, as we will proceed as if there was no parent.
4607 	 */
4608 	if (!overlap) {
4609 		if (parent_spec) {
4610 			/* refresh, careful to warn just once */
4611 			if (rbd_dev->parent_overlap)
4612 				rbd_warn(rbd_dev,
4613 				    "clone now standalone (overlap became 0)");
4614 		} else {
4615 			/* initial probe */
4616 			rbd_warn(rbd_dev, "clone is standalone (overlap 0)");
4617 		}
4618 	}
4619 	rbd_dev->parent_overlap = overlap;
4620 
4621 out:
4622 	ret = 0;
4623 out_err:
4624 	kfree(reply_buf);
4625 	rbd_spec_put(parent_spec);
4626 
4627 	return ret;
4628 }
4629 
4630 static int rbd_dev_v2_striping_info(struct rbd_device *rbd_dev)
4631 {
4632 	struct {
4633 		__le64 stripe_unit;
4634 		__le64 stripe_count;
4635 	} __attribute__ ((packed)) striping_info_buf = { 0 };
4636 	size_t size = sizeof (striping_info_buf);
4637 	void *p;
4638 	int ret;
4639 
4640 	ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
4641 				&rbd_dev->header_oloc, "get_stripe_unit_count",
4642 				NULL, 0, &striping_info_buf, size);
4643 	dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4644 	if (ret < 0)
4645 		return ret;
4646 	if (ret < size)
4647 		return -ERANGE;
4648 
4649 	p = &striping_info_buf;
4650 	rbd_dev->header.stripe_unit = ceph_decode_64(&p);
4651 	rbd_dev->header.stripe_count = ceph_decode_64(&p);
4652 	return 0;
4653 }
4654 
4655 static int rbd_dev_v2_data_pool(struct rbd_device *rbd_dev)
4656 {
4657 	__le64 data_pool_id;
4658 	int ret;
4659 
4660 	ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
4661 				  &rbd_dev->header_oloc, "get_data_pool",
4662 				  NULL, 0, &data_pool_id, sizeof(data_pool_id));
4663 	if (ret < 0)
4664 		return ret;
4665 	if (ret < sizeof(data_pool_id))
4666 		return -EBADMSG;
4667 
4668 	rbd_dev->header.data_pool_id = le64_to_cpu(data_pool_id);
4669 	WARN_ON(rbd_dev->header.data_pool_id == CEPH_NOPOOL);
4670 	return 0;
4671 }
4672 
4673 static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
4674 {
4675 	CEPH_DEFINE_OID_ONSTACK(oid);
4676 	size_t image_id_size;
4677 	char *image_id;
4678 	void *p;
4679 	void *end;
4680 	size_t size;
4681 	void *reply_buf = NULL;
4682 	size_t len = 0;
4683 	char *image_name = NULL;
4684 	int ret;
4685 
4686 	rbd_assert(!rbd_dev->spec->image_name);
4687 
4688 	len = strlen(rbd_dev->spec->image_id);
4689 	image_id_size = sizeof (__le32) + len;
4690 	image_id = kmalloc(image_id_size, GFP_KERNEL);
4691 	if (!image_id)
4692 		return NULL;
4693 
4694 	p = image_id;
4695 	end = image_id + image_id_size;
4696 	ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32)len);
4697 
4698 	size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
4699 	reply_buf = kmalloc(size, GFP_KERNEL);
4700 	if (!reply_buf)
4701 		goto out;
4702 
4703 	ceph_oid_printf(&oid, "%s", RBD_DIRECTORY);
4704 	ret = rbd_obj_method_sync(rbd_dev, &oid, &rbd_dev->header_oloc,
4705 				  "dir_get_name", image_id, image_id_size,
4706 				  reply_buf, size);
4707 	if (ret < 0)
4708 		goto out;
4709 	p = reply_buf;
4710 	end = reply_buf + ret;
4711 
4712 	image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
4713 	if (IS_ERR(image_name))
4714 		image_name = NULL;
4715 	else
4716 		dout("%s: name is %s len is %zd\n", __func__, image_name, len);
4717 out:
4718 	kfree(reply_buf);
4719 	kfree(image_id);
4720 
4721 	return image_name;
4722 }
4723 
4724 static u64 rbd_v1_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
4725 {
4726 	struct ceph_snap_context *snapc = rbd_dev->header.snapc;
4727 	const char *snap_name;
4728 	u32 which = 0;
4729 
4730 	/* Skip over names until we find the one we are looking for */
4731 
4732 	snap_name = rbd_dev->header.snap_names;
4733 	while (which < snapc->num_snaps) {
4734 		if (!strcmp(name, snap_name))
4735 			return snapc->snaps[which];
4736 		snap_name += strlen(snap_name) + 1;
4737 		which++;
4738 	}
4739 	return CEPH_NOSNAP;
4740 }
4741 
4742 static u64 rbd_v2_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
4743 {
4744 	struct ceph_snap_context *snapc = rbd_dev->header.snapc;
4745 	u32 which;
4746 	bool found = false;
4747 	u64 snap_id;
4748 
4749 	for (which = 0; !found && which < snapc->num_snaps; which++) {
4750 		const char *snap_name;
4751 
4752 		snap_id = snapc->snaps[which];
4753 		snap_name = rbd_dev_v2_snap_name(rbd_dev, snap_id);
4754 		if (IS_ERR(snap_name)) {
4755 			/* ignore no-longer existing snapshots */
4756 			if (PTR_ERR(snap_name) == -ENOENT)
4757 				continue;
4758 			else
4759 				break;
4760 		}
4761 		found = !strcmp(name, snap_name);
4762 		kfree(snap_name);
4763 	}
4764 	return found ? snap_id : CEPH_NOSNAP;
4765 }
4766 
4767 /*
4768  * Assumes name is never RBD_SNAP_HEAD_NAME; returns CEPH_NOSNAP if
4769  * no snapshot by that name is found, or if an error occurs.
4770  */
4771 static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
4772 {
4773 	if (rbd_dev->image_format == 1)
4774 		return rbd_v1_snap_id_by_name(rbd_dev, name);
4775 
4776 	return rbd_v2_snap_id_by_name(rbd_dev, name);
4777 }
4778 
4779 /*
4780  * An image being mapped will have everything but the snap id.
4781  */
4782 static int rbd_spec_fill_snap_id(struct rbd_device *rbd_dev)
4783 {
4784 	struct rbd_spec *spec = rbd_dev->spec;
4785 
4786 	rbd_assert(spec->pool_id != CEPH_NOPOOL && spec->pool_name);
4787 	rbd_assert(spec->image_id && spec->image_name);
4788 	rbd_assert(spec->snap_name);
4789 
4790 	if (strcmp(spec->snap_name, RBD_SNAP_HEAD_NAME)) {
4791 		u64 snap_id;
4792 
4793 		snap_id = rbd_snap_id_by_name(rbd_dev, spec->snap_name);
4794 		if (snap_id == CEPH_NOSNAP)
4795 			return -ENOENT;
4796 
4797 		spec->snap_id = snap_id;
4798 	} else {
4799 		spec->snap_id = CEPH_NOSNAP;
4800 	}
4801 
4802 	return 0;
4803 }
4804 
4805 /*
4806  * A parent image will have all ids but none of the names.
4807  *
4808  * All names in an rbd spec are dynamically allocated.  It's OK if we
4809  * can't figure out the name for an image id.
4810  */
4811 static int rbd_spec_fill_names(struct rbd_device *rbd_dev)
4812 {
4813 	struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
4814 	struct rbd_spec *spec = rbd_dev->spec;
4815 	const char *pool_name;
4816 	const char *image_name;
4817 	const char *snap_name;
4818 	int ret;
4819 
4820 	rbd_assert(spec->pool_id != CEPH_NOPOOL);
4821 	rbd_assert(spec->image_id);
4822 	rbd_assert(spec->snap_id != CEPH_NOSNAP);
4823 
4824 	/* Get the pool name; we have to make our own copy of this */
4825 
4826 	pool_name = ceph_pg_pool_name_by_id(osdc->osdmap, spec->pool_id);
4827 	if (!pool_name) {
4828 		rbd_warn(rbd_dev, "no pool with id %llu", spec->pool_id);
4829 		return -EIO;
4830 	}
4831 	pool_name = kstrdup(pool_name, GFP_KERNEL);
4832 	if (!pool_name)
4833 		return -ENOMEM;
4834 
4835 	/* Fetch the image name; tolerate failure here */
4836 
4837 	image_name = rbd_dev_image_name(rbd_dev);
4838 	if (!image_name)
4839 		rbd_warn(rbd_dev, "unable to get image name");
4840 
4841 	/* Fetch the snapshot name */
4842 
4843 	snap_name = rbd_snap_name(rbd_dev, spec->snap_id);
4844 	if (IS_ERR(snap_name)) {
4845 		ret = PTR_ERR(snap_name);
4846 		goto out_err;
4847 	}
4848 
4849 	spec->pool_name = pool_name;
4850 	spec->image_name = image_name;
4851 	spec->snap_name = snap_name;
4852 
4853 	return 0;
4854 
4855 out_err:
4856 	kfree(image_name);
4857 	kfree(pool_name);
4858 	return ret;
4859 }
4860 
4861 static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev)
4862 {
4863 	size_t size;
4864 	int ret;
4865 	void *reply_buf;
4866 	void *p;
4867 	void *end;
4868 	u64 seq;
4869 	u32 snap_count;
4870 	struct ceph_snap_context *snapc;
4871 	u32 i;
4872 
4873 	/*
4874 	 * We'll need room for the seq value (maximum snapshot id),
4875 	 * snapshot count, and array of that many snapshot ids.
4876 	 * For now we have a fixed upper limit on the number we're
4877 	 * prepared to receive.
4878 	 */
4879 	size = sizeof (__le64) + sizeof (__le32) +
4880 			RBD_MAX_SNAP_COUNT * sizeof (__le64);
4881 	reply_buf = kzalloc(size, GFP_KERNEL);
4882 	if (!reply_buf)
4883 		return -ENOMEM;
4884 
4885 	ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
4886 				  &rbd_dev->header_oloc, "get_snapcontext",
4887 				  NULL, 0, reply_buf, size);
4888 	dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4889 	if (ret < 0)
4890 		goto out;
4891 
4892 	p = reply_buf;
4893 	end = reply_buf + ret;
4894 	ret = -ERANGE;
4895 	ceph_decode_64_safe(&p, end, seq, out);
4896 	ceph_decode_32_safe(&p, end, snap_count, out);
4897 
4898 	/*
4899 	 * Make sure the reported number of snapshot ids wouldn't go
4900 	 * beyond the end of our buffer.  But before checking that,
4901 	 * make sure the computed size of the snapshot context we
4902 	 * allocate is representable in a size_t.
4903 	 */
4904 	if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
4905 				 / sizeof (u64)) {
4906 		ret = -EINVAL;
4907 		goto out;
4908 	}
4909 	if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
4910 		goto out;
4911 	ret = 0;
4912 
4913 	snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
4914 	if (!snapc) {
4915 		ret = -ENOMEM;
4916 		goto out;
4917 	}
4918 	snapc->seq = seq;
4919 	for (i = 0; i < snap_count; i++)
4920 		snapc->snaps[i] = ceph_decode_64(&p);
4921 
4922 	ceph_put_snap_context(rbd_dev->header.snapc);
4923 	rbd_dev->header.snapc = snapc;
4924 
4925 	dout("  snap context seq = %llu, snap_count = %u\n",
4926 		(unsigned long long)seq, (unsigned int)snap_count);
4927 out:
4928 	kfree(reply_buf);
4929 
4930 	return ret;
4931 }
4932 
4933 static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
4934 					u64 snap_id)
4935 {
4936 	size_t size;
4937 	void *reply_buf;
4938 	__le64 snapid;
4939 	int ret;
4940 	void *p;
4941 	void *end;
4942 	char *snap_name;
4943 
4944 	size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
4945 	reply_buf = kmalloc(size, GFP_KERNEL);
4946 	if (!reply_buf)
4947 		return ERR_PTR(-ENOMEM);
4948 
4949 	snapid = cpu_to_le64(snap_id);
4950 	ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
4951 				  &rbd_dev->header_oloc, "get_snapshot_name",
4952 				  &snapid, sizeof(snapid), reply_buf, size);
4953 	dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4954 	if (ret < 0) {
4955 		snap_name = ERR_PTR(ret);
4956 		goto out;
4957 	}
4958 
4959 	p = reply_buf;
4960 	end = reply_buf + ret;
4961 	snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
4962 	if (IS_ERR(snap_name))
4963 		goto out;
4964 
4965 	dout("  snap_id 0x%016llx snap_name = %s\n",
4966 		(unsigned long long)snap_id, snap_name);
4967 out:
4968 	kfree(reply_buf);
4969 
4970 	return snap_name;
4971 }
4972 
4973 static int rbd_dev_v2_header_info(struct rbd_device *rbd_dev)
4974 {
4975 	bool first_time = rbd_dev->header.object_prefix == NULL;
4976 	int ret;
4977 
4978 	ret = rbd_dev_v2_image_size(rbd_dev);
4979 	if (ret)
4980 		return ret;
4981 
4982 	if (first_time) {
4983 		ret = rbd_dev_v2_header_onetime(rbd_dev);
4984 		if (ret)
4985 			return ret;
4986 	}
4987 
4988 	ret = rbd_dev_v2_snap_context(rbd_dev);
4989 	if (ret && first_time) {
4990 		kfree(rbd_dev->header.object_prefix);
4991 		rbd_dev->header.object_prefix = NULL;
4992 	}
4993 
4994 	return ret;
4995 }
4996 
4997 static int rbd_dev_header_info(struct rbd_device *rbd_dev)
4998 {
4999 	rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
5000 
5001 	if (rbd_dev->image_format == 1)
5002 		return rbd_dev_v1_header_info(rbd_dev);
5003 
5004 	return rbd_dev_v2_header_info(rbd_dev);
5005 }
5006 
5007 /*
5008  * Skips over white space at *buf, and updates *buf to point to the
5009  * first found non-space character (if any). Returns the length of
5010  * the token (string of non-white space characters) found.  Note
5011  * that *buf must be terminated with '\0'.
5012  */
5013 static inline size_t next_token(const char **buf)
5014 {
5015         /*
5016         * These are the characters that produce nonzero for
5017         * isspace() in the "C" and "POSIX" locales.
5018         */
5019         const char *spaces = " \f\n\r\t\v";
5020 
5021         *buf += strspn(*buf, spaces);	/* Find start of token */
5022 
5023 	return strcspn(*buf, spaces);   /* Return token length */
5024 }
5025 
5026 /*
5027  * Finds the next token in *buf, dynamically allocates a buffer big
5028  * enough to hold a copy of it, and copies the token into the new
5029  * buffer.  The copy is guaranteed to be terminated with '\0'.  Note
5030  * that a duplicate buffer is created even for a zero-length token.
5031  *
5032  * Returns a pointer to the newly-allocated duplicate, or a null
5033  * pointer if memory for the duplicate was not available.  If
5034  * the lenp argument is a non-null pointer, the length of the token
5035  * (not including the '\0') is returned in *lenp.
5036  *
5037  * If successful, the *buf pointer will be updated to point beyond
5038  * the end of the found token.
5039  *
5040  * Note: uses GFP_KERNEL for allocation.
5041  */
5042 static inline char *dup_token(const char **buf, size_t *lenp)
5043 {
5044 	char *dup;
5045 	size_t len;
5046 
5047 	len = next_token(buf);
5048 	dup = kmemdup(*buf, len + 1, GFP_KERNEL);
5049 	if (!dup)
5050 		return NULL;
5051 	*(dup + len) = '\0';
5052 	*buf += len;
5053 
5054 	if (lenp)
5055 		*lenp = len;
5056 
5057 	return dup;
5058 }
5059 
5060 /*
5061  * Parse the options provided for an "rbd add" (i.e., rbd image
5062  * mapping) request.  These arrive via a write to /sys/bus/rbd/add,
5063  * and the data written is passed here via a NUL-terminated buffer.
5064  * Returns 0 if successful or an error code otherwise.
5065  *
5066  * The information extracted from these options is recorded in
5067  * the other parameters which return dynamically-allocated
5068  * structures:
5069  *  ceph_opts
5070  *      The address of a pointer that will refer to a ceph options
5071  *      structure.  Caller must release the returned pointer using
5072  *      ceph_destroy_options() when it is no longer needed.
5073  *  rbd_opts
5074  *	Address of an rbd options pointer.  Fully initialized by
5075  *	this function; caller must release with kfree().
5076  *  spec
5077  *	Address of an rbd image specification pointer.  Fully
5078  *	initialized by this function based on parsed options.
5079  *	Caller must release with rbd_spec_put().
5080  *
5081  * The options passed take this form:
5082  *  <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
5083  * where:
5084  *  <mon_addrs>
5085  *      A comma-separated list of one or more monitor addresses.
5086  *      A monitor address is an ip address, optionally followed
5087  *      by a port number (separated by a colon).
5088  *        I.e.:  ip1[:port1][,ip2[:port2]...]
5089  *  <options>
5090  *      A comma-separated list of ceph and/or rbd options.
5091  *  <pool_name>
5092  *      The name of the rados pool containing the rbd image.
5093  *  <image_name>
5094  *      The name of the image in that pool to map.
5095  *  <snap_id>
5096  *      An optional snapshot id.  If provided, the mapping will
5097  *      present data from the image at the time that snapshot was
5098  *      created.  The image head is used if no snapshot id is
5099  *      provided.  Snapshot mappings are always read-only.
5100  */
5101 static int rbd_add_parse_args(const char *buf,
5102 				struct ceph_options **ceph_opts,
5103 				struct rbd_options **opts,
5104 				struct rbd_spec **rbd_spec)
5105 {
5106 	size_t len;
5107 	char *options;
5108 	const char *mon_addrs;
5109 	char *snap_name;
5110 	size_t mon_addrs_size;
5111 	struct rbd_spec *spec = NULL;
5112 	struct rbd_options *rbd_opts = NULL;
5113 	struct ceph_options *copts;
5114 	int ret;
5115 
5116 	/* The first four tokens are required */
5117 
5118 	len = next_token(&buf);
5119 	if (!len) {
5120 		rbd_warn(NULL, "no monitor address(es) provided");
5121 		return -EINVAL;
5122 	}
5123 	mon_addrs = buf;
5124 	mon_addrs_size = len + 1;
5125 	buf += len;
5126 
5127 	ret = -EINVAL;
5128 	options = dup_token(&buf, NULL);
5129 	if (!options)
5130 		return -ENOMEM;
5131 	if (!*options) {
5132 		rbd_warn(NULL, "no options provided");
5133 		goto out_err;
5134 	}
5135 
5136 	spec = rbd_spec_alloc();
5137 	if (!spec)
5138 		goto out_mem;
5139 
5140 	spec->pool_name = dup_token(&buf, NULL);
5141 	if (!spec->pool_name)
5142 		goto out_mem;
5143 	if (!*spec->pool_name) {
5144 		rbd_warn(NULL, "no pool name provided");
5145 		goto out_err;
5146 	}
5147 
5148 	spec->image_name = dup_token(&buf, NULL);
5149 	if (!spec->image_name)
5150 		goto out_mem;
5151 	if (!*spec->image_name) {
5152 		rbd_warn(NULL, "no image name provided");
5153 		goto out_err;
5154 	}
5155 
5156 	/*
5157 	 * Snapshot name is optional; default is to use "-"
5158 	 * (indicating the head/no snapshot).
5159 	 */
5160 	len = next_token(&buf);
5161 	if (!len) {
5162 		buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
5163 		len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
5164 	} else if (len > RBD_MAX_SNAP_NAME_LEN) {
5165 		ret = -ENAMETOOLONG;
5166 		goto out_err;
5167 	}
5168 	snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
5169 	if (!snap_name)
5170 		goto out_mem;
5171 	*(snap_name + len) = '\0';
5172 	spec->snap_name = snap_name;
5173 
5174 	/* Initialize all rbd options to the defaults */
5175 
5176 	rbd_opts = kzalloc(sizeof (*rbd_opts), GFP_KERNEL);
5177 	if (!rbd_opts)
5178 		goto out_mem;
5179 
5180 	rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
5181 	rbd_opts->queue_depth = RBD_QUEUE_DEPTH_DEFAULT;
5182 	rbd_opts->lock_on_read = RBD_LOCK_ON_READ_DEFAULT;
5183 	rbd_opts->exclusive = RBD_EXCLUSIVE_DEFAULT;
5184 
5185 	copts = ceph_parse_options(options, mon_addrs,
5186 					mon_addrs + mon_addrs_size - 1,
5187 					parse_rbd_opts_token, rbd_opts);
5188 	if (IS_ERR(copts)) {
5189 		ret = PTR_ERR(copts);
5190 		goto out_err;
5191 	}
5192 	kfree(options);
5193 
5194 	*ceph_opts = copts;
5195 	*opts = rbd_opts;
5196 	*rbd_spec = spec;
5197 
5198 	return 0;
5199 out_mem:
5200 	ret = -ENOMEM;
5201 out_err:
5202 	kfree(rbd_opts);
5203 	rbd_spec_put(spec);
5204 	kfree(options);
5205 
5206 	return ret;
5207 }
5208 
5209 static void rbd_dev_image_unlock(struct rbd_device *rbd_dev)
5210 {
5211 	down_write(&rbd_dev->lock_rwsem);
5212 	if (__rbd_is_lock_owner(rbd_dev))
5213 		rbd_unlock(rbd_dev);
5214 	up_write(&rbd_dev->lock_rwsem);
5215 }
5216 
5217 static int rbd_add_acquire_lock(struct rbd_device *rbd_dev)
5218 {
5219 	if (!(rbd_dev->header.features & RBD_FEATURE_EXCLUSIVE_LOCK)) {
5220 		rbd_warn(rbd_dev, "exclusive-lock feature is not enabled");
5221 		return -EINVAL;
5222 	}
5223 
5224 	/* FIXME: "rbd map --exclusive" should be in interruptible */
5225 	down_read(&rbd_dev->lock_rwsem);
5226 	rbd_wait_state_locked(rbd_dev);
5227 	up_read(&rbd_dev->lock_rwsem);
5228 	if (test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags)) {
5229 		rbd_warn(rbd_dev, "failed to acquire exclusive lock");
5230 		return -EROFS;
5231 	}
5232 
5233 	return 0;
5234 }
5235 
5236 /*
5237  * An rbd format 2 image has a unique identifier, distinct from the
5238  * name given to it by the user.  Internally, that identifier is
5239  * what's used to specify the names of objects related to the image.
5240  *
5241  * A special "rbd id" object is used to map an rbd image name to its
5242  * id.  If that object doesn't exist, then there is no v2 rbd image
5243  * with the supplied name.
5244  *
5245  * This function will record the given rbd_dev's image_id field if
5246  * it can be determined, and in that case will return 0.  If any
5247  * errors occur a negative errno will be returned and the rbd_dev's
5248  * image_id field will be unchanged (and should be NULL).
5249  */
5250 static int rbd_dev_image_id(struct rbd_device *rbd_dev)
5251 {
5252 	int ret;
5253 	size_t size;
5254 	CEPH_DEFINE_OID_ONSTACK(oid);
5255 	void *response;
5256 	char *image_id;
5257 
5258 	/*
5259 	 * When probing a parent image, the image id is already
5260 	 * known (and the image name likely is not).  There's no
5261 	 * need to fetch the image id again in this case.  We
5262 	 * do still need to set the image format though.
5263 	 */
5264 	if (rbd_dev->spec->image_id) {
5265 		rbd_dev->image_format = *rbd_dev->spec->image_id ? 2 : 1;
5266 
5267 		return 0;
5268 	}
5269 
5270 	/*
5271 	 * First, see if the format 2 image id file exists, and if
5272 	 * so, get the image's persistent id from it.
5273 	 */
5274 	ret = ceph_oid_aprintf(&oid, GFP_KERNEL, "%s%s", RBD_ID_PREFIX,
5275 			       rbd_dev->spec->image_name);
5276 	if (ret)
5277 		return ret;
5278 
5279 	dout("rbd id object name is %s\n", oid.name);
5280 
5281 	/* Response will be an encoded string, which includes a length */
5282 
5283 	size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
5284 	response = kzalloc(size, GFP_NOIO);
5285 	if (!response) {
5286 		ret = -ENOMEM;
5287 		goto out;
5288 	}
5289 
5290 	/* If it doesn't exist we'll assume it's a format 1 image */
5291 
5292 	ret = rbd_obj_method_sync(rbd_dev, &oid, &rbd_dev->header_oloc,
5293 				  "get_id", NULL, 0,
5294 				  response, RBD_IMAGE_ID_LEN_MAX);
5295 	dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
5296 	if (ret == -ENOENT) {
5297 		image_id = kstrdup("", GFP_KERNEL);
5298 		ret = image_id ? 0 : -ENOMEM;
5299 		if (!ret)
5300 			rbd_dev->image_format = 1;
5301 	} else if (ret >= 0) {
5302 		void *p = response;
5303 
5304 		image_id = ceph_extract_encoded_string(&p, p + ret,
5305 						NULL, GFP_NOIO);
5306 		ret = PTR_ERR_OR_ZERO(image_id);
5307 		if (!ret)
5308 			rbd_dev->image_format = 2;
5309 	}
5310 
5311 	if (!ret) {
5312 		rbd_dev->spec->image_id = image_id;
5313 		dout("image_id is %s\n", image_id);
5314 	}
5315 out:
5316 	kfree(response);
5317 	ceph_oid_destroy(&oid);
5318 	return ret;
5319 }
5320 
5321 /*
5322  * Undo whatever state changes are made by v1 or v2 header info
5323  * call.
5324  */
5325 static void rbd_dev_unprobe(struct rbd_device *rbd_dev)
5326 {
5327 	struct rbd_image_header	*header;
5328 
5329 	rbd_dev_parent_put(rbd_dev);
5330 
5331 	/* Free dynamic fields from the header, then zero it out */
5332 
5333 	header = &rbd_dev->header;
5334 	ceph_put_snap_context(header->snapc);
5335 	kfree(header->snap_sizes);
5336 	kfree(header->snap_names);
5337 	kfree(header->object_prefix);
5338 	memset(header, 0, sizeof (*header));
5339 }
5340 
5341 static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev)
5342 {
5343 	int ret;
5344 
5345 	ret = rbd_dev_v2_object_prefix(rbd_dev);
5346 	if (ret)
5347 		goto out_err;
5348 
5349 	/*
5350 	 * Get the and check features for the image.  Currently the
5351 	 * features are assumed to never change.
5352 	 */
5353 	ret = rbd_dev_v2_features(rbd_dev);
5354 	if (ret)
5355 		goto out_err;
5356 
5357 	/* If the image supports fancy striping, get its parameters */
5358 
5359 	if (rbd_dev->header.features & RBD_FEATURE_STRIPINGV2) {
5360 		ret = rbd_dev_v2_striping_info(rbd_dev);
5361 		if (ret < 0)
5362 			goto out_err;
5363 	}
5364 
5365 	if (rbd_dev->header.features & RBD_FEATURE_DATA_POOL) {
5366 		ret = rbd_dev_v2_data_pool(rbd_dev);
5367 		if (ret)
5368 			goto out_err;
5369 	}
5370 
5371 	rbd_init_layout(rbd_dev);
5372 	return 0;
5373 
5374 out_err:
5375 	rbd_dev->header.features = 0;
5376 	kfree(rbd_dev->header.object_prefix);
5377 	rbd_dev->header.object_prefix = NULL;
5378 	return ret;
5379 }
5380 
5381 /*
5382  * @depth is rbd_dev_image_probe() -> rbd_dev_probe_parent() ->
5383  * rbd_dev_image_probe() recursion depth, which means it's also the
5384  * length of the already discovered part of the parent chain.
5385  */
5386 static int rbd_dev_probe_parent(struct rbd_device *rbd_dev, int depth)
5387 {
5388 	struct rbd_device *parent = NULL;
5389 	int ret;
5390 
5391 	if (!rbd_dev->parent_spec)
5392 		return 0;
5393 
5394 	if (++depth > RBD_MAX_PARENT_CHAIN_LEN) {
5395 		pr_info("parent chain is too long (%d)\n", depth);
5396 		ret = -EINVAL;
5397 		goto out_err;
5398 	}
5399 
5400 	parent = __rbd_dev_create(rbd_dev->rbd_client, rbd_dev->parent_spec);
5401 	if (!parent) {
5402 		ret = -ENOMEM;
5403 		goto out_err;
5404 	}
5405 
5406 	/*
5407 	 * Images related by parent/child relationships always share
5408 	 * rbd_client and spec/parent_spec, so bump their refcounts.
5409 	 */
5410 	__rbd_get_client(rbd_dev->rbd_client);
5411 	rbd_spec_get(rbd_dev->parent_spec);
5412 
5413 	ret = rbd_dev_image_probe(parent, depth);
5414 	if (ret < 0)
5415 		goto out_err;
5416 
5417 	rbd_dev->parent = parent;
5418 	atomic_set(&rbd_dev->parent_ref, 1);
5419 	return 0;
5420 
5421 out_err:
5422 	rbd_dev_unparent(rbd_dev);
5423 	rbd_dev_destroy(parent);
5424 	return ret;
5425 }
5426 
5427 static void rbd_dev_device_release(struct rbd_device *rbd_dev)
5428 {
5429 	clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
5430 	rbd_dev_mapping_clear(rbd_dev);
5431 	rbd_free_disk(rbd_dev);
5432 	if (!single_major)
5433 		unregister_blkdev(rbd_dev->major, rbd_dev->name);
5434 }
5435 
5436 /*
5437  * rbd_dev->header_rwsem must be locked for write and will be unlocked
5438  * upon return.
5439  */
5440 static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
5441 {
5442 	int ret;
5443 
5444 	/* Record our major and minor device numbers. */
5445 
5446 	if (!single_major) {
5447 		ret = register_blkdev(0, rbd_dev->name);
5448 		if (ret < 0)
5449 			goto err_out_unlock;
5450 
5451 		rbd_dev->major = ret;
5452 		rbd_dev->minor = 0;
5453 	} else {
5454 		rbd_dev->major = rbd_major;
5455 		rbd_dev->minor = rbd_dev_id_to_minor(rbd_dev->dev_id);
5456 	}
5457 
5458 	/* Set up the blkdev mapping. */
5459 
5460 	ret = rbd_init_disk(rbd_dev);
5461 	if (ret)
5462 		goto err_out_blkdev;
5463 
5464 	ret = rbd_dev_mapping_set(rbd_dev);
5465 	if (ret)
5466 		goto err_out_disk;
5467 
5468 	set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
5469 	set_disk_ro(rbd_dev->disk, rbd_dev->opts->read_only);
5470 
5471 	ret = dev_set_name(&rbd_dev->dev, "%d", rbd_dev->dev_id);
5472 	if (ret)
5473 		goto err_out_mapping;
5474 
5475 	set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
5476 	up_write(&rbd_dev->header_rwsem);
5477 	return 0;
5478 
5479 err_out_mapping:
5480 	rbd_dev_mapping_clear(rbd_dev);
5481 err_out_disk:
5482 	rbd_free_disk(rbd_dev);
5483 err_out_blkdev:
5484 	if (!single_major)
5485 		unregister_blkdev(rbd_dev->major, rbd_dev->name);
5486 err_out_unlock:
5487 	up_write(&rbd_dev->header_rwsem);
5488 	return ret;
5489 }
5490 
5491 static int rbd_dev_header_name(struct rbd_device *rbd_dev)
5492 {
5493 	struct rbd_spec *spec = rbd_dev->spec;
5494 	int ret;
5495 
5496 	/* Record the header object name for this rbd image. */
5497 
5498 	rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
5499 	if (rbd_dev->image_format == 1)
5500 		ret = ceph_oid_aprintf(&rbd_dev->header_oid, GFP_KERNEL, "%s%s",
5501 				       spec->image_name, RBD_SUFFIX);
5502 	else
5503 		ret = ceph_oid_aprintf(&rbd_dev->header_oid, GFP_KERNEL, "%s%s",
5504 				       RBD_HEADER_PREFIX, spec->image_id);
5505 
5506 	return ret;
5507 }
5508 
5509 static void rbd_dev_image_release(struct rbd_device *rbd_dev)
5510 {
5511 	rbd_dev_unprobe(rbd_dev);
5512 	if (rbd_dev->opts)
5513 		rbd_unregister_watch(rbd_dev);
5514 	rbd_dev->image_format = 0;
5515 	kfree(rbd_dev->spec->image_id);
5516 	rbd_dev->spec->image_id = NULL;
5517 }
5518 
5519 /*
5520  * Probe for the existence of the header object for the given rbd
5521  * device.  If this image is the one being mapped (i.e., not a
5522  * parent), initiate a watch on its header object before using that
5523  * object to get detailed information about the rbd image.
5524  */
5525 static int rbd_dev_image_probe(struct rbd_device *rbd_dev, int depth)
5526 {
5527 	int ret;
5528 
5529 	/*
5530 	 * Get the id from the image id object.  Unless there's an
5531 	 * error, rbd_dev->spec->image_id will be filled in with
5532 	 * a dynamically-allocated string, and rbd_dev->image_format
5533 	 * will be set to either 1 or 2.
5534 	 */
5535 	ret = rbd_dev_image_id(rbd_dev);
5536 	if (ret)
5537 		return ret;
5538 
5539 	ret = rbd_dev_header_name(rbd_dev);
5540 	if (ret)
5541 		goto err_out_format;
5542 
5543 	if (!depth) {
5544 		ret = rbd_register_watch(rbd_dev);
5545 		if (ret) {
5546 			if (ret == -ENOENT)
5547 				pr_info("image %s/%s does not exist\n",
5548 					rbd_dev->spec->pool_name,
5549 					rbd_dev->spec->image_name);
5550 			goto err_out_format;
5551 		}
5552 	}
5553 
5554 	ret = rbd_dev_header_info(rbd_dev);
5555 	if (ret)
5556 		goto err_out_watch;
5557 
5558 	/*
5559 	 * If this image is the one being mapped, we have pool name and
5560 	 * id, image name and id, and snap name - need to fill snap id.
5561 	 * Otherwise this is a parent image, identified by pool, image
5562 	 * and snap ids - need to fill in names for those ids.
5563 	 */
5564 	if (!depth)
5565 		ret = rbd_spec_fill_snap_id(rbd_dev);
5566 	else
5567 		ret = rbd_spec_fill_names(rbd_dev);
5568 	if (ret) {
5569 		if (ret == -ENOENT)
5570 			pr_info("snap %s/%s@%s does not exist\n",
5571 				rbd_dev->spec->pool_name,
5572 				rbd_dev->spec->image_name,
5573 				rbd_dev->spec->snap_name);
5574 		goto err_out_probe;
5575 	}
5576 
5577 	if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
5578 		ret = rbd_dev_v2_parent_info(rbd_dev);
5579 		if (ret)
5580 			goto err_out_probe;
5581 
5582 		/*
5583 		 * Need to warn users if this image is the one being
5584 		 * mapped and has a parent.
5585 		 */
5586 		if (!depth && rbd_dev->parent_spec)
5587 			rbd_warn(rbd_dev,
5588 				 "WARNING: kernel layering is EXPERIMENTAL!");
5589 	}
5590 
5591 	ret = rbd_dev_probe_parent(rbd_dev, depth);
5592 	if (ret)
5593 		goto err_out_probe;
5594 
5595 	dout("discovered format %u image, header name is %s\n",
5596 		rbd_dev->image_format, rbd_dev->header_oid.name);
5597 	return 0;
5598 
5599 err_out_probe:
5600 	rbd_dev_unprobe(rbd_dev);
5601 err_out_watch:
5602 	if (!depth)
5603 		rbd_unregister_watch(rbd_dev);
5604 err_out_format:
5605 	rbd_dev->image_format = 0;
5606 	kfree(rbd_dev->spec->image_id);
5607 	rbd_dev->spec->image_id = NULL;
5608 	return ret;
5609 }
5610 
5611 static ssize_t do_rbd_add(struct bus_type *bus,
5612 			  const char *buf,
5613 			  size_t count)
5614 {
5615 	struct rbd_device *rbd_dev = NULL;
5616 	struct ceph_options *ceph_opts = NULL;
5617 	struct rbd_options *rbd_opts = NULL;
5618 	struct rbd_spec *spec = NULL;
5619 	struct rbd_client *rbdc;
5620 	int rc;
5621 
5622 	if (!try_module_get(THIS_MODULE))
5623 		return -ENODEV;
5624 
5625 	/* parse add command */
5626 	rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
5627 	if (rc < 0)
5628 		goto out;
5629 
5630 	rbdc = rbd_get_client(ceph_opts);
5631 	if (IS_ERR(rbdc)) {
5632 		rc = PTR_ERR(rbdc);
5633 		goto err_out_args;
5634 	}
5635 
5636 	/* pick the pool */
5637 	rc = ceph_pg_poolid_by_name(rbdc->client->osdc.osdmap, spec->pool_name);
5638 	if (rc < 0) {
5639 		if (rc == -ENOENT)
5640 			pr_info("pool %s does not exist\n", spec->pool_name);
5641 		goto err_out_client;
5642 	}
5643 	spec->pool_id = (u64)rc;
5644 
5645 	rbd_dev = rbd_dev_create(rbdc, spec, rbd_opts);
5646 	if (!rbd_dev) {
5647 		rc = -ENOMEM;
5648 		goto err_out_client;
5649 	}
5650 	rbdc = NULL;		/* rbd_dev now owns this */
5651 	spec = NULL;		/* rbd_dev now owns this */
5652 	rbd_opts = NULL;	/* rbd_dev now owns this */
5653 
5654 	rbd_dev->config_info = kstrdup(buf, GFP_KERNEL);
5655 	if (!rbd_dev->config_info) {
5656 		rc = -ENOMEM;
5657 		goto err_out_rbd_dev;
5658 	}
5659 
5660 	down_write(&rbd_dev->header_rwsem);
5661 	rc = rbd_dev_image_probe(rbd_dev, 0);
5662 	if (rc < 0) {
5663 		up_write(&rbd_dev->header_rwsem);
5664 		goto err_out_rbd_dev;
5665 	}
5666 
5667 	/* If we are mapping a snapshot it must be marked read-only */
5668 	if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
5669 		rbd_dev->opts->read_only = true;
5670 
5671 	rc = rbd_dev_device_setup(rbd_dev);
5672 	if (rc)
5673 		goto err_out_image_probe;
5674 
5675 	if (rbd_dev->opts->exclusive) {
5676 		rc = rbd_add_acquire_lock(rbd_dev);
5677 		if (rc)
5678 			goto err_out_device_setup;
5679 	}
5680 
5681 	/* Everything's ready.  Announce the disk to the world. */
5682 
5683 	rc = device_add(&rbd_dev->dev);
5684 	if (rc)
5685 		goto err_out_image_lock;
5686 
5687 	add_disk(rbd_dev->disk);
5688 	/* see rbd_init_disk() */
5689 	blk_put_queue(rbd_dev->disk->queue);
5690 
5691 	spin_lock(&rbd_dev_list_lock);
5692 	list_add_tail(&rbd_dev->node, &rbd_dev_list);
5693 	spin_unlock(&rbd_dev_list_lock);
5694 
5695 	pr_info("%s: capacity %llu features 0x%llx\n", rbd_dev->disk->disk_name,
5696 		(unsigned long long)get_capacity(rbd_dev->disk) << SECTOR_SHIFT,
5697 		rbd_dev->header.features);
5698 	rc = count;
5699 out:
5700 	module_put(THIS_MODULE);
5701 	return rc;
5702 
5703 err_out_image_lock:
5704 	rbd_dev_image_unlock(rbd_dev);
5705 err_out_device_setup:
5706 	rbd_dev_device_release(rbd_dev);
5707 err_out_image_probe:
5708 	rbd_dev_image_release(rbd_dev);
5709 err_out_rbd_dev:
5710 	rbd_dev_destroy(rbd_dev);
5711 err_out_client:
5712 	rbd_put_client(rbdc);
5713 err_out_args:
5714 	rbd_spec_put(spec);
5715 	kfree(rbd_opts);
5716 	goto out;
5717 }
5718 
5719 static ssize_t rbd_add(struct bus_type *bus,
5720 		       const char *buf,
5721 		       size_t count)
5722 {
5723 	if (single_major)
5724 		return -EINVAL;
5725 
5726 	return do_rbd_add(bus, buf, count);
5727 }
5728 
5729 static ssize_t rbd_add_single_major(struct bus_type *bus,
5730 				    const char *buf,
5731 				    size_t count)
5732 {
5733 	return do_rbd_add(bus, buf, count);
5734 }
5735 
5736 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev)
5737 {
5738 	while (rbd_dev->parent) {
5739 		struct rbd_device *first = rbd_dev;
5740 		struct rbd_device *second = first->parent;
5741 		struct rbd_device *third;
5742 
5743 		/*
5744 		 * Follow to the parent with no grandparent and
5745 		 * remove it.
5746 		 */
5747 		while (second && (third = second->parent)) {
5748 			first = second;
5749 			second = third;
5750 		}
5751 		rbd_assert(second);
5752 		rbd_dev_image_release(second);
5753 		rbd_dev_destroy(second);
5754 		first->parent = NULL;
5755 		first->parent_overlap = 0;
5756 
5757 		rbd_assert(first->parent_spec);
5758 		rbd_spec_put(first->parent_spec);
5759 		first->parent_spec = NULL;
5760 	}
5761 }
5762 
5763 static ssize_t do_rbd_remove(struct bus_type *bus,
5764 			     const char *buf,
5765 			     size_t count)
5766 {
5767 	struct rbd_device *rbd_dev = NULL;
5768 	struct list_head *tmp;
5769 	int dev_id;
5770 	char opt_buf[6];
5771 	bool already = false;
5772 	bool force = false;
5773 	int ret;
5774 
5775 	dev_id = -1;
5776 	opt_buf[0] = '\0';
5777 	sscanf(buf, "%d %5s", &dev_id, opt_buf);
5778 	if (dev_id < 0) {
5779 		pr_err("dev_id out of range\n");
5780 		return -EINVAL;
5781 	}
5782 	if (opt_buf[0] != '\0') {
5783 		if (!strcmp(opt_buf, "force")) {
5784 			force = true;
5785 		} else {
5786 			pr_err("bad remove option at '%s'\n", opt_buf);
5787 			return -EINVAL;
5788 		}
5789 	}
5790 
5791 	ret = -ENOENT;
5792 	spin_lock(&rbd_dev_list_lock);
5793 	list_for_each(tmp, &rbd_dev_list) {
5794 		rbd_dev = list_entry(tmp, struct rbd_device, node);
5795 		if (rbd_dev->dev_id == dev_id) {
5796 			ret = 0;
5797 			break;
5798 		}
5799 	}
5800 	if (!ret) {
5801 		spin_lock_irq(&rbd_dev->lock);
5802 		if (rbd_dev->open_count && !force)
5803 			ret = -EBUSY;
5804 		else
5805 			already = test_and_set_bit(RBD_DEV_FLAG_REMOVING,
5806 							&rbd_dev->flags);
5807 		spin_unlock_irq(&rbd_dev->lock);
5808 	}
5809 	spin_unlock(&rbd_dev_list_lock);
5810 	if (ret < 0 || already)
5811 		return ret;
5812 
5813 	if (force) {
5814 		/*
5815 		 * Prevent new IO from being queued and wait for existing
5816 		 * IO to complete/fail.
5817 		 */
5818 		blk_mq_freeze_queue(rbd_dev->disk->queue);
5819 		blk_set_queue_dying(rbd_dev->disk->queue);
5820 	}
5821 
5822 	del_gendisk(rbd_dev->disk);
5823 	spin_lock(&rbd_dev_list_lock);
5824 	list_del_init(&rbd_dev->node);
5825 	spin_unlock(&rbd_dev_list_lock);
5826 	device_del(&rbd_dev->dev);
5827 
5828 	rbd_dev_image_unlock(rbd_dev);
5829 	rbd_dev_device_release(rbd_dev);
5830 	rbd_dev_image_release(rbd_dev);
5831 	rbd_dev_destroy(rbd_dev);
5832 	return count;
5833 }
5834 
5835 static ssize_t rbd_remove(struct bus_type *bus,
5836 			  const char *buf,
5837 			  size_t count)
5838 {
5839 	if (single_major)
5840 		return -EINVAL;
5841 
5842 	return do_rbd_remove(bus, buf, count);
5843 }
5844 
5845 static ssize_t rbd_remove_single_major(struct bus_type *bus,
5846 				       const char *buf,
5847 				       size_t count)
5848 {
5849 	return do_rbd_remove(bus, buf, count);
5850 }
5851 
5852 /*
5853  * create control files in sysfs
5854  * /sys/bus/rbd/...
5855  */
5856 static int rbd_sysfs_init(void)
5857 {
5858 	int ret;
5859 
5860 	ret = device_register(&rbd_root_dev);
5861 	if (ret < 0)
5862 		return ret;
5863 
5864 	ret = bus_register(&rbd_bus_type);
5865 	if (ret < 0)
5866 		device_unregister(&rbd_root_dev);
5867 
5868 	return ret;
5869 }
5870 
5871 static void rbd_sysfs_cleanup(void)
5872 {
5873 	bus_unregister(&rbd_bus_type);
5874 	device_unregister(&rbd_root_dev);
5875 }
5876 
5877 static int rbd_slab_init(void)
5878 {
5879 	rbd_assert(!rbd_img_request_cache);
5880 	rbd_img_request_cache = KMEM_CACHE(rbd_img_request, 0);
5881 	if (!rbd_img_request_cache)
5882 		return -ENOMEM;
5883 
5884 	rbd_assert(!rbd_obj_request_cache);
5885 	rbd_obj_request_cache = KMEM_CACHE(rbd_obj_request, 0);
5886 	if (!rbd_obj_request_cache)
5887 		goto out_err;
5888 
5889 	return 0;
5890 
5891 out_err:
5892 	kmem_cache_destroy(rbd_img_request_cache);
5893 	rbd_img_request_cache = NULL;
5894 	return -ENOMEM;
5895 }
5896 
5897 static void rbd_slab_exit(void)
5898 {
5899 	rbd_assert(rbd_obj_request_cache);
5900 	kmem_cache_destroy(rbd_obj_request_cache);
5901 	rbd_obj_request_cache = NULL;
5902 
5903 	rbd_assert(rbd_img_request_cache);
5904 	kmem_cache_destroy(rbd_img_request_cache);
5905 	rbd_img_request_cache = NULL;
5906 }
5907 
5908 static int __init rbd_init(void)
5909 {
5910 	int rc;
5911 
5912 	if (!libceph_compatible(NULL)) {
5913 		rbd_warn(NULL, "libceph incompatibility (quitting)");
5914 		return -EINVAL;
5915 	}
5916 
5917 	rc = rbd_slab_init();
5918 	if (rc)
5919 		return rc;
5920 
5921 	/*
5922 	 * The number of active work items is limited by the number of
5923 	 * rbd devices * queue depth, so leave @max_active at default.
5924 	 */
5925 	rbd_wq = alloc_workqueue(RBD_DRV_NAME, WQ_MEM_RECLAIM, 0);
5926 	if (!rbd_wq) {
5927 		rc = -ENOMEM;
5928 		goto err_out_slab;
5929 	}
5930 
5931 	if (single_major) {
5932 		rbd_major = register_blkdev(0, RBD_DRV_NAME);
5933 		if (rbd_major < 0) {
5934 			rc = rbd_major;
5935 			goto err_out_wq;
5936 		}
5937 	}
5938 
5939 	rc = rbd_sysfs_init();
5940 	if (rc)
5941 		goto err_out_blkdev;
5942 
5943 	if (single_major)
5944 		pr_info("loaded (major %d)\n", rbd_major);
5945 	else
5946 		pr_info("loaded\n");
5947 
5948 	return 0;
5949 
5950 err_out_blkdev:
5951 	if (single_major)
5952 		unregister_blkdev(rbd_major, RBD_DRV_NAME);
5953 err_out_wq:
5954 	destroy_workqueue(rbd_wq);
5955 err_out_slab:
5956 	rbd_slab_exit();
5957 	return rc;
5958 }
5959 
5960 static void __exit rbd_exit(void)
5961 {
5962 	ida_destroy(&rbd_dev_id_ida);
5963 	rbd_sysfs_cleanup();
5964 	if (single_major)
5965 		unregister_blkdev(rbd_major, RBD_DRV_NAME);
5966 	destroy_workqueue(rbd_wq);
5967 	rbd_slab_exit();
5968 }
5969 
5970 module_init(rbd_init);
5971 module_exit(rbd_exit);
5972 
5973 MODULE_AUTHOR("Alex Elder <elder@inktank.com>");
5974 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
5975 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
5976 /* following authorship retained from original osdblk.c */
5977 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
5978 
5979 MODULE_DESCRIPTION("RADOS Block Device (RBD) driver");
5980 MODULE_LICENSE("GPL");
5981