xref: /openbmc/linux/drivers/block/rbd.c (revision 48c926cd)
1 
2 /*
3    rbd.c -- Export ceph rados objects as a Linux block device
4 
5 
6    based on drivers/block/osdblk.c:
7 
8    Copyright 2009 Red Hat, Inc.
9 
10    This program is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation.
13 
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License for more details.
18 
19    You should have received a copy of the GNU General Public License
20    along with this program; see the file COPYING.  If not, write to
21    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
22 
23 
24 
25    For usage instructions, please refer to:
26 
27                  Documentation/ABI/testing/sysfs-bus-rbd
28 
29  */
30 
31 #include <linux/ceph/libceph.h>
32 #include <linux/ceph/osd_client.h>
33 #include <linux/ceph/mon_client.h>
34 #include <linux/ceph/cls_lock_client.h>
35 #include <linux/ceph/decode.h>
36 #include <linux/parser.h>
37 #include <linux/bsearch.h>
38 
39 #include <linux/kernel.h>
40 #include <linux/device.h>
41 #include <linux/module.h>
42 #include <linux/blk-mq.h>
43 #include <linux/fs.h>
44 #include <linux/blkdev.h>
45 #include <linux/slab.h>
46 #include <linux/idr.h>
47 #include <linux/workqueue.h>
48 
49 #include "rbd_types.h"
50 
51 #define RBD_DEBUG	/* Activate rbd_assert() calls */
52 
53 /*
54  * The basic unit of block I/O is a sector.  It is interpreted in a
55  * number of contexts in Linux (blk, bio, genhd), but the default is
56  * universally 512 bytes.  These symbols are just slightly more
57  * meaningful than the bare numbers they represent.
58  */
59 #define	SECTOR_SHIFT	9
60 #define	SECTOR_SIZE	(1ULL << SECTOR_SHIFT)
61 
62 /*
63  * Increment the given counter and return its updated value.
64  * If the counter is already 0 it will not be incremented.
65  * If the counter is already at its maximum value returns
66  * -EINVAL without updating it.
67  */
68 static int atomic_inc_return_safe(atomic_t *v)
69 {
70 	unsigned int counter;
71 
72 	counter = (unsigned int)__atomic_add_unless(v, 1, 0);
73 	if (counter <= (unsigned int)INT_MAX)
74 		return (int)counter;
75 
76 	atomic_dec(v);
77 
78 	return -EINVAL;
79 }
80 
81 /* Decrement the counter.  Return the resulting value, or -EINVAL */
82 static int atomic_dec_return_safe(atomic_t *v)
83 {
84 	int counter;
85 
86 	counter = atomic_dec_return(v);
87 	if (counter >= 0)
88 		return counter;
89 
90 	atomic_inc(v);
91 
92 	return -EINVAL;
93 }
94 
95 #define RBD_DRV_NAME "rbd"
96 
97 #define RBD_MINORS_PER_MAJOR		256
98 #define RBD_SINGLE_MAJOR_PART_SHIFT	4
99 
100 #define RBD_MAX_PARENT_CHAIN_LEN	16
101 
102 #define RBD_SNAP_DEV_NAME_PREFIX	"snap_"
103 #define RBD_MAX_SNAP_NAME_LEN	\
104 			(NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
105 
106 #define RBD_MAX_SNAP_COUNT	510	/* allows max snapc to fit in 4KB */
107 
108 #define RBD_SNAP_HEAD_NAME	"-"
109 
110 #define	BAD_SNAP_INDEX	U32_MAX		/* invalid index into snap array */
111 
112 /* This allows a single page to hold an image name sent by OSD */
113 #define RBD_IMAGE_NAME_LEN_MAX	(PAGE_SIZE - sizeof (__le32) - 1)
114 #define RBD_IMAGE_ID_LEN_MAX	64
115 
116 #define RBD_OBJ_PREFIX_LEN_MAX	64
117 
118 #define RBD_NOTIFY_TIMEOUT	5	/* seconds */
119 #define RBD_RETRY_DELAY		msecs_to_jiffies(1000)
120 
121 /* Feature bits */
122 
123 #define RBD_FEATURE_LAYERING		(1ULL<<0)
124 #define RBD_FEATURE_STRIPINGV2		(1ULL<<1)
125 #define RBD_FEATURE_EXCLUSIVE_LOCK	(1ULL<<2)
126 #define RBD_FEATURE_DATA_POOL		(1ULL<<7)
127 
128 #define RBD_FEATURES_ALL	(RBD_FEATURE_LAYERING |		\
129 				 RBD_FEATURE_STRIPINGV2 |	\
130 				 RBD_FEATURE_EXCLUSIVE_LOCK |	\
131 				 RBD_FEATURE_DATA_POOL)
132 
133 /* Features supported by this (client software) implementation. */
134 
135 #define RBD_FEATURES_SUPPORTED	(RBD_FEATURES_ALL)
136 
137 /*
138  * An RBD device name will be "rbd#", where the "rbd" comes from
139  * RBD_DRV_NAME above, and # is a unique integer identifier.
140  */
141 #define DEV_NAME_LEN		32
142 
143 /*
144  * block device image metadata (in-memory version)
145  */
146 struct rbd_image_header {
147 	/* These six fields never change for a given rbd image */
148 	char *object_prefix;
149 	__u8 obj_order;
150 	u64 stripe_unit;
151 	u64 stripe_count;
152 	s64 data_pool_id;
153 	u64 features;		/* Might be changeable someday? */
154 
155 	/* The remaining fields need to be updated occasionally */
156 	u64 image_size;
157 	struct ceph_snap_context *snapc;
158 	char *snap_names;	/* format 1 only */
159 	u64 *snap_sizes;	/* format 1 only */
160 };
161 
162 /*
163  * An rbd image specification.
164  *
165  * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
166  * identify an image.  Each rbd_dev structure includes a pointer to
167  * an rbd_spec structure that encapsulates this identity.
168  *
169  * Each of the id's in an rbd_spec has an associated name.  For a
170  * user-mapped image, the names are supplied and the id's associated
171  * with them are looked up.  For a layered image, a parent image is
172  * defined by the tuple, and the names are looked up.
173  *
174  * An rbd_dev structure contains a parent_spec pointer which is
175  * non-null if the image it represents is a child in a layered
176  * image.  This pointer will refer to the rbd_spec structure used
177  * by the parent rbd_dev for its own identity (i.e., the structure
178  * is shared between the parent and child).
179  *
180  * Since these structures are populated once, during the discovery
181  * phase of image construction, they are effectively immutable so
182  * we make no effort to synchronize access to them.
183  *
184  * Note that code herein does not assume the image name is known (it
185  * could be a null pointer).
186  */
187 struct rbd_spec {
188 	u64		pool_id;
189 	const char	*pool_name;
190 
191 	const char	*image_id;
192 	const char	*image_name;
193 
194 	u64		snap_id;
195 	const char	*snap_name;
196 
197 	struct kref	kref;
198 };
199 
200 /*
201  * an instance of the client.  multiple devices may share an rbd client.
202  */
203 struct rbd_client {
204 	struct ceph_client	*client;
205 	struct kref		kref;
206 	struct list_head	node;
207 };
208 
209 struct rbd_img_request;
210 typedef void (*rbd_img_callback_t)(struct rbd_img_request *);
211 
212 #define	BAD_WHICH	U32_MAX		/* Good which or bad which, which? */
213 
214 struct rbd_obj_request;
215 typedef void (*rbd_obj_callback_t)(struct rbd_obj_request *);
216 
217 enum obj_request_type {
218 	OBJ_REQUEST_NODATA, OBJ_REQUEST_BIO, OBJ_REQUEST_PAGES
219 };
220 
221 enum obj_operation_type {
222 	OBJ_OP_WRITE,
223 	OBJ_OP_READ,
224 	OBJ_OP_DISCARD,
225 };
226 
227 enum obj_req_flags {
228 	OBJ_REQ_DONE,		/* completion flag: not done = 0, done = 1 */
229 	OBJ_REQ_IMG_DATA,	/* object usage: standalone = 0, image = 1 */
230 	OBJ_REQ_KNOWN,		/* EXISTS flag valid: no = 0, yes = 1 */
231 	OBJ_REQ_EXISTS,		/* target exists: no = 0, yes = 1 */
232 };
233 
234 struct rbd_obj_request {
235 	u64			object_no;
236 	u64			offset;		/* object start byte */
237 	u64			length;		/* bytes from offset */
238 	unsigned long		flags;
239 
240 	/*
241 	 * An object request associated with an image will have its
242 	 * img_data flag set; a standalone object request will not.
243 	 *
244 	 * A standalone object request will have which == BAD_WHICH
245 	 * and a null obj_request pointer.
246 	 *
247 	 * An object request initiated in support of a layered image
248 	 * object (to check for its existence before a write) will
249 	 * have which == BAD_WHICH and a non-null obj_request pointer.
250 	 *
251 	 * Finally, an object request for rbd image data will have
252 	 * which != BAD_WHICH, and will have a non-null img_request
253 	 * pointer.  The value of which will be in the range
254 	 * 0..(img_request->obj_request_count-1).
255 	 */
256 	union {
257 		struct rbd_obj_request	*obj_request;	/* STAT op */
258 		struct {
259 			struct rbd_img_request	*img_request;
260 			u64			img_offset;
261 			/* links for img_request->obj_requests list */
262 			struct list_head	links;
263 		};
264 	};
265 	u32			which;		/* posn image request list */
266 
267 	enum obj_request_type	type;
268 	union {
269 		struct bio	*bio_list;
270 		struct {
271 			struct page	**pages;
272 			u32		page_count;
273 		};
274 	};
275 	struct page		**copyup_pages;
276 	u32			copyup_page_count;
277 
278 	struct ceph_osd_request	*osd_req;
279 
280 	u64			xferred;	/* bytes transferred */
281 	int			result;
282 
283 	rbd_obj_callback_t	callback;
284 	struct completion	completion;
285 
286 	struct kref		kref;
287 };
288 
289 enum img_req_flags {
290 	IMG_REQ_WRITE,		/* I/O direction: read = 0, write = 1 */
291 	IMG_REQ_CHILD,		/* initiator: block = 0, child image = 1 */
292 	IMG_REQ_LAYERED,	/* ENOENT handling: normal = 0, layered = 1 */
293 	IMG_REQ_DISCARD,	/* discard: normal = 0, discard request = 1 */
294 };
295 
296 struct rbd_img_request {
297 	struct rbd_device	*rbd_dev;
298 	u64			offset;	/* starting image byte offset */
299 	u64			length;	/* byte count from offset */
300 	unsigned long		flags;
301 	union {
302 		u64			snap_id;	/* for reads */
303 		struct ceph_snap_context *snapc;	/* for writes */
304 	};
305 	union {
306 		struct request		*rq;		/* block request */
307 		struct rbd_obj_request	*obj_request;	/* obj req initiator */
308 	};
309 	struct page		**copyup_pages;
310 	u32			copyup_page_count;
311 	spinlock_t		completion_lock;/* protects next_completion */
312 	u32			next_completion;
313 	rbd_img_callback_t	callback;
314 	u64			xferred;/* aggregate bytes transferred */
315 	int			result;	/* first nonzero obj_request result */
316 
317 	u32			obj_request_count;
318 	struct list_head	obj_requests;	/* rbd_obj_request structs */
319 
320 	struct kref		kref;
321 };
322 
323 #define for_each_obj_request(ireq, oreq) \
324 	list_for_each_entry(oreq, &(ireq)->obj_requests, links)
325 #define for_each_obj_request_from(ireq, oreq) \
326 	list_for_each_entry_from(oreq, &(ireq)->obj_requests, links)
327 #define for_each_obj_request_safe(ireq, oreq, n) \
328 	list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links)
329 
330 enum rbd_watch_state {
331 	RBD_WATCH_STATE_UNREGISTERED,
332 	RBD_WATCH_STATE_REGISTERED,
333 	RBD_WATCH_STATE_ERROR,
334 };
335 
336 enum rbd_lock_state {
337 	RBD_LOCK_STATE_UNLOCKED,
338 	RBD_LOCK_STATE_LOCKED,
339 	RBD_LOCK_STATE_RELEASING,
340 };
341 
342 /* WatchNotify::ClientId */
343 struct rbd_client_id {
344 	u64 gid;
345 	u64 handle;
346 };
347 
348 struct rbd_mapping {
349 	u64                     size;
350 	u64                     features;
351 	bool			read_only;
352 };
353 
354 /*
355  * a single device
356  */
357 struct rbd_device {
358 	int			dev_id;		/* blkdev unique id */
359 
360 	int			major;		/* blkdev assigned major */
361 	int			minor;
362 	struct gendisk		*disk;		/* blkdev's gendisk and rq */
363 
364 	u32			image_format;	/* Either 1 or 2 */
365 	struct rbd_client	*rbd_client;
366 
367 	char			name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
368 
369 	spinlock_t		lock;		/* queue, flags, open_count */
370 
371 	struct rbd_image_header	header;
372 	unsigned long		flags;		/* possibly lock protected */
373 	struct rbd_spec		*spec;
374 	struct rbd_options	*opts;
375 	char			*config_info;	/* add{,_single_major} string */
376 
377 	struct ceph_object_id	header_oid;
378 	struct ceph_object_locator header_oloc;
379 
380 	struct ceph_file_layout	layout;		/* used for all rbd requests */
381 
382 	struct mutex		watch_mutex;
383 	enum rbd_watch_state	watch_state;
384 	struct ceph_osd_linger_request *watch_handle;
385 	u64			watch_cookie;
386 	struct delayed_work	watch_dwork;
387 
388 	struct rw_semaphore	lock_rwsem;
389 	enum rbd_lock_state	lock_state;
390 	char			lock_cookie[32];
391 	struct rbd_client_id	owner_cid;
392 	struct work_struct	acquired_lock_work;
393 	struct work_struct	released_lock_work;
394 	struct delayed_work	lock_dwork;
395 	struct work_struct	unlock_work;
396 	wait_queue_head_t	lock_waitq;
397 
398 	struct workqueue_struct	*task_wq;
399 
400 	struct rbd_spec		*parent_spec;
401 	u64			parent_overlap;
402 	atomic_t		parent_ref;
403 	struct rbd_device	*parent;
404 
405 	/* Block layer tags. */
406 	struct blk_mq_tag_set	tag_set;
407 
408 	/* protects updating the header */
409 	struct rw_semaphore     header_rwsem;
410 
411 	struct rbd_mapping	mapping;
412 
413 	struct list_head	node;
414 
415 	/* sysfs related */
416 	struct device		dev;
417 	unsigned long		open_count;	/* protected by lock */
418 };
419 
420 /*
421  * Flag bits for rbd_dev->flags:
422  * - REMOVING (which is coupled with rbd_dev->open_count) is protected
423  *   by rbd_dev->lock
424  * - BLACKLISTED is protected by rbd_dev->lock_rwsem
425  */
426 enum rbd_dev_flags {
427 	RBD_DEV_FLAG_EXISTS,	/* mapped snapshot has not been deleted */
428 	RBD_DEV_FLAG_REMOVING,	/* this mapping is being removed */
429 	RBD_DEV_FLAG_BLACKLISTED, /* our ceph_client is blacklisted */
430 };
431 
432 static DEFINE_MUTEX(client_mutex);	/* Serialize client creation */
433 
434 static LIST_HEAD(rbd_dev_list);    /* devices */
435 static DEFINE_SPINLOCK(rbd_dev_list_lock);
436 
437 static LIST_HEAD(rbd_client_list);		/* clients */
438 static DEFINE_SPINLOCK(rbd_client_list_lock);
439 
440 /* Slab caches for frequently-allocated structures */
441 
442 static struct kmem_cache	*rbd_img_request_cache;
443 static struct kmem_cache	*rbd_obj_request_cache;
444 
445 static struct bio_set		*rbd_bio_clone;
446 
447 static int rbd_major;
448 static DEFINE_IDA(rbd_dev_id_ida);
449 
450 static struct workqueue_struct *rbd_wq;
451 
452 /*
453  * Default to false for now, as single-major requires >= 0.75 version of
454  * userspace rbd utility.
455  */
456 static bool single_major = false;
457 module_param(single_major, bool, S_IRUGO);
458 MODULE_PARM_DESC(single_major, "Use a single major number for all rbd devices (default: false)");
459 
460 static int rbd_img_request_submit(struct rbd_img_request *img_request);
461 
462 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
463 		       size_t count);
464 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
465 			  size_t count);
466 static ssize_t rbd_add_single_major(struct bus_type *bus, const char *buf,
467 				    size_t count);
468 static ssize_t rbd_remove_single_major(struct bus_type *bus, const char *buf,
469 				       size_t count);
470 static int rbd_dev_image_probe(struct rbd_device *rbd_dev, int depth);
471 static void rbd_spec_put(struct rbd_spec *spec);
472 
473 static int rbd_dev_id_to_minor(int dev_id)
474 {
475 	return dev_id << RBD_SINGLE_MAJOR_PART_SHIFT;
476 }
477 
478 static int minor_to_rbd_dev_id(int minor)
479 {
480 	return minor >> RBD_SINGLE_MAJOR_PART_SHIFT;
481 }
482 
483 static bool __rbd_is_lock_owner(struct rbd_device *rbd_dev)
484 {
485 	return rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED ||
486 	       rbd_dev->lock_state == RBD_LOCK_STATE_RELEASING;
487 }
488 
489 static bool rbd_is_lock_owner(struct rbd_device *rbd_dev)
490 {
491 	bool is_lock_owner;
492 
493 	down_read(&rbd_dev->lock_rwsem);
494 	is_lock_owner = __rbd_is_lock_owner(rbd_dev);
495 	up_read(&rbd_dev->lock_rwsem);
496 	return is_lock_owner;
497 }
498 
499 static ssize_t rbd_supported_features_show(struct bus_type *bus, char *buf)
500 {
501 	return sprintf(buf, "0x%llx\n", RBD_FEATURES_SUPPORTED);
502 }
503 
504 static BUS_ATTR(add, S_IWUSR, NULL, rbd_add);
505 static BUS_ATTR(remove, S_IWUSR, NULL, rbd_remove);
506 static BUS_ATTR(add_single_major, S_IWUSR, NULL, rbd_add_single_major);
507 static BUS_ATTR(remove_single_major, S_IWUSR, NULL, rbd_remove_single_major);
508 static BUS_ATTR(supported_features, S_IRUGO, rbd_supported_features_show, NULL);
509 
510 static struct attribute *rbd_bus_attrs[] = {
511 	&bus_attr_add.attr,
512 	&bus_attr_remove.attr,
513 	&bus_attr_add_single_major.attr,
514 	&bus_attr_remove_single_major.attr,
515 	&bus_attr_supported_features.attr,
516 	NULL,
517 };
518 
519 static umode_t rbd_bus_is_visible(struct kobject *kobj,
520 				  struct attribute *attr, int index)
521 {
522 	if (!single_major &&
523 	    (attr == &bus_attr_add_single_major.attr ||
524 	     attr == &bus_attr_remove_single_major.attr))
525 		return 0;
526 
527 	return attr->mode;
528 }
529 
530 static const struct attribute_group rbd_bus_group = {
531 	.attrs = rbd_bus_attrs,
532 	.is_visible = rbd_bus_is_visible,
533 };
534 __ATTRIBUTE_GROUPS(rbd_bus);
535 
536 static struct bus_type rbd_bus_type = {
537 	.name		= "rbd",
538 	.bus_groups	= rbd_bus_groups,
539 };
540 
541 static void rbd_root_dev_release(struct device *dev)
542 {
543 }
544 
545 static struct device rbd_root_dev = {
546 	.init_name =    "rbd",
547 	.release =      rbd_root_dev_release,
548 };
549 
550 static __printf(2, 3)
551 void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
552 {
553 	struct va_format vaf;
554 	va_list args;
555 
556 	va_start(args, fmt);
557 	vaf.fmt = fmt;
558 	vaf.va = &args;
559 
560 	if (!rbd_dev)
561 		printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
562 	else if (rbd_dev->disk)
563 		printk(KERN_WARNING "%s: %s: %pV\n",
564 			RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
565 	else if (rbd_dev->spec && rbd_dev->spec->image_name)
566 		printk(KERN_WARNING "%s: image %s: %pV\n",
567 			RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
568 	else if (rbd_dev->spec && rbd_dev->spec->image_id)
569 		printk(KERN_WARNING "%s: id %s: %pV\n",
570 			RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
571 	else	/* punt */
572 		printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
573 			RBD_DRV_NAME, rbd_dev, &vaf);
574 	va_end(args);
575 }
576 
577 #ifdef RBD_DEBUG
578 #define rbd_assert(expr)						\
579 		if (unlikely(!(expr))) {				\
580 			printk(KERN_ERR "\nAssertion failure in %s() "	\
581 						"at line %d:\n\n"	\
582 					"\trbd_assert(%s);\n\n",	\
583 					__func__, __LINE__, #expr);	\
584 			BUG();						\
585 		}
586 #else /* !RBD_DEBUG */
587 #  define rbd_assert(expr)	((void) 0)
588 #endif /* !RBD_DEBUG */
589 
590 static void rbd_osd_copyup_callback(struct rbd_obj_request *obj_request);
591 static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request);
592 static void rbd_img_parent_read(struct rbd_obj_request *obj_request);
593 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev);
594 
595 static int rbd_dev_refresh(struct rbd_device *rbd_dev);
596 static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev);
597 static int rbd_dev_header_info(struct rbd_device *rbd_dev);
598 static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev);
599 static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
600 					u64 snap_id);
601 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
602 				u8 *order, u64 *snap_size);
603 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
604 		u64 *snap_features);
605 
606 static int rbd_open(struct block_device *bdev, fmode_t mode)
607 {
608 	struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
609 	bool removing = false;
610 
611 	if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
612 		return -EROFS;
613 
614 	spin_lock_irq(&rbd_dev->lock);
615 	if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags))
616 		removing = true;
617 	else
618 		rbd_dev->open_count++;
619 	spin_unlock_irq(&rbd_dev->lock);
620 	if (removing)
621 		return -ENOENT;
622 
623 	(void) get_device(&rbd_dev->dev);
624 
625 	return 0;
626 }
627 
628 static void rbd_release(struct gendisk *disk, fmode_t mode)
629 {
630 	struct rbd_device *rbd_dev = disk->private_data;
631 	unsigned long open_count_before;
632 
633 	spin_lock_irq(&rbd_dev->lock);
634 	open_count_before = rbd_dev->open_count--;
635 	spin_unlock_irq(&rbd_dev->lock);
636 	rbd_assert(open_count_before > 0);
637 
638 	put_device(&rbd_dev->dev);
639 }
640 
641 static int rbd_ioctl_set_ro(struct rbd_device *rbd_dev, unsigned long arg)
642 {
643 	int ret = 0;
644 	int val;
645 	bool ro;
646 	bool ro_changed = false;
647 
648 	/* get_user() may sleep, so call it before taking rbd_dev->lock */
649 	if (get_user(val, (int __user *)(arg)))
650 		return -EFAULT;
651 
652 	ro = val ? true : false;
653 	/* Snapshot doesn't allow to write*/
654 	if (rbd_dev->spec->snap_id != CEPH_NOSNAP && !ro)
655 		return -EROFS;
656 
657 	spin_lock_irq(&rbd_dev->lock);
658 	/* prevent others open this device */
659 	if (rbd_dev->open_count > 1) {
660 		ret = -EBUSY;
661 		goto out;
662 	}
663 
664 	if (rbd_dev->mapping.read_only != ro) {
665 		rbd_dev->mapping.read_only = ro;
666 		ro_changed = true;
667 	}
668 
669 out:
670 	spin_unlock_irq(&rbd_dev->lock);
671 	/* set_disk_ro() may sleep, so call it after releasing rbd_dev->lock */
672 	if (ret == 0 && ro_changed)
673 		set_disk_ro(rbd_dev->disk, ro ? 1 : 0);
674 
675 	return ret;
676 }
677 
678 static int rbd_ioctl(struct block_device *bdev, fmode_t mode,
679 			unsigned int cmd, unsigned long arg)
680 {
681 	struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
682 	int ret = 0;
683 
684 	switch (cmd) {
685 	case BLKROSET:
686 		ret = rbd_ioctl_set_ro(rbd_dev, arg);
687 		break;
688 	default:
689 		ret = -ENOTTY;
690 	}
691 
692 	return ret;
693 }
694 
695 #ifdef CONFIG_COMPAT
696 static int rbd_compat_ioctl(struct block_device *bdev, fmode_t mode,
697 				unsigned int cmd, unsigned long arg)
698 {
699 	return rbd_ioctl(bdev, mode, cmd, arg);
700 }
701 #endif /* CONFIG_COMPAT */
702 
703 static const struct block_device_operations rbd_bd_ops = {
704 	.owner			= THIS_MODULE,
705 	.open			= rbd_open,
706 	.release		= rbd_release,
707 	.ioctl			= rbd_ioctl,
708 #ifdef CONFIG_COMPAT
709 	.compat_ioctl		= rbd_compat_ioctl,
710 #endif
711 };
712 
713 /*
714  * Initialize an rbd client instance.  Success or not, this function
715  * consumes ceph_opts.  Caller holds client_mutex.
716  */
717 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
718 {
719 	struct rbd_client *rbdc;
720 	int ret = -ENOMEM;
721 
722 	dout("%s:\n", __func__);
723 	rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
724 	if (!rbdc)
725 		goto out_opt;
726 
727 	kref_init(&rbdc->kref);
728 	INIT_LIST_HEAD(&rbdc->node);
729 
730 	rbdc->client = ceph_create_client(ceph_opts, rbdc);
731 	if (IS_ERR(rbdc->client))
732 		goto out_rbdc;
733 	ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
734 
735 	ret = ceph_open_session(rbdc->client);
736 	if (ret < 0)
737 		goto out_client;
738 
739 	spin_lock(&rbd_client_list_lock);
740 	list_add_tail(&rbdc->node, &rbd_client_list);
741 	spin_unlock(&rbd_client_list_lock);
742 
743 	dout("%s: rbdc %p\n", __func__, rbdc);
744 
745 	return rbdc;
746 out_client:
747 	ceph_destroy_client(rbdc->client);
748 out_rbdc:
749 	kfree(rbdc);
750 out_opt:
751 	if (ceph_opts)
752 		ceph_destroy_options(ceph_opts);
753 	dout("%s: error %d\n", __func__, ret);
754 
755 	return ERR_PTR(ret);
756 }
757 
758 static struct rbd_client *__rbd_get_client(struct rbd_client *rbdc)
759 {
760 	kref_get(&rbdc->kref);
761 
762 	return rbdc;
763 }
764 
765 /*
766  * Find a ceph client with specific addr and configuration.  If
767  * found, bump its reference count.
768  */
769 static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
770 {
771 	struct rbd_client *client_node;
772 	bool found = false;
773 
774 	if (ceph_opts->flags & CEPH_OPT_NOSHARE)
775 		return NULL;
776 
777 	spin_lock(&rbd_client_list_lock);
778 	list_for_each_entry(client_node, &rbd_client_list, node) {
779 		if (!ceph_compare_options(ceph_opts, client_node->client)) {
780 			__rbd_get_client(client_node);
781 
782 			found = true;
783 			break;
784 		}
785 	}
786 	spin_unlock(&rbd_client_list_lock);
787 
788 	return found ? client_node : NULL;
789 }
790 
791 /*
792  * (Per device) rbd map options
793  */
794 enum {
795 	Opt_queue_depth,
796 	Opt_last_int,
797 	/* int args above */
798 	Opt_last_string,
799 	/* string args above */
800 	Opt_read_only,
801 	Opt_read_write,
802 	Opt_lock_on_read,
803 	Opt_exclusive,
804 	Opt_err
805 };
806 
807 static match_table_t rbd_opts_tokens = {
808 	{Opt_queue_depth, "queue_depth=%d"},
809 	/* int args above */
810 	/* string args above */
811 	{Opt_read_only, "read_only"},
812 	{Opt_read_only, "ro"},		/* Alternate spelling */
813 	{Opt_read_write, "read_write"},
814 	{Opt_read_write, "rw"},		/* Alternate spelling */
815 	{Opt_lock_on_read, "lock_on_read"},
816 	{Opt_exclusive, "exclusive"},
817 	{Opt_err, NULL}
818 };
819 
820 struct rbd_options {
821 	int	queue_depth;
822 	bool	read_only;
823 	bool	lock_on_read;
824 	bool	exclusive;
825 };
826 
827 #define RBD_QUEUE_DEPTH_DEFAULT	BLKDEV_MAX_RQ
828 #define RBD_READ_ONLY_DEFAULT	false
829 #define RBD_LOCK_ON_READ_DEFAULT false
830 #define RBD_EXCLUSIVE_DEFAULT	false
831 
832 static int parse_rbd_opts_token(char *c, void *private)
833 {
834 	struct rbd_options *rbd_opts = private;
835 	substring_t argstr[MAX_OPT_ARGS];
836 	int token, intval, ret;
837 
838 	token = match_token(c, rbd_opts_tokens, argstr);
839 	if (token < Opt_last_int) {
840 		ret = match_int(&argstr[0], &intval);
841 		if (ret < 0) {
842 			pr_err("bad mount option arg (not int) at '%s'\n", c);
843 			return ret;
844 		}
845 		dout("got int token %d val %d\n", token, intval);
846 	} else if (token > Opt_last_int && token < Opt_last_string) {
847 		dout("got string token %d val %s\n", token, argstr[0].from);
848 	} else {
849 		dout("got token %d\n", token);
850 	}
851 
852 	switch (token) {
853 	case Opt_queue_depth:
854 		if (intval < 1) {
855 			pr_err("queue_depth out of range\n");
856 			return -EINVAL;
857 		}
858 		rbd_opts->queue_depth = intval;
859 		break;
860 	case Opt_read_only:
861 		rbd_opts->read_only = true;
862 		break;
863 	case Opt_read_write:
864 		rbd_opts->read_only = false;
865 		break;
866 	case Opt_lock_on_read:
867 		rbd_opts->lock_on_read = true;
868 		break;
869 	case Opt_exclusive:
870 		rbd_opts->exclusive = true;
871 		break;
872 	default:
873 		/* libceph prints "bad option" msg */
874 		return -EINVAL;
875 	}
876 
877 	return 0;
878 }
879 
880 static char* obj_op_name(enum obj_operation_type op_type)
881 {
882 	switch (op_type) {
883 	case OBJ_OP_READ:
884 		return "read";
885 	case OBJ_OP_WRITE:
886 		return "write";
887 	case OBJ_OP_DISCARD:
888 		return "discard";
889 	default:
890 		return "???";
891 	}
892 }
893 
894 /*
895  * Get a ceph client with specific addr and configuration, if one does
896  * not exist create it.  Either way, ceph_opts is consumed by this
897  * function.
898  */
899 static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
900 {
901 	struct rbd_client *rbdc;
902 
903 	mutex_lock_nested(&client_mutex, SINGLE_DEPTH_NESTING);
904 	rbdc = rbd_client_find(ceph_opts);
905 	if (rbdc)	/* using an existing client */
906 		ceph_destroy_options(ceph_opts);
907 	else
908 		rbdc = rbd_client_create(ceph_opts);
909 	mutex_unlock(&client_mutex);
910 
911 	return rbdc;
912 }
913 
914 /*
915  * Destroy ceph client
916  *
917  * Caller must hold rbd_client_list_lock.
918  */
919 static void rbd_client_release(struct kref *kref)
920 {
921 	struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
922 
923 	dout("%s: rbdc %p\n", __func__, rbdc);
924 	spin_lock(&rbd_client_list_lock);
925 	list_del(&rbdc->node);
926 	spin_unlock(&rbd_client_list_lock);
927 
928 	ceph_destroy_client(rbdc->client);
929 	kfree(rbdc);
930 }
931 
932 /*
933  * Drop reference to ceph client node. If it's not referenced anymore, release
934  * it.
935  */
936 static void rbd_put_client(struct rbd_client *rbdc)
937 {
938 	if (rbdc)
939 		kref_put(&rbdc->kref, rbd_client_release);
940 }
941 
942 static bool rbd_image_format_valid(u32 image_format)
943 {
944 	return image_format == 1 || image_format == 2;
945 }
946 
947 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
948 {
949 	size_t size;
950 	u32 snap_count;
951 
952 	/* The header has to start with the magic rbd header text */
953 	if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
954 		return false;
955 
956 	/* The bio layer requires at least sector-sized I/O */
957 
958 	if (ondisk->options.order < SECTOR_SHIFT)
959 		return false;
960 
961 	/* If we use u64 in a few spots we may be able to loosen this */
962 
963 	if (ondisk->options.order > 8 * sizeof (int) - 1)
964 		return false;
965 
966 	/*
967 	 * The size of a snapshot header has to fit in a size_t, and
968 	 * that limits the number of snapshots.
969 	 */
970 	snap_count = le32_to_cpu(ondisk->snap_count);
971 	size = SIZE_MAX - sizeof (struct ceph_snap_context);
972 	if (snap_count > size / sizeof (__le64))
973 		return false;
974 
975 	/*
976 	 * Not only that, but the size of the entire the snapshot
977 	 * header must also be representable in a size_t.
978 	 */
979 	size -= snap_count * sizeof (__le64);
980 	if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
981 		return false;
982 
983 	return true;
984 }
985 
986 /*
987  * returns the size of an object in the image
988  */
989 static u32 rbd_obj_bytes(struct rbd_image_header *header)
990 {
991 	return 1U << header->obj_order;
992 }
993 
994 static void rbd_init_layout(struct rbd_device *rbd_dev)
995 {
996 	if (rbd_dev->header.stripe_unit == 0 ||
997 	    rbd_dev->header.stripe_count == 0) {
998 		rbd_dev->header.stripe_unit = rbd_obj_bytes(&rbd_dev->header);
999 		rbd_dev->header.stripe_count = 1;
1000 	}
1001 
1002 	rbd_dev->layout.stripe_unit = rbd_dev->header.stripe_unit;
1003 	rbd_dev->layout.stripe_count = rbd_dev->header.stripe_count;
1004 	rbd_dev->layout.object_size = rbd_obj_bytes(&rbd_dev->header);
1005 	rbd_dev->layout.pool_id = rbd_dev->header.data_pool_id == CEPH_NOPOOL ?
1006 			  rbd_dev->spec->pool_id : rbd_dev->header.data_pool_id;
1007 	RCU_INIT_POINTER(rbd_dev->layout.pool_ns, NULL);
1008 }
1009 
1010 /*
1011  * Fill an rbd image header with information from the given format 1
1012  * on-disk header.
1013  */
1014 static int rbd_header_from_disk(struct rbd_device *rbd_dev,
1015 				 struct rbd_image_header_ondisk *ondisk)
1016 {
1017 	struct rbd_image_header *header = &rbd_dev->header;
1018 	bool first_time = header->object_prefix == NULL;
1019 	struct ceph_snap_context *snapc;
1020 	char *object_prefix = NULL;
1021 	char *snap_names = NULL;
1022 	u64 *snap_sizes = NULL;
1023 	u32 snap_count;
1024 	int ret = -ENOMEM;
1025 	u32 i;
1026 
1027 	/* Allocate this now to avoid having to handle failure below */
1028 
1029 	if (first_time) {
1030 		object_prefix = kstrndup(ondisk->object_prefix,
1031 					 sizeof(ondisk->object_prefix),
1032 					 GFP_KERNEL);
1033 		if (!object_prefix)
1034 			return -ENOMEM;
1035 	}
1036 
1037 	/* Allocate the snapshot context and fill it in */
1038 
1039 	snap_count = le32_to_cpu(ondisk->snap_count);
1040 	snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
1041 	if (!snapc)
1042 		goto out_err;
1043 	snapc->seq = le64_to_cpu(ondisk->snap_seq);
1044 	if (snap_count) {
1045 		struct rbd_image_snap_ondisk *snaps;
1046 		u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
1047 
1048 		/* We'll keep a copy of the snapshot names... */
1049 
1050 		if (snap_names_len > (u64)SIZE_MAX)
1051 			goto out_2big;
1052 		snap_names = kmalloc(snap_names_len, GFP_KERNEL);
1053 		if (!snap_names)
1054 			goto out_err;
1055 
1056 		/* ...as well as the array of their sizes. */
1057 		snap_sizes = kmalloc_array(snap_count,
1058 					   sizeof(*header->snap_sizes),
1059 					   GFP_KERNEL);
1060 		if (!snap_sizes)
1061 			goto out_err;
1062 
1063 		/*
1064 		 * Copy the names, and fill in each snapshot's id
1065 		 * and size.
1066 		 *
1067 		 * Note that rbd_dev_v1_header_info() guarantees the
1068 		 * ondisk buffer we're working with has
1069 		 * snap_names_len bytes beyond the end of the
1070 		 * snapshot id array, this memcpy() is safe.
1071 		 */
1072 		memcpy(snap_names, &ondisk->snaps[snap_count], snap_names_len);
1073 		snaps = ondisk->snaps;
1074 		for (i = 0; i < snap_count; i++) {
1075 			snapc->snaps[i] = le64_to_cpu(snaps[i].id);
1076 			snap_sizes[i] = le64_to_cpu(snaps[i].image_size);
1077 		}
1078 	}
1079 
1080 	/* We won't fail any more, fill in the header */
1081 
1082 	if (first_time) {
1083 		header->object_prefix = object_prefix;
1084 		header->obj_order = ondisk->options.order;
1085 		rbd_init_layout(rbd_dev);
1086 	} else {
1087 		ceph_put_snap_context(header->snapc);
1088 		kfree(header->snap_names);
1089 		kfree(header->snap_sizes);
1090 	}
1091 
1092 	/* The remaining fields always get updated (when we refresh) */
1093 
1094 	header->image_size = le64_to_cpu(ondisk->image_size);
1095 	header->snapc = snapc;
1096 	header->snap_names = snap_names;
1097 	header->snap_sizes = snap_sizes;
1098 
1099 	return 0;
1100 out_2big:
1101 	ret = -EIO;
1102 out_err:
1103 	kfree(snap_sizes);
1104 	kfree(snap_names);
1105 	ceph_put_snap_context(snapc);
1106 	kfree(object_prefix);
1107 
1108 	return ret;
1109 }
1110 
1111 static const char *_rbd_dev_v1_snap_name(struct rbd_device *rbd_dev, u32 which)
1112 {
1113 	const char *snap_name;
1114 
1115 	rbd_assert(which < rbd_dev->header.snapc->num_snaps);
1116 
1117 	/* Skip over names until we find the one we are looking for */
1118 
1119 	snap_name = rbd_dev->header.snap_names;
1120 	while (which--)
1121 		snap_name += strlen(snap_name) + 1;
1122 
1123 	return kstrdup(snap_name, GFP_KERNEL);
1124 }
1125 
1126 /*
1127  * Snapshot id comparison function for use with qsort()/bsearch().
1128  * Note that result is for snapshots in *descending* order.
1129  */
1130 static int snapid_compare_reverse(const void *s1, const void *s2)
1131 {
1132 	u64 snap_id1 = *(u64 *)s1;
1133 	u64 snap_id2 = *(u64 *)s2;
1134 
1135 	if (snap_id1 < snap_id2)
1136 		return 1;
1137 	return snap_id1 == snap_id2 ? 0 : -1;
1138 }
1139 
1140 /*
1141  * Search a snapshot context to see if the given snapshot id is
1142  * present.
1143  *
1144  * Returns the position of the snapshot id in the array if it's found,
1145  * or BAD_SNAP_INDEX otherwise.
1146  *
1147  * Note: The snapshot array is in kept sorted (by the osd) in
1148  * reverse order, highest snapshot id first.
1149  */
1150 static u32 rbd_dev_snap_index(struct rbd_device *rbd_dev, u64 snap_id)
1151 {
1152 	struct ceph_snap_context *snapc = rbd_dev->header.snapc;
1153 	u64 *found;
1154 
1155 	found = bsearch(&snap_id, &snapc->snaps, snapc->num_snaps,
1156 				sizeof (snap_id), snapid_compare_reverse);
1157 
1158 	return found ? (u32)(found - &snapc->snaps[0]) : BAD_SNAP_INDEX;
1159 }
1160 
1161 static const char *rbd_dev_v1_snap_name(struct rbd_device *rbd_dev,
1162 					u64 snap_id)
1163 {
1164 	u32 which;
1165 	const char *snap_name;
1166 
1167 	which = rbd_dev_snap_index(rbd_dev, snap_id);
1168 	if (which == BAD_SNAP_INDEX)
1169 		return ERR_PTR(-ENOENT);
1170 
1171 	snap_name = _rbd_dev_v1_snap_name(rbd_dev, which);
1172 	return snap_name ? snap_name : ERR_PTR(-ENOMEM);
1173 }
1174 
1175 static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
1176 {
1177 	if (snap_id == CEPH_NOSNAP)
1178 		return RBD_SNAP_HEAD_NAME;
1179 
1180 	rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1181 	if (rbd_dev->image_format == 1)
1182 		return rbd_dev_v1_snap_name(rbd_dev, snap_id);
1183 
1184 	return rbd_dev_v2_snap_name(rbd_dev, snap_id);
1185 }
1186 
1187 static int rbd_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
1188 				u64 *snap_size)
1189 {
1190 	rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1191 	if (snap_id == CEPH_NOSNAP) {
1192 		*snap_size = rbd_dev->header.image_size;
1193 	} else if (rbd_dev->image_format == 1) {
1194 		u32 which;
1195 
1196 		which = rbd_dev_snap_index(rbd_dev, snap_id);
1197 		if (which == BAD_SNAP_INDEX)
1198 			return -ENOENT;
1199 
1200 		*snap_size = rbd_dev->header.snap_sizes[which];
1201 	} else {
1202 		u64 size = 0;
1203 		int ret;
1204 
1205 		ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, NULL, &size);
1206 		if (ret)
1207 			return ret;
1208 
1209 		*snap_size = size;
1210 	}
1211 	return 0;
1212 }
1213 
1214 static int rbd_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
1215 			u64 *snap_features)
1216 {
1217 	rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1218 	if (snap_id == CEPH_NOSNAP) {
1219 		*snap_features = rbd_dev->header.features;
1220 	} else if (rbd_dev->image_format == 1) {
1221 		*snap_features = 0;	/* No features for format 1 */
1222 	} else {
1223 		u64 features = 0;
1224 		int ret;
1225 
1226 		ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, &features);
1227 		if (ret)
1228 			return ret;
1229 
1230 		*snap_features = features;
1231 	}
1232 	return 0;
1233 }
1234 
1235 static int rbd_dev_mapping_set(struct rbd_device *rbd_dev)
1236 {
1237 	u64 snap_id = rbd_dev->spec->snap_id;
1238 	u64 size = 0;
1239 	u64 features = 0;
1240 	int ret;
1241 
1242 	ret = rbd_snap_size(rbd_dev, snap_id, &size);
1243 	if (ret)
1244 		return ret;
1245 	ret = rbd_snap_features(rbd_dev, snap_id, &features);
1246 	if (ret)
1247 		return ret;
1248 
1249 	rbd_dev->mapping.size = size;
1250 	rbd_dev->mapping.features = features;
1251 
1252 	return 0;
1253 }
1254 
1255 static void rbd_dev_mapping_clear(struct rbd_device *rbd_dev)
1256 {
1257 	rbd_dev->mapping.size = 0;
1258 	rbd_dev->mapping.features = 0;
1259 }
1260 
1261 static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
1262 {
1263 	u64 segment_size = rbd_obj_bytes(&rbd_dev->header);
1264 
1265 	return offset & (segment_size - 1);
1266 }
1267 
1268 static u64 rbd_segment_length(struct rbd_device *rbd_dev,
1269 				u64 offset, u64 length)
1270 {
1271 	u64 segment_size = rbd_obj_bytes(&rbd_dev->header);
1272 
1273 	offset &= segment_size - 1;
1274 
1275 	rbd_assert(length <= U64_MAX - offset);
1276 	if (offset + length > segment_size)
1277 		length = segment_size - offset;
1278 
1279 	return length;
1280 }
1281 
1282 /*
1283  * bio helpers
1284  */
1285 
1286 static void bio_chain_put(struct bio *chain)
1287 {
1288 	struct bio *tmp;
1289 
1290 	while (chain) {
1291 		tmp = chain;
1292 		chain = chain->bi_next;
1293 		bio_put(tmp);
1294 	}
1295 }
1296 
1297 /*
1298  * zeros a bio chain, starting at specific offset
1299  */
1300 static void zero_bio_chain(struct bio *chain, int start_ofs)
1301 {
1302 	struct bio_vec bv;
1303 	struct bvec_iter iter;
1304 	unsigned long flags;
1305 	void *buf;
1306 	int pos = 0;
1307 
1308 	while (chain) {
1309 		bio_for_each_segment(bv, chain, iter) {
1310 			if (pos + bv.bv_len > start_ofs) {
1311 				int remainder = max(start_ofs - pos, 0);
1312 				buf = bvec_kmap_irq(&bv, &flags);
1313 				memset(buf + remainder, 0,
1314 				       bv.bv_len - remainder);
1315 				flush_dcache_page(bv.bv_page);
1316 				bvec_kunmap_irq(buf, &flags);
1317 			}
1318 			pos += bv.bv_len;
1319 		}
1320 
1321 		chain = chain->bi_next;
1322 	}
1323 }
1324 
1325 /*
1326  * similar to zero_bio_chain(), zeros data defined by a page array,
1327  * starting at the given byte offset from the start of the array and
1328  * continuing up to the given end offset.  The pages array is
1329  * assumed to be big enough to hold all bytes up to the end.
1330  */
1331 static void zero_pages(struct page **pages, u64 offset, u64 end)
1332 {
1333 	struct page **page = &pages[offset >> PAGE_SHIFT];
1334 
1335 	rbd_assert(end > offset);
1336 	rbd_assert(end - offset <= (u64)SIZE_MAX);
1337 	while (offset < end) {
1338 		size_t page_offset;
1339 		size_t length;
1340 		unsigned long flags;
1341 		void *kaddr;
1342 
1343 		page_offset = offset & ~PAGE_MASK;
1344 		length = min_t(size_t, PAGE_SIZE - page_offset, end - offset);
1345 		local_irq_save(flags);
1346 		kaddr = kmap_atomic(*page);
1347 		memset(kaddr + page_offset, 0, length);
1348 		flush_dcache_page(*page);
1349 		kunmap_atomic(kaddr);
1350 		local_irq_restore(flags);
1351 
1352 		offset += length;
1353 		page++;
1354 	}
1355 }
1356 
1357 /*
1358  * Clone a portion of a bio, starting at the given byte offset
1359  * and continuing for the number of bytes indicated.
1360  */
1361 static struct bio *bio_clone_range(struct bio *bio_src,
1362 					unsigned int offset,
1363 					unsigned int len,
1364 					gfp_t gfpmask)
1365 {
1366 	struct bio *bio;
1367 
1368 	bio = bio_clone_fast(bio_src, gfpmask, rbd_bio_clone);
1369 	if (!bio)
1370 		return NULL;	/* ENOMEM */
1371 
1372 	bio_advance(bio, offset);
1373 	bio->bi_iter.bi_size = len;
1374 
1375 	return bio;
1376 }
1377 
1378 /*
1379  * Clone a portion of a bio chain, starting at the given byte offset
1380  * into the first bio in the source chain and continuing for the
1381  * number of bytes indicated.  The result is another bio chain of
1382  * exactly the given length, or a null pointer on error.
1383  *
1384  * The bio_src and offset parameters are both in-out.  On entry they
1385  * refer to the first source bio and the offset into that bio where
1386  * the start of data to be cloned is located.
1387  *
1388  * On return, bio_src is updated to refer to the bio in the source
1389  * chain that contains first un-cloned byte, and *offset will
1390  * contain the offset of that byte within that bio.
1391  */
1392 static struct bio *bio_chain_clone_range(struct bio **bio_src,
1393 					unsigned int *offset,
1394 					unsigned int len,
1395 					gfp_t gfpmask)
1396 {
1397 	struct bio *bi = *bio_src;
1398 	unsigned int off = *offset;
1399 	struct bio *chain = NULL;
1400 	struct bio **end;
1401 
1402 	/* Build up a chain of clone bios up to the limit */
1403 
1404 	if (!bi || off >= bi->bi_iter.bi_size || !len)
1405 		return NULL;		/* Nothing to clone */
1406 
1407 	end = &chain;
1408 	while (len) {
1409 		unsigned int bi_size;
1410 		struct bio *bio;
1411 
1412 		if (!bi) {
1413 			rbd_warn(NULL, "bio_chain exhausted with %u left", len);
1414 			goto out_err;	/* EINVAL; ran out of bio's */
1415 		}
1416 		bi_size = min_t(unsigned int, bi->bi_iter.bi_size - off, len);
1417 		bio = bio_clone_range(bi, off, bi_size, gfpmask);
1418 		if (!bio)
1419 			goto out_err;	/* ENOMEM */
1420 
1421 		*end = bio;
1422 		end = &bio->bi_next;
1423 
1424 		off += bi_size;
1425 		if (off == bi->bi_iter.bi_size) {
1426 			bi = bi->bi_next;
1427 			off = 0;
1428 		}
1429 		len -= bi_size;
1430 	}
1431 	*bio_src = bi;
1432 	*offset = off;
1433 
1434 	return chain;
1435 out_err:
1436 	bio_chain_put(chain);
1437 
1438 	return NULL;
1439 }
1440 
1441 /*
1442  * The default/initial value for all object request flags is 0.  For
1443  * each flag, once its value is set to 1 it is never reset to 0
1444  * again.
1445  */
1446 static void obj_request_img_data_set(struct rbd_obj_request *obj_request)
1447 {
1448 	if (test_and_set_bit(OBJ_REQ_IMG_DATA, &obj_request->flags)) {
1449 		struct rbd_device *rbd_dev;
1450 
1451 		rbd_dev = obj_request->img_request->rbd_dev;
1452 		rbd_warn(rbd_dev, "obj_request %p already marked img_data",
1453 			obj_request);
1454 	}
1455 }
1456 
1457 static bool obj_request_img_data_test(struct rbd_obj_request *obj_request)
1458 {
1459 	smp_mb();
1460 	return test_bit(OBJ_REQ_IMG_DATA, &obj_request->flags) != 0;
1461 }
1462 
1463 static void obj_request_done_set(struct rbd_obj_request *obj_request)
1464 {
1465 	if (test_and_set_bit(OBJ_REQ_DONE, &obj_request->flags)) {
1466 		struct rbd_device *rbd_dev = NULL;
1467 
1468 		if (obj_request_img_data_test(obj_request))
1469 			rbd_dev = obj_request->img_request->rbd_dev;
1470 		rbd_warn(rbd_dev, "obj_request %p already marked done",
1471 			obj_request);
1472 	}
1473 }
1474 
1475 static bool obj_request_done_test(struct rbd_obj_request *obj_request)
1476 {
1477 	smp_mb();
1478 	return test_bit(OBJ_REQ_DONE, &obj_request->flags) != 0;
1479 }
1480 
1481 /*
1482  * This sets the KNOWN flag after (possibly) setting the EXISTS
1483  * flag.  The latter is set based on the "exists" value provided.
1484  *
1485  * Note that for our purposes once an object exists it never goes
1486  * away again.  It's possible that the response from two existence
1487  * checks are separated by the creation of the target object, and
1488  * the first ("doesn't exist") response arrives *after* the second
1489  * ("does exist").  In that case we ignore the second one.
1490  */
1491 static void obj_request_existence_set(struct rbd_obj_request *obj_request,
1492 				bool exists)
1493 {
1494 	if (exists)
1495 		set_bit(OBJ_REQ_EXISTS, &obj_request->flags);
1496 	set_bit(OBJ_REQ_KNOWN, &obj_request->flags);
1497 	smp_mb();
1498 }
1499 
1500 static bool obj_request_known_test(struct rbd_obj_request *obj_request)
1501 {
1502 	smp_mb();
1503 	return test_bit(OBJ_REQ_KNOWN, &obj_request->flags) != 0;
1504 }
1505 
1506 static bool obj_request_exists_test(struct rbd_obj_request *obj_request)
1507 {
1508 	smp_mb();
1509 	return test_bit(OBJ_REQ_EXISTS, &obj_request->flags) != 0;
1510 }
1511 
1512 static bool obj_request_overlaps_parent(struct rbd_obj_request *obj_request)
1513 {
1514 	struct rbd_device *rbd_dev = obj_request->img_request->rbd_dev;
1515 
1516 	return obj_request->img_offset <
1517 	    round_up(rbd_dev->parent_overlap, rbd_obj_bytes(&rbd_dev->header));
1518 }
1519 
1520 static void rbd_obj_request_get(struct rbd_obj_request *obj_request)
1521 {
1522 	dout("%s: obj %p (was %d)\n", __func__, obj_request,
1523 		kref_read(&obj_request->kref));
1524 	kref_get(&obj_request->kref);
1525 }
1526 
1527 static void rbd_obj_request_destroy(struct kref *kref);
1528 static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
1529 {
1530 	rbd_assert(obj_request != NULL);
1531 	dout("%s: obj %p (was %d)\n", __func__, obj_request,
1532 		kref_read(&obj_request->kref));
1533 	kref_put(&obj_request->kref, rbd_obj_request_destroy);
1534 }
1535 
1536 static void rbd_img_request_get(struct rbd_img_request *img_request)
1537 {
1538 	dout("%s: img %p (was %d)\n", __func__, img_request,
1539 	     kref_read(&img_request->kref));
1540 	kref_get(&img_request->kref);
1541 }
1542 
1543 static bool img_request_child_test(struct rbd_img_request *img_request);
1544 static void rbd_parent_request_destroy(struct kref *kref);
1545 static void rbd_img_request_destroy(struct kref *kref);
1546 static void rbd_img_request_put(struct rbd_img_request *img_request)
1547 {
1548 	rbd_assert(img_request != NULL);
1549 	dout("%s: img %p (was %d)\n", __func__, img_request,
1550 		kref_read(&img_request->kref));
1551 	if (img_request_child_test(img_request))
1552 		kref_put(&img_request->kref, rbd_parent_request_destroy);
1553 	else
1554 		kref_put(&img_request->kref, rbd_img_request_destroy);
1555 }
1556 
1557 static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
1558 					struct rbd_obj_request *obj_request)
1559 {
1560 	rbd_assert(obj_request->img_request == NULL);
1561 
1562 	/* Image request now owns object's original reference */
1563 	obj_request->img_request = img_request;
1564 	obj_request->which = img_request->obj_request_count;
1565 	rbd_assert(!obj_request_img_data_test(obj_request));
1566 	obj_request_img_data_set(obj_request);
1567 	rbd_assert(obj_request->which != BAD_WHICH);
1568 	img_request->obj_request_count++;
1569 	list_add_tail(&obj_request->links, &img_request->obj_requests);
1570 	dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1571 		obj_request->which);
1572 }
1573 
1574 static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
1575 					struct rbd_obj_request *obj_request)
1576 {
1577 	rbd_assert(obj_request->which != BAD_WHICH);
1578 
1579 	dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1580 		obj_request->which);
1581 	list_del(&obj_request->links);
1582 	rbd_assert(img_request->obj_request_count > 0);
1583 	img_request->obj_request_count--;
1584 	rbd_assert(obj_request->which == img_request->obj_request_count);
1585 	obj_request->which = BAD_WHICH;
1586 	rbd_assert(obj_request_img_data_test(obj_request));
1587 	rbd_assert(obj_request->img_request == img_request);
1588 	obj_request->img_request = NULL;
1589 	obj_request->callback = NULL;
1590 	rbd_obj_request_put(obj_request);
1591 }
1592 
1593 static bool obj_request_type_valid(enum obj_request_type type)
1594 {
1595 	switch (type) {
1596 	case OBJ_REQUEST_NODATA:
1597 	case OBJ_REQUEST_BIO:
1598 	case OBJ_REQUEST_PAGES:
1599 		return true;
1600 	default:
1601 		return false;
1602 	}
1603 }
1604 
1605 static void rbd_img_obj_callback(struct rbd_obj_request *obj_request);
1606 
1607 static void rbd_obj_request_submit(struct rbd_obj_request *obj_request)
1608 {
1609 	struct ceph_osd_request *osd_req = obj_request->osd_req;
1610 
1611 	dout("%s %p object_no %016llx %llu~%llu osd_req %p\n", __func__,
1612 	     obj_request, obj_request->object_no, obj_request->offset,
1613 	     obj_request->length, osd_req);
1614 	if (obj_request_img_data_test(obj_request)) {
1615 		WARN_ON(obj_request->callback != rbd_img_obj_callback);
1616 		rbd_img_request_get(obj_request->img_request);
1617 	}
1618 	ceph_osdc_start_request(osd_req->r_osdc, osd_req, false);
1619 }
1620 
1621 static void rbd_img_request_complete(struct rbd_img_request *img_request)
1622 {
1623 
1624 	dout("%s: img %p\n", __func__, img_request);
1625 
1626 	/*
1627 	 * If no error occurred, compute the aggregate transfer
1628 	 * count for the image request.  We could instead use
1629 	 * atomic64_cmpxchg() to update it as each object request
1630 	 * completes; not clear which way is better off hand.
1631 	 */
1632 	if (!img_request->result) {
1633 		struct rbd_obj_request *obj_request;
1634 		u64 xferred = 0;
1635 
1636 		for_each_obj_request(img_request, obj_request)
1637 			xferred += obj_request->xferred;
1638 		img_request->xferred = xferred;
1639 	}
1640 
1641 	if (img_request->callback)
1642 		img_request->callback(img_request);
1643 	else
1644 		rbd_img_request_put(img_request);
1645 }
1646 
1647 /*
1648  * The default/initial value for all image request flags is 0.  Each
1649  * is conditionally set to 1 at image request initialization time
1650  * and currently never change thereafter.
1651  */
1652 static void img_request_write_set(struct rbd_img_request *img_request)
1653 {
1654 	set_bit(IMG_REQ_WRITE, &img_request->flags);
1655 	smp_mb();
1656 }
1657 
1658 static bool img_request_write_test(struct rbd_img_request *img_request)
1659 {
1660 	smp_mb();
1661 	return test_bit(IMG_REQ_WRITE, &img_request->flags) != 0;
1662 }
1663 
1664 /*
1665  * Set the discard flag when the img_request is an discard request
1666  */
1667 static void img_request_discard_set(struct rbd_img_request *img_request)
1668 {
1669 	set_bit(IMG_REQ_DISCARD, &img_request->flags);
1670 	smp_mb();
1671 }
1672 
1673 static bool img_request_discard_test(struct rbd_img_request *img_request)
1674 {
1675 	smp_mb();
1676 	return test_bit(IMG_REQ_DISCARD, &img_request->flags) != 0;
1677 }
1678 
1679 static void img_request_child_set(struct rbd_img_request *img_request)
1680 {
1681 	set_bit(IMG_REQ_CHILD, &img_request->flags);
1682 	smp_mb();
1683 }
1684 
1685 static void img_request_child_clear(struct rbd_img_request *img_request)
1686 {
1687 	clear_bit(IMG_REQ_CHILD, &img_request->flags);
1688 	smp_mb();
1689 }
1690 
1691 static bool img_request_child_test(struct rbd_img_request *img_request)
1692 {
1693 	smp_mb();
1694 	return test_bit(IMG_REQ_CHILD, &img_request->flags) != 0;
1695 }
1696 
1697 static void img_request_layered_set(struct rbd_img_request *img_request)
1698 {
1699 	set_bit(IMG_REQ_LAYERED, &img_request->flags);
1700 	smp_mb();
1701 }
1702 
1703 static void img_request_layered_clear(struct rbd_img_request *img_request)
1704 {
1705 	clear_bit(IMG_REQ_LAYERED, &img_request->flags);
1706 	smp_mb();
1707 }
1708 
1709 static bool img_request_layered_test(struct rbd_img_request *img_request)
1710 {
1711 	smp_mb();
1712 	return test_bit(IMG_REQ_LAYERED, &img_request->flags) != 0;
1713 }
1714 
1715 static enum obj_operation_type
1716 rbd_img_request_op_type(struct rbd_img_request *img_request)
1717 {
1718 	if (img_request_write_test(img_request))
1719 		return OBJ_OP_WRITE;
1720 	else if (img_request_discard_test(img_request))
1721 		return OBJ_OP_DISCARD;
1722 	else
1723 		return OBJ_OP_READ;
1724 }
1725 
1726 static void
1727 rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request)
1728 {
1729 	u64 xferred = obj_request->xferred;
1730 	u64 length = obj_request->length;
1731 
1732 	dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1733 		obj_request, obj_request->img_request, obj_request->result,
1734 		xferred, length);
1735 	/*
1736 	 * ENOENT means a hole in the image.  We zero-fill the entire
1737 	 * length of the request.  A short read also implies zero-fill
1738 	 * to the end of the request.  An error requires the whole
1739 	 * length of the request to be reported finished with an error
1740 	 * to the block layer.  In each case we update the xferred
1741 	 * count to indicate the whole request was satisfied.
1742 	 */
1743 	rbd_assert(obj_request->type != OBJ_REQUEST_NODATA);
1744 	if (obj_request->result == -ENOENT) {
1745 		if (obj_request->type == OBJ_REQUEST_BIO)
1746 			zero_bio_chain(obj_request->bio_list, 0);
1747 		else
1748 			zero_pages(obj_request->pages, 0, length);
1749 		obj_request->result = 0;
1750 	} else if (xferred < length && !obj_request->result) {
1751 		if (obj_request->type == OBJ_REQUEST_BIO)
1752 			zero_bio_chain(obj_request->bio_list, xferred);
1753 		else
1754 			zero_pages(obj_request->pages, xferred, length);
1755 	}
1756 	obj_request->xferred = length;
1757 	obj_request_done_set(obj_request);
1758 }
1759 
1760 static void rbd_obj_request_complete(struct rbd_obj_request *obj_request)
1761 {
1762 	dout("%s: obj %p cb %p\n", __func__, obj_request,
1763 		obj_request->callback);
1764 	if (obj_request->callback)
1765 		obj_request->callback(obj_request);
1766 	else
1767 		complete_all(&obj_request->completion);
1768 }
1769 
1770 static void rbd_obj_request_error(struct rbd_obj_request *obj_request, int err)
1771 {
1772 	obj_request->result = err;
1773 	obj_request->xferred = 0;
1774 	/*
1775 	 * kludge - mirror rbd_obj_request_submit() to match a put in
1776 	 * rbd_img_obj_callback()
1777 	 */
1778 	if (obj_request_img_data_test(obj_request)) {
1779 		WARN_ON(obj_request->callback != rbd_img_obj_callback);
1780 		rbd_img_request_get(obj_request->img_request);
1781 	}
1782 	obj_request_done_set(obj_request);
1783 	rbd_obj_request_complete(obj_request);
1784 }
1785 
1786 static void rbd_osd_read_callback(struct rbd_obj_request *obj_request)
1787 {
1788 	struct rbd_img_request *img_request = NULL;
1789 	struct rbd_device *rbd_dev = NULL;
1790 	bool layered = false;
1791 
1792 	if (obj_request_img_data_test(obj_request)) {
1793 		img_request = obj_request->img_request;
1794 		layered = img_request && img_request_layered_test(img_request);
1795 		rbd_dev = img_request->rbd_dev;
1796 	}
1797 
1798 	dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1799 		obj_request, img_request, obj_request->result,
1800 		obj_request->xferred, obj_request->length);
1801 	if (layered && obj_request->result == -ENOENT &&
1802 			obj_request->img_offset < rbd_dev->parent_overlap)
1803 		rbd_img_parent_read(obj_request);
1804 	else if (img_request)
1805 		rbd_img_obj_request_read_callback(obj_request);
1806 	else
1807 		obj_request_done_set(obj_request);
1808 }
1809 
1810 static void rbd_osd_write_callback(struct rbd_obj_request *obj_request)
1811 {
1812 	dout("%s: obj %p result %d %llu\n", __func__, obj_request,
1813 		obj_request->result, obj_request->length);
1814 	/*
1815 	 * There is no such thing as a successful short write.  Set
1816 	 * it to our originally-requested length.
1817 	 */
1818 	obj_request->xferred = obj_request->length;
1819 	obj_request_done_set(obj_request);
1820 }
1821 
1822 static void rbd_osd_discard_callback(struct rbd_obj_request *obj_request)
1823 {
1824 	dout("%s: obj %p result %d %llu\n", __func__, obj_request,
1825 		obj_request->result, obj_request->length);
1826 	/*
1827 	 * There is no such thing as a successful short discard.  Set
1828 	 * it to our originally-requested length.
1829 	 */
1830 	obj_request->xferred = obj_request->length;
1831 	/* discarding a non-existent object is not a problem */
1832 	if (obj_request->result == -ENOENT)
1833 		obj_request->result = 0;
1834 	obj_request_done_set(obj_request);
1835 }
1836 
1837 /*
1838  * For a simple stat call there's nothing to do.  We'll do more if
1839  * this is part of a write sequence for a layered image.
1840  */
1841 static void rbd_osd_stat_callback(struct rbd_obj_request *obj_request)
1842 {
1843 	dout("%s: obj %p\n", __func__, obj_request);
1844 	obj_request_done_set(obj_request);
1845 }
1846 
1847 static void rbd_osd_call_callback(struct rbd_obj_request *obj_request)
1848 {
1849 	dout("%s: obj %p\n", __func__, obj_request);
1850 
1851 	if (obj_request_img_data_test(obj_request))
1852 		rbd_osd_copyup_callback(obj_request);
1853 	else
1854 		obj_request_done_set(obj_request);
1855 }
1856 
1857 static void rbd_osd_req_callback(struct ceph_osd_request *osd_req)
1858 {
1859 	struct rbd_obj_request *obj_request = osd_req->r_priv;
1860 	u16 opcode;
1861 
1862 	dout("%s: osd_req %p\n", __func__, osd_req);
1863 	rbd_assert(osd_req == obj_request->osd_req);
1864 	if (obj_request_img_data_test(obj_request)) {
1865 		rbd_assert(obj_request->img_request);
1866 		rbd_assert(obj_request->which != BAD_WHICH);
1867 	} else {
1868 		rbd_assert(obj_request->which == BAD_WHICH);
1869 	}
1870 
1871 	if (osd_req->r_result < 0)
1872 		obj_request->result = osd_req->r_result;
1873 
1874 	/*
1875 	 * We support a 64-bit length, but ultimately it has to be
1876 	 * passed to the block layer, which just supports a 32-bit
1877 	 * length field.
1878 	 */
1879 	obj_request->xferred = osd_req->r_ops[0].outdata_len;
1880 	rbd_assert(obj_request->xferred < (u64)UINT_MAX);
1881 
1882 	opcode = osd_req->r_ops[0].op;
1883 	switch (opcode) {
1884 	case CEPH_OSD_OP_READ:
1885 		rbd_osd_read_callback(obj_request);
1886 		break;
1887 	case CEPH_OSD_OP_SETALLOCHINT:
1888 		rbd_assert(osd_req->r_ops[1].op == CEPH_OSD_OP_WRITE ||
1889 			   osd_req->r_ops[1].op == CEPH_OSD_OP_WRITEFULL);
1890 		/* fall through */
1891 	case CEPH_OSD_OP_WRITE:
1892 	case CEPH_OSD_OP_WRITEFULL:
1893 		rbd_osd_write_callback(obj_request);
1894 		break;
1895 	case CEPH_OSD_OP_STAT:
1896 		rbd_osd_stat_callback(obj_request);
1897 		break;
1898 	case CEPH_OSD_OP_DELETE:
1899 	case CEPH_OSD_OP_TRUNCATE:
1900 	case CEPH_OSD_OP_ZERO:
1901 		rbd_osd_discard_callback(obj_request);
1902 		break;
1903 	case CEPH_OSD_OP_CALL:
1904 		rbd_osd_call_callback(obj_request);
1905 		break;
1906 	default:
1907 		rbd_warn(NULL, "unexpected OSD op: object_no %016llx opcode %d",
1908 			 obj_request->object_no, opcode);
1909 		break;
1910 	}
1911 
1912 	if (obj_request_done_test(obj_request))
1913 		rbd_obj_request_complete(obj_request);
1914 }
1915 
1916 static void rbd_osd_req_format_read(struct rbd_obj_request *obj_request)
1917 {
1918 	struct ceph_osd_request *osd_req = obj_request->osd_req;
1919 
1920 	rbd_assert(obj_request_img_data_test(obj_request));
1921 	osd_req->r_snapid = obj_request->img_request->snap_id;
1922 }
1923 
1924 static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request)
1925 {
1926 	struct ceph_osd_request *osd_req = obj_request->osd_req;
1927 
1928 	ktime_get_real_ts(&osd_req->r_mtime);
1929 	osd_req->r_data_offset = obj_request->offset;
1930 }
1931 
1932 static struct ceph_osd_request *
1933 __rbd_osd_req_create(struct rbd_device *rbd_dev,
1934 		     struct ceph_snap_context *snapc,
1935 		     int num_ops, unsigned int flags,
1936 		     struct rbd_obj_request *obj_request)
1937 {
1938 	struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1939 	struct ceph_osd_request *req;
1940 	const char *name_format = rbd_dev->image_format == 1 ?
1941 				      RBD_V1_DATA_FORMAT : RBD_V2_DATA_FORMAT;
1942 
1943 	req = ceph_osdc_alloc_request(osdc, snapc, num_ops, false, GFP_NOIO);
1944 	if (!req)
1945 		return NULL;
1946 
1947 	req->r_flags = flags;
1948 	req->r_callback = rbd_osd_req_callback;
1949 	req->r_priv = obj_request;
1950 
1951 	req->r_base_oloc.pool = rbd_dev->layout.pool_id;
1952 	if (ceph_oid_aprintf(&req->r_base_oid, GFP_NOIO, name_format,
1953 			rbd_dev->header.object_prefix, obj_request->object_no))
1954 		goto err_req;
1955 
1956 	if (ceph_osdc_alloc_messages(req, GFP_NOIO))
1957 		goto err_req;
1958 
1959 	return req;
1960 
1961 err_req:
1962 	ceph_osdc_put_request(req);
1963 	return NULL;
1964 }
1965 
1966 /*
1967  * Create an osd request.  A read request has one osd op (read).
1968  * A write request has either one (watch) or two (hint+write) osd ops.
1969  * (All rbd data writes are prefixed with an allocation hint op, but
1970  * technically osd watch is a write request, hence this distinction.)
1971  */
1972 static struct ceph_osd_request *rbd_osd_req_create(
1973 					struct rbd_device *rbd_dev,
1974 					enum obj_operation_type op_type,
1975 					unsigned int num_ops,
1976 					struct rbd_obj_request *obj_request)
1977 {
1978 	struct ceph_snap_context *snapc = NULL;
1979 
1980 	if (obj_request_img_data_test(obj_request) &&
1981 		(op_type == OBJ_OP_DISCARD || op_type == OBJ_OP_WRITE)) {
1982 		struct rbd_img_request *img_request = obj_request->img_request;
1983 		if (op_type == OBJ_OP_WRITE) {
1984 			rbd_assert(img_request_write_test(img_request));
1985 		} else {
1986 			rbd_assert(img_request_discard_test(img_request));
1987 		}
1988 		snapc = img_request->snapc;
1989 	}
1990 
1991 	rbd_assert(num_ops == 1 || ((op_type == OBJ_OP_WRITE) && num_ops == 2));
1992 
1993 	return __rbd_osd_req_create(rbd_dev, snapc, num_ops,
1994 	    (op_type == OBJ_OP_WRITE || op_type == OBJ_OP_DISCARD) ?
1995 	    CEPH_OSD_FLAG_WRITE : CEPH_OSD_FLAG_READ, obj_request);
1996 }
1997 
1998 /*
1999  * Create a copyup osd request based on the information in the object
2000  * request supplied.  A copyup request has two or three osd ops, a
2001  * copyup method call, potentially a hint op, and a write or truncate
2002  * or zero op.
2003  */
2004 static struct ceph_osd_request *
2005 rbd_osd_req_create_copyup(struct rbd_obj_request *obj_request)
2006 {
2007 	struct rbd_img_request *img_request;
2008 	int num_osd_ops = 3;
2009 
2010 	rbd_assert(obj_request_img_data_test(obj_request));
2011 	img_request = obj_request->img_request;
2012 	rbd_assert(img_request);
2013 	rbd_assert(img_request_write_test(img_request) ||
2014 			img_request_discard_test(img_request));
2015 
2016 	if (img_request_discard_test(img_request))
2017 		num_osd_ops = 2;
2018 
2019 	return __rbd_osd_req_create(img_request->rbd_dev,
2020 				    img_request->snapc, num_osd_ops,
2021 				    CEPH_OSD_FLAG_WRITE, obj_request);
2022 }
2023 
2024 static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
2025 {
2026 	ceph_osdc_put_request(osd_req);
2027 }
2028 
2029 static struct rbd_obj_request *
2030 rbd_obj_request_create(enum obj_request_type type)
2031 {
2032 	struct rbd_obj_request *obj_request;
2033 
2034 	rbd_assert(obj_request_type_valid(type));
2035 
2036 	obj_request = kmem_cache_zalloc(rbd_obj_request_cache, GFP_NOIO);
2037 	if (!obj_request)
2038 		return NULL;
2039 
2040 	obj_request->which = BAD_WHICH;
2041 	obj_request->type = type;
2042 	INIT_LIST_HEAD(&obj_request->links);
2043 	init_completion(&obj_request->completion);
2044 	kref_init(&obj_request->kref);
2045 
2046 	dout("%s %p\n", __func__, obj_request);
2047 	return obj_request;
2048 }
2049 
2050 static void rbd_obj_request_destroy(struct kref *kref)
2051 {
2052 	struct rbd_obj_request *obj_request;
2053 
2054 	obj_request = container_of(kref, struct rbd_obj_request, kref);
2055 
2056 	dout("%s: obj %p\n", __func__, obj_request);
2057 
2058 	rbd_assert(obj_request->img_request == NULL);
2059 	rbd_assert(obj_request->which == BAD_WHICH);
2060 
2061 	if (obj_request->osd_req)
2062 		rbd_osd_req_destroy(obj_request->osd_req);
2063 
2064 	rbd_assert(obj_request_type_valid(obj_request->type));
2065 	switch (obj_request->type) {
2066 	case OBJ_REQUEST_NODATA:
2067 		break;		/* Nothing to do */
2068 	case OBJ_REQUEST_BIO:
2069 		if (obj_request->bio_list)
2070 			bio_chain_put(obj_request->bio_list);
2071 		break;
2072 	case OBJ_REQUEST_PAGES:
2073 		/* img_data requests don't own their page array */
2074 		if (obj_request->pages &&
2075 		    !obj_request_img_data_test(obj_request))
2076 			ceph_release_page_vector(obj_request->pages,
2077 						obj_request->page_count);
2078 		break;
2079 	}
2080 
2081 	kmem_cache_free(rbd_obj_request_cache, obj_request);
2082 }
2083 
2084 /* It's OK to call this for a device with no parent */
2085 
2086 static void rbd_spec_put(struct rbd_spec *spec);
2087 static void rbd_dev_unparent(struct rbd_device *rbd_dev)
2088 {
2089 	rbd_dev_remove_parent(rbd_dev);
2090 	rbd_spec_put(rbd_dev->parent_spec);
2091 	rbd_dev->parent_spec = NULL;
2092 	rbd_dev->parent_overlap = 0;
2093 }
2094 
2095 /*
2096  * Parent image reference counting is used to determine when an
2097  * image's parent fields can be safely torn down--after there are no
2098  * more in-flight requests to the parent image.  When the last
2099  * reference is dropped, cleaning them up is safe.
2100  */
2101 static void rbd_dev_parent_put(struct rbd_device *rbd_dev)
2102 {
2103 	int counter;
2104 
2105 	if (!rbd_dev->parent_spec)
2106 		return;
2107 
2108 	counter = atomic_dec_return_safe(&rbd_dev->parent_ref);
2109 	if (counter > 0)
2110 		return;
2111 
2112 	/* Last reference; clean up parent data structures */
2113 
2114 	if (!counter)
2115 		rbd_dev_unparent(rbd_dev);
2116 	else
2117 		rbd_warn(rbd_dev, "parent reference underflow");
2118 }
2119 
2120 /*
2121  * If an image has a non-zero parent overlap, get a reference to its
2122  * parent.
2123  *
2124  * Returns true if the rbd device has a parent with a non-zero
2125  * overlap and a reference for it was successfully taken, or
2126  * false otherwise.
2127  */
2128 static bool rbd_dev_parent_get(struct rbd_device *rbd_dev)
2129 {
2130 	int counter = 0;
2131 
2132 	if (!rbd_dev->parent_spec)
2133 		return false;
2134 
2135 	down_read(&rbd_dev->header_rwsem);
2136 	if (rbd_dev->parent_overlap)
2137 		counter = atomic_inc_return_safe(&rbd_dev->parent_ref);
2138 	up_read(&rbd_dev->header_rwsem);
2139 
2140 	if (counter < 0)
2141 		rbd_warn(rbd_dev, "parent reference overflow");
2142 
2143 	return counter > 0;
2144 }
2145 
2146 /*
2147  * Caller is responsible for filling in the list of object requests
2148  * that comprises the image request, and the Linux request pointer
2149  * (if there is one).
2150  */
2151 static struct rbd_img_request *rbd_img_request_create(
2152 					struct rbd_device *rbd_dev,
2153 					u64 offset, u64 length,
2154 					enum obj_operation_type op_type,
2155 					struct ceph_snap_context *snapc)
2156 {
2157 	struct rbd_img_request *img_request;
2158 
2159 	img_request = kmem_cache_alloc(rbd_img_request_cache, GFP_NOIO);
2160 	if (!img_request)
2161 		return NULL;
2162 
2163 	img_request->rq = NULL;
2164 	img_request->rbd_dev = rbd_dev;
2165 	img_request->offset = offset;
2166 	img_request->length = length;
2167 	img_request->flags = 0;
2168 	if (op_type == OBJ_OP_DISCARD) {
2169 		img_request_discard_set(img_request);
2170 		img_request->snapc = snapc;
2171 	} else if (op_type == OBJ_OP_WRITE) {
2172 		img_request_write_set(img_request);
2173 		img_request->snapc = snapc;
2174 	} else {
2175 		img_request->snap_id = rbd_dev->spec->snap_id;
2176 	}
2177 	if (rbd_dev_parent_get(rbd_dev))
2178 		img_request_layered_set(img_request);
2179 	spin_lock_init(&img_request->completion_lock);
2180 	img_request->next_completion = 0;
2181 	img_request->callback = NULL;
2182 	img_request->result = 0;
2183 	img_request->obj_request_count = 0;
2184 	INIT_LIST_HEAD(&img_request->obj_requests);
2185 	kref_init(&img_request->kref);
2186 
2187 	dout("%s: rbd_dev %p %s %llu/%llu -> img %p\n", __func__, rbd_dev,
2188 		obj_op_name(op_type), offset, length, img_request);
2189 
2190 	return img_request;
2191 }
2192 
2193 static void rbd_img_request_destroy(struct kref *kref)
2194 {
2195 	struct rbd_img_request *img_request;
2196 	struct rbd_obj_request *obj_request;
2197 	struct rbd_obj_request *next_obj_request;
2198 
2199 	img_request = container_of(kref, struct rbd_img_request, kref);
2200 
2201 	dout("%s: img %p\n", __func__, img_request);
2202 
2203 	for_each_obj_request_safe(img_request, obj_request, next_obj_request)
2204 		rbd_img_obj_request_del(img_request, obj_request);
2205 	rbd_assert(img_request->obj_request_count == 0);
2206 
2207 	if (img_request_layered_test(img_request)) {
2208 		img_request_layered_clear(img_request);
2209 		rbd_dev_parent_put(img_request->rbd_dev);
2210 	}
2211 
2212 	if (img_request_write_test(img_request) ||
2213 		img_request_discard_test(img_request))
2214 		ceph_put_snap_context(img_request->snapc);
2215 
2216 	kmem_cache_free(rbd_img_request_cache, img_request);
2217 }
2218 
2219 static struct rbd_img_request *rbd_parent_request_create(
2220 					struct rbd_obj_request *obj_request,
2221 					u64 img_offset, u64 length)
2222 {
2223 	struct rbd_img_request *parent_request;
2224 	struct rbd_device *rbd_dev;
2225 
2226 	rbd_assert(obj_request->img_request);
2227 	rbd_dev = obj_request->img_request->rbd_dev;
2228 
2229 	parent_request = rbd_img_request_create(rbd_dev->parent, img_offset,
2230 						length, OBJ_OP_READ, NULL);
2231 	if (!parent_request)
2232 		return NULL;
2233 
2234 	img_request_child_set(parent_request);
2235 	rbd_obj_request_get(obj_request);
2236 	parent_request->obj_request = obj_request;
2237 
2238 	return parent_request;
2239 }
2240 
2241 static void rbd_parent_request_destroy(struct kref *kref)
2242 {
2243 	struct rbd_img_request *parent_request;
2244 	struct rbd_obj_request *orig_request;
2245 
2246 	parent_request = container_of(kref, struct rbd_img_request, kref);
2247 	orig_request = parent_request->obj_request;
2248 
2249 	parent_request->obj_request = NULL;
2250 	rbd_obj_request_put(orig_request);
2251 	img_request_child_clear(parent_request);
2252 
2253 	rbd_img_request_destroy(kref);
2254 }
2255 
2256 static bool rbd_img_obj_end_request(struct rbd_obj_request *obj_request)
2257 {
2258 	struct rbd_img_request *img_request;
2259 	unsigned int xferred;
2260 	int result;
2261 	bool more;
2262 
2263 	rbd_assert(obj_request_img_data_test(obj_request));
2264 	img_request = obj_request->img_request;
2265 
2266 	rbd_assert(obj_request->xferred <= (u64)UINT_MAX);
2267 	xferred = (unsigned int)obj_request->xferred;
2268 	result = obj_request->result;
2269 	if (result) {
2270 		struct rbd_device *rbd_dev = img_request->rbd_dev;
2271 		enum obj_operation_type op_type;
2272 
2273 		if (img_request_discard_test(img_request))
2274 			op_type = OBJ_OP_DISCARD;
2275 		else if (img_request_write_test(img_request))
2276 			op_type = OBJ_OP_WRITE;
2277 		else
2278 			op_type = OBJ_OP_READ;
2279 
2280 		rbd_warn(rbd_dev, "%s %llx at %llx (%llx)",
2281 			obj_op_name(op_type), obj_request->length,
2282 			obj_request->img_offset, obj_request->offset);
2283 		rbd_warn(rbd_dev, "  result %d xferred %x",
2284 			result, xferred);
2285 		if (!img_request->result)
2286 			img_request->result = result;
2287 		/*
2288 		 * Need to end I/O on the entire obj_request worth of
2289 		 * bytes in case of error.
2290 		 */
2291 		xferred = obj_request->length;
2292 	}
2293 
2294 	if (img_request_child_test(img_request)) {
2295 		rbd_assert(img_request->obj_request != NULL);
2296 		more = obj_request->which < img_request->obj_request_count - 1;
2297 	} else {
2298 		blk_status_t status = errno_to_blk_status(result);
2299 
2300 		rbd_assert(img_request->rq != NULL);
2301 
2302 		more = blk_update_request(img_request->rq, status, xferred);
2303 		if (!more)
2304 			__blk_mq_end_request(img_request->rq, status);
2305 	}
2306 
2307 	return more;
2308 }
2309 
2310 static void rbd_img_obj_callback(struct rbd_obj_request *obj_request)
2311 {
2312 	struct rbd_img_request *img_request;
2313 	u32 which = obj_request->which;
2314 	bool more = true;
2315 
2316 	rbd_assert(obj_request_img_data_test(obj_request));
2317 	img_request = obj_request->img_request;
2318 
2319 	dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
2320 	rbd_assert(img_request != NULL);
2321 	rbd_assert(img_request->obj_request_count > 0);
2322 	rbd_assert(which != BAD_WHICH);
2323 	rbd_assert(which < img_request->obj_request_count);
2324 
2325 	spin_lock_irq(&img_request->completion_lock);
2326 	if (which != img_request->next_completion)
2327 		goto out;
2328 
2329 	for_each_obj_request_from(img_request, obj_request) {
2330 		rbd_assert(more);
2331 		rbd_assert(which < img_request->obj_request_count);
2332 
2333 		if (!obj_request_done_test(obj_request))
2334 			break;
2335 		more = rbd_img_obj_end_request(obj_request);
2336 		which++;
2337 	}
2338 
2339 	rbd_assert(more ^ (which == img_request->obj_request_count));
2340 	img_request->next_completion = which;
2341 out:
2342 	spin_unlock_irq(&img_request->completion_lock);
2343 	rbd_img_request_put(img_request);
2344 
2345 	if (!more)
2346 		rbd_img_request_complete(img_request);
2347 }
2348 
2349 /*
2350  * Add individual osd ops to the given ceph_osd_request and prepare
2351  * them for submission. num_ops is the current number of
2352  * osd operations already to the object request.
2353  */
2354 static void rbd_img_obj_request_fill(struct rbd_obj_request *obj_request,
2355 				struct ceph_osd_request *osd_request,
2356 				enum obj_operation_type op_type,
2357 				unsigned int num_ops)
2358 {
2359 	struct rbd_img_request *img_request = obj_request->img_request;
2360 	struct rbd_device *rbd_dev = img_request->rbd_dev;
2361 	u64 object_size = rbd_obj_bytes(&rbd_dev->header);
2362 	u64 offset = obj_request->offset;
2363 	u64 length = obj_request->length;
2364 	u64 img_end;
2365 	u16 opcode;
2366 
2367 	if (op_type == OBJ_OP_DISCARD) {
2368 		if (!offset && length == object_size &&
2369 		    (!img_request_layered_test(img_request) ||
2370 		     !obj_request_overlaps_parent(obj_request))) {
2371 			opcode = CEPH_OSD_OP_DELETE;
2372 		} else if ((offset + length == object_size)) {
2373 			opcode = CEPH_OSD_OP_TRUNCATE;
2374 		} else {
2375 			down_read(&rbd_dev->header_rwsem);
2376 			img_end = rbd_dev->header.image_size;
2377 			up_read(&rbd_dev->header_rwsem);
2378 
2379 			if (obj_request->img_offset + length == img_end)
2380 				opcode = CEPH_OSD_OP_TRUNCATE;
2381 			else
2382 				opcode = CEPH_OSD_OP_ZERO;
2383 		}
2384 	} else if (op_type == OBJ_OP_WRITE) {
2385 		if (!offset && length == object_size)
2386 			opcode = CEPH_OSD_OP_WRITEFULL;
2387 		else
2388 			opcode = CEPH_OSD_OP_WRITE;
2389 		osd_req_op_alloc_hint_init(osd_request, num_ops,
2390 					object_size, object_size);
2391 		num_ops++;
2392 	} else {
2393 		opcode = CEPH_OSD_OP_READ;
2394 	}
2395 
2396 	if (opcode == CEPH_OSD_OP_DELETE)
2397 		osd_req_op_init(osd_request, num_ops, opcode, 0);
2398 	else
2399 		osd_req_op_extent_init(osd_request, num_ops, opcode,
2400 				       offset, length, 0, 0);
2401 
2402 	if (obj_request->type == OBJ_REQUEST_BIO)
2403 		osd_req_op_extent_osd_data_bio(osd_request, num_ops,
2404 					obj_request->bio_list, length);
2405 	else if (obj_request->type == OBJ_REQUEST_PAGES)
2406 		osd_req_op_extent_osd_data_pages(osd_request, num_ops,
2407 					obj_request->pages, length,
2408 					offset & ~PAGE_MASK, false, false);
2409 
2410 	/* Discards are also writes */
2411 	if (op_type == OBJ_OP_WRITE || op_type == OBJ_OP_DISCARD)
2412 		rbd_osd_req_format_write(obj_request);
2413 	else
2414 		rbd_osd_req_format_read(obj_request);
2415 }
2416 
2417 /*
2418  * Split up an image request into one or more object requests, each
2419  * to a different object.  The "type" parameter indicates whether
2420  * "data_desc" is the pointer to the head of a list of bio
2421  * structures, or the base of a page array.  In either case this
2422  * function assumes data_desc describes memory sufficient to hold
2423  * all data described by the image request.
2424  */
2425 static int rbd_img_request_fill(struct rbd_img_request *img_request,
2426 					enum obj_request_type type,
2427 					void *data_desc)
2428 {
2429 	struct rbd_device *rbd_dev = img_request->rbd_dev;
2430 	struct rbd_obj_request *obj_request = NULL;
2431 	struct rbd_obj_request *next_obj_request;
2432 	struct bio *bio_list = NULL;
2433 	unsigned int bio_offset = 0;
2434 	struct page **pages = NULL;
2435 	enum obj_operation_type op_type;
2436 	u64 img_offset;
2437 	u64 resid;
2438 
2439 	dout("%s: img %p type %d data_desc %p\n", __func__, img_request,
2440 		(int)type, data_desc);
2441 
2442 	img_offset = img_request->offset;
2443 	resid = img_request->length;
2444 	rbd_assert(resid > 0);
2445 	op_type = rbd_img_request_op_type(img_request);
2446 
2447 	if (type == OBJ_REQUEST_BIO) {
2448 		bio_list = data_desc;
2449 		rbd_assert(img_offset ==
2450 			   bio_list->bi_iter.bi_sector << SECTOR_SHIFT);
2451 	} else if (type == OBJ_REQUEST_PAGES) {
2452 		pages = data_desc;
2453 	}
2454 
2455 	while (resid) {
2456 		struct ceph_osd_request *osd_req;
2457 		u64 object_no = img_offset >> rbd_dev->header.obj_order;
2458 		u64 offset = rbd_segment_offset(rbd_dev, img_offset);
2459 		u64 length = rbd_segment_length(rbd_dev, img_offset, resid);
2460 
2461 		obj_request = rbd_obj_request_create(type);
2462 		if (!obj_request)
2463 			goto out_unwind;
2464 
2465 		obj_request->object_no = object_no;
2466 		obj_request->offset = offset;
2467 		obj_request->length = length;
2468 
2469 		/*
2470 		 * set obj_request->img_request before creating the
2471 		 * osd_request so that it gets the right snapc
2472 		 */
2473 		rbd_img_obj_request_add(img_request, obj_request);
2474 
2475 		if (type == OBJ_REQUEST_BIO) {
2476 			unsigned int clone_size;
2477 
2478 			rbd_assert(length <= (u64)UINT_MAX);
2479 			clone_size = (unsigned int)length;
2480 			obj_request->bio_list =
2481 					bio_chain_clone_range(&bio_list,
2482 								&bio_offset,
2483 								clone_size,
2484 								GFP_NOIO);
2485 			if (!obj_request->bio_list)
2486 				goto out_unwind;
2487 		} else if (type == OBJ_REQUEST_PAGES) {
2488 			unsigned int page_count;
2489 
2490 			obj_request->pages = pages;
2491 			page_count = (u32)calc_pages_for(offset, length);
2492 			obj_request->page_count = page_count;
2493 			if ((offset + length) & ~PAGE_MASK)
2494 				page_count--;	/* more on last page */
2495 			pages += page_count;
2496 		}
2497 
2498 		osd_req = rbd_osd_req_create(rbd_dev, op_type,
2499 					(op_type == OBJ_OP_WRITE) ? 2 : 1,
2500 					obj_request);
2501 		if (!osd_req)
2502 			goto out_unwind;
2503 
2504 		obj_request->osd_req = osd_req;
2505 		obj_request->callback = rbd_img_obj_callback;
2506 		obj_request->img_offset = img_offset;
2507 
2508 		rbd_img_obj_request_fill(obj_request, osd_req, op_type, 0);
2509 
2510 		img_offset += length;
2511 		resid -= length;
2512 	}
2513 
2514 	return 0;
2515 
2516 out_unwind:
2517 	for_each_obj_request_safe(img_request, obj_request, next_obj_request)
2518 		rbd_img_obj_request_del(img_request, obj_request);
2519 
2520 	return -ENOMEM;
2521 }
2522 
2523 static void
2524 rbd_osd_copyup_callback(struct rbd_obj_request *obj_request)
2525 {
2526 	struct rbd_img_request *img_request;
2527 	struct rbd_device *rbd_dev;
2528 	struct page **pages;
2529 	u32 page_count;
2530 
2531 	dout("%s: obj %p\n", __func__, obj_request);
2532 
2533 	rbd_assert(obj_request->type == OBJ_REQUEST_BIO ||
2534 		obj_request->type == OBJ_REQUEST_NODATA);
2535 	rbd_assert(obj_request_img_data_test(obj_request));
2536 	img_request = obj_request->img_request;
2537 	rbd_assert(img_request);
2538 
2539 	rbd_dev = img_request->rbd_dev;
2540 	rbd_assert(rbd_dev);
2541 
2542 	pages = obj_request->copyup_pages;
2543 	rbd_assert(pages != NULL);
2544 	obj_request->copyup_pages = NULL;
2545 	page_count = obj_request->copyup_page_count;
2546 	rbd_assert(page_count);
2547 	obj_request->copyup_page_count = 0;
2548 	ceph_release_page_vector(pages, page_count);
2549 
2550 	/*
2551 	 * We want the transfer count to reflect the size of the
2552 	 * original write request.  There is no such thing as a
2553 	 * successful short write, so if the request was successful
2554 	 * we can just set it to the originally-requested length.
2555 	 */
2556 	if (!obj_request->result)
2557 		obj_request->xferred = obj_request->length;
2558 
2559 	obj_request_done_set(obj_request);
2560 }
2561 
2562 static void
2563 rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request)
2564 {
2565 	struct rbd_obj_request *orig_request;
2566 	struct ceph_osd_request *osd_req;
2567 	struct rbd_device *rbd_dev;
2568 	struct page **pages;
2569 	enum obj_operation_type op_type;
2570 	u32 page_count;
2571 	int img_result;
2572 	u64 parent_length;
2573 
2574 	rbd_assert(img_request_child_test(img_request));
2575 
2576 	/* First get what we need from the image request */
2577 
2578 	pages = img_request->copyup_pages;
2579 	rbd_assert(pages != NULL);
2580 	img_request->copyup_pages = NULL;
2581 	page_count = img_request->copyup_page_count;
2582 	rbd_assert(page_count);
2583 	img_request->copyup_page_count = 0;
2584 
2585 	orig_request = img_request->obj_request;
2586 	rbd_assert(orig_request != NULL);
2587 	rbd_assert(obj_request_type_valid(orig_request->type));
2588 	img_result = img_request->result;
2589 	parent_length = img_request->length;
2590 	rbd_assert(img_result || parent_length == img_request->xferred);
2591 	rbd_img_request_put(img_request);
2592 
2593 	rbd_assert(orig_request->img_request);
2594 	rbd_dev = orig_request->img_request->rbd_dev;
2595 	rbd_assert(rbd_dev);
2596 
2597 	/*
2598 	 * If the overlap has become 0 (most likely because the
2599 	 * image has been flattened) we need to free the pages
2600 	 * and re-submit the original write request.
2601 	 */
2602 	if (!rbd_dev->parent_overlap) {
2603 		ceph_release_page_vector(pages, page_count);
2604 		rbd_obj_request_submit(orig_request);
2605 		return;
2606 	}
2607 
2608 	if (img_result)
2609 		goto out_err;
2610 
2611 	/*
2612 	 * The original osd request is of no use to use any more.
2613 	 * We need a new one that can hold the three ops in a copyup
2614 	 * request.  Allocate the new copyup osd request for the
2615 	 * original request, and release the old one.
2616 	 */
2617 	img_result = -ENOMEM;
2618 	osd_req = rbd_osd_req_create_copyup(orig_request);
2619 	if (!osd_req)
2620 		goto out_err;
2621 	rbd_osd_req_destroy(orig_request->osd_req);
2622 	orig_request->osd_req = osd_req;
2623 	orig_request->copyup_pages = pages;
2624 	orig_request->copyup_page_count = page_count;
2625 
2626 	/* Initialize the copyup op */
2627 
2628 	osd_req_op_cls_init(osd_req, 0, CEPH_OSD_OP_CALL, "rbd", "copyup");
2629 	osd_req_op_cls_request_data_pages(osd_req, 0, pages, parent_length, 0,
2630 						false, false);
2631 
2632 	/* Add the other op(s) */
2633 
2634 	op_type = rbd_img_request_op_type(orig_request->img_request);
2635 	rbd_img_obj_request_fill(orig_request, osd_req, op_type, 1);
2636 
2637 	/* All set, send it off. */
2638 
2639 	rbd_obj_request_submit(orig_request);
2640 	return;
2641 
2642 out_err:
2643 	ceph_release_page_vector(pages, page_count);
2644 	rbd_obj_request_error(orig_request, img_result);
2645 }
2646 
2647 /*
2648  * Read from the parent image the range of data that covers the
2649  * entire target of the given object request.  This is used for
2650  * satisfying a layered image write request when the target of an
2651  * object request from the image request does not exist.
2652  *
2653  * A page array big enough to hold the returned data is allocated
2654  * and supplied to rbd_img_request_fill() as the "data descriptor."
2655  * When the read completes, this page array will be transferred to
2656  * the original object request for the copyup operation.
2657  *
2658  * If an error occurs, it is recorded as the result of the original
2659  * object request in rbd_img_obj_exists_callback().
2660  */
2661 static int rbd_img_obj_parent_read_full(struct rbd_obj_request *obj_request)
2662 {
2663 	struct rbd_device *rbd_dev = obj_request->img_request->rbd_dev;
2664 	struct rbd_img_request *parent_request = NULL;
2665 	u64 img_offset;
2666 	u64 length;
2667 	struct page **pages = NULL;
2668 	u32 page_count;
2669 	int result;
2670 
2671 	rbd_assert(rbd_dev->parent != NULL);
2672 
2673 	/*
2674 	 * Determine the byte range covered by the object in the
2675 	 * child image to which the original request was to be sent.
2676 	 */
2677 	img_offset = obj_request->img_offset - obj_request->offset;
2678 	length = rbd_obj_bytes(&rbd_dev->header);
2679 
2680 	/*
2681 	 * There is no defined parent data beyond the parent
2682 	 * overlap, so limit what we read at that boundary if
2683 	 * necessary.
2684 	 */
2685 	if (img_offset + length > rbd_dev->parent_overlap) {
2686 		rbd_assert(img_offset < rbd_dev->parent_overlap);
2687 		length = rbd_dev->parent_overlap - img_offset;
2688 	}
2689 
2690 	/*
2691 	 * Allocate a page array big enough to receive the data read
2692 	 * from the parent.
2693 	 */
2694 	page_count = (u32)calc_pages_for(0, length);
2695 	pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2696 	if (IS_ERR(pages)) {
2697 		result = PTR_ERR(pages);
2698 		pages = NULL;
2699 		goto out_err;
2700 	}
2701 
2702 	result = -ENOMEM;
2703 	parent_request = rbd_parent_request_create(obj_request,
2704 						img_offset, length);
2705 	if (!parent_request)
2706 		goto out_err;
2707 
2708 	result = rbd_img_request_fill(parent_request, OBJ_REQUEST_PAGES, pages);
2709 	if (result)
2710 		goto out_err;
2711 
2712 	parent_request->copyup_pages = pages;
2713 	parent_request->copyup_page_count = page_count;
2714 	parent_request->callback = rbd_img_obj_parent_read_full_callback;
2715 
2716 	result = rbd_img_request_submit(parent_request);
2717 	if (!result)
2718 		return 0;
2719 
2720 	parent_request->copyup_pages = NULL;
2721 	parent_request->copyup_page_count = 0;
2722 	parent_request->obj_request = NULL;
2723 	rbd_obj_request_put(obj_request);
2724 out_err:
2725 	if (pages)
2726 		ceph_release_page_vector(pages, page_count);
2727 	if (parent_request)
2728 		rbd_img_request_put(parent_request);
2729 	return result;
2730 }
2731 
2732 static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request)
2733 {
2734 	struct rbd_obj_request *orig_request;
2735 	struct rbd_device *rbd_dev;
2736 	int result;
2737 
2738 	rbd_assert(!obj_request_img_data_test(obj_request));
2739 
2740 	/*
2741 	 * All we need from the object request is the original
2742 	 * request and the result of the STAT op.  Grab those, then
2743 	 * we're done with the request.
2744 	 */
2745 	orig_request = obj_request->obj_request;
2746 	obj_request->obj_request = NULL;
2747 	rbd_obj_request_put(orig_request);
2748 	rbd_assert(orig_request);
2749 	rbd_assert(orig_request->img_request);
2750 
2751 	result = obj_request->result;
2752 	obj_request->result = 0;
2753 
2754 	dout("%s: obj %p for obj %p result %d %llu/%llu\n", __func__,
2755 		obj_request, orig_request, result,
2756 		obj_request->xferred, obj_request->length);
2757 	rbd_obj_request_put(obj_request);
2758 
2759 	/*
2760 	 * If the overlap has become 0 (most likely because the
2761 	 * image has been flattened) we need to re-submit the
2762 	 * original request.
2763 	 */
2764 	rbd_dev = orig_request->img_request->rbd_dev;
2765 	if (!rbd_dev->parent_overlap) {
2766 		rbd_obj_request_submit(orig_request);
2767 		return;
2768 	}
2769 
2770 	/*
2771 	 * Our only purpose here is to determine whether the object
2772 	 * exists, and we don't want to treat the non-existence as
2773 	 * an error.  If something else comes back, transfer the
2774 	 * error to the original request and complete it now.
2775 	 */
2776 	if (!result) {
2777 		obj_request_existence_set(orig_request, true);
2778 	} else if (result == -ENOENT) {
2779 		obj_request_existence_set(orig_request, false);
2780 	} else {
2781 		goto fail_orig_request;
2782 	}
2783 
2784 	/*
2785 	 * Resubmit the original request now that we have recorded
2786 	 * whether the target object exists.
2787 	 */
2788 	result = rbd_img_obj_request_submit(orig_request);
2789 	if (result)
2790 		goto fail_orig_request;
2791 
2792 	return;
2793 
2794 fail_orig_request:
2795 	rbd_obj_request_error(orig_request, result);
2796 }
2797 
2798 static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request)
2799 {
2800 	struct rbd_device *rbd_dev = obj_request->img_request->rbd_dev;
2801 	struct rbd_obj_request *stat_request;
2802 	struct page **pages;
2803 	u32 page_count;
2804 	size_t size;
2805 	int ret;
2806 
2807 	stat_request = rbd_obj_request_create(OBJ_REQUEST_PAGES);
2808 	if (!stat_request)
2809 		return -ENOMEM;
2810 
2811 	stat_request->object_no = obj_request->object_no;
2812 
2813 	stat_request->osd_req = rbd_osd_req_create(rbd_dev, OBJ_OP_READ, 1,
2814 						   stat_request);
2815 	if (!stat_request->osd_req) {
2816 		ret = -ENOMEM;
2817 		goto fail_stat_request;
2818 	}
2819 
2820 	/*
2821 	 * The response data for a STAT call consists of:
2822 	 *     le64 length;
2823 	 *     struct {
2824 	 *         le32 tv_sec;
2825 	 *         le32 tv_nsec;
2826 	 *     } mtime;
2827 	 */
2828 	size = sizeof (__le64) + sizeof (__le32) + sizeof (__le32);
2829 	page_count = (u32)calc_pages_for(0, size);
2830 	pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2831 	if (IS_ERR(pages)) {
2832 		ret = PTR_ERR(pages);
2833 		goto fail_stat_request;
2834 	}
2835 
2836 	osd_req_op_init(stat_request->osd_req, 0, CEPH_OSD_OP_STAT, 0);
2837 	osd_req_op_raw_data_in_pages(stat_request->osd_req, 0, pages, size, 0,
2838 				     false, false);
2839 
2840 	rbd_obj_request_get(obj_request);
2841 	stat_request->obj_request = obj_request;
2842 	stat_request->pages = pages;
2843 	stat_request->page_count = page_count;
2844 	stat_request->callback = rbd_img_obj_exists_callback;
2845 
2846 	rbd_obj_request_submit(stat_request);
2847 	return 0;
2848 
2849 fail_stat_request:
2850 	rbd_obj_request_put(stat_request);
2851 	return ret;
2852 }
2853 
2854 static bool img_obj_request_simple(struct rbd_obj_request *obj_request)
2855 {
2856 	struct rbd_img_request *img_request = obj_request->img_request;
2857 	struct rbd_device *rbd_dev = img_request->rbd_dev;
2858 
2859 	/* Reads */
2860 	if (!img_request_write_test(img_request) &&
2861 	    !img_request_discard_test(img_request))
2862 		return true;
2863 
2864 	/* Non-layered writes */
2865 	if (!img_request_layered_test(img_request))
2866 		return true;
2867 
2868 	/*
2869 	 * Layered writes outside of the parent overlap range don't
2870 	 * share any data with the parent.
2871 	 */
2872 	if (!obj_request_overlaps_parent(obj_request))
2873 		return true;
2874 
2875 	/*
2876 	 * Entire-object layered writes - we will overwrite whatever
2877 	 * parent data there is anyway.
2878 	 */
2879 	if (!obj_request->offset &&
2880 	    obj_request->length == rbd_obj_bytes(&rbd_dev->header))
2881 		return true;
2882 
2883 	/*
2884 	 * If the object is known to already exist, its parent data has
2885 	 * already been copied.
2886 	 */
2887 	if (obj_request_known_test(obj_request) &&
2888 	    obj_request_exists_test(obj_request))
2889 		return true;
2890 
2891 	return false;
2892 }
2893 
2894 static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request)
2895 {
2896 	rbd_assert(obj_request_img_data_test(obj_request));
2897 	rbd_assert(obj_request_type_valid(obj_request->type));
2898 	rbd_assert(obj_request->img_request);
2899 
2900 	if (img_obj_request_simple(obj_request)) {
2901 		rbd_obj_request_submit(obj_request);
2902 		return 0;
2903 	}
2904 
2905 	/*
2906 	 * It's a layered write.  The target object might exist but
2907 	 * we may not know that yet.  If we know it doesn't exist,
2908 	 * start by reading the data for the full target object from
2909 	 * the parent so we can use it for a copyup to the target.
2910 	 */
2911 	if (obj_request_known_test(obj_request))
2912 		return rbd_img_obj_parent_read_full(obj_request);
2913 
2914 	/* We don't know whether the target exists.  Go find out. */
2915 
2916 	return rbd_img_obj_exists_submit(obj_request);
2917 }
2918 
2919 static int rbd_img_request_submit(struct rbd_img_request *img_request)
2920 {
2921 	struct rbd_obj_request *obj_request;
2922 	struct rbd_obj_request *next_obj_request;
2923 	int ret = 0;
2924 
2925 	dout("%s: img %p\n", __func__, img_request);
2926 
2927 	rbd_img_request_get(img_request);
2928 	for_each_obj_request_safe(img_request, obj_request, next_obj_request) {
2929 		ret = rbd_img_obj_request_submit(obj_request);
2930 		if (ret)
2931 			goto out_put_ireq;
2932 	}
2933 
2934 out_put_ireq:
2935 	rbd_img_request_put(img_request);
2936 	return ret;
2937 }
2938 
2939 static void rbd_img_parent_read_callback(struct rbd_img_request *img_request)
2940 {
2941 	struct rbd_obj_request *obj_request;
2942 	struct rbd_device *rbd_dev;
2943 	u64 obj_end;
2944 	u64 img_xferred;
2945 	int img_result;
2946 
2947 	rbd_assert(img_request_child_test(img_request));
2948 
2949 	/* First get what we need from the image request and release it */
2950 
2951 	obj_request = img_request->obj_request;
2952 	img_xferred = img_request->xferred;
2953 	img_result = img_request->result;
2954 	rbd_img_request_put(img_request);
2955 
2956 	/*
2957 	 * If the overlap has become 0 (most likely because the
2958 	 * image has been flattened) we need to re-submit the
2959 	 * original request.
2960 	 */
2961 	rbd_assert(obj_request);
2962 	rbd_assert(obj_request->img_request);
2963 	rbd_dev = obj_request->img_request->rbd_dev;
2964 	if (!rbd_dev->parent_overlap) {
2965 		rbd_obj_request_submit(obj_request);
2966 		return;
2967 	}
2968 
2969 	obj_request->result = img_result;
2970 	if (obj_request->result)
2971 		goto out;
2972 
2973 	/*
2974 	 * We need to zero anything beyond the parent overlap
2975 	 * boundary.  Since rbd_img_obj_request_read_callback()
2976 	 * will zero anything beyond the end of a short read, an
2977 	 * easy way to do this is to pretend the data from the
2978 	 * parent came up short--ending at the overlap boundary.
2979 	 */
2980 	rbd_assert(obj_request->img_offset < U64_MAX - obj_request->length);
2981 	obj_end = obj_request->img_offset + obj_request->length;
2982 	if (obj_end > rbd_dev->parent_overlap) {
2983 		u64 xferred = 0;
2984 
2985 		if (obj_request->img_offset < rbd_dev->parent_overlap)
2986 			xferred = rbd_dev->parent_overlap -
2987 					obj_request->img_offset;
2988 
2989 		obj_request->xferred = min(img_xferred, xferred);
2990 	} else {
2991 		obj_request->xferred = img_xferred;
2992 	}
2993 out:
2994 	rbd_img_obj_request_read_callback(obj_request);
2995 	rbd_obj_request_complete(obj_request);
2996 }
2997 
2998 static void rbd_img_parent_read(struct rbd_obj_request *obj_request)
2999 {
3000 	struct rbd_img_request *img_request;
3001 	int result;
3002 
3003 	rbd_assert(obj_request_img_data_test(obj_request));
3004 	rbd_assert(obj_request->img_request != NULL);
3005 	rbd_assert(obj_request->result == (s32) -ENOENT);
3006 	rbd_assert(obj_request_type_valid(obj_request->type));
3007 
3008 	/* rbd_read_finish(obj_request, obj_request->length); */
3009 	img_request = rbd_parent_request_create(obj_request,
3010 						obj_request->img_offset,
3011 						obj_request->length);
3012 	result = -ENOMEM;
3013 	if (!img_request)
3014 		goto out_err;
3015 
3016 	if (obj_request->type == OBJ_REQUEST_BIO)
3017 		result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
3018 						obj_request->bio_list);
3019 	else
3020 		result = rbd_img_request_fill(img_request, OBJ_REQUEST_PAGES,
3021 						obj_request->pages);
3022 	if (result)
3023 		goto out_err;
3024 
3025 	img_request->callback = rbd_img_parent_read_callback;
3026 	result = rbd_img_request_submit(img_request);
3027 	if (result)
3028 		goto out_err;
3029 
3030 	return;
3031 out_err:
3032 	if (img_request)
3033 		rbd_img_request_put(img_request);
3034 	obj_request->result = result;
3035 	obj_request->xferred = 0;
3036 	obj_request_done_set(obj_request);
3037 }
3038 
3039 static const struct rbd_client_id rbd_empty_cid;
3040 
3041 static bool rbd_cid_equal(const struct rbd_client_id *lhs,
3042 			  const struct rbd_client_id *rhs)
3043 {
3044 	return lhs->gid == rhs->gid && lhs->handle == rhs->handle;
3045 }
3046 
3047 static struct rbd_client_id rbd_get_cid(struct rbd_device *rbd_dev)
3048 {
3049 	struct rbd_client_id cid;
3050 
3051 	mutex_lock(&rbd_dev->watch_mutex);
3052 	cid.gid = ceph_client_gid(rbd_dev->rbd_client->client);
3053 	cid.handle = rbd_dev->watch_cookie;
3054 	mutex_unlock(&rbd_dev->watch_mutex);
3055 	return cid;
3056 }
3057 
3058 /*
3059  * lock_rwsem must be held for write
3060  */
3061 static void rbd_set_owner_cid(struct rbd_device *rbd_dev,
3062 			      const struct rbd_client_id *cid)
3063 {
3064 	dout("%s rbd_dev %p %llu-%llu -> %llu-%llu\n", __func__, rbd_dev,
3065 	     rbd_dev->owner_cid.gid, rbd_dev->owner_cid.handle,
3066 	     cid->gid, cid->handle);
3067 	rbd_dev->owner_cid = *cid; /* struct */
3068 }
3069 
3070 static void format_lock_cookie(struct rbd_device *rbd_dev, char *buf)
3071 {
3072 	mutex_lock(&rbd_dev->watch_mutex);
3073 	sprintf(buf, "%s %llu", RBD_LOCK_COOKIE_PREFIX, rbd_dev->watch_cookie);
3074 	mutex_unlock(&rbd_dev->watch_mutex);
3075 }
3076 
3077 /*
3078  * lock_rwsem must be held for write
3079  */
3080 static int rbd_lock(struct rbd_device *rbd_dev)
3081 {
3082 	struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3083 	struct rbd_client_id cid = rbd_get_cid(rbd_dev);
3084 	char cookie[32];
3085 	int ret;
3086 
3087 	WARN_ON(__rbd_is_lock_owner(rbd_dev) ||
3088 		rbd_dev->lock_cookie[0] != '\0');
3089 
3090 	format_lock_cookie(rbd_dev, cookie);
3091 	ret = ceph_cls_lock(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
3092 			    RBD_LOCK_NAME, CEPH_CLS_LOCK_EXCLUSIVE, cookie,
3093 			    RBD_LOCK_TAG, "", 0);
3094 	if (ret)
3095 		return ret;
3096 
3097 	rbd_dev->lock_state = RBD_LOCK_STATE_LOCKED;
3098 	strcpy(rbd_dev->lock_cookie, cookie);
3099 	rbd_set_owner_cid(rbd_dev, &cid);
3100 	queue_work(rbd_dev->task_wq, &rbd_dev->acquired_lock_work);
3101 	return 0;
3102 }
3103 
3104 /*
3105  * lock_rwsem must be held for write
3106  */
3107 static void rbd_unlock(struct rbd_device *rbd_dev)
3108 {
3109 	struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3110 	int ret;
3111 
3112 	WARN_ON(!__rbd_is_lock_owner(rbd_dev) ||
3113 		rbd_dev->lock_cookie[0] == '\0');
3114 
3115 	ret = ceph_cls_unlock(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
3116 			      RBD_LOCK_NAME, rbd_dev->lock_cookie);
3117 	if (ret && ret != -ENOENT)
3118 		rbd_warn(rbd_dev, "failed to unlock: %d", ret);
3119 
3120 	/* treat errors as the image is unlocked */
3121 	rbd_dev->lock_state = RBD_LOCK_STATE_UNLOCKED;
3122 	rbd_dev->lock_cookie[0] = '\0';
3123 	rbd_set_owner_cid(rbd_dev, &rbd_empty_cid);
3124 	queue_work(rbd_dev->task_wq, &rbd_dev->released_lock_work);
3125 }
3126 
3127 static int __rbd_notify_op_lock(struct rbd_device *rbd_dev,
3128 				enum rbd_notify_op notify_op,
3129 				struct page ***preply_pages,
3130 				size_t *preply_len)
3131 {
3132 	struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3133 	struct rbd_client_id cid = rbd_get_cid(rbd_dev);
3134 	int buf_size = 4 + 8 + 8 + CEPH_ENCODING_START_BLK_LEN;
3135 	char buf[buf_size];
3136 	void *p = buf;
3137 
3138 	dout("%s rbd_dev %p notify_op %d\n", __func__, rbd_dev, notify_op);
3139 
3140 	/* encode *LockPayload NotifyMessage (op + ClientId) */
3141 	ceph_start_encoding(&p, 2, 1, buf_size - CEPH_ENCODING_START_BLK_LEN);
3142 	ceph_encode_32(&p, notify_op);
3143 	ceph_encode_64(&p, cid.gid);
3144 	ceph_encode_64(&p, cid.handle);
3145 
3146 	return ceph_osdc_notify(osdc, &rbd_dev->header_oid,
3147 				&rbd_dev->header_oloc, buf, buf_size,
3148 				RBD_NOTIFY_TIMEOUT, preply_pages, preply_len);
3149 }
3150 
3151 static void rbd_notify_op_lock(struct rbd_device *rbd_dev,
3152 			       enum rbd_notify_op notify_op)
3153 {
3154 	struct page **reply_pages;
3155 	size_t reply_len;
3156 
3157 	__rbd_notify_op_lock(rbd_dev, notify_op, &reply_pages, &reply_len);
3158 	ceph_release_page_vector(reply_pages, calc_pages_for(0, reply_len));
3159 }
3160 
3161 static void rbd_notify_acquired_lock(struct work_struct *work)
3162 {
3163 	struct rbd_device *rbd_dev = container_of(work, struct rbd_device,
3164 						  acquired_lock_work);
3165 
3166 	rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_ACQUIRED_LOCK);
3167 }
3168 
3169 static void rbd_notify_released_lock(struct work_struct *work)
3170 {
3171 	struct rbd_device *rbd_dev = container_of(work, struct rbd_device,
3172 						  released_lock_work);
3173 
3174 	rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_RELEASED_LOCK);
3175 }
3176 
3177 static int rbd_request_lock(struct rbd_device *rbd_dev)
3178 {
3179 	struct page **reply_pages;
3180 	size_t reply_len;
3181 	bool lock_owner_responded = false;
3182 	int ret;
3183 
3184 	dout("%s rbd_dev %p\n", __func__, rbd_dev);
3185 
3186 	ret = __rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_REQUEST_LOCK,
3187 				   &reply_pages, &reply_len);
3188 	if (ret && ret != -ETIMEDOUT) {
3189 		rbd_warn(rbd_dev, "failed to request lock: %d", ret);
3190 		goto out;
3191 	}
3192 
3193 	if (reply_len > 0 && reply_len <= PAGE_SIZE) {
3194 		void *p = page_address(reply_pages[0]);
3195 		void *const end = p + reply_len;
3196 		u32 n;
3197 
3198 		ceph_decode_32_safe(&p, end, n, e_inval); /* num_acks */
3199 		while (n--) {
3200 			u8 struct_v;
3201 			u32 len;
3202 
3203 			ceph_decode_need(&p, end, 8 + 8, e_inval);
3204 			p += 8 + 8; /* skip gid and cookie */
3205 
3206 			ceph_decode_32_safe(&p, end, len, e_inval);
3207 			if (!len)
3208 				continue;
3209 
3210 			if (lock_owner_responded) {
3211 				rbd_warn(rbd_dev,
3212 					 "duplicate lock owners detected");
3213 				ret = -EIO;
3214 				goto out;
3215 			}
3216 
3217 			lock_owner_responded = true;
3218 			ret = ceph_start_decoding(&p, end, 1, "ResponseMessage",
3219 						  &struct_v, &len);
3220 			if (ret) {
3221 				rbd_warn(rbd_dev,
3222 					 "failed to decode ResponseMessage: %d",
3223 					 ret);
3224 				goto e_inval;
3225 			}
3226 
3227 			ret = ceph_decode_32(&p);
3228 		}
3229 	}
3230 
3231 	if (!lock_owner_responded) {
3232 		rbd_warn(rbd_dev, "no lock owners detected");
3233 		ret = -ETIMEDOUT;
3234 	}
3235 
3236 out:
3237 	ceph_release_page_vector(reply_pages, calc_pages_for(0, reply_len));
3238 	return ret;
3239 
3240 e_inval:
3241 	ret = -EINVAL;
3242 	goto out;
3243 }
3244 
3245 static void wake_requests(struct rbd_device *rbd_dev, bool wake_all)
3246 {
3247 	dout("%s rbd_dev %p wake_all %d\n", __func__, rbd_dev, wake_all);
3248 
3249 	cancel_delayed_work(&rbd_dev->lock_dwork);
3250 	if (wake_all)
3251 		wake_up_all(&rbd_dev->lock_waitq);
3252 	else
3253 		wake_up(&rbd_dev->lock_waitq);
3254 }
3255 
3256 static int get_lock_owner_info(struct rbd_device *rbd_dev,
3257 			       struct ceph_locker **lockers, u32 *num_lockers)
3258 {
3259 	struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3260 	u8 lock_type;
3261 	char *lock_tag;
3262 	int ret;
3263 
3264 	dout("%s rbd_dev %p\n", __func__, rbd_dev);
3265 
3266 	ret = ceph_cls_lock_info(osdc, &rbd_dev->header_oid,
3267 				 &rbd_dev->header_oloc, RBD_LOCK_NAME,
3268 				 &lock_type, &lock_tag, lockers, num_lockers);
3269 	if (ret)
3270 		return ret;
3271 
3272 	if (*num_lockers == 0) {
3273 		dout("%s rbd_dev %p no lockers detected\n", __func__, rbd_dev);
3274 		goto out;
3275 	}
3276 
3277 	if (strcmp(lock_tag, RBD_LOCK_TAG)) {
3278 		rbd_warn(rbd_dev, "locked by external mechanism, tag %s",
3279 			 lock_tag);
3280 		ret = -EBUSY;
3281 		goto out;
3282 	}
3283 
3284 	if (lock_type == CEPH_CLS_LOCK_SHARED) {
3285 		rbd_warn(rbd_dev, "shared lock type detected");
3286 		ret = -EBUSY;
3287 		goto out;
3288 	}
3289 
3290 	if (strncmp((*lockers)[0].id.cookie, RBD_LOCK_COOKIE_PREFIX,
3291 		    strlen(RBD_LOCK_COOKIE_PREFIX))) {
3292 		rbd_warn(rbd_dev, "locked by external mechanism, cookie %s",
3293 			 (*lockers)[0].id.cookie);
3294 		ret = -EBUSY;
3295 		goto out;
3296 	}
3297 
3298 out:
3299 	kfree(lock_tag);
3300 	return ret;
3301 }
3302 
3303 static int find_watcher(struct rbd_device *rbd_dev,
3304 			const struct ceph_locker *locker)
3305 {
3306 	struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3307 	struct ceph_watch_item *watchers;
3308 	u32 num_watchers;
3309 	u64 cookie;
3310 	int i;
3311 	int ret;
3312 
3313 	ret = ceph_osdc_list_watchers(osdc, &rbd_dev->header_oid,
3314 				      &rbd_dev->header_oloc, &watchers,
3315 				      &num_watchers);
3316 	if (ret)
3317 		return ret;
3318 
3319 	sscanf(locker->id.cookie, RBD_LOCK_COOKIE_PREFIX " %llu", &cookie);
3320 	for (i = 0; i < num_watchers; i++) {
3321 		if (!memcmp(&watchers[i].addr, &locker->info.addr,
3322 			    sizeof(locker->info.addr)) &&
3323 		    watchers[i].cookie == cookie) {
3324 			struct rbd_client_id cid = {
3325 				.gid = le64_to_cpu(watchers[i].name.num),
3326 				.handle = cookie,
3327 			};
3328 
3329 			dout("%s rbd_dev %p found cid %llu-%llu\n", __func__,
3330 			     rbd_dev, cid.gid, cid.handle);
3331 			rbd_set_owner_cid(rbd_dev, &cid);
3332 			ret = 1;
3333 			goto out;
3334 		}
3335 	}
3336 
3337 	dout("%s rbd_dev %p no watchers\n", __func__, rbd_dev);
3338 	ret = 0;
3339 out:
3340 	kfree(watchers);
3341 	return ret;
3342 }
3343 
3344 /*
3345  * lock_rwsem must be held for write
3346  */
3347 static int rbd_try_lock(struct rbd_device *rbd_dev)
3348 {
3349 	struct ceph_client *client = rbd_dev->rbd_client->client;
3350 	struct ceph_locker *lockers;
3351 	u32 num_lockers;
3352 	int ret;
3353 
3354 	for (;;) {
3355 		ret = rbd_lock(rbd_dev);
3356 		if (ret != -EBUSY)
3357 			return ret;
3358 
3359 		/* determine if the current lock holder is still alive */
3360 		ret = get_lock_owner_info(rbd_dev, &lockers, &num_lockers);
3361 		if (ret)
3362 			return ret;
3363 
3364 		if (num_lockers == 0)
3365 			goto again;
3366 
3367 		ret = find_watcher(rbd_dev, lockers);
3368 		if (ret) {
3369 			if (ret > 0)
3370 				ret = 0; /* have to request lock */
3371 			goto out;
3372 		}
3373 
3374 		rbd_warn(rbd_dev, "%s%llu seems dead, breaking lock",
3375 			 ENTITY_NAME(lockers[0].id.name));
3376 
3377 		ret = ceph_monc_blacklist_add(&client->monc,
3378 					      &lockers[0].info.addr);
3379 		if (ret) {
3380 			rbd_warn(rbd_dev, "blacklist of %s%llu failed: %d",
3381 				 ENTITY_NAME(lockers[0].id.name), ret);
3382 			goto out;
3383 		}
3384 
3385 		ret = ceph_cls_break_lock(&client->osdc, &rbd_dev->header_oid,
3386 					  &rbd_dev->header_oloc, RBD_LOCK_NAME,
3387 					  lockers[0].id.cookie,
3388 					  &lockers[0].id.name);
3389 		if (ret && ret != -ENOENT)
3390 			goto out;
3391 
3392 again:
3393 		ceph_free_lockers(lockers, num_lockers);
3394 	}
3395 
3396 out:
3397 	ceph_free_lockers(lockers, num_lockers);
3398 	return ret;
3399 }
3400 
3401 /*
3402  * ret is set only if lock_state is RBD_LOCK_STATE_UNLOCKED
3403  */
3404 static enum rbd_lock_state rbd_try_acquire_lock(struct rbd_device *rbd_dev,
3405 						int *pret)
3406 {
3407 	enum rbd_lock_state lock_state;
3408 
3409 	down_read(&rbd_dev->lock_rwsem);
3410 	dout("%s rbd_dev %p read lock_state %d\n", __func__, rbd_dev,
3411 	     rbd_dev->lock_state);
3412 	if (__rbd_is_lock_owner(rbd_dev)) {
3413 		lock_state = rbd_dev->lock_state;
3414 		up_read(&rbd_dev->lock_rwsem);
3415 		return lock_state;
3416 	}
3417 
3418 	up_read(&rbd_dev->lock_rwsem);
3419 	down_write(&rbd_dev->lock_rwsem);
3420 	dout("%s rbd_dev %p write lock_state %d\n", __func__, rbd_dev,
3421 	     rbd_dev->lock_state);
3422 	if (!__rbd_is_lock_owner(rbd_dev)) {
3423 		*pret = rbd_try_lock(rbd_dev);
3424 		if (*pret)
3425 			rbd_warn(rbd_dev, "failed to acquire lock: %d", *pret);
3426 	}
3427 
3428 	lock_state = rbd_dev->lock_state;
3429 	up_write(&rbd_dev->lock_rwsem);
3430 	return lock_state;
3431 }
3432 
3433 static void rbd_acquire_lock(struct work_struct *work)
3434 {
3435 	struct rbd_device *rbd_dev = container_of(to_delayed_work(work),
3436 					    struct rbd_device, lock_dwork);
3437 	enum rbd_lock_state lock_state;
3438 	int ret = 0;
3439 
3440 	dout("%s rbd_dev %p\n", __func__, rbd_dev);
3441 again:
3442 	lock_state = rbd_try_acquire_lock(rbd_dev, &ret);
3443 	if (lock_state != RBD_LOCK_STATE_UNLOCKED || ret == -EBLACKLISTED) {
3444 		if (lock_state == RBD_LOCK_STATE_LOCKED)
3445 			wake_requests(rbd_dev, true);
3446 		dout("%s rbd_dev %p lock_state %d ret %d - done\n", __func__,
3447 		     rbd_dev, lock_state, ret);
3448 		return;
3449 	}
3450 
3451 	ret = rbd_request_lock(rbd_dev);
3452 	if (ret == -ETIMEDOUT) {
3453 		goto again; /* treat this as a dead client */
3454 	} else if (ret == -EROFS) {
3455 		rbd_warn(rbd_dev, "peer will not release lock");
3456 		/*
3457 		 * If this is rbd_add_acquire_lock(), we want to fail
3458 		 * immediately -- reuse BLACKLISTED flag.  Otherwise we
3459 		 * want to block.
3460 		 */
3461 		if (!(rbd_dev->disk->flags & GENHD_FL_UP)) {
3462 			set_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags);
3463 			/* wake "rbd map --exclusive" process */
3464 			wake_requests(rbd_dev, false);
3465 		}
3466 	} else if (ret < 0) {
3467 		rbd_warn(rbd_dev, "error requesting lock: %d", ret);
3468 		mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork,
3469 				 RBD_RETRY_DELAY);
3470 	} else {
3471 		/*
3472 		 * lock owner acked, but resend if we don't see them
3473 		 * release the lock
3474 		 */
3475 		dout("%s rbd_dev %p requeueing lock_dwork\n", __func__,
3476 		     rbd_dev);
3477 		mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork,
3478 		    msecs_to_jiffies(2 * RBD_NOTIFY_TIMEOUT * MSEC_PER_SEC));
3479 	}
3480 }
3481 
3482 /*
3483  * lock_rwsem must be held for write
3484  */
3485 static bool rbd_release_lock(struct rbd_device *rbd_dev)
3486 {
3487 	dout("%s rbd_dev %p read lock_state %d\n", __func__, rbd_dev,
3488 	     rbd_dev->lock_state);
3489 	if (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED)
3490 		return false;
3491 
3492 	rbd_dev->lock_state = RBD_LOCK_STATE_RELEASING;
3493 	downgrade_write(&rbd_dev->lock_rwsem);
3494 	/*
3495 	 * Ensure that all in-flight IO is flushed.
3496 	 *
3497 	 * FIXME: ceph_osdc_sync() flushes the entire OSD client, which
3498 	 * may be shared with other devices.
3499 	 */
3500 	ceph_osdc_sync(&rbd_dev->rbd_client->client->osdc);
3501 	up_read(&rbd_dev->lock_rwsem);
3502 
3503 	down_write(&rbd_dev->lock_rwsem);
3504 	dout("%s rbd_dev %p write lock_state %d\n", __func__, rbd_dev,
3505 	     rbd_dev->lock_state);
3506 	if (rbd_dev->lock_state != RBD_LOCK_STATE_RELEASING)
3507 		return false;
3508 
3509 	rbd_unlock(rbd_dev);
3510 	/*
3511 	 * Give others a chance to grab the lock - we would re-acquire
3512 	 * almost immediately if we got new IO during ceph_osdc_sync()
3513 	 * otherwise.  We need to ack our own notifications, so this
3514 	 * lock_dwork will be requeued from rbd_wait_state_locked()
3515 	 * after wake_requests() in rbd_handle_released_lock().
3516 	 */
3517 	cancel_delayed_work(&rbd_dev->lock_dwork);
3518 	return true;
3519 }
3520 
3521 static void rbd_release_lock_work(struct work_struct *work)
3522 {
3523 	struct rbd_device *rbd_dev = container_of(work, struct rbd_device,
3524 						  unlock_work);
3525 
3526 	down_write(&rbd_dev->lock_rwsem);
3527 	rbd_release_lock(rbd_dev);
3528 	up_write(&rbd_dev->lock_rwsem);
3529 }
3530 
3531 static void rbd_handle_acquired_lock(struct rbd_device *rbd_dev, u8 struct_v,
3532 				     void **p)
3533 {
3534 	struct rbd_client_id cid = { 0 };
3535 
3536 	if (struct_v >= 2) {
3537 		cid.gid = ceph_decode_64(p);
3538 		cid.handle = ceph_decode_64(p);
3539 	}
3540 
3541 	dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid,
3542 	     cid.handle);
3543 	if (!rbd_cid_equal(&cid, &rbd_empty_cid)) {
3544 		down_write(&rbd_dev->lock_rwsem);
3545 		if (rbd_cid_equal(&cid, &rbd_dev->owner_cid)) {
3546 			/*
3547 			 * we already know that the remote client is
3548 			 * the owner
3549 			 */
3550 			up_write(&rbd_dev->lock_rwsem);
3551 			return;
3552 		}
3553 
3554 		rbd_set_owner_cid(rbd_dev, &cid);
3555 		downgrade_write(&rbd_dev->lock_rwsem);
3556 	} else {
3557 		down_read(&rbd_dev->lock_rwsem);
3558 	}
3559 
3560 	if (!__rbd_is_lock_owner(rbd_dev))
3561 		wake_requests(rbd_dev, false);
3562 	up_read(&rbd_dev->lock_rwsem);
3563 }
3564 
3565 static void rbd_handle_released_lock(struct rbd_device *rbd_dev, u8 struct_v,
3566 				     void **p)
3567 {
3568 	struct rbd_client_id cid = { 0 };
3569 
3570 	if (struct_v >= 2) {
3571 		cid.gid = ceph_decode_64(p);
3572 		cid.handle = ceph_decode_64(p);
3573 	}
3574 
3575 	dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid,
3576 	     cid.handle);
3577 	if (!rbd_cid_equal(&cid, &rbd_empty_cid)) {
3578 		down_write(&rbd_dev->lock_rwsem);
3579 		if (!rbd_cid_equal(&cid, &rbd_dev->owner_cid)) {
3580 			dout("%s rbd_dev %p unexpected owner, cid %llu-%llu != owner_cid %llu-%llu\n",
3581 			     __func__, rbd_dev, cid.gid, cid.handle,
3582 			     rbd_dev->owner_cid.gid, rbd_dev->owner_cid.handle);
3583 			up_write(&rbd_dev->lock_rwsem);
3584 			return;
3585 		}
3586 
3587 		rbd_set_owner_cid(rbd_dev, &rbd_empty_cid);
3588 		downgrade_write(&rbd_dev->lock_rwsem);
3589 	} else {
3590 		down_read(&rbd_dev->lock_rwsem);
3591 	}
3592 
3593 	if (!__rbd_is_lock_owner(rbd_dev))
3594 		wake_requests(rbd_dev, false);
3595 	up_read(&rbd_dev->lock_rwsem);
3596 }
3597 
3598 /*
3599  * Returns result for ResponseMessage to be encoded (<= 0), or 1 if no
3600  * ResponseMessage is needed.
3601  */
3602 static int rbd_handle_request_lock(struct rbd_device *rbd_dev, u8 struct_v,
3603 				   void **p)
3604 {
3605 	struct rbd_client_id my_cid = rbd_get_cid(rbd_dev);
3606 	struct rbd_client_id cid = { 0 };
3607 	int result = 1;
3608 
3609 	if (struct_v >= 2) {
3610 		cid.gid = ceph_decode_64(p);
3611 		cid.handle = ceph_decode_64(p);
3612 	}
3613 
3614 	dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid,
3615 	     cid.handle);
3616 	if (rbd_cid_equal(&cid, &my_cid))
3617 		return result;
3618 
3619 	down_read(&rbd_dev->lock_rwsem);
3620 	if (__rbd_is_lock_owner(rbd_dev)) {
3621 		if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED &&
3622 		    rbd_cid_equal(&rbd_dev->owner_cid, &rbd_empty_cid))
3623 			goto out_unlock;
3624 
3625 		/*
3626 		 * encode ResponseMessage(0) so the peer can detect
3627 		 * a missing owner
3628 		 */
3629 		result = 0;
3630 
3631 		if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED) {
3632 			if (!rbd_dev->opts->exclusive) {
3633 				dout("%s rbd_dev %p queueing unlock_work\n",
3634 				     __func__, rbd_dev);
3635 				queue_work(rbd_dev->task_wq,
3636 					   &rbd_dev->unlock_work);
3637 			} else {
3638 				/* refuse to release the lock */
3639 				result = -EROFS;
3640 			}
3641 		}
3642 	}
3643 
3644 out_unlock:
3645 	up_read(&rbd_dev->lock_rwsem);
3646 	return result;
3647 }
3648 
3649 static void __rbd_acknowledge_notify(struct rbd_device *rbd_dev,
3650 				     u64 notify_id, u64 cookie, s32 *result)
3651 {
3652 	struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3653 	int buf_size = 4 + CEPH_ENCODING_START_BLK_LEN;
3654 	char buf[buf_size];
3655 	int ret;
3656 
3657 	if (result) {
3658 		void *p = buf;
3659 
3660 		/* encode ResponseMessage */
3661 		ceph_start_encoding(&p, 1, 1,
3662 				    buf_size - CEPH_ENCODING_START_BLK_LEN);
3663 		ceph_encode_32(&p, *result);
3664 	} else {
3665 		buf_size = 0;
3666 	}
3667 
3668 	ret = ceph_osdc_notify_ack(osdc, &rbd_dev->header_oid,
3669 				   &rbd_dev->header_oloc, notify_id, cookie,
3670 				   buf, buf_size);
3671 	if (ret)
3672 		rbd_warn(rbd_dev, "acknowledge_notify failed: %d", ret);
3673 }
3674 
3675 static void rbd_acknowledge_notify(struct rbd_device *rbd_dev, u64 notify_id,
3676 				   u64 cookie)
3677 {
3678 	dout("%s rbd_dev %p\n", __func__, rbd_dev);
3679 	__rbd_acknowledge_notify(rbd_dev, notify_id, cookie, NULL);
3680 }
3681 
3682 static void rbd_acknowledge_notify_result(struct rbd_device *rbd_dev,
3683 					  u64 notify_id, u64 cookie, s32 result)
3684 {
3685 	dout("%s rbd_dev %p result %d\n", __func__, rbd_dev, result);
3686 	__rbd_acknowledge_notify(rbd_dev, notify_id, cookie, &result);
3687 }
3688 
3689 static void rbd_watch_cb(void *arg, u64 notify_id, u64 cookie,
3690 			 u64 notifier_id, void *data, size_t data_len)
3691 {
3692 	struct rbd_device *rbd_dev = arg;
3693 	void *p = data;
3694 	void *const end = p + data_len;
3695 	u8 struct_v = 0;
3696 	u32 len;
3697 	u32 notify_op;
3698 	int ret;
3699 
3700 	dout("%s rbd_dev %p cookie %llu notify_id %llu data_len %zu\n",
3701 	     __func__, rbd_dev, cookie, notify_id, data_len);
3702 	if (data_len) {
3703 		ret = ceph_start_decoding(&p, end, 1, "NotifyMessage",
3704 					  &struct_v, &len);
3705 		if (ret) {
3706 			rbd_warn(rbd_dev, "failed to decode NotifyMessage: %d",
3707 				 ret);
3708 			return;
3709 		}
3710 
3711 		notify_op = ceph_decode_32(&p);
3712 	} else {
3713 		/* legacy notification for header updates */
3714 		notify_op = RBD_NOTIFY_OP_HEADER_UPDATE;
3715 		len = 0;
3716 	}
3717 
3718 	dout("%s rbd_dev %p notify_op %u\n", __func__, rbd_dev, notify_op);
3719 	switch (notify_op) {
3720 	case RBD_NOTIFY_OP_ACQUIRED_LOCK:
3721 		rbd_handle_acquired_lock(rbd_dev, struct_v, &p);
3722 		rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
3723 		break;
3724 	case RBD_NOTIFY_OP_RELEASED_LOCK:
3725 		rbd_handle_released_lock(rbd_dev, struct_v, &p);
3726 		rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
3727 		break;
3728 	case RBD_NOTIFY_OP_REQUEST_LOCK:
3729 		ret = rbd_handle_request_lock(rbd_dev, struct_v, &p);
3730 		if (ret <= 0)
3731 			rbd_acknowledge_notify_result(rbd_dev, notify_id,
3732 						      cookie, ret);
3733 		else
3734 			rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
3735 		break;
3736 	case RBD_NOTIFY_OP_HEADER_UPDATE:
3737 		ret = rbd_dev_refresh(rbd_dev);
3738 		if (ret)
3739 			rbd_warn(rbd_dev, "refresh failed: %d", ret);
3740 
3741 		rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
3742 		break;
3743 	default:
3744 		if (rbd_is_lock_owner(rbd_dev))
3745 			rbd_acknowledge_notify_result(rbd_dev, notify_id,
3746 						      cookie, -EOPNOTSUPP);
3747 		else
3748 			rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
3749 		break;
3750 	}
3751 }
3752 
3753 static void __rbd_unregister_watch(struct rbd_device *rbd_dev);
3754 
3755 static void rbd_watch_errcb(void *arg, u64 cookie, int err)
3756 {
3757 	struct rbd_device *rbd_dev = arg;
3758 
3759 	rbd_warn(rbd_dev, "encountered watch error: %d", err);
3760 
3761 	down_write(&rbd_dev->lock_rwsem);
3762 	rbd_set_owner_cid(rbd_dev, &rbd_empty_cid);
3763 	up_write(&rbd_dev->lock_rwsem);
3764 
3765 	mutex_lock(&rbd_dev->watch_mutex);
3766 	if (rbd_dev->watch_state == RBD_WATCH_STATE_REGISTERED) {
3767 		__rbd_unregister_watch(rbd_dev);
3768 		rbd_dev->watch_state = RBD_WATCH_STATE_ERROR;
3769 
3770 		queue_delayed_work(rbd_dev->task_wq, &rbd_dev->watch_dwork, 0);
3771 	}
3772 	mutex_unlock(&rbd_dev->watch_mutex);
3773 }
3774 
3775 /*
3776  * watch_mutex must be locked
3777  */
3778 static int __rbd_register_watch(struct rbd_device *rbd_dev)
3779 {
3780 	struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3781 	struct ceph_osd_linger_request *handle;
3782 
3783 	rbd_assert(!rbd_dev->watch_handle);
3784 	dout("%s rbd_dev %p\n", __func__, rbd_dev);
3785 
3786 	handle = ceph_osdc_watch(osdc, &rbd_dev->header_oid,
3787 				 &rbd_dev->header_oloc, rbd_watch_cb,
3788 				 rbd_watch_errcb, rbd_dev);
3789 	if (IS_ERR(handle))
3790 		return PTR_ERR(handle);
3791 
3792 	rbd_dev->watch_handle = handle;
3793 	return 0;
3794 }
3795 
3796 /*
3797  * watch_mutex must be locked
3798  */
3799 static void __rbd_unregister_watch(struct rbd_device *rbd_dev)
3800 {
3801 	struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3802 	int ret;
3803 
3804 	rbd_assert(rbd_dev->watch_handle);
3805 	dout("%s rbd_dev %p\n", __func__, rbd_dev);
3806 
3807 	ret = ceph_osdc_unwatch(osdc, rbd_dev->watch_handle);
3808 	if (ret)
3809 		rbd_warn(rbd_dev, "failed to unwatch: %d", ret);
3810 
3811 	rbd_dev->watch_handle = NULL;
3812 }
3813 
3814 static int rbd_register_watch(struct rbd_device *rbd_dev)
3815 {
3816 	int ret;
3817 
3818 	mutex_lock(&rbd_dev->watch_mutex);
3819 	rbd_assert(rbd_dev->watch_state == RBD_WATCH_STATE_UNREGISTERED);
3820 	ret = __rbd_register_watch(rbd_dev);
3821 	if (ret)
3822 		goto out;
3823 
3824 	rbd_dev->watch_state = RBD_WATCH_STATE_REGISTERED;
3825 	rbd_dev->watch_cookie = rbd_dev->watch_handle->linger_id;
3826 
3827 out:
3828 	mutex_unlock(&rbd_dev->watch_mutex);
3829 	return ret;
3830 }
3831 
3832 static void cancel_tasks_sync(struct rbd_device *rbd_dev)
3833 {
3834 	dout("%s rbd_dev %p\n", __func__, rbd_dev);
3835 
3836 	cancel_delayed_work_sync(&rbd_dev->watch_dwork);
3837 	cancel_work_sync(&rbd_dev->acquired_lock_work);
3838 	cancel_work_sync(&rbd_dev->released_lock_work);
3839 	cancel_delayed_work_sync(&rbd_dev->lock_dwork);
3840 	cancel_work_sync(&rbd_dev->unlock_work);
3841 }
3842 
3843 static void rbd_unregister_watch(struct rbd_device *rbd_dev)
3844 {
3845 	WARN_ON(waitqueue_active(&rbd_dev->lock_waitq));
3846 	cancel_tasks_sync(rbd_dev);
3847 
3848 	mutex_lock(&rbd_dev->watch_mutex);
3849 	if (rbd_dev->watch_state == RBD_WATCH_STATE_REGISTERED)
3850 		__rbd_unregister_watch(rbd_dev);
3851 	rbd_dev->watch_state = RBD_WATCH_STATE_UNREGISTERED;
3852 	mutex_unlock(&rbd_dev->watch_mutex);
3853 
3854 	ceph_osdc_flush_notifies(&rbd_dev->rbd_client->client->osdc);
3855 }
3856 
3857 /*
3858  * lock_rwsem must be held for write
3859  */
3860 static void rbd_reacquire_lock(struct rbd_device *rbd_dev)
3861 {
3862 	struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3863 	char cookie[32];
3864 	int ret;
3865 
3866 	WARN_ON(rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED);
3867 
3868 	format_lock_cookie(rbd_dev, cookie);
3869 	ret = ceph_cls_set_cookie(osdc, &rbd_dev->header_oid,
3870 				  &rbd_dev->header_oloc, RBD_LOCK_NAME,
3871 				  CEPH_CLS_LOCK_EXCLUSIVE, rbd_dev->lock_cookie,
3872 				  RBD_LOCK_TAG, cookie);
3873 	if (ret) {
3874 		if (ret != -EOPNOTSUPP)
3875 			rbd_warn(rbd_dev, "failed to update lock cookie: %d",
3876 				 ret);
3877 
3878 		/*
3879 		 * Lock cookie cannot be updated on older OSDs, so do
3880 		 * a manual release and queue an acquire.
3881 		 */
3882 		if (rbd_release_lock(rbd_dev))
3883 			queue_delayed_work(rbd_dev->task_wq,
3884 					   &rbd_dev->lock_dwork, 0);
3885 	} else {
3886 		strcpy(rbd_dev->lock_cookie, cookie);
3887 	}
3888 }
3889 
3890 static void rbd_reregister_watch(struct work_struct *work)
3891 {
3892 	struct rbd_device *rbd_dev = container_of(to_delayed_work(work),
3893 					    struct rbd_device, watch_dwork);
3894 	int ret;
3895 
3896 	dout("%s rbd_dev %p\n", __func__, rbd_dev);
3897 
3898 	mutex_lock(&rbd_dev->watch_mutex);
3899 	if (rbd_dev->watch_state != RBD_WATCH_STATE_ERROR) {
3900 		mutex_unlock(&rbd_dev->watch_mutex);
3901 		return;
3902 	}
3903 
3904 	ret = __rbd_register_watch(rbd_dev);
3905 	if (ret) {
3906 		rbd_warn(rbd_dev, "failed to reregister watch: %d", ret);
3907 		if (ret == -EBLACKLISTED || ret == -ENOENT) {
3908 			set_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags);
3909 			wake_requests(rbd_dev, true);
3910 		} else {
3911 			queue_delayed_work(rbd_dev->task_wq,
3912 					   &rbd_dev->watch_dwork,
3913 					   RBD_RETRY_DELAY);
3914 		}
3915 		mutex_unlock(&rbd_dev->watch_mutex);
3916 		return;
3917 	}
3918 
3919 	rbd_dev->watch_state = RBD_WATCH_STATE_REGISTERED;
3920 	rbd_dev->watch_cookie = rbd_dev->watch_handle->linger_id;
3921 	mutex_unlock(&rbd_dev->watch_mutex);
3922 
3923 	down_write(&rbd_dev->lock_rwsem);
3924 	if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED)
3925 		rbd_reacquire_lock(rbd_dev);
3926 	up_write(&rbd_dev->lock_rwsem);
3927 
3928 	ret = rbd_dev_refresh(rbd_dev);
3929 	if (ret)
3930 		rbd_warn(rbd_dev, "reregisteration refresh failed: %d", ret);
3931 }
3932 
3933 /*
3934  * Synchronous osd object method call.  Returns the number of bytes
3935  * returned in the outbound buffer, or a negative error code.
3936  */
3937 static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
3938 			     struct ceph_object_id *oid,
3939 			     struct ceph_object_locator *oloc,
3940 			     const char *method_name,
3941 			     const void *outbound,
3942 			     size_t outbound_size,
3943 			     void *inbound,
3944 			     size_t inbound_size)
3945 {
3946 	struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3947 	struct page *req_page = NULL;
3948 	struct page *reply_page;
3949 	int ret;
3950 
3951 	/*
3952 	 * Method calls are ultimately read operations.  The result
3953 	 * should placed into the inbound buffer provided.  They
3954 	 * also supply outbound data--parameters for the object
3955 	 * method.  Currently if this is present it will be a
3956 	 * snapshot id.
3957 	 */
3958 	if (outbound) {
3959 		if (outbound_size > PAGE_SIZE)
3960 			return -E2BIG;
3961 
3962 		req_page = alloc_page(GFP_KERNEL);
3963 		if (!req_page)
3964 			return -ENOMEM;
3965 
3966 		memcpy(page_address(req_page), outbound, outbound_size);
3967 	}
3968 
3969 	reply_page = alloc_page(GFP_KERNEL);
3970 	if (!reply_page) {
3971 		if (req_page)
3972 			__free_page(req_page);
3973 		return -ENOMEM;
3974 	}
3975 
3976 	ret = ceph_osdc_call(osdc, oid, oloc, RBD_DRV_NAME, method_name,
3977 			     CEPH_OSD_FLAG_READ, req_page, outbound_size,
3978 			     reply_page, &inbound_size);
3979 	if (!ret) {
3980 		memcpy(inbound, page_address(reply_page), inbound_size);
3981 		ret = inbound_size;
3982 	}
3983 
3984 	if (req_page)
3985 		__free_page(req_page);
3986 	__free_page(reply_page);
3987 	return ret;
3988 }
3989 
3990 /*
3991  * lock_rwsem must be held for read
3992  */
3993 static void rbd_wait_state_locked(struct rbd_device *rbd_dev)
3994 {
3995 	DEFINE_WAIT(wait);
3996 
3997 	do {
3998 		/*
3999 		 * Note the use of mod_delayed_work() in rbd_acquire_lock()
4000 		 * and cancel_delayed_work() in wake_requests().
4001 		 */
4002 		dout("%s rbd_dev %p queueing lock_dwork\n", __func__, rbd_dev);
4003 		queue_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 0);
4004 		prepare_to_wait_exclusive(&rbd_dev->lock_waitq, &wait,
4005 					  TASK_UNINTERRUPTIBLE);
4006 		up_read(&rbd_dev->lock_rwsem);
4007 		schedule();
4008 		down_read(&rbd_dev->lock_rwsem);
4009 	} while (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED &&
4010 		 !test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags));
4011 
4012 	finish_wait(&rbd_dev->lock_waitq, &wait);
4013 }
4014 
4015 static void rbd_queue_workfn(struct work_struct *work)
4016 {
4017 	struct request *rq = blk_mq_rq_from_pdu(work);
4018 	struct rbd_device *rbd_dev = rq->q->queuedata;
4019 	struct rbd_img_request *img_request;
4020 	struct ceph_snap_context *snapc = NULL;
4021 	u64 offset = (u64)blk_rq_pos(rq) << SECTOR_SHIFT;
4022 	u64 length = blk_rq_bytes(rq);
4023 	enum obj_operation_type op_type;
4024 	u64 mapping_size;
4025 	bool must_be_locked;
4026 	int result;
4027 
4028 	switch (req_op(rq)) {
4029 	case REQ_OP_DISCARD:
4030 	case REQ_OP_WRITE_ZEROES:
4031 		op_type = OBJ_OP_DISCARD;
4032 		break;
4033 	case REQ_OP_WRITE:
4034 		op_type = OBJ_OP_WRITE;
4035 		break;
4036 	case REQ_OP_READ:
4037 		op_type = OBJ_OP_READ;
4038 		break;
4039 	default:
4040 		dout("%s: non-fs request type %d\n", __func__, req_op(rq));
4041 		result = -EIO;
4042 		goto err;
4043 	}
4044 
4045 	/* Ignore/skip any zero-length requests */
4046 
4047 	if (!length) {
4048 		dout("%s: zero-length request\n", __func__);
4049 		result = 0;
4050 		goto err_rq;
4051 	}
4052 
4053 	/* Only reads are allowed to a read-only device */
4054 
4055 	if (op_type != OBJ_OP_READ) {
4056 		if (rbd_dev->mapping.read_only) {
4057 			result = -EROFS;
4058 			goto err_rq;
4059 		}
4060 		rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP);
4061 	}
4062 
4063 	/*
4064 	 * Quit early if the mapped snapshot no longer exists.  It's
4065 	 * still possible the snapshot will have disappeared by the
4066 	 * time our request arrives at the osd, but there's no sense in
4067 	 * sending it if we already know.
4068 	 */
4069 	if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) {
4070 		dout("request for non-existent snapshot");
4071 		rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
4072 		result = -ENXIO;
4073 		goto err_rq;
4074 	}
4075 
4076 	if (offset && length > U64_MAX - offset + 1) {
4077 		rbd_warn(rbd_dev, "bad request range (%llu~%llu)", offset,
4078 			 length);
4079 		result = -EINVAL;
4080 		goto err_rq;	/* Shouldn't happen */
4081 	}
4082 
4083 	blk_mq_start_request(rq);
4084 
4085 	down_read(&rbd_dev->header_rwsem);
4086 	mapping_size = rbd_dev->mapping.size;
4087 	if (op_type != OBJ_OP_READ) {
4088 		snapc = rbd_dev->header.snapc;
4089 		ceph_get_snap_context(snapc);
4090 	}
4091 	up_read(&rbd_dev->header_rwsem);
4092 
4093 	if (offset + length > mapping_size) {
4094 		rbd_warn(rbd_dev, "beyond EOD (%llu~%llu > %llu)", offset,
4095 			 length, mapping_size);
4096 		result = -EIO;
4097 		goto err_rq;
4098 	}
4099 
4100 	must_be_locked =
4101 	    (rbd_dev->header.features & RBD_FEATURE_EXCLUSIVE_LOCK) &&
4102 	    (op_type != OBJ_OP_READ || rbd_dev->opts->lock_on_read);
4103 	if (must_be_locked) {
4104 		down_read(&rbd_dev->lock_rwsem);
4105 		if (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED &&
4106 		    !test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags)) {
4107 			if (rbd_dev->opts->exclusive) {
4108 				rbd_warn(rbd_dev, "exclusive lock required");
4109 				result = -EROFS;
4110 				goto err_unlock;
4111 			}
4112 			rbd_wait_state_locked(rbd_dev);
4113 		}
4114 		if (test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags)) {
4115 			result = -EBLACKLISTED;
4116 			goto err_unlock;
4117 		}
4118 	}
4119 
4120 	img_request = rbd_img_request_create(rbd_dev, offset, length, op_type,
4121 					     snapc);
4122 	if (!img_request) {
4123 		result = -ENOMEM;
4124 		goto err_unlock;
4125 	}
4126 	img_request->rq = rq;
4127 	snapc = NULL; /* img_request consumes a ref */
4128 
4129 	if (op_type == OBJ_OP_DISCARD)
4130 		result = rbd_img_request_fill(img_request, OBJ_REQUEST_NODATA,
4131 					      NULL);
4132 	else
4133 		result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
4134 					      rq->bio);
4135 	if (result)
4136 		goto err_img_request;
4137 
4138 	result = rbd_img_request_submit(img_request);
4139 	if (result)
4140 		goto err_img_request;
4141 
4142 	if (must_be_locked)
4143 		up_read(&rbd_dev->lock_rwsem);
4144 	return;
4145 
4146 err_img_request:
4147 	rbd_img_request_put(img_request);
4148 err_unlock:
4149 	if (must_be_locked)
4150 		up_read(&rbd_dev->lock_rwsem);
4151 err_rq:
4152 	if (result)
4153 		rbd_warn(rbd_dev, "%s %llx at %llx result %d",
4154 			 obj_op_name(op_type), length, offset, result);
4155 	ceph_put_snap_context(snapc);
4156 err:
4157 	blk_mq_end_request(rq, errno_to_blk_status(result));
4158 }
4159 
4160 static blk_status_t rbd_queue_rq(struct blk_mq_hw_ctx *hctx,
4161 		const struct blk_mq_queue_data *bd)
4162 {
4163 	struct request *rq = bd->rq;
4164 	struct work_struct *work = blk_mq_rq_to_pdu(rq);
4165 
4166 	queue_work(rbd_wq, work);
4167 	return BLK_STS_OK;
4168 }
4169 
4170 static void rbd_free_disk(struct rbd_device *rbd_dev)
4171 {
4172 	blk_cleanup_queue(rbd_dev->disk->queue);
4173 	blk_mq_free_tag_set(&rbd_dev->tag_set);
4174 	put_disk(rbd_dev->disk);
4175 	rbd_dev->disk = NULL;
4176 }
4177 
4178 static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
4179 			     struct ceph_object_id *oid,
4180 			     struct ceph_object_locator *oloc,
4181 			     void *buf, int buf_len)
4182 
4183 {
4184 	struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
4185 	struct ceph_osd_request *req;
4186 	struct page **pages;
4187 	int num_pages = calc_pages_for(0, buf_len);
4188 	int ret;
4189 
4190 	req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_KERNEL);
4191 	if (!req)
4192 		return -ENOMEM;
4193 
4194 	ceph_oid_copy(&req->r_base_oid, oid);
4195 	ceph_oloc_copy(&req->r_base_oloc, oloc);
4196 	req->r_flags = CEPH_OSD_FLAG_READ;
4197 
4198 	ret = ceph_osdc_alloc_messages(req, GFP_KERNEL);
4199 	if (ret)
4200 		goto out_req;
4201 
4202 	pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
4203 	if (IS_ERR(pages)) {
4204 		ret = PTR_ERR(pages);
4205 		goto out_req;
4206 	}
4207 
4208 	osd_req_op_extent_init(req, 0, CEPH_OSD_OP_READ, 0, buf_len, 0, 0);
4209 	osd_req_op_extent_osd_data_pages(req, 0, pages, buf_len, 0, false,
4210 					 true);
4211 
4212 	ceph_osdc_start_request(osdc, req, false);
4213 	ret = ceph_osdc_wait_request(osdc, req);
4214 	if (ret >= 0)
4215 		ceph_copy_from_page_vector(pages, buf, 0, ret);
4216 
4217 out_req:
4218 	ceph_osdc_put_request(req);
4219 	return ret;
4220 }
4221 
4222 /*
4223  * Read the complete header for the given rbd device.  On successful
4224  * return, the rbd_dev->header field will contain up-to-date
4225  * information about the image.
4226  */
4227 static int rbd_dev_v1_header_info(struct rbd_device *rbd_dev)
4228 {
4229 	struct rbd_image_header_ondisk *ondisk = NULL;
4230 	u32 snap_count = 0;
4231 	u64 names_size = 0;
4232 	u32 want_count;
4233 	int ret;
4234 
4235 	/*
4236 	 * The complete header will include an array of its 64-bit
4237 	 * snapshot ids, followed by the names of those snapshots as
4238 	 * a contiguous block of NUL-terminated strings.  Note that
4239 	 * the number of snapshots could change by the time we read
4240 	 * it in, in which case we re-read it.
4241 	 */
4242 	do {
4243 		size_t size;
4244 
4245 		kfree(ondisk);
4246 
4247 		size = sizeof (*ondisk);
4248 		size += snap_count * sizeof (struct rbd_image_snap_ondisk);
4249 		size += names_size;
4250 		ondisk = kmalloc(size, GFP_KERNEL);
4251 		if (!ondisk)
4252 			return -ENOMEM;
4253 
4254 		ret = rbd_obj_read_sync(rbd_dev, &rbd_dev->header_oid,
4255 					&rbd_dev->header_oloc, ondisk, size);
4256 		if (ret < 0)
4257 			goto out;
4258 		if ((size_t)ret < size) {
4259 			ret = -ENXIO;
4260 			rbd_warn(rbd_dev, "short header read (want %zd got %d)",
4261 				size, ret);
4262 			goto out;
4263 		}
4264 		if (!rbd_dev_ondisk_valid(ondisk)) {
4265 			ret = -ENXIO;
4266 			rbd_warn(rbd_dev, "invalid header");
4267 			goto out;
4268 		}
4269 
4270 		names_size = le64_to_cpu(ondisk->snap_names_len);
4271 		want_count = snap_count;
4272 		snap_count = le32_to_cpu(ondisk->snap_count);
4273 	} while (snap_count != want_count);
4274 
4275 	ret = rbd_header_from_disk(rbd_dev, ondisk);
4276 out:
4277 	kfree(ondisk);
4278 
4279 	return ret;
4280 }
4281 
4282 /*
4283  * Clear the rbd device's EXISTS flag if the snapshot it's mapped to
4284  * has disappeared from the (just updated) snapshot context.
4285  */
4286 static void rbd_exists_validate(struct rbd_device *rbd_dev)
4287 {
4288 	u64 snap_id;
4289 
4290 	if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags))
4291 		return;
4292 
4293 	snap_id = rbd_dev->spec->snap_id;
4294 	if (snap_id == CEPH_NOSNAP)
4295 		return;
4296 
4297 	if (rbd_dev_snap_index(rbd_dev, snap_id) == BAD_SNAP_INDEX)
4298 		clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
4299 }
4300 
4301 static void rbd_dev_update_size(struct rbd_device *rbd_dev)
4302 {
4303 	sector_t size;
4304 
4305 	/*
4306 	 * If EXISTS is not set, rbd_dev->disk may be NULL, so don't
4307 	 * try to update its size.  If REMOVING is set, updating size
4308 	 * is just useless work since the device can't be opened.
4309 	 */
4310 	if (test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags) &&
4311 	    !test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags)) {
4312 		size = (sector_t)rbd_dev->mapping.size / SECTOR_SIZE;
4313 		dout("setting size to %llu sectors", (unsigned long long)size);
4314 		set_capacity(rbd_dev->disk, size);
4315 		revalidate_disk(rbd_dev->disk);
4316 	}
4317 }
4318 
4319 static int rbd_dev_refresh(struct rbd_device *rbd_dev)
4320 {
4321 	u64 mapping_size;
4322 	int ret;
4323 
4324 	down_write(&rbd_dev->header_rwsem);
4325 	mapping_size = rbd_dev->mapping.size;
4326 
4327 	ret = rbd_dev_header_info(rbd_dev);
4328 	if (ret)
4329 		goto out;
4330 
4331 	/*
4332 	 * If there is a parent, see if it has disappeared due to the
4333 	 * mapped image getting flattened.
4334 	 */
4335 	if (rbd_dev->parent) {
4336 		ret = rbd_dev_v2_parent_info(rbd_dev);
4337 		if (ret)
4338 			goto out;
4339 	}
4340 
4341 	if (rbd_dev->spec->snap_id == CEPH_NOSNAP) {
4342 		rbd_dev->mapping.size = rbd_dev->header.image_size;
4343 	} else {
4344 		/* validate mapped snapshot's EXISTS flag */
4345 		rbd_exists_validate(rbd_dev);
4346 	}
4347 
4348 out:
4349 	up_write(&rbd_dev->header_rwsem);
4350 	if (!ret && mapping_size != rbd_dev->mapping.size)
4351 		rbd_dev_update_size(rbd_dev);
4352 
4353 	return ret;
4354 }
4355 
4356 static int rbd_init_request(struct blk_mq_tag_set *set, struct request *rq,
4357 		unsigned int hctx_idx, unsigned int numa_node)
4358 {
4359 	struct work_struct *work = blk_mq_rq_to_pdu(rq);
4360 
4361 	INIT_WORK(work, rbd_queue_workfn);
4362 	return 0;
4363 }
4364 
4365 static const struct blk_mq_ops rbd_mq_ops = {
4366 	.queue_rq	= rbd_queue_rq,
4367 	.init_request	= rbd_init_request,
4368 };
4369 
4370 static int rbd_init_disk(struct rbd_device *rbd_dev)
4371 {
4372 	struct gendisk *disk;
4373 	struct request_queue *q;
4374 	u64 segment_size;
4375 	int err;
4376 
4377 	/* create gendisk info */
4378 	disk = alloc_disk(single_major ?
4379 			  (1 << RBD_SINGLE_MAJOR_PART_SHIFT) :
4380 			  RBD_MINORS_PER_MAJOR);
4381 	if (!disk)
4382 		return -ENOMEM;
4383 
4384 	snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
4385 		 rbd_dev->dev_id);
4386 	disk->major = rbd_dev->major;
4387 	disk->first_minor = rbd_dev->minor;
4388 	if (single_major)
4389 		disk->flags |= GENHD_FL_EXT_DEVT;
4390 	disk->fops = &rbd_bd_ops;
4391 	disk->private_data = rbd_dev;
4392 
4393 	memset(&rbd_dev->tag_set, 0, sizeof(rbd_dev->tag_set));
4394 	rbd_dev->tag_set.ops = &rbd_mq_ops;
4395 	rbd_dev->tag_set.queue_depth = rbd_dev->opts->queue_depth;
4396 	rbd_dev->tag_set.numa_node = NUMA_NO_NODE;
4397 	rbd_dev->tag_set.flags = BLK_MQ_F_SHOULD_MERGE | BLK_MQ_F_SG_MERGE;
4398 	rbd_dev->tag_set.nr_hw_queues = 1;
4399 	rbd_dev->tag_set.cmd_size = sizeof(struct work_struct);
4400 
4401 	err = blk_mq_alloc_tag_set(&rbd_dev->tag_set);
4402 	if (err)
4403 		goto out_disk;
4404 
4405 	q = blk_mq_init_queue(&rbd_dev->tag_set);
4406 	if (IS_ERR(q)) {
4407 		err = PTR_ERR(q);
4408 		goto out_tag_set;
4409 	}
4410 
4411 	queue_flag_set_unlocked(QUEUE_FLAG_NONROT, q);
4412 	/* QUEUE_FLAG_ADD_RANDOM is off by default for blk-mq */
4413 
4414 	/* set io sizes to object size */
4415 	segment_size = rbd_obj_bytes(&rbd_dev->header);
4416 	blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
4417 	q->limits.max_sectors = queue_max_hw_sectors(q);
4418 	blk_queue_max_segments(q, segment_size / SECTOR_SIZE);
4419 	blk_queue_max_segment_size(q, segment_size);
4420 	blk_queue_io_min(q, segment_size);
4421 	blk_queue_io_opt(q, segment_size);
4422 
4423 	/* enable the discard support */
4424 	queue_flag_set_unlocked(QUEUE_FLAG_DISCARD, q);
4425 	q->limits.discard_granularity = segment_size;
4426 	q->limits.discard_alignment = segment_size;
4427 	blk_queue_max_discard_sectors(q, segment_size / SECTOR_SIZE);
4428 	blk_queue_max_write_zeroes_sectors(q, segment_size / SECTOR_SIZE);
4429 
4430 	if (!ceph_test_opt(rbd_dev->rbd_client->client, NOCRC))
4431 		q->backing_dev_info->capabilities |= BDI_CAP_STABLE_WRITES;
4432 
4433 	/*
4434 	 * disk_release() expects a queue ref from add_disk() and will
4435 	 * put it.  Hold an extra ref until add_disk() is called.
4436 	 */
4437 	WARN_ON(!blk_get_queue(q));
4438 	disk->queue = q;
4439 	q->queuedata = rbd_dev;
4440 
4441 	rbd_dev->disk = disk;
4442 
4443 	return 0;
4444 out_tag_set:
4445 	blk_mq_free_tag_set(&rbd_dev->tag_set);
4446 out_disk:
4447 	put_disk(disk);
4448 	return err;
4449 }
4450 
4451 /*
4452   sysfs
4453 */
4454 
4455 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
4456 {
4457 	return container_of(dev, struct rbd_device, dev);
4458 }
4459 
4460 static ssize_t rbd_size_show(struct device *dev,
4461 			     struct device_attribute *attr, char *buf)
4462 {
4463 	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4464 
4465 	return sprintf(buf, "%llu\n",
4466 		(unsigned long long)rbd_dev->mapping.size);
4467 }
4468 
4469 /*
4470  * Note this shows the features for whatever's mapped, which is not
4471  * necessarily the base image.
4472  */
4473 static ssize_t rbd_features_show(struct device *dev,
4474 			     struct device_attribute *attr, char *buf)
4475 {
4476 	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4477 
4478 	return sprintf(buf, "0x%016llx\n",
4479 			(unsigned long long)rbd_dev->mapping.features);
4480 }
4481 
4482 static ssize_t rbd_major_show(struct device *dev,
4483 			      struct device_attribute *attr, char *buf)
4484 {
4485 	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4486 
4487 	if (rbd_dev->major)
4488 		return sprintf(buf, "%d\n", rbd_dev->major);
4489 
4490 	return sprintf(buf, "(none)\n");
4491 }
4492 
4493 static ssize_t rbd_minor_show(struct device *dev,
4494 			      struct device_attribute *attr, char *buf)
4495 {
4496 	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4497 
4498 	return sprintf(buf, "%d\n", rbd_dev->minor);
4499 }
4500 
4501 static ssize_t rbd_client_addr_show(struct device *dev,
4502 				    struct device_attribute *attr, char *buf)
4503 {
4504 	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4505 	struct ceph_entity_addr *client_addr =
4506 	    ceph_client_addr(rbd_dev->rbd_client->client);
4507 
4508 	return sprintf(buf, "%pISpc/%u\n", &client_addr->in_addr,
4509 		       le32_to_cpu(client_addr->nonce));
4510 }
4511 
4512 static ssize_t rbd_client_id_show(struct device *dev,
4513 				  struct device_attribute *attr, char *buf)
4514 {
4515 	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4516 
4517 	return sprintf(buf, "client%lld\n",
4518 		       ceph_client_gid(rbd_dev->rbd_client->client));
4519 }
4520 
4521 static ssize_t rbd_cluster_fsid_show(struct device *dev,
4522 				     struct device_attribute *attr, char *buf)
4523 {
4524 	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4525 
4526 	return sprintf(buf, "%pU\n", &rbd_dev->rbd_client->client->fsid);
4527 }
4528 
4529 static ssize_t rbd_config_info_show(struct device *dev,
4530 				    struct device_attribute *attr, char *buf)
4531 {
4532 	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4533 
4534 	return sprintf(buf, "%s\n", rbd_dev->config_info);
4535 }
4536 
4537 static ssize_t rbd_pool_show(struct device *dev,
4538 			     struct device_attribute *attr, char *buf)
4539 {
4540 	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4541 
4542 	return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
4543 }
4544 
4545 static ssize_t rbd_pool_id_show(struct device *dev,
4546 			     struct device_attribute *attr, char *buf)
4547 {
4548 	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4549 
4550 	return sprintf(buf, "%llu\n",
4551 			(unsigned long long) rbd_dev->spec->pool_id);
4552 }
4553 
4554 static ssize_t rbd_name_show(struct device *dev,
4555 			     struct device_attribute *attr, char *buf)
4556 {
4557 	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4558 
4559 	if (rbd_dev->spec->image_name)
4560 		return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
4561 
4562 	return sprintf(buf, "(unknown)\n");
4563 }
4564 
4565 static ssize_t rbd_image_id_show(struct device *dev,
4566 			     struct device_attribute *attr, char *buf)
4567 {
4568 	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4569 
4570 	return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
4571 }
4572 
4573 /*
4574  * Shows the name of the currently-mapped snapshot (or
4575  * RBD_SNAP_HEAD_NAME for the base image).
4576  */
4577 static ssize_t rbd_snap_show(struct device *dev,
4578 			     struct device_attribute *attr,
4579 			     char *buf)
4580 {
4581 	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4582 
4583 	return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
4584 }
4585 
4586 static ssize_t rbd_snap_id_show(struct device *dev,
4587 				struct device_attribute *attr, char *buf)
4588 {
4589 	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4590 
4591 	return sprintf(buf, "%llu\n", rbd_dev->spec->snap_id);
4592 }
4593 
4594 /*
4595  * For a v2 image, shows the chain of parent images, separated by empty
4596  * lines.  For v1 images or if there is no parent, shows "(no parent
4597  * image)".
4598  */
4599 static ssize_t rbd_parent_show(struct device *dev,
4600 			       struct device_attribute *attr,
4601 			       char *buf)
4602 {
4603 	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4604 	ssize_t count = 0;
4605 
4606 	if (!rbd_dev->parent)
4607 		return sprintf(buf, "(no parent image)\n");
4608 
4609 	for ( ; rbd_dev->parent; rbd_dev = rbd_dev->parent) {
4610 		struct rbd_spec *spec = rbd_dev->parent_spec;
4611 
4612 		count += sprintf(&buf[count], "%s"
4613 			    "pool_id %llu\npool_name %s\n"
4614 			    "image_id %s\nimage_name %s\n"
4615 			    "snap_id %llu\nsnap_name %s\n"
4616 			    "overlap %llu\n",
4617 			    !count ? "" : "\n", /* first? */
4618 			    spec->pool_id, spec->pool_name,
4619 			    spec->image_id, spec->image_name ?: "(unknown)",
4620 			    spec->snap_id, spec->snap_name,
4621 			    rbd_dev->parent_overlap);
4622 	}
4623 
4624 	return count;
4625 }
4626 
4627 static ssize_t rbd_image_refresh(struct device *dev,
4628 				 struct device_attribute *attr,
4629 				 const char *buf,
4630 				 size_t size)
4631 {
4632 	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4633 	int ret;
4634 
4635 	ret = rbd_dev_refresh(rbd_dev);
4636 	if (ret)
4637 		return ret;
4638 
4639 	return size;
4640 }
4641 
4642 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
4643 static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
4644 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
4645 static DEVICE_ATTR(minor, S_IRUGO, rbd_minor_show, NULL);
4646 static DEVICE_ATTR(client_addr, S_IRUGO, rbd_client_addr_show, NULL);
4647 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
4648 static DEVICE_ATTR(cluster_fsid, S_IRUGO, rbd_cluster_fsid_show, NULL);
4649 static DEVICE_ATTR(config_info, S_IRUSR, rbd_config_info_show, NULL);
4650 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
4651 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
4652 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
4653 static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
4654 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
4655 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
4656 static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
4657 static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL);
4658 
4659 static struct attribute *rbd_attrs[] = {
4660 	&dev_attr_size.attr,
4661 	&dev_attr_features.attr,
4662 	&dev_attr_major.attr,
4663 	&dev_attr_minor.attr,
4664 	&dev_attr_client_addr.attr,
4665 	&dev_attr_client_id.attr,
4666 	&dev_attr_cluster_fsid.attr,
4667 	&dev_attr_config_info.attr,
4668 	&dev_attr_pool.attr,
4669 	&dev_attr_pool_id.attr,
4670 	&dev_attr_name.attr,
4671 	&dev_attr_image_id.attr,
4672 	&dev_attr_current_snap.attr,
4673 	&dev_attr_snap_id.attr,
4674 	&dev_attr_parent.attr,
4675 	&dev_attr_refresh.attr,
4676 	NULL
4677 };
4678 
4679 static struct attribute_group rbd_attr_group = {
4680 	.attrs = rbd_attrs,
4681 };
4682 
4683 static const struct attribute_group *rbd_attr_groups[] = {
4684 	&rbd_attr_group,
4685 	NULL
4686 };
4687 
4688 static void rbd_dev_release(struct device *dev);
4689 
4690 static const struct device_type rbd_device_type = {
4691 	.name		= "rbd",
4692 	.groups		= rbd_attr_groups,
4693 	.release	= rbd_dev_release,
4694 };
4695 
4696 static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
4697 {
4698 	kref_get(&spec->kref);
4699 
4700 	return spec;
4701 }
4702 
4703 static void rbd_spec_free(struct kref *kref);
4704 static void rbd_spec_put(struct rbd_spec *spec)
4705 {
4706 	if (spec)
4707 		kref_put(&spec->kref, rbd_spec_free);
4708 }
4709 
4710 static struct rbd_spec *rbd_spec_alloc(void)
4711 {
4712 	struct rbd_spec *spec;
4713 
4714 	spec = kzalloc(sizeof (*spec), GFP_KERNEL);
4715 	if (!spec)
4716 		return NULL;
4717 
4718 	spec->pool_id = CEPH_NOPOOL;
4719 	spec->snap_id = CEPH_NOSNAP;
4720 	kref_init(&spec->kref);
4721 
4722 	return spec;
4723 }
4724 
4725 static void rbd_spec_free(struct kref *kref)
4726 {
4727 	struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
4728 
4729 	kfree(spec->pool_name);
4730 	kfree(spec->image_id);
4731 	kfree(spec->image_name);
4732 	kfree(spec->snap_name);
4733 	kfree(spec);
4734 }
4735 
4736 static void rbd_dev_free(struct rbd_device *rbd_dev)
4737 {
4738 	WARN_ON(rbd_dev->watch_state != RBD_WATCH_STATE_UNREGISTERED);
4739 	WARN_ON(rbd_dev->lock_state != RBD_LOCK_STATE_UNLOCKED);
4740 
4741 	ceph_oid_destroy(&rbd_dev->header_oid);
4742 	ceph_oloc_destroy(&rbd_dev->header_oloc);
4743 	kfree(rbd_dev->config_info);
4744 
4745 	rbd_put_client(rbd_dev->rbd_client);
4746 	rbd_spec_put(rbd_dev->spec);
4747 	kfree(rbd_dev->opts);
4748 	kfree(rbd_dev);
4749 }
4750 
4751 static void rbd_dev_release(struct device *dev)
4752 {
4753 	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4754 	bool need_put = !!rbd_dev->opts;
4755 
4756 	if (need_put) {
4757 		destroy_workqueue(rbd_dev->task_wq);
4758 		ida_simple_remove(&rbd_dev_id_ida, rbd_dev->dev_id);
4759 	}
4760 
4761 	rbd_dev_free(rbd_dev);
4762 
4763 	/*
4764 	 * This is racy, but way better than putting module outside of
4765 	 * the release callback.  The race window is pretty small, so
4766 	 * doing something similar to dm (dm-builtin.c) is overkill.
4767 	 */
4768 	if (need_put)
4769 		module_put(THIS_MODULE);
4770 }
4771 
4772 static struct rbd_device *__rbd_dev_create(struct rbd_client *rbdc,
4773 					   struct rbd_spec *spec)
4774 {
4775 	struct rbd_device *rbd_dev;
4776 
4777 	rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
4778 	if (!rbd_dev)
4779 		return NULL;
4780 
4781 	spin_lock_init(&rbd_dev->lock);
4782 	INIT_LIST_HEAD(&rbd_dev->node);
4783 	init_rwsem(&rbd_dev->header_rwsem);
4784 
4785 	rbd_dev->header.data_pool_id = CEPH_NOPOOL;
4786 	ceph_oid_init(&rbd_dev->header_oid);
4787 	rbd_dev->header_oloc.pool = spec->pool_id;
4788 
4789 	mutex_init(&rbd_dev->watch_mutex);
4790 	rbd_dev->watch_state = RBD_WATCH_STATE_UNREGISTERED;
4791 	INIT_DELAYED_WORK(&rbd_dev->watch_dwork, rbd_reregister_watch);
4792 
4793 	init_rwsem(&rbd_dev->lock_rwsem);
4794 	rbd_dev->lock_state = RBD_LOCK_STATE_UNLOCKED;
4795 	INIT_WORK(&rbd_dev->acquired_lock_work, rbd_notify_acquired_lock);
4796 	INIT_WORK(&rbd_dev->released_lock_work, rbd_notify_released_lock);
4797 	INIT_DELAYED_WORK(&rbd_dev->lock_dwork, rbd_acquire_lock);
4798 	INIT_WORK(&rbd_dev->unlock_work, rbd_release_lock_work);
4799 	init_waitqueue_head(&rbd_dev->lock_waitq);
4800 
4801 	rbd_dev->dev.bus = &rbd_bus_type;
4802 	rbd_dev->dev.type = &rbd_device_type;
4803 	rbd_dev->dev.parent = &rbd_root_dev;
4804 	device_initialize(&rbd_dev->dev);
4805 
4806 	rbd_dev->rbd_client = rbdc;
4807 	rbd_dev->spec = spec;
4808 
4809 	return rbd_dev;
4810 }
4811 
4812 /*
4813  * Create a mapping rbd_dev.
4814  */
4815 static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
4816 					 struct rbd_spec *spec,
4817 					 struct rbd_options *opts)
4818 {
4819 	struct rbd_device *rbd_dev;
4820 
4821 	rbd_dev = __rbd_dev_create(rbdc, spec);
4822 	if (!rbd_dev)
4823 		return NULL;
4824 
4825 	rbd_dev->opts = opts;
4826 
4827 	/* get an id and fill in device name */
4828 	rbd_dev->dev_id = ida_simple_get(&rbd_dev_id_ida, 0,
4829 					 minor_to_rbd_dev_id(1 << MINORBITS),
4830 					 GFP_KERNEL);
4831 	if (rbd_dev->dev_id < 0)
4832 		goto fail_rbd_dev;
4833 
4834 	sprintf(rbd_dev->name, RBD_DRV_NAME "%d", rbd_dev->dev_id);
4835 	rbd_dev->task_wq = alloc_ordered_workqueue("%s-tasks", WQ_MEM_RECLAIM,
4836 						   rbd_dev->name);
4837 	if (!rbd_dev->task_wq)
4838 		goto fail_dev_id;
4839 
4840 	/* we have a ref from do_rbd_add() */
4841 	__module_get(THIS_MODULE);
4842 
4843 	dout("%s rbd_dev %p dev_id %d\n", __func__, rbd_dev, rbd_dev->dev_id);
4844 	return rbd_dev;
4845 
4846 fail_dev_id:
4847 	ida_simple_remove(&rbd_dev_id_ida, rbd_dev->dev_id);
4848 fail_rbd_dev:
4849 	rbd_dev_free(rbd_dev);
4850 	return NULL;
4851 }
4852 
4853 static void rbd_dev_destroy(struct rbd_device *rbd_dev)
4854 {
4855 	if (rbd_dev)
4856 		put_device(&rbd_dev->dev);
4857 }
4858 
4859 /*
4860  * Get the size and object order for an image snapshot, or if
4861  * snap_id is CEPH_NOSNAP, gets this information for the base
4862  * image.
4863  */
4864 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
4865 				u8 *order, u64 *snap_size)
4866 {
4867 	__le64 snapid = cpu_to_le64(snap_id);
4868 	int ret;
4869 	struct {
4870 		u8 order;
4871 		__le64 size;
4872 	} __attribute__ ((packed)) size_buf = { 0 };
4873 
4874 	ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
4875 				  &rbd_dev->header_oloc, "get_size",
4876 				  &snapid, sizeof(snapid),
4877 				  &size_buf, sizeof(size_buf));
4878 	dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4879 	if (ret < 0)
4880 		return ret;
4881 	if (ret < sizeof (size_buf))
4882 		return -ERANGE;
4883 
4884 	if (order) {
4885 		*order = size_buf.order;
4886 		dout("  order %u", (unsigned int)*order);
4887 	}
4888 	*snap_size = le64_to_cpu(size_buf.size);
4889 
4890 	dout("  snap_id 0x%016llx snap_size = %llu\n",
4891 		(unsigned long long)snap_id,
4892 		(unsigned long long)*snap_size);
4893 
4894 	return 0;
4895 }
4896 
4897 static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
4898 {
4899 	return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
4900 					&rbd_dev->header.obj_order,
4901 					&rbd_dev->header.image_size);
4902 }
4903 
4904 static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
4905 {
4906 	void *reply_buf;
4907 	int ret;
4908 	void *p;
4909 
4910 	reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
4911 	if (!reply_buf)
4912 		return -ENOMEM;
4913 
4914 	ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
4915 				  &rbd_dev->header_oloc, "get_object_prefix",
4916 				  NULL, 0, reply_buf, RBD_OBJ_PREFIX_LEN_MAX);
4917 	dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4918 	if (ret < 0)
4919 		goto out;
4920 
4921 	p = reply_buf;
4922 	rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
4923 						p + ret, NULL, GFP_NOIO);
4924 	ret = 0;
4925 
4926 	if (IS_ERR(rbd_dev->header.object_prefix)) {
4927 		ret = PTR_ERR(rbd_dev->header.object_prefix);
4928 		rbd_dev->header.object_prefix = NULL;
4929 	} else {
4930 		dout("  object_prefix = %s\n", rbd_dev->header.object_prefix);
4931 	}
4932 out:
4933 	kfree(reply_buf);
4934 
4935 	return ret;
4936 }
4937 
4938 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
4939 		u64 *snap_features)
4940 {
4941 	__le64 snapid = cpu_to_le64(snap_id);
4942 	struct {
4943 		__le64 features;
4944 		__le64 incompat;
4945 	} __attribute__ ((packed)) features_buf = { 0 };
4946 	u64 unsup;
4947 	int ret;
4948 
4949 	ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
4950 				  &rbd_dev->header_oloc, "get_features",
4951 				  &snapid, sizeof(snapid),
4952 				  &features_buf, sizeof(features_buf));
4953 	dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4954 	if (ret < 0)
4955 		return ret;
4956 	if (ret < sizeof (features_buf))
4957 		return -ERANGE;
4958 
4959 	unsup = le64_to_cpu(features_buf.incompat) & ~RBD_FEATURES_SUPPORTED;
4960 	if (unsup) {
4961 		rbd_warn(rbd_dev, "image uses unsupported features: 0x%llx",
4962 			 unsup);
4963 		return -ENXIO;
4964 	}
4965 
4966 	*snap_features = le64_to_cpu(features_buf.features);
4967 
4968 	dout("  snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
4969 		(unsigned long long)snap_id,
4970 		(unsigned long long)*snap_features,
4971 		(unsigned long long)le64_to_cpu(features_buf.incompat));
4972 
4973 	return 0;
4974 }
4975 
4976 static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
4977 {
4978 	return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
4979 						&rbd_dev->header.features);
4980 }
4981 
4982 static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
4983 {
4984 	struct rbd_spec *parent_spec;
4985 	size_t size;
4986 	void *reply_buf = NULL;
4987 	__le64 snapid;
4988 	void *p;
4989 	void *end;
4990 	u64 pool_id;
4991 	char *image_id;
4992 	u64 snap_id;
4993 	u64 overlap;
4994 	int ret;
4995 
4996 	parent_spec = rbd_spec_alloc();
4997 	if (!parent_spec)
4998 		return -ENOMEM;
4999 
5000 	size = sizeof (__le64) +				/* pool_id */
5001 		sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX +	/* image_id */
5002 		sizeof (__le64) +				/* snap_id */
5003 		sizeof (__le64);				/* overlap */
5004 	reply_buf = kmalloc(size, GFP_KERNEL);
5005 	if (!reply_buf) {
5006 		ret = -ENOMEM;
5007 		goto out_err;
5008 	}
5009 
5010 	snapid = cpu_to_le64(rbd_dev->spec->snap_id);
5011 	ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
5012 				  &rbd_dev->header_oloc, "get_parent",
5013 				  &snapid, sizeof(snapid), reply_buf, size);
5014 	dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
5015 	if (ret < 0)
5016 		goto out_err;
5017 
5018 	p = reply_buf;
5019 	end = reply_buf + ret;
5020 	ret = -ERANGE;
5021 	ceph_decode_64_safe(&p, end, pool_id, out_err);
5022 	if (pool_id == CEPH_NOPOOL) {
5023 		/*
5024 		 * Either the parent never existed, or we have
5025 		 * record of it but the image got flattened so it no
5026 		 * longer has a parent.  When the parent of a
5027 		 * layered image disappears we immediately set the
5028 		 * overlap to 0.  The effect of this is that all new
5029 		 * requests will be treated as if the image had no
5030 		 * parent.
5031 		 */
5032 		if (rbd_dev->parent_overlap) {
5033 			rbd_dev->parent_overlap = 0;
5034 			rbd_dev_parent_put(rbd_dev);
5035 			pr_info("%s: clone image has been flattened\n",
5036 				rbd_dev->disk->disk_name);
5037 		}
5038 
5039 		goto out;	/* No parent?  No problem. */
5040 	}
5041 
5042 	/* The ceph file layout needs to fit pool id in 32 bits */
5043 
5044 	ret = -EIO;
5045 	if (pool_id > (u64)U32_MAX) {
5046 		rbd_warn(NULL, "parent pool id too large (%llu > %u)",
5047 			(unsigned long long)pool_id, U32_MAX);
5048 		goto out_err;
5049 	}
5050 
5051 	image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
5052 	if (IS_ERR(image_id)) {
5053 		ret = PTR_ERR(image_id);
5054 		goto out_err;
5055 	}
5056 	ceph_decode_64_safe(&p, end, snap_id, out_err);
5057 	ceph_decode_64_safe(&p, end, overlap, out_err);
5058 
5059 	/*
5060 	 * The parent won't change (except when the clone is
5061 	 * flattened, already handled that).  So we only need to
5062 	 * record the parent spec we have not already done so.
5063 	 */
5064 	if (!rbd_dev->parent_spec) {
5065 		parent_spec->pool_id = pool_id;
5066 		parent_spec->image_id = image_id;
5067 		parent_spec->snap_id = snap_id;
5068 		rbd_dev->parent_spec = parent_spec;
5069 		parent_spec = NULL;	/* rbd_dev now owns this */
5070 	} else {
5071 		kfree(image_id);
5072 	}
5073 
5074 	/*
5075 	 * We always update the parent overlap.  If it's zero we issue
5076 	 * a warning, as we will proceed as if there was no parent.
5077 	 */
5078 	if (!overlap) {
5079 		if (parent_spec) {
5080 			/* refresh, careful to warn just once */
5081 			if (rbd_dev->parent_overlap)
5082 				rbd_warn(rbd_dev,
5083 				    "clone now standalone (overlap became 0)");
5084 		} else {
5085 			/* initial probe */
5086 			rbd_warn(rbd_dev, "clone is standalone (overlap 0)");
5087 		}
5088 	}
5089 	rbd_dev->parent_overlap = overlap;
5090 
5091 out:
5092 	ret = 0;
5093 out_err:
5094 	kfree(reply_buf);
5095 	rbd_spec_put(parent_spec);
5096 
5097 	return ret;
5098 }
5099 
5100 static int rbd_dev_v2_striping_info(struct rbd_device *rbd_dev)
5101 {
5102 	struct {
5103 		__le64 stripe_unit;
5104 		__le64 stripe_count;
5105 	} __attribute__ ((packed)) striping_info_buf = { 0 };
5106 	size_t size = sizeof (striping_info_buf);
5107 	void *p;
5108 	u64 obj_size;
5109 	u64 stripe_unit;
5110 	u64 stripe_count;
5111 	int ret;
5112 
5113 	ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
5114 				&rbd_dev->header_oloc, "get_stripe_unit_count",
5115 				NULL, 0, &striping_info_buf, size);
5116 	dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
5117 	if (ret < 0)
5118 		return ret;
5119 	if (ret < size)
5120 		return -ERANGE;
5121 
5122 	/*
5123 	 * We don't actually support the "fancy striping" feature
5124 	 * (STRIPINGV2) yet, but if the striping sizes are the
5125 	 * defaults the behavior is the same as before.  So find
5126 	 * out, and only fail if the image has non-default values.
5127 	 */
5128 	ret = -EINVAL;
5129 	obj_size = rbd_obj_bytes(&rbd_dev->header);
5130 	p = &striping_info_buf;
5131 	stripe_unit = ceph_decode_64(&p);
5132 	if (stripe_unit != obj_size) {
5133 		rbd_warn(rbd_dev, "unsupported stripe unit "
5134 				"(got %llu want %llu)",
5135 				stripe_unit, obj_size);
5136 		return -EINVAL;
5137 	}
5138 	stripe_count = ceph_decode_64(&p);
5139 	if (stripe_count != 1) {
5140 		rbd_warn(rbd_dev, "unsupported stripe count "
5141 				"(got %llu want 1)", stripe_count);
5142 		return -EINVAL;
5143 	}
5144 	rbd_dev->header.stripe_unit = stripe_unit;
5145 	rbd_dev->header.stripe_count = stripe_count;
5146 
5147 	return 0;
5148 }
5149 
5150 static int rbd_dev_v2_data_pool(struct rbd_device *rbd_dev)
5151 {
5152 	__le64 data_pool_id;
5153 	int ret;
5154 
5155 	ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
5156 				  &rbd_dev->header_oloc, "get_data_pool",
5157 				  NULL, 0, &data_pool_id, sizeof(data_pool_id));
5158 	if (ret < 0)
5159 		return ret;
5160 	if (ret < sizeof(data_pool_id))
5161 		return -EBADMSG;
5162 
5163 	rbd_dev->header.data_pool_id = le64_to_cpu(data_pool_id);
5164 	WARN_ON(rbd_dev->header.data_pool_id == CEPH_NOPOOL);
5165 	return 0;
5166 }
5167 
5168 static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
5169 {
5170 	CEPH_DEFINE_OID_ONSTACK(oid);
5171 	size_t image_id_size;
5172 	char *image_id;
5173 	void *p;
5174 	void *end;
5175 	size_t size;
5176 	void *reply_buf = NULL;
5177 	size_t len = 0;
5178 	char *image_name = NULL;
5179 	int ret;
5180 
5181 	rbd_assert(!rbd_dev->spec->image_name);
5182 
5183 	len = strlen(rbd_dev->spec->image_id);
5184 	image_id_size = sizeof (__le32) + len;
5185 	image_id = kmalloc(image_id_size, GFP_KERNEL);
5186 	if (!image_id)
5187 		return NULL;
5188 
5189 	p = image_id;
5190 	end = image_id + image_id_size;
5191 	ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32)len);
5192 
5193 	size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
5194 	reply_buf = kmalloc(size, GFP_KERNEL);
5195 	if (!reply_buf)
5196 		goto out;
5197 
5198 	ceph_oid_printf(&oid, "%s", RBD_DIRECTORY);
5199 	ret = rbd_obj_method_sync(rbd_dev, &oid, &rbd_dev->header_oloc,
5200 				  "dir_get_name", image_id, image_id_size,
5201 				  reply_buf, size);
5202 	if (ret < 0)
5203 		goto out;
5204 	p = reply_buf;
5205 	end = reply_buf + ret;
5206 
5207 	image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
5208 	if (IS_ERR(image_name))
5209 		image_name = NULL;
5210 	else
5211 		dout("%s: name is %s len is %zd\n", __func__, image_name, len);
5212 out:
5213 	kfree(reply_buf);
5214 	kfree(image_id);
5215 
5216 	return image_name;
5217 }
5218 
5219 static u64 rbd_v1_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
5220 {
5221 	struct ceph_snap_context *snapc = rbd_dev->header.snapc;
5222 	const char *snap_name;
5223 	u32 which = 0;
5224 
5225 	/* Skip over names until we find the one we are looking for */
5226 
5227 	snap_name = rbd_dev->header.snap_names;
5228 	while (which < snapc->num_snaps) {
5229 		if (!strcmp(name, snap_name))
5230 			return snapc->snaps[which];
5231 		snap_name += strlen(snap_name) + 1;
5232 		which++;
5233 	}
5234 	return CEPH_NOSNAP;
5235 }
5236 
5237 static u64 rbd_v2_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
5238 {
5239 	struct ceph_snap_context *snapc = rbd_dev->header.snapc;
5240 	u32 which;
5241 	bool found = false;
5242 	u64 snap_id;
5243 
5244 	for (which = 0; !found && which < snapc->num_snaps; which++) {
5245 		const char *snap_name;
5246 
5247 		snap_id = snapc->snaps[which];
5248 		snap_name = rbd_dev_v2_snap_name(rbd_dev, snap_id);
5249 		if (IS_ERR(snap_name)) {
5250 			/* ignore no-longer existing snapshots */
5251 			if (PTR_ERR(snap_name) == -ENOENT)
5252 				continue;
5253 			else
5254 				break;
5255 		}
5256 		found = !strcmp(name, snap_name);
5257 		kfree(snap_name);
5258 	}
5259 	return found ? snap_id : CEPH_NOSNAP;
5260 }
5261 
5262 /*
5263  * Assumes name is never RBD_SNAP_HEAD_NAME; returns CEPH_NOSNAP if
5264  * no snapshot by that name is found, or if an error occurs.
5265  */
5266 static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
5267 {
5268 	if (rbd_dev->image_format == 1)
5269 		return rbd_v1_snap_id_by_name(rbd_dev, name);
5270 
5271 	return rbd_v2_snap_id_by_name(rbd_dev, name);
5272 }
5273 
5274 /*
5275  * An image being mapped will have everything but the snap id.
5276  */
5277 static int rbd_spec_fill_snap_id(struct rbd_device *rbd_dev)
5278 {
5279 	struct rbd_spec *spec = rbd_dev->spec;
5280 
5281 	rbd_assert(spec->pool_id != CEPH_NOPOOL && spec->pool_name);
5282 	rbd_assert(spec->image_id && spec->image_name);
5283 	rbd_assert(spec->snap_name);
5284 
5285 	if (strcmp(spec->snap_name, RBD_SNAP_HEAD_NAME)) {
5286 		u64 snap_id;
5287 
5288 		snap_id = rbd_snap_id_by_name(rbd_dev, spec->snap_name);
5289 		if (snap_id == CEPH_NOSNAP)
5290 			return -ENOENT;
5291 
5292 		spec->snap_id = snap_id;
5293 	} else {
5294 		spec->snap_id = CEPH_NOSNAP;
5295 	}
5296 
5297 	return 0;
5298 }
5299 
5300 /*
5301  * A parent image will have all ids but none of the names.
5302  *
5303  * All names in an rbd spec are dynamically allocated.  It's OK if we
5304  * can't figure out the name for an image id.
5305  */
5306 static int rbd_spec_fill_names(struct rbd_device *rbd_dev)
5307 {
5308 	struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
5309 	struct rbd_spec *spec = rbd_dev->spec;
5310 	const char *pool_name;
5311 	const char *image_name;
5312 	const char *snap_name;
5313 	int ret;
5314 
5315 	rbd_assert(spec->pool_id != CEPH_NOPOOL);
5316 	rbd_assert(spec->image_id);
5317 	rbd_assert(spec->snap_id != CEPH_NOSNAP);
5318 
5319 	/* Get the pool name; we have to make our own copy of this */
5320 
5321 	pool_name = ceph_pg_pool_name_by_id(osdc->osdmap, spec->pool_id);
5322 	if (!pool_name) {
5323 		rbd_warn(rbd_dev, "no pool with id %llu", spec->pool_id);
5324 		return -EIO;
5325 	}
5326 	pool_name = kstrdup(pool_name, GFP_KERNEL);
5327 	if (!pool_name)
5328 		return -ENOMEM;
5329 
5330 	/* Fetch the image name; tolerate failure here */
5331 
5332 	image_name = rbd_dev_image_name(rbd_dev);
5333 	if (!image_name)
5334 		rbd_warn(rbd_dev, "unable to get image name");
5335 
5336 	/* Fetch the snapshot name */
5337 
5338 	snap_name = rbd_snap_name(rbd_dev, spec->snap_id);
5339 	if (IS_ERR(snap_name)) {
5340 		ret = PTR_ERR(snap_name);
5341 		goto out_err;
5342 	}
5343 
5344 	spec->pool_name = pool_name;
5345 	spec->image_name = image_name;
5346 	spec->snap_name = snap_name;
5347 
5348 	return 0;
5349 
5350 out_err:
5351 	kfree(image_name);
5352 	kfree(pool_name);
5353 	return ret;
5354 }
5355 
5356 static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev)
5357 {
5358 	size_t size;
5359 	int ret;
5360 	void *reply_buf;
5361 	void *p;
5362 	void *end;
5363 	u64 seq;
5364 	u32 snap_count;
5365 	struct ceph_snap_context *snapc;
5366 	u32 i;
5367 
5368 	/*
5369 	 * We'll need room for the seq value (maximum snapshot id),
5370 	 * snapshot count, and array of that many snapshot ids.
5371 	 * For now we have a fixed upper limit on the number we're
5372 	 * prepared to receive.
5373 	 */
5374 	size = sizeof (__le64) + sizeof (__le32) +
5375 			RBD_MAX_SNAP_COUNT * sizeof (__le64);
5376 	reply_buf = kzalloc(size, GFP_KERNEL);
5377 	if (!reply_buf)
5378 		return -ENOMEM;
5379 
5380 	ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
5381 				  &rbd_dev->header_oloc, "get_snapcontext",
5382 				  NULL, 0, reply_buf, size);
5383 	dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
5384 	if (ret < 0)
5385 		goto out;
5386 
5387 	p = reply_buf;
5388 	end = reply_buf + ret;
5389 	ret = -ERANGE;
5390 	ceph_decode_64_safe(&p, end, seq, out);
5391 	ceph_decode_32_safe(&p, end, snap_count, out);
5392 
5393 	/*
5394 	 * Make sure the reported number of snapshot ids wouldn't go
5395 	 * beyond the end of our buffer.  But before checking that,
5396 	 * make sure the computed size of the snapshot context we
5397 	 * allocate is representable in a size_t.
5398 	 */
5399 	if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
5400 				 / sizeof (u64)) {
5401 		ret = -EINVAL;
5402 		goto out;
5403 	}
5404 	if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
5405 		goto out;
5406 	ret = 0;
5407 
5408 	snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
5409 	if (!snapc) {
5410 		ret = -ENOMEM;
5411 		goto out;
5412 	}
5413 	snapc->seq = seq;
5414 	for (i = 0; i < snap_count; i++)
5415 		snapc->snaps[i] = ceph_decode_64(&p);
5416 
5417 	ceph_put_snap_context(rbd_dev->header.snapc);
5418 	rbd_dev->header.snapc = snapc;
5419 
5420 	dout("  snap context seq = %llu, snap_count = %u\n",
5421 		(unsigned long long)seq, (unsigned int)snap_count);
5422 out:
5423 	kfree(reply_buf);
5424 
5425 	return ret;
5426 }
5427 
5428 static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
5429 					u64 snap_id)
5430 {
5431 	size_t size;
5432 	void *reply_buf;
5433 	__le64 snapid;
5434 	int ret;
5435 	void *p;
5436 	void *end;
5437 	char *snap_name;
5438 
5439 	size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
5440 	reply_buf = kmalloc(size, GFP_KERNEL);
5441 	if (!reply_buf)
5442 		return ERR_PTR(-ENOMEM);
5443 
5444 	snapid = cpu_to_le64(snap_id);
5445 	ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
5446 				  &rbd_dev->header_oloc, "get_snapshot_name",
5447 				  &snapid, sizeof(snapid), reply_buf, size);
5448 	dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
5449 	if (ret < 0) {
5450 		snap_name = ERR_PTR(ret);
5451 		goto out;
5452 	}
5453 
5454 	p = reply_buf;
5455 	end = reply_buf + ret;
5456 	snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
5457 	if (IS_ERR(snap_name))
5458 		goto out;
5459 
5460 	dout("  snap_id 0x%016llx snap_name = %s\n",
5461 		(unsigned long long)snap_id, snap_name);
5462 out:
5463 	kfree(reply_buf);
5464 
5465 	return snap_name;
5466 }
5467 
5468 static int rbd_dev_v2_header_info(struct rbd_device *rbd_dev)
5469 {
5470 	bool first_time = rbd_dev->header.object_prefix == NULL;
5471 	int ret;
5472 
5473 	ret = rbd_dev_v2_image_size(rbd_dev);
5474 	if (ret)
5475 		return ret;
5476 
5477 	if (first_time) {
5478 		ret = rbd_dev_v2_header_onetime(rbd_dev);
5479 		if (ret)
5480 			return ret;
5481 	}
5482 
5483 	ret = rbd_dev_v2_snap_context(rbd_dev);
5484 	if (ret && first_time) {
5485 		kfree(rbd_dev->header.object_prefix);
5486 		rbd_dev->header.object_prefix = NULL;
5487 	}
5488 
5489 	return ret;
5490 }
5491 
5492 static int rbd_dev_header_info(struct rbd_device *rbd_dev)
5493 {
5494 	rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
5495 
5496 	if (rbd_dev->image_format == 1)
5497 		return rbd_dev_v1_header_info(rbd_dev);
5498 
5499 	return rbd_dev_v2_header_info(rbd_dev);
5500 }
5501 
5502 /*
5503  * Skips over white space at *buf, and updates *buf to point to the
5504  * first found non-space character (if any). Returns the length of
5505  * the token (string of non-white space characters) found.  Note
5506  * that *buf must be terminated with '\0'.
5507  */
5508 static inline size_t next_token(const char **buf)
5509 {
5510         /*
5511         * These are the characters that produce nonzero for
5512         * isspace() in the "C" and "POSIX" locales.
5513         */
5514         const char *spaces = " \f\n\r\t\v";
5515 
5516         *buf += strspn(*buf, spaces);	/* Find start of token */
5517 
5518 	return strcspn(*buf, spaces);   /* Return token length */
5519 }
5520 
5521 /*
5522  * Finds the next token in *buf, dynamically allocates a buffer big
5523  * enough to hold a copy of it, and copies the token into the new
5524  * buffer.  The copy is guaranteed to be terminated with '\0'.  Note
5525  * that a duplicate buffer is created even for a zero-length token.
5526  *
5527  * Returns a pointer to the newly-allocated duplicate, or a null
5528  * pointer if memory for the duplicate was not available.  If
5529  * the lenp argument is a non-null pointer, the length of the token
5530  * (not including the '\0') is returned in *lenp.
5531  *
5532  * If successful, the *buf pointer will be updated to point beyond
5533  * the end of the found token.
5534  *
5535  * Note: uses GFP_KERNEL for allocation.
5536  */
5537 static inline char *dup_token(const char **buf, size_t *lenp)
5538 {
5539 	char *dup;
5540 	size_t len;
5541 
5542 	len = next_token(buf);
5543 	dup = kmemdup(*buf, len + 1, GFP_KERNEL);
5544 	if (!dup)
5545 		return NULL;
5546 	*(dup + len) = '\0';
5547 	*buf += len;
5548 
5549 	if (lenp)
5550 		*lenp = len;
5551 
5552 	return dup;
5553 }
5554 
5555 /*
5556  * Parse the options provided for an "rbd add" (i.e., rbd image
5557  * mapping) request.  These arrive via a write to /sys/bus/rbd/add,
5558  * and the data written is passed here via a NUL-terminated buffer.
5559  * Returns 0 if successful or an error code otherwise.
5560  *
5561  * The information extracted from these options is recorded in
5562  * the other parameters which return dynamically-allocated
5563  * structures:
5564  *  ceph_opts
5565  *      The address of a pointer that will refer to a ceph options
5566  *      structure.  Caller must release the returned pointer using
5567  *      ceph_destroy_options() when it is no longer needed.
5568  *  rbd_opts
5569  *	Address of an rbd options pointer.  Fully initialized by
5570  *	this function; caller must release with kfree().
5571  *  spec
5572  *	Address of an rbd image specification pointer.  Fully
5573  *	initialized by this function based on parsed options.
5574  *	Caller must release with rbd_spec_put().
5575  *
5576  * The options passed take this form:
5577  *  <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
5578  * where:
5579  *  <mon_addrs>
5580  *      A comma-separated list of one or more monitor addresses.
5581  *      A monitor address is an ip address, optionally followed
5582  *      by a port number (separated by a colon).
5583  *        I.e.:  ip1[:port1][,ip2[:port2]...]
5584  *  <options>
5585  *      A comma-separated list of ceph and/or rbd options.
5586  *  <pool_name>
5587  *      The name of the rados pool containing the rbd image.
5588  *  <image_name>
5589  *      The name of the image in that pool to map.
5590  *  <snap_id>
5591  *      An optional snapshot id.  If provided, the mapping will
5592  *      present data from the image at the time that snapshot was
5593  *      created.  The image head is used if no snapshot id is
5594  *      provided.  Snapshot mappings are always read-only.
5595  */
5596 static int rbd_add_parse_args(const char *buf,
5597 				struct ceph_options **ceph_opts,
5598 				struct rbd_options **opts,
5599 				struct rbd_spec **rbd_spec)
5600 {
5601 	size_t len;
5602 	char *options;
5603 	const char *mon_addrs;
5604 	char *snap_name;
5605 	size_t mon_addrs_size;
5606 	struct rbd_spec *spec = NULL;
5607 	struct rbd_options *rbd_opts = NULL;
5608 	struct ceph_options *copts;
5609 	int ret;
5610 
5611 	/* The first four tokens are required */
5612 
5613 	len = next_token(&buf);
5614 	if (!len) {
5615 		rbd_warn(NULL, "no monitor address(es) provided");
5616 		return -EINVAL;
5617 	}
5618 	mon_addrs = buf;
5619 	mon_addrs_size = len + 1;
5620 	buf += len;
5621 
5622 	ret = -EINVAL;
5623 	options = dup_token(&buf, NULL);
5624 	if (!options)
5625 		return -ENOMEM;
5626 	if (!*options) {
5627 		rbd_warn(NULL, "no options provided");
5628 		goto out_err;
5629 	}
5630 
5631 	spec = rbd_spec_alloc();
5632 	if (!spec)
5633 		goto out_mem;
5634 
5635 	spec->pool_name = dup_token(&buf, NULL);
5636 	if (!spec->pool_name)
5637 		goto out_mem;
5638 	if (!*spec->pool_name) {
5639 		rbd_warn(NULL, "no pool name provided");
5640 		goto out_err;
5641 	}
5642 
5643 	spec->image_name = dup_token(&buf, NULL);
5644 	if (!spec->image_name)
5645 		goto out_mem;
5646 	if (!*spec->image_name) {
5647 		rbd_warn(NULL, "no image name provided");
5648 		goto out_err;
5649 	}
5650 
5651 	/*
5652 	 * Snapshot name is optional; default is to use "-"
5653 	 * (indicating the head/no snapshot).
5654 	 */
5655 	len = next_token(&buf);
5656 	if (!len) {
5657 		buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
5658 		len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
5659 	} else if (len > RBD_MAX_SNAP_NAME_LEN) {
5660 		ret = -ENAMETOOLONG;
5661 		goto out_err;
5662 	}
5663 	snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
5664 	if (!snap_name)
5665 		goto out_mem;
5666 	*(snap_name + len) = '\0';
5667 	spec->snap_name = snap_name;
5668 
5669 	/* Initialize all rbd options to the defaults */
5670 
5671 	rbd_opts = kzalloc(sizeof (*rbd_opts), GFP_KERNEL);
5672 	if (!rbd_opts)
5673 		goto out_mem;
5674 
5675 	rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
5676 	rbd_opts->queue_depth = RBD_QUEUE_DEPTH_DEFAULT;
5677 	rbd_opts->lock_on_read = RBD_LOCK_ON_READ_DEFAULT;
5678 	rbd_opts->exclusive = RBD_EXCLUSIVE_DEFAULT;
5679 
5680 	copts = ceph_parse_options(options, mon_addrs,
5681 					mon_addrs + mon_addrs_size - 1,
5682 					parse_rbd_opts_token, rbd_opts);
5683 	if (IS_ERR(copts)) {
5684 		ret = PTR_ERR(copts);
5685 		goto out_err;
5686 	}
5687 	kfree(options);
5688 
5689 	*ceph_opts = copts;
5690 	*opts = rbd_opts;
5691 	*rbd_spec = spec;
5692 
5693 	return 0;
5694 out_mem:
5695 	ret = -ENOMEM;
5696 out_err:
5697 	kfree(rbd_opts);
5698 	rbd_spec_put(spec);
5699 	kfree(options);
5700 
5701 	return ret;
5702 }
5703 
5704 /*
5705  * Return pool id (>= 0) or a negative error code.
5706  */
5707 static int rbd_add_get_pool_id(struct rbd_client *rbdc, const char *pool_name)
5708 {
5709 	struct ceph_options *opts = rbdc->client->options;
5710 	u64 newest_epoch;
5711 	int tries = 0;
5712 	int ret;
5713 
5714 again:
5715 	ret = ceph_pg_poolid_by_name(rbdc->client->osdc.osdmap, pool_name);
5716 	if (ret == -ENOENT && tries++ < 1) {
5717 		ret = ceph_monc_get_version(&rbdc->client->monc, "osdmap",
5718 					    &newest_epoch);
5719 		if (ret < 0)
5720 			return ret;
5721 
5722 		if (rbdc->client->osdc.osdmap->epoch < newest_epoch) {
5723 			ceph_osdc_maybe_request_map(&rbdc->client->osdc);
5724 			(void) ceph_monc_wait_osdmap(&rbdc->client->monc,
5725 						     newest_epoch,
5726 						     opts->mount_timeout);
5727 			goto again;
5728 		} else {
5729 			/* the osdmap we have is new enough */
5730 			return -ENOENT;
5731 		}
5732 	}
5733 
5734 	return ret;
5735 }
5736 
5737 static void rbd_dev_image_unlock(struct rbd_device *rbd_dev)
5738 {
5739 	down_write(&rbd_dev->lock_rwsem);
5740 	if (__rbd_is_lock_owner(rbd_dev))
5741 		rbd_unlock(rbd_dev);
5742 	up_write(&rbd_dev->lock_rwsem);
5743 }
5744 
5745 static int rbd_add_acquire_lock(struct rbd_device *rbd_dev)
5746 {
5747 	if (!(rbd_dev->header.features & RBD_FEATURE_EXCLUSIVE_LOCK)) {
5748 		rbd_warn(rbd_dev, "exclusive-lock feature is not enabled");
5749 		return -EINVAL;
5750 	}
5751 
5752 	/* FIXME: "rbd map --exclusive" should be in interruptible */
5753 	down_read(&rbd_dev->lock_rwsem);
5754 	rbd_wait_state_locked(rbd_dev);
5755 	up_read(&rbd_dev->lock_rwsem);
5756 	if (test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags)) {
5757 		rbd_warn(rbd_dev, "failed to acquire exclusive lock");
5758 		return -EROFS;
5759 	}
5760 
5761 	return 0;
5762 }
5763 
5764 /*
5765  * An rbd format 2 image has a unique identifier, distinct from the
5766  * name given to it by the user.  Internally, that identifier is
5767  * what's used to specify the names of objects related to the image.
5768  *
5769  * A special "rbd id" object is used to map an rbd image name to its
5770  * id.  If that object doesn't exist, then there is no v2 rbd image
5771  * with the supplied name.
5772  *
5773  * This function will record the given rbd_dev's image_id field if
5774  * it can be determined, and in that case will return 0.  If any
5775  * errors occur a negative errno will be returned and the rbd_dev's
5776  * image_id field will be unchanged (and should be NULL).
5777  */
5778 static int rbd_dev_image_id(struct rbd_device *rbd_dev)
5779 {
5780 	int ret;
5781 	size_t size;
5782 	CEPH_DEFINE_OID_ONSTACK(oid);
5783 	void *response;
5784 	char *image_id;
5785 
5786 	/*
5787 	 * When probing a parent image, the image id is already
5788 	 * known (and the image name likely is not).  There's no
5789 	 * need to fetch the image id again in this case.  We
5790 	 * do still need to set the image format though.
5791 	 */
5792 	if (rbd_dev->spec->image_id) {
5793 		rbd_dev->image_format = *rbd_dev->spec->image_id ? 2 : 1;
5794 
5795 		return 0;
5796 	}
5797 
5798 	/*
5799 	 * First, see if the format 2 image id file exists, and if
5800 	 * so, get the image's persistent id from it.
5801 	 */
5802 	ret = ceph_oid_aprintf(&oid, GFP_KERNEL, "%s%s", RBD_ID_PREFIX,
5803 			       rbd_dev->spec->image_name);
5804 	if (ret)
5805 		return ret;
5806 
5807 	dout("rbd id object name is %s\n", oid.name);
5808 
5809 	/* Response will be an encoded string, which includes a length */
5810 
5811 	size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
5812 	response = kzalloc(size, GFP_NOIO);
5813 	if (!response) {
5814 		ret = -ENOMEM;
5815 		goto out;
5816 	}
5817 
5818 	/* If it doesn't exist we'll assume it's a format 1 image */
5819 
5820 	ret = rbd_obj_method_sync(rbd_dev, &oid, &rbd_dev->header_oloc,
5821 				  "get_id", NULL, 0,
5822 				  response, RBD_IMAGE_ID_LEN_MAX);
5823 	dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
5824 	if (ret == -ENOENT) {
5825 		image_id = kstrdup("", GFP_KERNEL);
5826 		ret = image_id ? 0 : -ENOMEM;
5827 		if (!ret)
5828 			rbd_dev->image_format = 1;
5829 	} else if (ret >= 0) {
5830 		void *p = response;
5831 
5832 		image_id = ceph_extract_encoded_string(&p, p + ret,
5833 						NULL, GFP_NOIO);
5834 		ret = PTR_ERR_OR_ZERO(image_id);
5835 		if (!ret)
5836 			rbd_dev->image_format = 2;
5837 	}
5838 
5839 	if (!ret) {
5840 		rbd_dev->spec->image_id = image_id;
5841 		dout("image_id is %s\n", image_id);
5842 	}
5843 out:
5844 	kfree(response);
5845 	ceph_oid_destroy(&oid);
5846 	return ret;
5847 }
5848 
5849 /*
5850  * Undo whatever state changes are made by v1 or v2 header info
5851  * call.
5852  */
5853 static void rbd_dev_unprobe(struct rbd_device *rbd_dev)
5854 {
5855 	struct rbd_image_header	*header;
5856 
5857 	rbd_dev_parent_put(rbd_dev);
5858 
5859 	/* Free dynamic fields from the header, then zero it out */
5860 
5861 	header = &rbd_dev->header;
5862 	ceph_put_snap_context(header->snapc);
5863 	kfree(header->snap_sizes);
5864 	kfree(header->snap_names);
5865 	kfree(header->object_prefix);
5866 	memset(header, 0, sizeof (*header));
5867 }
5868 
5869 static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev)
5870 {
5871 	int ret;
5872 
5873 	ret = rbd_dev_v2_object_prefix(rbd_dev);
5874 	if (ret)
5875 		goto out_err;
5876 
5877 	/*
5878 	 * Get the and check features for the image.  Currently the
5879 	 * features are assumed to never change.
5880 	 */
5881 	ret = rbd_dev_v2_features(rbd_dev);
5882 	if (ret)
5883 		goto out_err;
5884 
5885 	/* If the image supports fancy striping, get its parameters */
5886 
5887 	if (rbd_dev->header.features & RBD_FEATURE_STRIPINGV2) {
5888 		ret = rbd_dev_v2_striping_info(rbd_dev);
5889 		if (ret < 0)
5890 			goto out_err;
5891 	}
5892 
5893 	if (rbd_dev->header.features & RBD_FEATURE_DATA_POOL) {
5894 		ret = rbd_dev_v2_data_pool(rbd_dev);
5895 		if (ret)
5896 			goto out_err;
5897 	}
5898 
5899 	rbd_init_layout(rbd_dev);
5900 	return 0;
5901 
5902 out_err:
5903 	rbd_dev->header.features = 0;
5904 	kfree(rbd_dev->header.object_prefix);
5905 	rbd_dev->header.object_prefix = NULL;
5906 	return ret;
5907 }
5908 
5909 /*
5910  * @depth is rbd_dev_image_probe() -> rbd_dev_probe_parent() ->
5911  * rbd_dev_image_probe() recursion depth, which means it's also the
5912  * length of the already discovered part of the parent chain.
5913  */
5914 static int rbd_dev_probe_parent(struct rbd_device *rbd_dev, int depth)
5915 {
5916 	struct rbd_device *parent = NULL;
5917 	int ret;
5918 
5919 	if (!rbd_dev->parent_spec)
5920 		return 0;
5921 
5922 	if (++depth > RBD_MAX_PARENT_CHAIN_LEN) {
5923 		pr_info("parent chain is too long (%d)\n", depth);
5924 		ret = -EINVAL;
5925 		goto out_err;
5926 	}
5927 
5928 	parent = __rbd_dev_create(rbd_dev->rbd_client, rbd_dev->parent_spec);
5929 	if (!parent) {
5930 		ret = -ENOMEM;
5931 		goto out_err;
5932 	}
5933 
5934 	/*
5935 	 * Images related by parent/child relationships always share
5936 	 * rbd_client and spec/parent_spec, so bump their refcounts.
5937 	 */
5938 	__rbd_get_client(rbd_dev->rbd_client);
5939 	rbd_spec_get(rbd_dev->parent_spec);
5940 
5941 	ret = rbd_dev_image_probe(parent, depth);
5942 	if (ret < 0)
5943 		goto out_err;
5944 
5945 	rbd_dev->parent = parent;
5946 	atomic_set(&rbd_dev->parent_ref, 1);
5947 	return 0;
5948 
5949 out_err:
5950 	rbd_dev_unparent(rbd_dev);
5951 	rbd_dev_destroy(parent);
5952 	return ret;
5953 }
5954 
5955 static void rbd_dev_device_release(struct rbd_device *rbd_dev)
5956 {
5957 	clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
5958 	rbd_dev_mapping_clear(rbd_dev);
5959 	rbd_free_disk(rbd_dev);
5960 	if (!single_major)
5961 		unregister_blkdev(rbd_dev->major, rbd_dev->name);
5962 }
5963 
5964 /*
5965  * rbd_dev->header_rwsem must be locked for write and will be unlocked
5966  * upon return.
5967  */
5968 static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
5969 {
5970 	int ret;
5971 
5972 	/* Record our major and minor device numbers. */
5973 
5974 	if (!single_major) {
5975 		ret = register_blkdev(0, rbd_dev->name);
5976 		if (ret < 0)
5977 			goto err_out_unlock;
5978 
5979 		rbd_dev->major = ret;
5980 		rbd_dev->minor = 0;
5981 	} else {
5982 		rbd_dev->major = rbd_major;
5983 		rbd_dev->minor = rbd_dev_id_to_minor(rbd_dev->dev_id);
5984 	}
5985 
5986 	/* Set up the blkdev mapping. */
5987 
5988 	ret = rbd_init_disk(rbd_dev);
5989 	if (ret)
5990 		goto err_out_blkdev;
5991 
5992 	ret = rbd_dev_mapping_set(rbd_dev);
5993 	if (ret)
5994 		goto err_out_disk;
5995 
5996 	set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
5997 	set_disk_ro(rbd_dev->disk, rbd_dev->mapping.read_only);
5998 
5999 	ret = dev_set_name(&rbd_dev->dev, "%d", rbd_dev->dev_id);
6000 	if (ret)
6001 		goto err_out_mapping;
6002 
6003 	set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
6004 	up_write(&rbd_dev->header_rwsem);
6005 	return 0;
6006 
6007 err_out_mapping:
6008 	rbd_dev_mapping_clear(rbd_dev);
6009 err_out_disk:
6010 	rbd_free_disk(rbd_dev);
6011 err_out_blkdev:
6012 	if (!single_major)
6013 		unregister_blkdev(rbd_dev->major, rbd_dev->name);
6014 err_out_unlock:
6015 	up_write(&rbd_dev->header_rwsem);
6016 	return ret;
6017 }
6018 
6019 static int rbd_dev_header_name(struct rbd_device *rbd_dev)
6020 {
6021 	struct rbd_spec *spec = rbd_dev->spec;
6022 	int ret;
6023 
6024 	/* Record the header object name for this rbd image. */
6025 
6026 	rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
6027 	if (rbd_dev->image_format == 1)
6028 		ret = ceph_oid_aprintf(&rbd_dev->header_oid, GFP_KERNEL, "%s%s",
6029 				       spec->image_name, RBD_SUFFIX);
6030 	else
6031 		ret = ceph_oid_aprintf(&rbd_dev->header_oid, GFP_KERNEL, "%s%s",
6032 				       RBD_HEADER_PREFIX, spec->image_id);
6033 
6034 	return ret;
6035 }
6036 
6037 static void rbd_dev_image_release(struct rbd_device *rbd_dev)
6038 {
6039 	rbd_dev_unprobe(rbd_dev);
6040 	if (rbd_dev->opts)
6041 		rbd_unregister_watch(rbd_dev);
6042 	rbd_dev->image_format = 0;
6043 	kfree(rbd_dev->spec->image_id);
6044 	rbd_dev->spec->image_id = NULL;
6045 }
6046 
6047 /*
6048  * Probe for the existence of the header object for the given rbd
6049  * device.  If this image is the one being mapped (i.e., not a
6050  * parent), initiate a watch on its header object before using that
6051  * object to get detailed information about the rbd image.
6052  */
6053 static int rbd_dev_image_probe(struct rbd_device *rbd_dev, int depth)
6054 {
6055 	int ret;
6056 
6057 	/*
6058 	 * Get the id from the image id object.  Unless there's an
6059 	 * error, rbd_dev->spec->image_id will be filled in with
6060 	 * a dynamically-allocated string, and rbd_dev->image_format
6061 	 * will be set to either 1 or 2.
6062 	 */
6063 	ret = rbd_dev_image_id(rbd_dev);
6064 	if (ret)
6065 		return ret;
6066 
6067 	ret = rbd_dev_header_name(rbd_dev);
6068 	if (ret)
6069 		goto err_out_format;
6070 
6071 	if (!depth) {
6072 		ret = rbd_register_watch(rbd_dev);
6073 		if (ret) {
6074 			if (ret == -ENOENT)
6075 				pr_info("image %s/%s does not exist\n",
6076 					rbd_dev->spec->pool_name,
6077 					rbd_dev->spec->image_name);
6078 			goto err_out_format;
6079 		}
6080 	}
6081 
6082 	ret = rbd_dev_header_info(rbd_dev);
6083 	if (ret)
6084 		goto err_out_watch;
6085 
6086 	/*
6087 	 * If this image is the one being mapped, we have pool name and
6088 	 * id, image name and id, and snap name - need to fill snap id.
6089 	 * Otherwise this is a parent image, identified by pool, image
6090 	 * and snap ids - need to fill in names for those ids.
6091 	 */
6092 	if (!depth)
6093 		ret = rbd_spec_fill_snap_id(rbd_dev);
6094 	else
6095 		ret = rbd_spec_fill_names(rbd_dev);
6096 	if (ret) {
6097 		if (ret == -ENOENT)
6098 			pr_info("snap %s/%s@%s does not exist\n",
6099 				rbd_dev->spec->pool_name,
6100 				rbd_dev->spec->image_name,
6101 				rbd_dev->spec->snap_name);
6102 		goto err_out_probe;
6103 	}
6104 
6105 	if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
6106 		ret = rbd_dev_v2_parent_info(rbd_dev);
6107 		if (ret)
6108 			goto err_out_probe;
6109 
6110 		/*
6111 		 * Need to warn users if this image is the one being
6112 		 * mapped and has a parent.
6113 		 */
6114 		if (!depth && rbd_dev->parent_spec)
6115 			rbd_warn(rbd_dev,
6116 				 "WARNING: kernel layering is EXPERIMENTAL!");
6117 	}
6118 
6119 	ret = rbd_dev_probe_parent(rbd_dev, depth);
6120 	if (ret)
6121 		goto err_out_probe;
6122 
6123 	dout("discovered format %u image, header name is %s\n",
6124 		rbd_dev->image_format, rbd_dev->header_oid.name);
6125 	return 0;
6126 
6127 err_out_probe:
6128 	rbd_dev_unprobe(rbd_dev);
6129 err_out_watch:
6130 	if (!depth)
6131 		rbd_unregister_watch(rbd_dev);
6132 err_out_format:
6133 	rbd_dev->image_format = 0;
6134 	kfree(rbd_dev->spec->image_id);
6135 	rbd_dev->spec->image_id = NULL;
6136 	return ret;
6137 }
6138 
6139 static ssize_t do_rbd_add(struct bus_type *bus,
6140 			  const char *buf,
6141 			  size_t count)
6142 {
6143 	struct rbd_device *rbd_dev = NULL;
6144 	struct ceph_options *ceph_opts = NULL;
6145 	struct rbd_options *rbd_opts = NULL;
6146 	struct rbd_spec *spec = NULL;
6147 	struct rbd_client *rbdc;
6148 	bool read_only;
6149 	int rc;
6150 
6151 	if (!try_module_get(THIS_MODULE))
6152 		return -ENODEV;
6153 
6154 	/* parse add command */
6155 	rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
6156 	if (rc < 0)
6157 		goto out;
6158 
6159 	rbdc = rbd_get_client(ceph_opts);
6160 	if (IS_ERR(rbdc)) {
6161 		rc = PTR_ERR(rbdc);
6162 		goto err_out_args;
6163 	}
6164 
6165 	/* pick the pool */
6166 	rc = rbd_add_get_pool_id(rbdc, spec->pool_name);
6167 	if (rc < 0) {
6168 		if (rc == -ENOENT)
6169 			pr_info("pool %s does not exist\n", spec->pool_name);
6170 		goto err_out_client;
6171 	}
6172 	spec->pool_id = (u64)rc;
6173 
6174 	rbd_dev = rbd_dev_create(rbdc, spec, rbd_opts);
6175 	if (!rbd_dev) {
6176 		rc = -ENOMEM;
6177 		goto err_out_client;
6178 	}
6179 	rbdc = NULL;		/* rbd_dev now owns this */
6180 	spec = NULL;		/* rbd_dev now owns this */
6181 	rbd_opts = NULL;	/* rbd_dev now owns this */
6182 
6183 	rbd_dev->config_info = kstrdup(buf, GFP_KERNEL);
6184 	if (!rbd_dev->config_info) {
6185 		rc = -ENOMEM;
6186 		goto err_out_rbd_dev;
6187 	}
6188 
6189 	down_write(&rbd_dev->header_rwsem);
6190 	rc = rbd_dev_image_probe(rbd_dev, 0);
6191 	if (rc < 0) {
6192 		up_write(&rbd_dev->header_rwsem);
6193 		goto err_out_rbd_dev;
6194 	}
6195 
6196 	/* If we are mapping a snapshot it must be marked read-only */
6197 
6198 	read_only = rbd_dev->opts->read_only;
6199 	if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
6200 		read_only = true;
6201 	rbd_dev->mapping.read_only = read_only;
6202 
6203 	rc = rbd_dev_device_setup(rbd_dev);
6204 	if (rc)
6205 		goto err_out_image_probe;
6206 
6207 	if (rbd_dev->opts->exclusive) {
6208 		rc = rbd_add_acquire_lock(rbd_dev);
6209 		if (rc)
6210 			goto err_out_device_setup;
6211 	}
6212 
6213 	/* Everything's ready.  Announce the disk to the world. */
6214 
6215 	rc = device_add(&rbd_dev->dev);
6216 	if (rc)
6217 		goto err_out_image_lock;
6218 
6219 	add_disk(rbd_dev->disk);
6220 	/* see rbd_init_disk() */
6221 	blk_put_queue(rbd_dev->disk->queue);
6222 
6223 	spin_lock(&rbd_dev_list_lock);
6224 	list_add_tail(&rbd_dev->node, &rbd_dev_list);
6225 	spin_unlock(&rbd_dev_list_lock);
6226 
6227 	pr_info("%s: capacity %llu features 0x%llx\n", rbd_dev->disk->disk_name,
6228 		(unsigned long long)get_capacity(rbd_dev->disk) << SECTOR_SHIFT,
6229 		rbd_dev->header.features);
6230 	rc = count;
6231 out:
6232 	module_put(THIS_MODULE);
6233 	return rc;
6234 
6235 err_out_image_lock:
6236 	rbd_dev_image_unlock(rbd_dev);
6237 err_out_device_setup:
6238 	rbd_dev_device_release(rbd_dev);
6239 err_out_image_probe:
6240 	rbd_dev_image_release(rbd_dev);
6241 err_out_rbd_dev:
6242 	rbd_dev_destroy(rbd_dev);
6243 err_out_client:
6244 	rbd_put_client(rbdc);
6245 err_out_args:
6246 	rbd_spec_put(spec);
6247 	kfree(rbd_opts);
6248 	goto out;
6249 }
6250 
6251 static ssize_t rbd_add(struct bus_type *bus,
6252 		       const char *buf,
6253 		       size_t count)
6254 {
6255 	if (single_major)
6256 		return -EINVAL;
6257 
6258 	return do_rbd_add(bus, buf, count);
6259 }
6260 
6261 static ssize_t rbd_add_single_major(struct bus_type *bus,
6262 				    const char *buf,
6263 				    size_t count)
6264 {
6265 	return do_rbd_add(bus, buf, count);
6266 }
6267 
6268 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev)
6269 {
6270 	while (rbd_dev->parent) {
6271 		struct rbd_device *first = rbd_dev;
6272 		struct rbd_device *second = first->parent;
6273 		struct rbd_device *third;
6274 
6275 		/*
6276 		 * Follow to the parent with no grandparent and
6277 		 * remove it.
6278 		 */
6279 		while (second && (third = second->parent)) {
6280 			first = second;
6281 			second = third;
6282 		}
6283 		rbd_assert(second);
6284 		rbd_dev_image_release(second);
6285 		rbd_dev_destroy(second);
6286 		first->parent = NULL;
6287 		first->parent_overlap = 0;
6288 
6289 		rbd_assert(first->parent_spec);
6290 		rbd_spec_put(first->parent_spec);
6291 		first->parent_spec = NULL;
6292 	}
6293 }
6294 
6295 static ssize_t do_rbd_remove(struct bus_type *bus,
6296 			     const char *buf,
6297 			     size_t count)
6298 {
6299 	struct rbd_device *rbd_dev = NULL;
6300 	struct list_head *tmp;
6301 	int dev_id;
6302 	char opt_buf[6];
6303 	bool already = false;
6304 	bool force = false;
6305 	int ret;
6306 
6307 	dev_id = -1;
6308 	opt_buf[0] = '\0';
6309 	sscanf(buf, "%d %5s", &dev_id, opt_buf);
6310 	if (dev_id < 0) {
6311 		pr_err("dev_id out of range\n");
6312 		return -EINVAL;
6313 	}
6314 	if (opt_buf[0] != '\0') {
6315 		if (!strcmp(opt_buf, "force")) {
6316 			force = true;
6317 		} else {
6318 			pr_err("bad remove option at '%s'\n", opt_buf);
6319 			return -EINVAL;
6320 		}
6321 	}
6322 
6323 	ret = -ENOENT;
6324 	spin_lock(&rbd_dev_list_lock);
6325 	list_for_each(tmp, &rbd_dev_list) {
6326 		rbd_dev = list_entry(tmp, struct rbd_device, node);
6327 		if (rbd_dev->dev_id == dev_id) {
6328 			ret = 0;
6329 			break;
6330 		}
6331 	}
6332 	if (!ret) {
6333 		spin_lock_irq(&rbd_dev->lock);
6334 		if (rbd_dev->open_count && !force)
6335 			ret = -EBUSY;
6336 		else
6337 			already = test_and_set_bit(RBD_DEV_FLAG_REMOVING,
6338 							&rbd_dev->flags);
6339 		spin_unlock_irq(&rbd_dev->lock);
6340 	}
6341 	spin_unlock(&rbd_dev_list_lock);
6342 	if (ret < 0 || already)
6343 		return ret;
6344 
6345 	if (force) {
6346 		/*
6347 		 * Prevent new IO from being queued and wait for existing
6348 		 * IO to complete/fail.
6349 		 */
6350 		blk_mq_freeze_queue(rbd_dev->disk->queue);
6351 		blk_set_queue_dying(rbd_dev->disk->queue);
6352 	}
6353 
6354 	del_gendisk(rbd_dev->disk);
6355 	spin_lock(&rbd_dev_list_lock);
6356 	list_del_init(&rbd_dev->node);
6357 	spin_unlock(&rbd_dev_list_lock);
6358 	device_del(&rbd_dev->dev);
6359 
6360 	rbd_dev_image_unlock(rbd_dev);
6361 	rbd_dev_device_release(rbd_dev);
6362 	rbd_dev_image_release(rbd_dev);
6363 	rbd_dev_destroy(rbd_dev);
6364 	return count;
6365 }
6366 
6367 static ssize_t rbd_remove(struct bus_type *bus,
6368 			  const char *buf,
6369 			  size_t count)
6370 {
6371 	if (single_major)
6372 		return -EINVAL;
6373 
6374 	return do_rbd_remove(bus, buf, count);
6375 }
6376 
6377 static ssize_t rbd_remove_single_major(struct bus_type *bus,
6378 				       const char *buf,
6379 				       size_t count)
6380 {
6381 	return do_rbd_remove(bus, buf, count);
6382 }
6383 
6384 /*
6385  * create control files in sysfs
6386  * /sys/bus/rbd/...
6387  */
6388 static int rbd_sysfs_init(void)
6389 {
6390 	int ret;
6391 
6392 	ret = device_register(&rbd_root_dev);
6393 	if (ret < 0)
6394 		return ret;
6395 
6396 	ret = bus_register(&rbd_bus_type);
6397 	if (ret < 0)
6398 		device_unregister(&rbd_root_dev);
6399 
6400 	return ret;
6401 }
6402 
6403 static void rbd_sysfs_cleanup(void)
6404 {
6405 	bus_unregister(&rbd_bus_type);
6406 	device_unregister(&rbd_root_dev);
6407 }
6408 
6409 static int rbd_slab_init(void)
6410 {
6411 	rbd_assert(!rbd_img_request_cache);
6412 	rbd_img_request_cache = KMEM_CACHE(rbd_img_request, 0);
6413 	if (!rbd_img_request_cache)
6414 		return -ENOMEM;
6415 
6416 	rbd_assert(!rbd_obj_request_cache);
6417 	rbd_obj_request_cache = KMEM_CACHE(rbd_obj_request, 0);
6418 	if (!rbd_obj_request_cache)
6419 		goto out_err;
6420 
6421 	rbd_assert(!rbd_bio_clone);
6422 	rbd_bio_clone = bioset_create(BIO_POOL_SIZE, 0, 0);
6423 	if (!rbd_bio_clone)
6424 		goto out_err_clone;
6425 
6426 	return 0;
6427 
6428 out_err_clone:
6429 	kmem_cache_destroy(rbd_obj_request_cache);
6430 	rbd_obj_request_cache = NULL;
6431 out_err:
6432 	kmem_cache_destroy(rbd_img_request_cache);
6433 	rbd_img_request_cache = NULL;
6434 	return -ENOMEM;
6435 }
6436 
6437 static void rbd_slab_exit(void)
6438 {
6439 	rbd_assert(rbd_obj_request_cache);
6440 	kmem_cache_destroy(rbd_obj_request_cache);
6441 	rbd_obj_request_cache = NULL;
6442 
6443 	rbd_assert(rbd_img_request_cache);
6444 	kmem_cache_destroy(rbd_img_request_cache);
6445 	rbd_img_request_cache = NULL;
6446 
6447 	rbd_assert(rbd_bio_clone);
6448 	bioset_free(rbd_bio_clone);
6449 	rbd_bio_clone = NULL;
6450 }
6451 
6452 static int __init rbd_init(void)
6453 {
6454 	int rc;
6455 
6456 	if (!libceph_compatible(NULL)) {
6457 		rbd_warn(NULL, "libceph incompatibility (quitting)");
6458 		return -EINVAL;
6459 	}
6460 
6461 	rc = rbd_slab_init();
6462 	if (rc)
6463 		return rc;
6464 
6465 	/*
6466 	 * The number of active work items is limited by the number of
6467 	 * rbd devices * queue depth, so leave @max_active at default.
6468 	 */
6469 	rbd_wq = alloc_workqueue(RBD_DRV_NAME, WQ_MEM_RECLAIM, 0);
6470 	if (!rbd_wq) {
6471 		rc = -ENOMEM;
6472 		goto err_out_slab;
6473 	}
6474 
6475 	if (single_major) {
6476 		rbd_major = register_blkdev(0, RBD_DRV_NAME);
6477 		if (rbd_major < 0) {
6478 			rc = rbd_major;
6479 			goto err_out_wq;
6480 		}
6481 	}
6482 
6483 	rc = rbd_sysfs_init();
6484 	if (rc)
6485 		goto err_out_blkdev;
6486 
6487 	if (single_major)
6488 		pr_info("loaded (major %d)\n", rbd_major);
6489 	else
6490 		pr_info("loaded\n");
6491 
6492 	return 0;
6493 
6494 err_out_blkdev:
6495 	if (single_major)
6496 		unregister_blkdev(rbd_major, RBD_DRV_NAME);
6497 err_out_wq:
6498 	destroy_workqueue(rbd_wq);
6499 err_out_slab:
6500 	rbd_slab_exit();
6501 	return rc;
6502 }
6503 
6504 static void __exit rbd_exit(void)
6505 {
6506 	ida_destroy(&rbd_dev_id_ida);
6507 	rbd_sysfs_cleanup();
6508 	if (single_major)
6509 		unregister_blkdev(rbd_major, RBD_DRV_NAME);
6510 	destroy_workqueue(rbd_wq);
6511 	rbd_slab_exit();
6512 }
6513 
6514 module_init(rbd_init);
6515 module_exit(rbd_exit);
6516 
6517 MODULE_AUTHOR("Alex Elder <elder@inktank.com>");
6518 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
6519 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
6520 /* following authorship retained from original osdblk.c */
6521 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
6522 
6523 MODULE_DESCRIPTION("RADOS Block Device (RBD) driver");
6524 MODULE_LICENSE("GPL");
6525