xref: /openbmc/linux/drivers/block/rbd.c (revision afc98d90)
1 
2 /*
3    rbd.c -- Export ceph rados objects as a Linux block device
4 
5 
6    based on drivers/block/osdblk.c:
7 
8    Copyright 2009 Red Hat, Inc.
9 
10    This program is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation.
13 
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License for more details.
18 
19    You should have received a copy of the GNU General Public License
20    along with this program; see the file COPYING.  If not, write to
21    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
22 
23 
24 
25    For usage instructions, please refer to:
26 
27                  Documentation/ABI/testing/sysfs-bus-rbd
28 
29  */
30 
31 #include <linux/ceph/libceph.h>
32 #include <linux/ceph/osd_client.h>
33 #include <linux/ceph/mon_client.h>
34 #include <linux/ceph/decode.h>
35 #include <linux/parser.h>
36 #include <linux/bsearch.h>
37 
38 #include <linux/kernel.h>
39 #include <linux/device.h>
40 #include <linux/module.h>
41 #include <linux/fs.h>
42 #include <linux/blkdev.h>
43 #include <linux/slab.h>
44 #include <linux/idr.h>
45 
46 #include "rbd_types.h"
47 
48 #define RBD_DEBUG	/* Activate rbd_assert() calls */
49 
50 /*
51  * The basic unit of block I/O is a sector.  It is interpreted in a
52  * number of contexts in Linux (blk, bio, genhd), but the default is
53  * universally 512 bytes.  These symbols are just slightly more
54  * meaningful than the bare numbers they represent.
55  */
56 #define	SECTOR_SHIFT	9
57 #define	SECTOR_SIZE	(1ULL << SECTOR_SHIFT)
58 
59 /*
60  * Increment the given counter and return its updated value.
61  * If the counter is already 0 it will not be incremented.
62  * If the counter is already at its maximum value returns
63  * -EINVAL without updating it.
64  */
65 static int atomic_inc_return_safe(atomic_t *v)
66 {
67 	unsigned int counter;
68 
69 	counter = (unsigned int)__atomic_add_unless(v, 1, 0);
70 	if (counter <= (unsigned int)INT_MAX)
71 		return (int)counter;
72 
73 	atomic_dec(v);
74 
75 	return -EINVAL;
76 }
77 
78 /* Decrement the counter.  Return the resulting value, or -EINVAL */
79 static int atomic_dec_return_safe(atomic_t *v)
80 {
81 	int counter;
82 
83 	counter = atomic_dec_return(v);
84 	if (counter >= 0)
85 		return counter;
86 
87 	atomic_inc(v);
88 
89 	return -EINVAL;
90 }
91 
92 #define RBD_DRV_NAME "rbd"
93 
94 #define RBD_MINORS_PER_MAJOR		256
95 #define RBD_SINGLE_MAJOR_PART_SHIFT	4
96 
97 #define RBD_SNAP_DEV_NAME_PREFIX	"snap_"
98 #define RBD_MAX_SNAP_NAME_LEN	\
99 			(NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
100 
101 #define RBD_MAX_SNAP_COUNT	510	/* allows max snapc to fit in 4KB */
102 
103 #define RBD_SNAP_HEAD_NAME	"-"
104 
105 #define	BAD_SNAP_INDEX	U32_MAX		/* invalid index into snap array */
106 
107 /* This allows a single page to hold an image name sent by OSD */
108 #define RBD_IMAGE_NAME_LEN_MAX	(PAGE_SIZE - sizeof (__le32) - 1)
109 #define RBD_IMAGE_ID_LEN_MAX	64
110 
111 #define RBD_OBJ_PREFIX_LEN_MAX	64
112 
113 /* Feature bits */
114 
115 #define RBD_FEATURE_LAYERING	(1<<0)
116 #define RBD_FEATURE_STRIPINGV2	(1<<1)
117 #define RBD_FEATURES_ALL \
118 	    (RBD_FEATURE_LAYERING | RBD_FEATURE_STRIPINGV2)
119 
120 /* Features supported by this (client software) implementation. */
121 
122 #define RBD_FEATURES_SUPPORTED	(RBD_FEATURES_ALL)
123 
124 /*
125  * An RBD device name will be "rbd#", where the "rbd" comes from
126  * RBD_DRV_NAME above, and # is a unique integer identifier.
127  * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
128  * enough to hold all possible device names.
129  */
130 #define DEV_NAME_LEN		32
131 #define MAX_INT_FORMAT_WIDTH	((5 * sizeof (int)) / 2 + 1)
132 
133 /*
134  * block device image metadata (in-memory version)
135  */
136 struct rbd_image_header {
137 	/* These six fields never change for a given rbd image */
138 	char *object_prefix;
139 	__u8 obj_order;
140 	__u8 crypt_type;
141 	__u8 comp_type;
142 	u64 stripe_unit;
143 	u64 stripe_count;
144 	u64 features;		/* Might be changeable someday? */
145 
146 	/* The remaining fields need to be updated occasionally */
147 	u64 image_size;
148 	struct ceph_snap_context *snapc;
149 	char *snap_names;	/* format 1 only */
150 	u64 *snap_sizes;	/* format 1 only */
151 };
152 
153 /*
154  * An rbd image specification.
155  *
156  * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
157  * identify an image.  Each rbd_dev structure includes a pointer to
158  * an rbd_spec structure that encapsulates this identity.
159  *
160  * Each of the id's in an rbd_spec has an associated name.  For a
161  * user-mapped image, the names are supplied and the id's associated
162  * with them are looked up.  For a layered image, a parent image is
163  * defined by the tuple, and the names are looked up.
164  *
165  * An rbd_dev structure contains a parent_spec pointer which is
166  * non-null if the image it represents is a child in a layered
167  * image.  This pointer will refer to the rbd_spec structure used
168  * by the parent rbd_dev for its own identity (i.e., the structure
169  * is shared between the parent and child).
170  *
171  * Since these structures are populated once, during the discovery
172  * phase of image construction, they are effectively immutable so
173  * we make no effort to synchronize access to them.
174  *
175  * Note that code herein does not assume the image name is known (it
176  * could be a null pointer).
177  */
178 struct rbd_spec {
179 	u64		pool_id;
180 	const char	*pool_name;
181 
182 	const char	*image_id;
183 	const char	*image_name;
184 
185 	u64		snap_id;
186 	const char	*snap_name;
187 
188 	struct kref	kref;
189 };
190 
191 /*
192  * an instance of the client.  multiple devices may share an rbd client.
193  */
194 struct rbd_client {
195 	struct ceph_client	*client;
196 	struct kref		kref;
197 	struct list_head	node;
198 };
199 
200 struct rbd_img_request;
201 typedef void (*rbd_img_callback_t)(struct rbd_img_request *);
202 
203 #define	BAD_WHICH	U32_MAX		/* Good which or bad which, which? */
204 
205 struct rbd_obj_request;
206 typedef void (*rbd_obj_callback_t)(struct rbd_obj_request *);
207 
208 enum obj_request_type {
209 	OBJ_REQUEST_NODATA, OBJ_REQUEST_BIO, OBJ_REQUEST_PAGES
210 };
211 
212 enum obj_req_flags {
213 	OBJ_REQ_DONE,		/* completion flag: not done = 0, done = 1 */
214 	OBJ_REQ_IMG_DATA,	/* object usage: standalone = 0, image = 1 */
215 	OBJ_REQ_KNOWN,		/* EXISTS flag valid: no = 0, yes = 1 */
216 	OBJ_REQ_EXISTS,		/* target exists: no = 0, yes = 1 */
217 };
218 
219 struct rbd_obj_request {
220 	const char		*object_name;
221 	u64			offset;		/* object start byte */
222 	u64			length;		/* bytes from offset */
223 	unsigned long		flags;
224 
225 	/*
226 	 * An object request associated with an image will have its
227 	 * img_data flag set; a standalone object request will not.
228 	 *
229 	 * A standalone object request will have which == BAD_WHICH
230 	 * and a null obj_request pointer.
231 	 *
232 	 * An object request initiated in support of a layered image
233 	 * object (to check for its existence before a write) will
234 	 * have which == BAD_WHICH and a non-null obj_request pointer.
235 	 *
236 	 * Finally, an object request for rbd image data will have
237 	 * which != BAD_WHICH, and will have a non-null img_request
238 	 * pointer.  The value of which will be in the range
239 	 * 0..(img_request->obj_request_count-1).
240 	 */
241 	union {
242 		struct rbd_obj_request	*obj_request;	/* STAT op */
243 		struct {
244 			struct rbd_img_request	*img_request;
245 			u64			img_offset;
246 			/* links for img_request->obj_requests list */
247 			struct list_head	links;
248 		};
249 	};
250 	u32			which;		/* posn image request list */
251 
252 	enum obj_request_type	type;
253 	union {
254 		struct bio	*bio_list;
255 		struct {
256 			struct page	**pages;
257 			u32		page_count;
258 		};
259 	};
260 	struct page		**copyup_pages;
261 	u32			copyup_page_count;
262 
263 	struct ceph_osd_request	*osd_req;
264 
265 	u64			xferred;	/* bytes transferred */
266 	int			result;
267 
268 	rbd_obj_callback_t	callback;
269 	struct completion	completion;
270 
271 	struct kref		kref;
272 };
273 
274 enum img_req_flags {
275 	IMG_REQ_WRITE,		/* I/O direction: read = 0, write = 1 */
276 	IMG_REQ_CHILD,		/* initiator: block = 0, child image = 1 */
277 	IMG_REQ_LAYERED,	/* ENOENT handling: normal = 0, layered = 1 */
278 };
279 
280 struct rbd_img_request {
281 	struct rbd_device	*rbd_dev;
282 	u64			offset;	/* starting image byte offset */
283 	u64			length;	/* byte count from offset */
284 	unsigned long		flags;
285 	union {
286 		u64			snap_id;	/* for reads */
287 		struct ceph_snap_context *snapc;	/* for writes */
288 	};
289 	union {
290 		struct request		*rq;		/* block request */
291 		struct rbd_obj_request	*obj_request;	/* obj req initiator */
292 	};
293 	struct page		**copyup_pages;
294 	u32			copyup_page_count;
295 	spinlock_t		completion_lock;/* protects next_completion */
296 	u32			next_completion;
297 	rbd_img_callback_t	callback;
298 	u64			xferred;/* aggregate bytes transferred */
299 	int			result;	/* first nonzero obj_request result */
300 
301 	u32			obj_request_count;
302 	struct list_head	obj_requests;	/* rbd_obj_request structs */
303 
304 	struct kref		kref;
305 };
306 
307 #define for_each_obj_request(ireq, oreq) \
308 	list_for_each_entry(oreq, &(ireq)->obj_requests, links)
309 #define for_each_obj_request_from(ireq, oreq) \
310 	list_for_each_entry_from(oreq, &(ireq)->obj_requests, links)
311 #define for_each_obj_request_safe(ireq, oreq, n) \
312 	list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links)
313 
314 struct rbd_mapping {
315 	u64                     size;
316 	u64                     features;
317 	bool			read_only;
318 };
319 
320 /*
321  * a single device
322  */
323 struct rbd_device {
324 	int			dev_id;		/* blkdev unique id */
325 
326 	int			major;		/* blkdev assigned major */
327 	int			minor;
328 	struct gendisk		*disk;		/* blkdev's gendisk and rq */
329 
330 	u32			image_format;	/* Either 1 or 2 */
331 	struct rbd_client	*rbd_client;
332 
333 	char			name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
334 
335 	spinlock_t		lock;		/* queue, flags, open_count */
336 
337 	struct rbd_image_header	header;
338 	unsigned long		flags;		/* possibly lock protected */
339 	struct rbd_spec		*spec;
340 
341 	char			*header_name;
342 
343 	struct ceph_file_layout	layout;
344 
345 	struct ceph_osd_event   *watch_event;
346 	struct rbd_obj_request	*watch_request;
347 
348 	struct rbd_spec		*parent_spec;
349 	u64			parent_overlap;
350 	atomic_t		parent_ref;
351 	struct rbd_device	*parent;
352 
353 	/* protects updating the header */
354 	struct rw_semaphore     header_rwsem;
355 
356 	struct rbd_mapping	mapping;
357 
358 	struct list_head	node;
359 
360 	/* sysfs related */
361 	struct device		dev;
362 	unsigned long		open_count;	/* protected by lock */
363 };
364 
365 /*
366  * Flag bits for rbd_dev->flags.  If atomicity is required,
367  * rbd_dev->lock is used to protect access.
368  *
369  * Currently, only the "removing" flag (which is coupled with the
370  * "open_count" field) requires atomic access.
371  */
372 enum rbd_dev_flags {
373 	RBD_DEV_FLAG_EXISTS,	/* mapped snapshot has not been deleted */
374 	RBD_DEV_FLAG_REMOVING,	/* this mapping is being removed */
375 };
376 
377 static DEFINE_MUTEX(client_mutex);	/* Serialize client creation */
378 
379 static LIST_HEAD(rbd_dev_list);    /* devices */
380 static DEFINE_SPINLOCK(rbd_dev_list_lock);
381 
382 static LIST_HEAD(rbd_client_list);		/* clients */
383 static DEFINE_SPINLOCK(rbd_client_list_lock);
384 
385 /* Slab caches for frequently-allocated structures */
386 
387 static struct kmem_cache	*rbd_img_request_cache;
388 static struct kmem_cache	*rbd_obj_request_cache;
389 static struct kmem_cache	*rbd_segment_name_cache;
390 
391 static int rbd_major;
392 static DEFINE_IDA(rbd_dev_id_ida);
393 
394 /*
395  * Default to false for now, as single-major requires >= 0.75 version of
396  * userspace rbd utility.
397  */
398 static bool single_major = false;
399 module_param(single_major, bool, S_IRUGO);
400 MODULE_PARM_DESC(single_major, "Use a single major number for all rbd devices (default: false)");
401 
402 static int rbd_img_request_submit(struct rbd_img_request *img_request);
403 
404 static void rbd_dev_device_release(struct device *dev);
405 
406 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
407 		       size_t count);
408 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
409 			  size_t count);
410 static ssize_t rbd_add_single_major(struct bus_type *bus, const char *buf,
411 				    size_t count);
412 static ssize_t rbd_remove_single_major(struct bus_type *bus, const char *buf,
413 				       size_t count);
414 static int rbd_dev_image_probe(struct rbd_device *rbd_dev, bool mapping);
415 static void rbd_spec_put(struct rbd_spec *spec);
416 
417 static int rbd_dev_id_to_minor(int dev_id)
418 {
419 	return dev_id << RBD_SINGLE_MAJOR_PART_SHIFT;
420 }
421 
422 static int minor_to_rbd_dev_id(int minor)
423 {
424 	return minor >> RBD_SINGLE_MAJOR_PART_SHIFT;
425 }
426 
427 static BUS_ATTR(add, S_IWUSR, NULL, rbd_add);
428 static BUS_ATTR(remove, S_IWUSR, NULL, rbd_remove);
429 static BUS_ATTR(add_single_major, S_IWUSR, NULL, rbd_add_single_major);
430 static BUS_ATTR(remove_single_major, S_IWUSR, NULL, rbd_remove_single_major);
431 
432 static struct attribute *rbd_bus_attrs[] = {
433 	&bus_attr_add.attr,
434 	&bus_attr_remove.attr,
435 	&bus_attr_add_single_major.attr,
436 	&bus_attr_remove_single_major.attr,
437 	NULL,
438 };
439 
440 static umode_t rbd_bus_is_visible(struct kobject *kobj,
441 				  struct attribute *attr, int index)
442 {
443 	if (!single_major &&
444 	    (attr == &bus_attr_add_single_major.attr ||
445 	     attr == &bus_attr_remove_single_major.attr))
446 		return 0;
447 
448 	return attr->mode;
449 }
450 
451 static const struct attribute_group rbd_bus_group = {
452 	.attrs = rbd_bus_attrs,
453 	.is_visible = rbd_bus_is_visible,
454 };
455 __ATTRIBUTE_GROUPS(rbd_bus);
456 
457 static struct bus_type rbd_bus_type = {
458 	.name		= "rbd",
459 	.bus_groups	= rbd_bus_groups,
460 };
461 
462 static void rbd_root_dev_release(struct device *dev)
463 {
464 }
465 
466 static struct device rbd_root_dev = {
467 	.init_name =    "rbd",
468 	.release =      rbd_root_dev_release,
469 };
470 
471 static __printf(2, 3)
472 void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
473 {
474 	struct va_format vaf;
475 	va_list args;
476 
477 	va_start(args, fmt);
478 	vaf.fmt = fmt;
479 	vaf.va = &args;
480 
481 	if (!rbd_dev)
482 		printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
483 	else if (rbd_dev->disk)
484 		printk(KERN_WARNING "%s: %s: %pV\n",
485 			RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
486 	else if (rbd_dev->spec && rbd_dev->spec->image_name)
487 		printk(KERN_WARNING "%s: image %s: %pV\n",
488 			RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
489 	else if (rbd_dev->spec && rbd_dev->spec->image_id)
490 		printk(KERN_WARNING "%s: id %s: %pV\n",
491 			RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
492 	else	/* punt */
493 		printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
494 			RBD_DRV_NAME, rbd_dev, &vaf);
495 	va_end(args);
496 }
497 
498 #ifdef RBD_DEBUG
499 #define rbd_assert(expr)						\
500 		if (unlikely(!(expr))) {				\
501 			printk(KERN_ERR "\nAssertion failure in %s() "	\
502 						"at line %d:\n\n"	\
503 					"\trbd_assert(%s);\n\n",	\
504 					__func__, __LINE__, #expr);	\
505 			BUG();						\
506 		}
507 #else /* !RBD_DEBUG */
508 #  define rbd_assert(expr)	((void) 0)
509 #endif /* !RBD_DEBUG */
510 
511 static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request);
512 static void rbd_img_parent_read(struct rbd_obj_request *obj_request);
513 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev);
514 
515 static int rbd_dev_refresh(struct rbd_device *rbd_dev);
516 static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev);
517 static int rbd_dev_v2_header_info(struct rbd_device *rbd_dev);
518 static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
519 					u64 snap_id);
520 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
521 				u8 *order, u64 *snap_size);
522 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
523 		u64 *snap_features);
524 static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name);
525 
526 static int rbd_open(struct block_device *bdev, fmode_t mode)
527 {
528 	struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
529 	bool removing = false;
530 
531 	if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
532 		return -EROFS;
533 
534 	spin_lock_irq(&rbd_dev->lock);
535 	if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags))
536 		removing = true;
537 	else
538 		rbd_dev->open_count++;
539 	spin_unlock_irq(&rbd_dev->lock);
540 	if (removing)
541 		return -ENOENT;
542 
543 	(void) get_device(&rbd_dev->dev);
544 	set_device_ro(bdev, rbd_dev->mapping.read_only);
545 
546 	return 0;
547 }
548 
549 static void rbd_release(struct gendisk *disk, fmode_t mode)
550 {
551 	struct rbd_device *rbd_dev = disk->private_data;
552 	unsigned long open_count_before;
553 
554 	spin_lock_irq(&rbd_dev->lock);
555 	open_count_before = rbd_dev->open_count--;
556 	spin_unlock_irq(&rbd_dev->lock);
557 	rbd_assert(open_count_before > 0);
558 
559 	put_device(&rbd_dev->dev);
560 }
561 
562 static const struct block_device_operations rbd_bd_ops = {
563 	.owner			= THIS_MODULE,
564 	.open			= rbd_open,
565 	.release		= rbd_release,
566 };
567 
568 /*
569  * Initialize an rbd client instance.  Success or not, this function
570  * consumes ceph_opts.  Caller holds client_mutex.
571  */
572 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
573 {
574 	struct rbd_client *rbdc;
575 	int ret = -ENOMEM;
576 
577 	dout("%s:\n", __func__);
578 	rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
579 	if (!rbdc)
580 		goto out_opt;
581 
582 	kref_init(&rbdc->kref);
583 	INIT_LIST_HEAD(&rbdc->node);
584 
585 	rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
586 	if (IS_ERR(rbdc->client))
587 		goto out_rbdc;
588 	ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
589 
590 	ret = ceph_open_session(rbdc->client);
591 	if (ret < 0)
592 		goto out_client;
593 
594 	spin_lock(&rbd_client_list_lock);
595 	list_add_tail(&rbdc->node, &rbd_client_list);
596 	spin_unlock(&rbd_client_list_lock);
597 
598 	dout("%s: rbdc %p\n", __func__, rbdc);
599 
600 	return rbdc;
601 out_client:
602 	ceph_destroy_client(rbdc->client);
603 out_rbdc:
604 	kfree(rbdc);
605 out_opt:
606 	if (ceph_opts)
607 		ceph_destroy_options(ceph_opts);
608 	dout("%s: error %d\n", __func__, ret);
609 
610 	return ERR_PTR(ret);
611 }
612 
613 static struct rbd_client *__rbd_get_client(struct rbd_client *rbdc)
614 {
615 	kref_get(&rbdc->kref);
616 
617 	return rbdc;
618 }
619 
620 /*
621  * Find a ceph client with specific addr and configuration.  If
622  * found, bump its reference count.
623  */
624 static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
625 {
626 	struct rbd_client *client_node;
627 	bool found = false;
628 
629 	if (ceph_opts->flags & CEPH_OPT_NOSHARE)
630 		return NULL;
631 
632 	spin_lock(&rbd_client_list_lock);
633 	list_for_each_entry(client_node, &rbd_client_list, node) {
634 		if (!ceph_compare_options(ceph_opts, client_node->client)) {
635 			__rbd_get_client(client_node);
636 
637 			found = true;
638 			break;
639 		}
640 	}
641 	spin_unlock(&rbd_client_list_lock);
642 
643 	return found ? client_node : NULL;
644 }
645 
646 /*
647  * mount options
648  */
649 enum {
650 	Opt_last_int,
651 	/* int args above */
652 	Opt_last_string,
653 	/* string args above */
654 	Opt_read_only,
655 	Opt_read_write,
656 	/* Boolean args above */
657 	Opt_last_bool,
658 };
659 
660 static match_table_t rbd_opts_tokens = {
661 	/* int args above */
662 	/* string args above */
663 	{Opt_read_only, "read_only"},
664 	{Opt_read_only, "ro"},		/* Alternate spelling */
665 	{Opt_read_write, "read_write"},
666 	{Opt_read_write, "rw"},		/* Alternate spelling */
667 	/* Boolean args above */
668 	{-1, NULL}
669 };
670 
671 struct rbd_options {
672 	bool	read_only;
673 };
674 
675 #define RBD_READ_ONLY_DEFAULT	false
676 
677 static int parse_rbd_opts_token(char *c, void *private)
678 {
679 	struct rbd_options *rbd_opts = private;
680 	substring_t argstr[MAX_OPT_ARGS];
681 	int token, intval, ret;
682 
683 	token = match_token(c, rbd_opts_tokens, argstr);
684 	if (token < 0)
685 		return -EINVAL;
686 
687 	if (token < Opt_last_int) {
688 		ret = match_int(&argstr[0], &intval);
689 		if (ret < 0) {
690 			pr_err("bad mount option arg (not int) "
691 			       "at '%s'\n", c);
692 			return ret;
693 		}
694 		dout("got int token %d val %d\n", token, intval);
695 	} else if (token > Opt_last_int && token < Opt_last_string) {
696 		dout("got string token %d val %s\n", token,
697 		     argstr[0].from);
698 	} else if (token > Opt_last_string && token < Opt_last_bool) {
699 		dout("got Boolean token %d\n", token);
700 	} else {
701 		dout("got token %d\n", token);
702 	}
703 
704 	switch (token) {
705 	case Opt_read_only:
706 		rbd_opts->read_only = true;
707 		break;
708 	case Opt_read_write:
709 		rbd_opts->read_only = false;
710 		break;
711 	default:
712 		rbd_assert(false);
713 		break;
714 	}
715 	return 0;
716 }
717 
718 /*
719  * Get a ceph client with specific addr and configuration, if one does
720  * not exist create it.  Either way, ceph_opts is consumed by this
721  * function.
722  */
723 static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
724 {
725 	struct rbd_client *rbdc;
726 
727 	mutex_lock_nested(&client_mutex, SINGLE_DEPTH_NESTING);
728 	rbdc = rbd_client_find(ceph_opts);
729 	if (rbdc)	/* using an existing client */
730 		ceph_destroy_options(ceph_opts);
731 	else
732 		rbdc = rbd_client_create(ceph_opts);
733 	mutex_unlock(&client_mutex);
734 
735 	return rbdc;
736 }
737 
738 /*
739  * Destroy ceph client
740  *
741  * Caller must hold rbd_client_list_lock.
742  */
743 static void rbd_client_release(struct kref *kref)
744 {
745 	struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
746 
747 	dout("%s: rbdc %p\n", __func__, rbdc);
748 	spin_lock(&rbd_client_list_lock);
749 	list_del(&rbdc->node);
750 	spin_unlock(&rbd_client_list_lock);
751 
752 	ceph_destroy_client(rbdc->client);
753 	kfree(rbdc);
754 }
755 
756 /*
757  * Drop reference to ceph client node. If it's not referenced anymore, release
758  * it.
759  */
760 static void rbd_put_client(struct rbd_client *rbdc)
761 {
762 	if (rbdc)
763 		kref_put(&rbdc->kref, rbd_client_release);
764 }
765 
766 static bool rbd_image_format_valid(u32 image_format)
767 {
768 	return image_format == 1 || image_format == 2;
769 }
770 
771 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
772 {
773 	size_t size;
774 	u32 snap_count;
775 
776 	/* The header has to start with the magic rbd header text */
777 	if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
778 		return false;
779 
780 	/* The bio layer requires at least sector-sized I/O */
781 
782 	if (ondisk->options.order < SECTOR_SHIFT)
783 		return false;
784 
785 	/* If we use u64 in a few spots we may be able to loosen this */
786 
787 	if (ondisk->options.order > 8 * sizeof (int) - 1)
788 		return false;
789 
790 	/*
791 	 * The size of a snapshot header has to fit in a size_t, and
792 	 * that limits the number of snapshots.
793 	 */
794 	snap_count = le32_to_cpu(ondisk->snap_count);
795 	size = SIZE_MAX - sizeof (struct ceph_snap_context);
796 	if (snap_count > size / sizeof (__le64))
797 		return false;
798 
799 	/*
800 	 * Not only that, but the size of the entire the snapshot
801 	 * header must also be representable in a size_t.
802 	 */
803 	size -= snap_count * sizeof (__le64);
804 	if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
805 		return false;
806 
807 	return true;
808 }
809 
810 /*
811  * Fill an rbd image header with information from the given format 1
812  * on-disk header.
813  */
814 static int rbd_header_from_disk(struct rbd_device *rbd_dev,
815 				 struct rbd_image_header_ondisk *ondisk)
816 {
817 	struct rbd_image_header *header = &rbd_dev->header;
818 	bool first_time = header->object_prefix == NULL;
819 	struct ceph_snap_context *snapc;
820 	char *object_prefix = NULL;
821 	char *snap_names = NULL;
822 	u64 *snap_sizes = NULL;
823 	u32 snap_count;
824 	size_t size;
825 	int ret = -ENOMEM;
826 	u32 i;
827 
828 	/* Allocate this now to avoid having to handle failure below */
829 
830 	if (first_time) {
831 		size_t len;
832 
833 		len = strnlen(ondisk->object_prefix,
834 				sizeof (ondisk->object_prefix));
835 		object_prefix = kmalloc(len + 1, GFP_KERNEL);
836 		if (!object_prefix)
837 			return -ENOMEM;
838 		memcpy(object_prefix, ondisk->object_prefix, len);
839 		object_prefix[len] = '\0';
840 	}
841 
842 	/* Allocate the snapshot context and fill it in */
843 
844 	snap_count = le32_to_cpu(ondisk->snap_count);
845 	snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
846 	if (!snapc)
847 		goto out_err;
848 	snapc->seq = le64_to_cpu(ondisk->snap_seq);
849 	if (snap_count) {
850 		struct rbd_image_snap_ondisk *snaps;
851 		u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
852 
853 		/* We'll keep a copy of the snapshot names... */
854 
855 		if (snap_names_len > (u64)SIZE_MAX)
856 			goto out_2big;
857 		snap_names = kmalloc(snap_names_len, GFP_KERNEL);
858 		if (!snap_names)
859 			goto out_err;
860 
861 		/* ...as well as the array of their sizes. */
862 
863 		size = snap_count * sizeof (*header->snap_sizes);
864 		snap_sizes = kmalloc(size, GFP_KERNEL);
865 		if (!snap_sizes)
866 			goto out_err;
867 
868 		/*
869 		 * Copy the names, and fill in each snapshot's id
870 		 * and size.
871 		 *
872 		 * Note that rbd_dev_v1_header_info() guarantees the
873 		 * ondisk buffer we're working with has
874 		 * snap_names_len bytes beyond the end of the
875 		 * snapshot id array, this memcpy() is safe.
876 		 */
877 		memcpy(snap_names, &ondisk->snaps[snap_count], snap_names_len);
878 		snaps = ondisk->snaps;
879 		for (i = 0; i < snap_count; i++) {
880 			snapc->snaps[i] = le64_to_cpu(snaps[i].id);
881 			snap_sizes[i] = le64_to_cpu(snaps[i].image_size);
882 		}
883 	}
884 
885 	/* We won't fail any more, fill in the header */
886 
887 	if (first_time) {
888 		header->object_prefix = object_prefix;
889 		header->obj_order = ondisk->options.order;
890 		header->crypt_type = ondisk->options.crypt_type;
891 		header->comp_type = ondisk->options.comp_type;
892 		/* The rest aren't used for format 1 images */
893 		header->stripe_unit = 0;
894 		header->stripe_count = 0;
895 		header->features = 0;
896 	} else {
897 		ceph_put_snap_context(header->snapc);
898 		kfree(header->snap_names);
899 		kfree(header->snap_sizes);
900 	}
901 
902 	/* The remaining fields always get updated (when we refresh) */
903 
904 	header->image_size = le64_to_cpu(ondisk->image_size);
905 	header->snapc = snapc;
906 	header->snap_names = snap_names;
907 	header->snap_sizes = snap_sizes;
908 
909 	/* Make sure mapping size is consistent with header info */
910 
911 	if (rbd_dev->spec->snap_id == CEPH_NOSNAP || first_time)
912 		if (rbd_dev->mapping.size != header->image_size)
913 			rbd_dev->mapping.size = header->image_size;
914 
915 	return 0;
916 out_2big:
917 	ret = -EIO;
918 out_err:
919 	kfree(snap_sizes);
920 	kfree(snap_names);
921 	ceph_put_snap_context(snapc);
922 	kfree(object_prefix);
923 
924 	return ret;
925 }
926 
927 static const char *_rbd_dev_v1_snap_name(struct rbd_device *rbd_dev, u32 which)
928 {
929 	const char *snap_name;
930 
931 	rbd_assert(which < rbd_dev->header.snapc->num_snaps);
932 
933 	/* Skip over names until we find the one we are looking for */
934 
935 	snap_name = rbd_dev->header.snap_names;
936 	while (which--)
937 		snap_name += strlen(snap_name) + 1;
938 
939 	return kstrdup(snap_name, GFP_KERNEL);
940 }
941 
942 /*
943  * Snapshot id comparison function for use with qsort()/bsearch().
944  * Note that result is for snapshots in *descending* order.
945  */
946 static int snapid_compare_reverse(const void *s1, const void *s2)
947 {
948 	u64 snap_id1 = *(u64 *)s1;
949 	u64 snap_id2 = *(u64 *)s2;
950 
951 	if (snap_id1 < snap_id2)
952 		return 1;
953 	return snap_id1 == snap_id2 ? 0 : -1;
954 }
955 
956 /*
957  * Search a snapshot context to see if the given snapshot id is
958  * present.
959  *
960  * Returns the position of the snapshot id in the array if it's found,
961  * or BAD_SNAP_INDEX otherwise.
962  *
963  * Note: The snapshot array is in kept sorted (by the osd) in
964  * reverse order, highest snapshot id first.
965  */
966 static u32 rbd_dev_snap_index(struct rbd_device *rbd_dev, u64 snap_id)
967 {
968 	struct ceph_snap_context *snapc = rbd_dev->header.snapc;
969 	u64 *found;
970 
971 	found = bsearch(&snap_id, &snapc->snaps, snapc->num_snaps,
972 				sizeof (snap_id), snapid_compare_reverse);
973 
974 	return found ? (u32)(found - &snapc->snaps[0]) : BAD_SNAP_INDEX;
975 }
976 
977 static const char *rbd_dev_v1_snap_name(struct rbd_device *rbd_dev,
978 					u64 snap_id)
979 {
980 	u32 which;
981 	const char *snap_name;
982 
983 	which = rbd_dev_snap_index(rbd_dev, snap_id);
984 	if (which == BAD_SNAP_INDEX)
985 		return ERR_PTR(-ENOENT);
986 
987 	snap_name = _rbd_dev_v1_snap_name(rbd_dev, which);
988 	return snap_name ? snap_name : ERR_PTR(-ENOMEM);
989 }
990 
991 static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
992 {
993 	if (snap_id == CEPH_NOSNAP)
994 		return RBD_SNAP_HEAD_NAME;
995 
996 	rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
997 	if (rbd_dev->image_format == 1)
998 		return rbd_dev_v1_snap_name(rbd_dev, snap_id);
999 
1000 	return rbd_dev_v2_snap_name(rbd_dev, snap_id);
1001 }
1002 
1003 static int rbd_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
1004 				u64 *snap_size)
1005 {
1006 	rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1007 	if (snap_id == CEPH_NOSNAP) {
1008 		*snap_size = rbd_dev->header.image_size;
1009 	} else if (rbd_dev->image_format == 1) {
1010 		u32 which;
1011 
1012 		which = rbd_dev_snap_index(rbd_dev, snap_id);
1013 		if (which == BAD_SNAP_INDEX)
1014 			return -ENOENT;
1015 
1016 		*snap_size = rbd_dev->header.snap_sizes[which];
1017 	} else {
1018 		u64 size = 0;
1019 		int ret;
1020 
1021 		ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, NULL, &size);
1022 		if (ret)
1023 			return ret;
1024 
1025 		*snap_size = size;
1026 	}
1027 	return 0;
1028 }
1029 
1030 static int rbd_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
1031 			u64 *snap_features)
1032 {
1033 	rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1034 	if (snap_id == CEPH_NOSNAP) {
1035 		*snap_features = rbd_dev->header.features;
1036 	} else if (rbd_dev->image_format == 1) {
1037 		*snap_features = 0;	/* No features for format 1 */
1038 	} else {
1039 		u64 features = 0;
1040 		int ret;
1041 
1042 		ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, &features);
1043 		if (ret)
1044 			return ret;
1045 
1046 		*snap_features = features;
1047 	}
1048 	return 0;
1049 }
1050 
1051 static int rbd_dev_mapping_set(struct rbd_device *rbd_dev)
1052 {
1053 	u64 snap_id = rbd_dev->spec->snap_id;
1054 	u64 size = 0;
1055 	u64 features = 0;
1056 	int ret;
1057 
1058 	ret = rbd_snap_size(rbd_dev, snap_id, &size);
1059 	if (ret)
1060 		return ret;
1061 	ret = rbd_snap_features(rbd_dev, snap_id, &features);
1062 	if (ret)
1063 		return ret;
1064 
1065 	rbd_dev->mapping.size = size;
1066 	rbd_dev->mapping.features = features;
1067 
1068 	return 0;
1069 }
1070 
1071 static void rbd_dev_mapping_clear(struct rbd_device *rbd_dev)
1072 {
1073 	rbd_dev->mapping.size = 0;
1074 	rbd_dev->mapping.features = 0;
1075 }
1076 
1077 static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
1078 {
1079 	char *name;
1080 	u64 segment;
1081 	int ret;
1082 	char *name_format;
1083 
1084 	name = kmem_cache_alloc(rbd_segment_name_cache, GFP_NOIO);
1085 	if (!name)
1086 		return NULL;
1087 	segment = offset >> rbd_dev->header.obj_order;
1088 	name_format = "%s.%012llx";
1089 	if (rbd_dev->image_format == 2)
1090 		name_format = "%s.%016llx";
1091 	ret = snprintf(name, CEPH_MAX_OID_NAME_LEN + 1, name_format,
1092 			rbd_dev->header.object_prefix, segment);
1093 	if (ret < 0 || ret > CEPH_MAX_OID_NAME_LEN) {
1094 		pr_err("error formatting segment name for #%llu (%d)\n",
1095 			segment, ret);
1096 		kfree(name);
1097 		name = NULL;
1098 	}
1099 
1100 	return name;
1101 }
1102 
1103 static void rbd_segment_name_free(const char *name)
1104 {
1105 	/* The explicit cast here is needed to drop the const qualifier */
1106 
1107 	kmem_cache_free(rbd_segment_name_cache, (void *)name);
1108 }
1109 
1110 static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
1111 {
1112 	u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
1113 
1114 	return offset & (segment_size - 1);
1115 }
1116 
1117 static u64 rbd_segment_length(struct rbd_device *rbd_dev,
1118 				u64 offset, u64 length)
1119 {
1120 	u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
1121 
1122 	offset &= segment_size - 1;
1123 
1124 	rbd_assert(length <= U64_MAX - offset);
1125 	if (offset + length > segment_size)
1126 		length = segment_size - offset;
1127 
1128 	return length;
1129 }
1130 
1131 /*
1132  * returns the size of an object in the image
1133  */
1134 static u64 rbd_obj_bytes(struct rbd_image_header *header)
1135 {
1136 	return 1 << header->obj_order;
1137 }
1138 
1139 /*
1140  * bio helpers
1141  */
1142 
1143 static void bio_chain_put(struct bio *chain)
1144 {
1145 	struct bio *tmp;
1146 
1147 	while (chain) {
1148 		tmp = chain;
1149 		chain = chain->bi_next;
1150 		bio_put(tmp);
1151 	}
1152 }
1153 
1154 /*
1155  * zeros a bio chain, starting at specific offset
1156  */
1157 static void zero_bio_chain(struct bio *chain, int start_ofs)
1158 {
1159 	struct bio_vec bv;
1160 	struct bvec_iter iter;
1161 	unsigned long flags;
1162 	void *buf;
1163 	int pos = 0;
1164 
1165 	while (chain) {
1166 		bio_for_each_segment(bv, chain, iter) {
1167 			if (pos + bv.bv_len > start_ofs) {
1168 				int remainder = max(start_ofs - pos, 0);
1169 				buf = bvec_kmap_irq(&bv, &flags);
1170 				memset(buf + remainder, 0,
1171 				       bv.bv_len - remainder);
1172 				flush_dcache_page(bv.bv_page);
1173 				bvec_kunmap_irq(buf, &flags);
1174 			}
1175 			pos += bv.bv_len;
1176 		}
1177 
1178 		chain = chain->bi_next;
1179 	}
1180 }
1181 
1182 /*
1183  * similar to zero_bio_chain(), zeros data defined by a page array,
1184  * starting at the given byte offset from the start of the array and
1185  * continuing up to the given end offset.  The pages array is
1186  * assumed to be big enough to hold all bytes up to the end.
1187  */
1188 static void zero_pages(struct page **pages, u64 offset, u64 end)
1189 {
1190 	struct page **page = &pages[offset >> PAGE_SHIFT];
1191 
1192 	rbd_assert(end > offset);
1193 	rbd_assert(end - offset <= (u64)SIZE_MAX);
1194 	while (offset < end) {
1195 		size_t page_offset;
1196 		size_t length;
1197 		unsigned long flags;
1198 		void *kaddr;
1199 
1200 		page_offset = offset & ~PAGE_MASK;
1201 		length = min_t(size_t, PAGE_SIZE - page_offset, end - offset);
1202 		local_irq_save(flags);
1203 		kaddr = kmap_atomic(*page);
1204 		memset(kaddr + page_offset, 0, length);
1205 		flush_dcache_page(*page);
1206 		kunmap_atomic(kaddr);
1207 		local_irq_restore(flags);
1208 
1209 		offset += length;
1210 		page++;
1211 	}
1212 }
1213 
1214 /*
1215  * Clone a portion of a bio, starting at the given byte offset
1216  * and continuing for the number of bytes indicated.
1217  */
1218 static struct bio *bio_clone_range(struct bio *bio_src,
1219 					unsigned int offset,
1220 					unsigned int len,
1221 					gfp_t gfpmask)
1222 {
1223 	struct bio *bio;
1224 
1225 	bio = bio_clone(bio_src, gfpmask);
1226 	if (!bio)
1227 		return NULL;	/* ENOMEM */
1228 
1229 	bio_advance(bio, offset);
1230 	bio->bi_iter.bi_size = len;
1231 
1232 	return bio;
1233 }
1234 
1235 /*
1236  * Clone a portion of a bio chain, starting at the given byte offset
1237  * into the first bio in the source chain and continuing for the
1238  * number of bytes indicated.  The result is another bio chain of
1239  * exactly the given length, or a null pointer on error.
1240  *
1241  * The bio_src and offset parameters are both in-out.  On entry they
1242  * refer to the first source bio and the offset into that bio where
1243  * the start of data to be cloned is located.
1244  *
1245  * On return, bio_src is updated to refer to the bio in the source
1246  * chain that contains first un-cloned byte, and *offset will
1247  * contain the offset of that byte within that bio.
1248  */
1249 static struct bio *bio_chain_clone_range(struct bio **bio_src,
1250 					unsigned int *offset,
1251 					unsigned int len,
1252 					gfp_t gfpmask)
1253 {
1254 	struct bio *bi = *bio_src;
1255 	unsigned int off = *offset;
1256 	struct bio *chain = NULL;
1257 	struct bio **end;
1258 
1259 	/* Build up a chain of clone bios up to the limit */
1260 
1261 	if (!bi || off >= bi->bi_iter.bi_size || !len)
1262 		return NULL;		/* Nothing to clone */
1263 
1264 	end = &chain;
1265 	while (len) {
1266 		unsigned int bi_size;
1267 		struct bio *bio;
1268 
1269 		if (!bi) {
1270 			rbd_warn(NULL, "bio_chain exhausted with %u left", len);
1271 			goto out_err;	/* EINVAL; ran out of bio's */
1272 		}
1273 		bi_size = min_t(unsigned int, bi->bi_iter.bi_size - off, len);
1274 		bio = bio_clone_range(bi, off, bi_size, gfpmask);
1275 		if (!bio)
1276 			goto out_err;	/* ENOMEM */
1277 
1278 		*end = bio;
1279 		end = &bio->bi_next;
1280 
1281 		off += bi_size;
1282 		if (off == bi->bi_iter.bi_size) {
1283 			bi = bi->bi_next;
1284 			off = 0;
1285 		}
1286 		len -= bi_size;
1287 	}
1288 	*bio_src = bi;
1289 	*offset = off;
1290 
1291 	return chain;
1292 out_err:
1293 	bio_chain_put(chain);
1294 
1295 	return NULL;
1296 }
1297 
1298 /*
1299  * The default/initial value for all object request flags is 0.  For
1300  * each flag, once its value is set to 1 it is never reset to 0
1301  * again.
1302  */
1303 static void obj_request_img_data_set(struct rbd_obj_request *obj_request)
1304 {
1305 	if (test_and_set_bit(OBJ_REQ_IMG_DATA, &obj_request->flags)) {
1306 		struct rbd_device *rbd_dev;
1307 
1308 		rbd_dev = obj_request->img_request->rbd_dev;
1309 		rbd_warn(rbd_dev, "obj_request %p already marked img_data\n",
1310 			obj_request);
1311 	}
1312 }
1313 
1314 static bool obj_request_img_data_test(struct rbd_obj_request *obj_request)
1315 {
1316 	smp_mb();
1317 	return test_bit(OBJ_REQ_IMG_DATA, &obj_request->flags) != 0;
1318 }
1319 
1320 static void obj_request_done_set(struct rbd_obj_request *obj_request)
1321 {
1322 	if (test_and_set_bit(OBJ_REQ_DONE, &obj_request->flags)) {
1323 		struct rbd_device *rbd_dev = NULL;
1324 
1325 		if (obj_request_img_data_test(obj_request))
1326 			rbd_dev = obj_request->img_request->rbd_dev;
1327 		rbd_warn(rbd_dev, "obj_request %p already marked done\n",
1328 			obj_request);
1329 	}
1330 }
1331 
1332 static bool obj_request_done_test(struct rbd_obj_request *obj_request)
1333 {
1334 	smp_mb();
1335 	return test_bit(OBJ_REQ_DONE, &obj_request->flags) != 0;
1336 }
1337 
1338 /*
1339  * This sets the KNOWN flag after (possibly) setting the EXISTS
1340  * flag.  The latter is set based on the "exists" value provided.
1341  *
1342  * Note that for our purposes once an object exists it never goes
1343  * away again.  It's possible that the response from two existence
1344  * checks are separated by the creation of the target object, and
1345  * the first ("doesn't exist") response arrives *after* the second
1346  * ("does exist").  In that case we ignore the second one.
1347  */
1348 static void obj_request_existence_set(struct rbd_obj_request *obj_request,
1349 				bool exists)
1350 {
1351 	if (exists)
1352 		set_bit(OBJ_REQ_EXISTS, &obj_request->flags);
1353 	set_bit(OBJ_REQ_KNOWN, &obj_request->flags);
1354 	smp_mb();
1355 }
1356 
1357 static bool obj_request_known_test(struct rbd_obj_request *obj_request)
1358 {
1359 	smp_mb();
1360 	return test_bit(OBJ_REQ_KNOWN, &obj_request->flags) != 0;
1361 }
1362 
1363 static bool obj_request_exists_test(struct rbd_obj_request *obj_request)
1364 {
1365 	smp_mb();
1366 	return test_bit(OBJ_REQ_EXISTS, &obj_request->flags) != 0;
1367 }
1368 
1369 static void rbd_obj_request_get(struct rbd_obj_request *obj_request)
1370 {
1371 	dout("%s: obj %p (was %d)\n", __func__, obj_request,
1372 		atomic_read(&obj_request->kref.refcount));
1373 	kref_get(&obj_request->kref);
1374 }
1375 
1376 static void rbd_obj_request_destroy(struct kref *kref);
1377 static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
1378 {
1379 	rbd_assert(obj_request != NULL);
1380 	dout("%s: obj %p (was %d)\n", __func__, obj_request,
1381 		atomic_read(&obj_request->kref.refcount));
1382 	kref_put(&obj_request->kref, rbd_obj_request_destroy);
1383 }
1384 
1385 static bool img_request_child_test(struct rbd_img_request *img_request);
1386 static void rbd_parent_request_destroy(struct kref *kref);
1387 static void rbd_img_request_destroy(struct kref *kref);
1388 static void rbd_img_request_put(struct rbd_img_request *img_request)
1389 {
1390 	rbd_assert(img_request != NULL);
1391 	dout("%s: img %p (was %d)\n", __func__, img_request,
1392 		atomic_read(&img_request->kref.refcount));
1393 	if (img_request_child_test(img_request))
1394 		kref_put(&img_request->kref, rbd_parent_request_destroy);
1395 	else
1396 		kref_put(&img_request->kref, rbd_img_request_destroy);
1397 }
1398 
1399 static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
1400 					struct rbd_obj_request *obj_request)
1401 {
1402 	rbd_assert(obj_request->img_request == NULL);
1403 
1404 	/* Image request now owns object's original reference */
1405 	obj_request->img_request = img_request;
1406 	obj_request->which = img_request->obj_request_count;
1407 	rbd_assert(!obj_request_img_data_test(obj_request));
1408 	obj_request_img_data_set(obj_request);
1409 	rbd_assert(obj_request->which != BAD_WHICH);
1410 	img_request->obj_request_count++;
1411 	list_add_tail(&obj_request->links, &img_request->obj_requests);
1412 	dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1413 		obj_request->which);
1414 }
1415 
1416 static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
1417 					struct rbd_obj_request *obj_request)
1418 {
1419 	rbd_assert(obj_request->which != BAD_WHICH);
1420 
1421 	dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1422 		obj_request->which);
1423 	list_del(&obj_request->links);
1424 	rbd_assert(img_request->obj_request_count > 0);
1425 	img_request->obj_request_count--;
1426 	rbd_assert(obj_request->which == img_request->obj_request_count);
1427 	obj_request->which = BAD_WHICH;
1428 	rbd_assert(obj_request_img_data_test(obj_request));
1429 	rbd_assert(obj_request->img_request == img_request);
1430 	obj_request->img_request = NULL;
1431 	obj_request->callback = NULL;
1432 	rbd_obj_request_put(obj_request);
1433 }
1434 
1435 static bool obj_request_type_valid(enum obj_request_type type)
1436 {
1437 	switch (type) {
1438 	case OBJ_REQUEST_NODATA:
1439 	case OBJ_REQUEST_BIO:
1440 	case OBJ_REQUEST_PAGES:
1441 		return true;
1442 	default:
1443 		return false;
1444 	}
1445 }
1446 
1447 static int rbd_obj_request_submit(struct ceph_osd_client *osdc,
1448 				struct rbd_obj_request *obj_request)
1449 {
1450 	dout("%s: osdc %p obj %p\n", __func__, osdc, obj_request);
1451 
1452 	return ceph_osdc_start_request(osdc, obj_request->osd_req, false);
1453 }
1454 
1455 static void rbd_img_request_complete(struct rbd_img_request *img_request)
1456 {
1457 
1458 	dout("%s: img %p\n", __func__, img_request);
1459 
1460 	/*
1461 	 * If no error occurred, compute the aggregate transfer
1462 	 * count for the image request.  We could instead use
1463 	 * atomic64_cmpxchg() to update it as each object request
1464 	 * completes; not clear which way is better off hand.
1465 	 */
1466 	if (!img_request->result) {
1467 		struct rbd_obj_request *obj_request;
1468 		u64 xferred = 0;
1469 
1470 		for_each_obj_request(img_request, obj_request)
1471 			xferred += obj_request->xferred;
1472 		img_request->xferred = xferred;
1473 	}
1474 
1475 	if (img_request->callback)
1476 		img_request->callback(img_request);
1477 	else
1478 		rbd_img_request_put(img_request);
1479 }
1480 
1481 /* Caller is responsible for rbd_obj_request_destroy(obj_request) */
1482 
1483 static int rbd_obj_request_wait(struct rbd_obj_request *obj_request)
1484 {
1485 	dout("%s: obj %p\n", __func__, obj_request);
1486 
1487 	return wait_for_completion_interruptible(&obj_request->completion);
1488 }
1489 
1490 /*
1491  * The default/initial value for all image request flags is 0.  Each
1492  * is conditionally set to 1 at image request initialization time
1493  * and currently never change thereafter.
1494  */
1495 static void img_request_write_set(struct rbd_img_request *img_request)
1496 {
1497 	set_bit(IMG_REQ_WRITE, &img_request->flags);
1498 	smp_mb();
1499 }
1500 
1501 static bool img_request_write_test(struct rbd_img_request *img_request)
1502 {
1503 	smp_mb();
1504 	return test_bit(IMG_REQ_WRITE, &img_request->flags) != 0;
1505 }
1506 
1507 static void img_request_child_set(struct rbd_img_request *img_request)
1508 {
1509 	set_bit(IMG_REQ_CHILD, &img_request->flags);
1510 	smp_mb();
1511 }
1512 
1513 static void img_request_child_clear(struct rbd_img_request *img_request)
1514 {
1515 	clear_bit(IMG_REQ_CHILD, &img_request->flags);
1516 	smp_mb();
1517 }
1518 
1519 static bool img_request_child_test(struct rbd_img_request *img_request)
1520 {
1521 	smp_mb();
1522 	return test_bit(IMG_REQ_CHILD, &img_request->flags) != 0;
1523 }
1524 
1525 static void img_request_layered_set(struct rbd_img_request *img_request)
1526 {
1527 	set_bit(IMG_REQ_LAYERED, &img_request->flags);
1528 	smp_mb();
1529 }
1530 
1531 static void img_request_layered_clear(struct rbd_img_request *img_request)
1532 {
1533 	clear_bit(IMG_REQ_LAYERED, &img_request->flags);
1534 	smp_mb();
1535 }
1536 
1537 static bool img_request_layered_test(struct rbd_img_request *img_request)
1538 {
1539 	smp_mb();
1540 	return test_bit(IMG_REQ_LAYERED, &img_request->flags) != 0;
1541 }
1542 
1543 static void
1544 rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request)
1545 {
1546 	u64 xferred = obj_request->xferred;
1547 	u64 length = obj_request->length;
1548 
1549 	dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1550 		obj_request, obj_request->img_request, obj_request->result,
1551 		xferred, length);
1552 	/*
1553 	 * ENOENT means a hole in the image.  We zero-fill the entire
1554 	 * length of the request.  A short read also implies zero-fill
1555 	 * to the end of the request.  An error requires the whole
1556 	 * length of the request to be reported finished with an error
1557 	 * to the block layer.  In each case we update the xferred
1558 	 * count to indicate the whole request was satisfied.
1559 	 */
1560 	rbd_assert(obj_request->type != OBJ_REQUEST_NODATA);
1561 	if (obj_request->result == -ENOENT) {
1562 		if (obj_request->type == OBJ_REQUEST_BIO)
1563 			zero_bio_chain(obj_request->bio_list, 0);
1564 		else
1565 			zero_pages(obj_request->pages, 0, length);
1566 		obj_request->result = 0;
1567 	} else if (xferred < length && !obj_request->result) {
1568 		if (obj_request->type == OBJ_REQUEST_BIO)
1569 			zero_bio_chain(obj_request->bio_list, xferred);
1570 		else
1571 			zero_pages(obj_request->pages, xferred, length);
1572 	}
1573 	obj_request->xferred = length;
1574 	obj_request_done_set(obj_request);
1575 }
1576 
1577 static void rbd_obj_request_complete(struct rbd_obj_request *obj_request)
1578 {
1579 	dout("%s: obj %p cb %p\n", __func__, obj_request,
1580 		obj_request->callback);
1581 	if (obj_request->callback)
1582 		obj_request->callback(obj_request);
1583 	else
1584 		complete_all(&obj_request->completion);
1585 }
1586 
1587 static void rbd_osd_trivial_callback(struct rbd_obj_request *obj_request)
1588 {
1589 	dout("%s: obj %p\n", __func__, obj_request);
1590 	obj_request_done_set(obj_request);
1591 }
1592 
1593 static void rbd_osd_read_callback(struct rbd_obj_request *obj_request)
1594 {
1595 	struct rbd_img_request *img_request = NULL;
1596 	struct rbd_device *rbd_dev = NULL;
1597 	bool layered = false;
1598 
1599 	if (obj_request_img_data_test(obj_request)) {
1600 		img_request = obj_request->img_request;
1601 		layered = img_request && img_request_layered_test(img_request);
1602 		rbd_dev = img_request->rbd_dev;
1603 	}
1604 
1605 	dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1606 		obj_request, img_request, obj_request->result,
1607 		obj_request->xferred, obj_request->length);
1608 	if (layered && obj_request->result == -ENOENT &&
1609 			obj_request->img_offset < rbd_dev->parent_overlap)
1610 		rbd_img_parent_read(obj_request);
1611 	else if (img_request)
1612 		rbd_img_obj_request_read_callback(obj_request);
1613 	else
1614 		obj_request_done_set(obj_request);
1615 }
1616 
1617 static void rbd_osd_write_callback(struct rbd_obj_request *obj_request)
1618 {
1619 	dout("%s: obj %p result %d %llu\n", __func__, obj_request,
1620 		obj_request->result, obj_request->length);
1621 	/*
1622 	 * There is no such thing as a successful short write.  Set
1623 	 * it to our originally-requested length.
1624 	 */
1625 	obj_request->xferred = obj_request->length;
1626 	obj_request_done_set(obj_request);
1627 }
1628 
1629 /*
1630  * For a simple stat call there's nothing to do.  We'll do more if
1631  * this is part of a write sequence for a layered image.
1632  */
1633 static void rbd_osd_stat_callback(struct rbd_obj_request *obj_request)
1634 {
1635 	dout("%s: obj %p\n", __func__, obj_request);
1636 	obj_request_done_set(obj_request);
1637 }
1638 
1639 static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
1640 				struct ceph_msg *msg)
1641 {
1642 	struct rbd_obj_request *obj_request = osd_req->r_priv;
1643 	u16 opcode;
1644 
1645 	dout("%s: osd_req %p msg %p\n", __func__, osd_req, msg);
1646 	rbd_assert(osd_req == obj_request->osd_req);
1647 	if (obj_request_img_data_test(obj_request)) {
1648 		rbd_assert(obj_request->img_request);
1649 		rbd_assert(obj_request->which != BAD_WHICH);
1650 	} else {
1651 		rbd_assert(obj_request->which == BAD_WHICH);
1652 	}
1653 
1654 	if (osd_req->r_result < 0)
1655 		obj_request->result = osd_req->r_result;
1656 
1657 	BUG_ON(osd_req->r_num_ops > 2);
1658 
1659 	/*
1660 	 * We support a 64-bit length, but ultimately it has to be
1661 	 * passed to blk_end_request(), which takes an unsigned int.
1662 	 */
1663 	obj_request->xferred = osd_req->r_reply_op_len[0];
1664 	rbd_assert(obj_request->xferred < (u64)UINT_MAX);
1665 	opcode = osd_req->r_ops[0].op;
1666 	switch (opcode) {
1667 	case CEPH_OSD_OP_READ:
1668 		rbd_osd_read_callback(obj_request);
1669 		break;
1670 	case CEPH_OSD_OP_WRITE:
1671 		rbd_osd_write_callback(obj_request);
1672 		break;
1673 	case CEPH_OSD_OP_STAT:
1674 		rbd_osd_stat_callback(obj_request);
1675 		break;
1676 	case CEPH_OSD_OP_CALL:
1677 	case CEPH_OSD_OP_NOTIFY_ACK:
1678 	case CEPH_OSD_OP_WATCH:
1679 		rbd_osd_trivial_callback(obj_request);
1680 		break;
1681 	default:
1682 		rbd_warn(NULL, "%s: unsupported op %hu\n",
1683 			obj_request->object_name, (unsigned short) opcode);
1684 		break;
1685 	}
1686 
1687 	if (obj_request_done_test(obj_request))
1688 		rbd_obj_request_complete(obj_request);
1689 }
1690 
1691 static void rbd_osd_req_format_read(struct rbd_obj_request *obj_request)
1692 {
1693 	struct rbd_img_request *img_request = obj_request->img_request;
1694 	struct ceph_osd_request *osd_req = obj_request->osd_req;
1695 	u64 snap_id;
1696 
1697 	rbd_assert(osd_req != NULL);
1698 
1699 	snap_id = img_request ? img_request->snap_id : CEPH_NOSNAP;
1700 	ceph_osdc_build_request(osd_req, obj_request->offset,
1701 			NULL, snap_id, NULL);
1702 }
1703 
1704 static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request)
1705 {
1706 	struct rbd_img_request *img_request = obj_request->img_request;
1707 	struct ceph_osd_request *osd_req = obj_request->osd_req;
1708 	struct ceph_snap_context *snapc;
1709 	struct timespec mtime = CURRENT_TIME;
1710 
1711 	rbd_assert(osd_req != NULL);
1712 
1713 	snapc = img_request ? img_request->snapc : NULL;
1714 	ceph_osdc_build_request(osd_req, obj_request->offset,
1715 			snapc, CEPH_NOSNAP, &mtime);
1716 }
1717 
1718 static struct ceph_osd_request *rbd_osd_req_create(
1719 					struct rbd_device *rbd_dev,
1720 					bool write_request,
1721 					struct rbd_obj_request *obj_request)
1722 {
1723 	struct ceph_snap_context *snapc = NULL;
1724 	struct ceph_osd_client *osdc;
1725 	struct ceph_osd_request *osd_req;
1726 
1727 	if (obj_request_img_data_test(obj_request)) {
1728 		struct rbd_img_request *img_request = obj_request->img_request;
1729 
1730 		rbd_assert(write_request ==
1731 				img_request_write_test(img_request));
1732 		if (write_request)
1733 			snapc = img_request->snapc;
1734 	}
1735 
1736 	/* Allocate and initialize the request, for the single op */
1737 
1738 	osdc = &rbd_dev->rbd_client->client->osdc;
1739 	osd_req = ceph_osdc_alloc_request(osdc, snapc, 1, false, GFP_ATOMIC);
1740 	if (!osd_req)
1741 		return NULL;	/* ENOMEM */
1742 
1743 	if (write_request)
1744 		osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1745 	else
1746 		osd_req->r_flags = CEPH_OSD_FLAG_READ;
1747 
1748 	osd_req->r_callback = rbd_osd_req_callback;
1749 	osd_req->r_priv = obj_request;
1750 
1751 	osd_req->r_base_oloc.pool = ceph_file_layout_pg_pool(rbd_dev->layout);
1752 	ceph_oid_set_name(&osd_req->r_base_oid, obj_request->object_name);
1753 
1754 	return osd_req;
1755 }
1756 
1757 /*
1758  * Create a copyup osd request based on the information in the
1759  * object request supplied.  A copyup request has two osd ops,
1760  * a copyup method call, and a "normal" write request.
1761  */
1762 static struct ceph_osd_request *
1763 rbd_osd_req_create_copyup(struct rbd_obj_request *obj_request)
1764 {
1765 	struct rbd_img_request *img_request;
1766 	struct ceph_snap_context *snapc;
1767 	struct rbd_device *rbd_dev;
1768 	struct ceph_osd_client *osdc;
1769 	struct ceph_osd_request *osd_req;
1770 
1771 	rbd_assert(obj_request_img_data_test(obj_request));
1772 	img_request = obj_request->img_request;
1773 	rbd_assert(img_request);
1774 	rbd_assert(img_request_write_test(img_request));
1775 
1776 	/* Allocate and initialize the request, for the two ops */
1777 
1778 	snapc = img_request->snapc;
1779 	rbd_dev = img_request->rbd_dev;
1780 	osdc = &rbd_dev->rbd_client->client->osdc;
1781 	osd_req = ceph_osdc_alloc_request(osdc, snapc, 2, false, GFP_ATOMIC);
1782 	if (!osd_req)
1783 		return NULL;	/* ENOMEM */
1784 
1785 	osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1786 	osd_req->r_callback = rbd_osd_req_callback;
1787 	osd_req->r_priv = obj_request;
1788 
1789 	osd_req->r_base_oloc.pool = ceph_file_layout_pg_pool(rbd_dev->layout);
1790 	ceph_oid_set_name(&osd_req->r_base_oid, obj_request->object_name);
1791 
1792 	return osd_req;
1793 }
1794 
1795 
1796 static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
1797 {
1798 	ceph_osdc_put_request(osd_req);
1799 }
1800 
1801 /* object_name is assumed to be a non-null pointer and NUL-terminated */
1802 
1803 static struct rbd_obj_request *rbd_obj_request_create(const char *object_name,
1804 						u64 offset, u64 length,
1805 						enum obj_request_type type)
1806 {
1807 	struct rbd_obj_request *obj_request;
1808 	size_t size;
1809 	char *name;
1810 
1811 	rbd_assert(obj_request_type_valid(type));
1812 
1813 	size = strlen(object_name) + 1;
1814 	name = kmalloc(size, GFP_KERNEL);
1815 	if (!name)
1816 		return NULL;
1817 
1818 	obj_request = kmem_cache_zalloc(rbd_obj_request_cache, GFP_KERNEL);
1819 	if (!obj_request) {
1820 		kfree(name);
1821 		return NULL;
1822 	}
1823 
1824 	obj_request->object_name = memcpy(name, object_name, size);
1825 	obj_request->offset = offset;
1826 	obj_request->length = length;
1827 	obj_request->flags = 0;
1828 	obj_request->which = BAD_WHICH;
1829 	obj_request->type = type;
1830 	INIT_LIST_HEAD(&obj_request->links);
1831 	init_completion(&obj_request->completion);
1832 	kref_init(&obj_request->kref);
1833 
1834 	dout("%s: \"%s\" %llu/%llu %d -> obj %p\n", __func__, object_name,
1835 		offset, length, (int)type, obj_request);
1836 
1837 	return obj_request;
1838 }
1839 
1840 static void rbd_obj_request_destroy(struct kref *kref)
1841 {
1842 	struct rbd_obj_request *obj_request;
1843 
1844 	obj_request = container_of(kref, struct rbd_obj_request, kref);
1845 
1846 	dout("%s: obj %p\n", __func__, obj_request);
1847 
1848 	rbd_assert(obj_request->img_request == NULL);
1849 	rbd_assert(obj_request->which == BAD_WHICH);
1850 
1851 	if (obj_request->osd_req)
1852 		rbd_osd_req_destroy(obj_request->osd_req);
1853 
1854 	rbd_assert(obj_request_type_valid(obj_request->type));
1855 	switch (obj_request->type) {
1856 	case OBJ_REQUEST_NODATA:
1857 		break;		/* Nothing to do */
1858 	case OBJ_REQUEST_BIO:
1859 		if (obj_request->bio_list)
1860 			bio_chain_put(obj_request->bio_list);
1861 		break;
1862 	case OBJ_REQUEST_PAGES:
1863 		if (obj_request->pages)
1864 			ceph_release_page_vector(obj_request->pages,
1865 						obj_request->page_count);
1866 		break;
1867 	}
1868 
1869 	kfree(obj_request->object_name);
1870 	obj_request->object_name = NULL;
1871 	kmem_cache_free(rbd_obj_request_cache, obj_request);
1872 }
1873 
1874 /* It's OK to call this for a device with no parent */
1875 
1876 static void rbd_spec_put(struct rbd_spec *spec);
1877 static void rbd_dev_unparent(struct rbd_device *rbd_dev)
1878 {
1879 	rbd_dev_remove_parent(rbd_dev);
1880 	rbd_spec_put(rbd_dev->parent_spec);
1881 	rbd_dev->parent_spec = NULL;
1882 	rbd_dev->parent_overlap = 0;
1883 }
1884 
1885 /*
1886  * Parent image reference counting is used to determine when an
1887  * image's parent fields can be safely torn down--after there are no
1888  * more in-flight requests to the parent image.  When the last
1889  * reference is dropped, cleaning them up is safe.
1890  */
1891 static void rbd_dev_parent_put(struct rbd_device *rbd_dev)
1892 {
1893 	int counter;
1894 
1895 	if (!rbd_dev->parent_spec)
1896 		return;
1897 
1898 	counter = atomic_dec_return_safe(&rbd_dev->parent_ref);
1899 	if (counter > 0)
1900 		return;
1901 
1902 	/* Last reference; clean up parent data structures */
1903 
1904 	if (!counter)
1905 		rbd_dev_unparent(rbd_dev);
1906 	else
1907 		rbd_warn(rbd_dev, "parent reference underflow\n");
1908 }
1909 
1910 /*
1911  * If an image has a non-zero parent overlap, get a reference to its
1912  * parent.
1913  *
1914  * We must get the reference before checking for the overlap to
1915  * coordinate properly with zeroing the parent overlap in
1916  * rbd_dev_v2_parent_info() when an image gets flattened.  We
1917  * drop it again if there is no overlap.
1918  *
1919  * Returns true if the rbd device has a parent with a non-zero
1920  * overlap and a reference for it was successfully taken, or
1921  * false otherwise.
1922  */
1923 static bool rbd_dev_parent_get(struct rbd_device *rbd_dev)
1924 {
1925 	int counter;
1926 
1927 	if (!rbd_dev->parent_spec)
1928 		return false;
1929 
1930 	counter = atomic_inc_return_safe(&rbd_dev->parent_ref);
1931 	if (counter > 0 && rbd_dev->parent_overlap)
1932 		return true;
1933 
1934 	/* Image was flattened, but parent is not yet torn down */
1935 
1936 	if (counter < 0)
1937 		rbd_warn(rbd_dev, "parent reference overflow\n");
1938 
1939 	return false;
1940 }
1941 
1942 /*
1943  * Caller is responsible for filling in the list of object requests
1944  * that comprises the image request, and the Linux request pointer
1945  * (if there is one).
1946  */
1947 static struct rbd_img_request *rbd_img_request_create(
1948 					struct rbd_device *rbd_dev,
1949 					u64 offset, u64 length,
1950 					bool write_request)
1951 {
1952 	struct rbd_img_request *img_request;
1953 
1954 	img_request = kmem_cache_alloc(rbd_img_request_cache, GFP_ATOMIC);
1955 	if (!img_request)
1956 		return NULL;
1957 
1958 	if (write_request) {
1959 		down_read(&rbd_dev->header_rwsem);
1960 		ceph_get_snap_context(rbd_dev->header.snapc);
1961 		up_read(&rbd_dev->header_rwsem);
1962 	}
1963 
1964 	img_request->rq = NULL;
1965 	img_request->rbd_dev = rbd_dev;
1966 	img_request->offset = offset;
1967 	img_request->length = length;
1968 	img_request->flags = 0;
1969 	if (write_request) {
1970 		img_request_write_set(img_request);
1971 		img_request->snapc = rbd_dev->header.snapc;
1972 	} else {
1973 		img_request->snap_id = rbd_dev->spec->snap_id;
1974 	}
1975 	if (rbd_dev_parent_get(rbd_dev))
1976 		img_request_layered_set(img_request);
1977 	spin_lock_init(&img_request->completion_lock);
1978 	img_request->next_completion = 0;
1979 	img_request->callback = NULL;
1980 	img_request->result = 0;
1981 	img_request->obj_request_count = 0;
1982 	INIT_LIST_HEAD(&img_request->obj_requests);
1983 	kref_init(&img_request->kref);
1984 
1985 	dout("%s: rbd_dev %p %s %llu/%llu -> img %p\n", __func__, rbd_dev,
1986 		write_request ? "write" : "read", offset, length,
1987 		img_request);
1988 
1989 	return img_request;
1990 }
1991 
1992 static void rbd_img_request_destroy(struct kref *kref)
1993 {
1994 	struct rbd_img_request *img_request;
1995 	struct rbd_obj_request *obj_request;
1996 	struct rbd_obj_request *next_obj_request;
1997 
1998 	img_request = container_of(kref, struct rbd_img_request, kref);
1999 
2000 	dout("%s: img %p\n", __func__, img_request);
2001 
2002 	for_each_obj_request_safe(img_request, obj_request, next_obj_request)
2003 		rbd_img_obj_request_del(img_request, obj_request);
2004 	rbd_assert(img_request->obj_request_count == 0);
2005 
2006 	if (img_request_layered_test(img_request)) {
2007 		img_request_layered_clear(img_request);
2008 		rbd_dev_parent_put(img_request->rbd_dev);
2009 	}
2010 
2011 	if (img_request_write_test(img_request))
2012 		ceph_put_snap_context(img_request->snapc);
2013 
2014 	kmem_cache_free(rbd_img_request_cache, img_request);
2015 }
2016 
2017 static struct rbd_img_request *rbd_parent_request_create(
2018 					struct rbd_obj_request *obj_request,
2019 					u64 img_offset, u64 length)
2020 {
2021 	struct rbd_img_request *parent_request;
2022 	struct rbd_device *rbd_dev;
2023 
2024 	rbd_assert(obj_request->img_request);
2025 	rbd_dev = obj_request->img_request->rbd_dev;
2026 
2027 	parent_request = rbd_img_request_create(rbd_dev->parent,
2028 						img_offset, length, false);
2029 	if (!parent_request)
2030 		return NULL;
2031 
2032 	img_request_child_set(parent_request);
2033 	rbd_obj_request_get(obj_request);
2034 	parent_request->obj_request = obj_request;
2035 
2036 	return parent_request;
2037 }
2038 
2039 static void rbd_parent_request_destroy(struct kref *kref)
2040 {
2041 	struct rbd_img_request *parent_request;
2042 	struct rbd_obj_request *orig_request;
2043 
2044 	parent_request = container_of(kref, struct rbd_img_request, kref);
2045 	orig_request = parent_request->obj_request;
2046 
2047 	parent_request->obj_request = NULL;
2048 	rbd_obj_request_put(orig_request);
2049 	img_request_child_clear(parent_request);
2050 
2051 	rbd_img_request_destroy(kref);
2052 }
2053 
2054 static bool rbd_img_obj_end_request(struct rbd_obj_request *obj_request)
2055 {
2056 	struct rbd_img_request *img_request;
2057 	unsigned int xferred;
2058 	int result;
2059 	bool more;
2060 
2061 	rbd_assert(obj_request_img_data_test(obj_request));
2062 	img_request = obj_request->img_request;
2063 
2064 	rbd_assert(obj_request->xferred <= (u64)UINT_MAX);
2065 	xferred = (unsigned int)obj_request->xferred;
2066 	result = obj_request->result;
2067 	if (result) {
2068 		struct rbd_device *rbd_dev = img_request->rbd_dev;
2069 
2070 		rbd_warn(rbd_dev, "%s %llx at %llx (%llx)\n",
2071 			img_request_write_test(img_request) ? "write" : "read",
2072 			obj_request->length, obj_request->img_offset,
2073 			obj_request->offset);
2074 		rbd_warn(rbd_dev, "  result %d xferred %x\n",
2075 			result, xferred);
2076 		if (!img_request->result)
2077 			img_request->result = result;
2078 	}
2079 
2080 	/* Image object requests don't own their page array */
2081 
2082 	if (obj_request->type == OBJ_REQUEST_PAGES) {
2083 		obj_request->pages = NULL;
2084 		obj_request->page_count = 0;
2085 	}
2086 
2087 	if (img_request_child_test(img_request)) {
2088 		rbd_assert(img_request->obj_request != NULL);
2089 		more = obj_request->which < img_request->obj_request_count - 1;
2090 	} else {
2091 		rbd_assert(img_request->rq != NULL);
2092 		more = blk_end_request(img_request->rq, result, xferred);
2093 	}
2094 
2095 	return more;
2096 }
2097 
2098 static void rbd_img_obj_callback(struct rbd_obj_request *obj_request)
2099 {
2100 	struct rbd_img_request *img_request;
2101 	u32 which = obj_request->which;
2102 	bool more = true;
2103 
2104 	rbd_assert(obj_request_img_data_test(obj_request));
2105 	img_request = obj_request->img_request;
2106 
2107 	dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
2108 	rbd_assert(img_request != NULL);
2109 	rbd_assert(img_request->obj_request_count > 0);
2110 	rbd_assert(which != BAD_WHICH);
2111 	rbd_assert(which < img_request->obj_request_count);
2112 	rbd_assert(which >= img_request->next_completion);
2113 
2114 	spin_lock_irq(&img_request->completion_lock);
2115 	if (which != img_request->next_completion)
2116 		goto out;
2117 
2118 	for_each_obj_request_from(img_request, obj_request) {
2119 		rbd_assert(more);
2120 		rbd_assert(which < img_request->obj_request_count);
2121 
2122 		if (!obj_request_done_test(obj_request))
2123 			break;
2124 		more = rbd_img_obj_end_request(obj_request);
2125 		which++;
2126 	}
2127 
2128 	rbd_assert(more ^ (which == img_request->obj_request_count));
2129 	img_request->next_completion = which;
2130 out:
2131 	spin_unlock_irq(&img_request->completion_lock);
2132 
2133 	if (!more)
2134 		rbd_img_request_complete(img_request);
2135 }
2136 
2137 /*
2138  * Split up an image request into one or more object requests, each
2139  * to a different object.  The "type" parameter indicates whether
2140  * "data_desc" is the pointer to the head of a list of bio
2141  * structures, or the base of a page array.  In either case this
2142  * function assumes data_desc describes memory sufficient to hold
2143  * all data described by the image request.
2144  */
2145 static int rbd_img_request_fill(struct rbd_img_request *img_request,
2146 					enum obj_request_type type,
2147 					void *data_desc)
2148 {
2149 	struct rbd_device *rbd_dev = img_request->rbd_dev;
2150 	struct rbd_obj_request *obj_request = NULL;
2151 	struct rbd_obj_request *next_obj_request;
2152 	bool write_request = img_request_write_test(img_request);
2153 	struct bio *bio_list = NULL;
2154 	unsigned int bio_offset = 0;
2155 	struct page **pages = NULL;
2156 	u64 img_offset;
2157 	u64 resid;
2158 	u16 opcode;
2159 
2160 	dout("%s: img %p type %d data_desc %p\n", __func__, img_request,
2161 		(int)type, data_desc);
2162 
2163 	opcode = write_request ? CEPH_OSD_OP_WRITE : CEPH_OSD_OP_READ;
2164 	img_offset = img_request->offset;
2165 	resid = img_request->length;
2166 	rbd_assert(resid > 0);
2167 
2168 	if (type == OBJ_REQUEST_BIO) {
2169 		bio_list = data_desc;
2170 		rbd_assert(img_offset ==
2171 			   bio_list->bi_iter.bi_sector << SECTOR_SHIFT);
2172 	} else {
2173 		rbd_assert(type == OBJ_REQUEST_PAGES);
2174 		pages = data_desc;
2175 	}
2176 
2177 	while (resid) {
2178 		struct ceph_osd_request *osd_req;
2179 		const char *object_name;
2180 		u64 offset;
2181 		u64 length;
2182 
2183 		object_name = rbd_segment_name(rbd_dev, img_offset);
2184 		if (!object_name)
2185 			goto out_unwind;
2186 		offset = rbd_segment_offset(rbd_dev, img_offset);
2187 		length = rbd_segment_length(rbd_dev, img_offset, resid);
2188 		obj_request = rbd_obj_request_create(object_name,
2189 						offset, length, type);
2190 		/* object request has its own copy of the object name */
2191 		rbd_segment_name_free(object_name);
2192 		if (!obj_request)
2193 			goto out_unwind;
2194 		/*
2195 		 * set obj_request->img_request before creating the
2196 		 * osd_request so that it gets the right snapc
2197 		 */
2198 		rbd_img_obj_request_add(img_request, obj_request);
2199 
2200 		if (type == OBJ_REQUEST_BIO) {
2201 			unsigned int clone_size;
2202 
2203 			rbd_assert(length <= (u64)UINT_MAX);
2204 			clone_size = (unsigned int)length;
2205 			obj_request->bio_list =
2206 					bio_chain_clone_range(&bio_list,
2207 								&bio_offset,
2208 								clone_size,
2209 								GFP_ATOMIC);
2210 			if (!obj_request->bio_list)
2211 				goto out_partial;
2212 		} else {
2213 			unsigned int page_count;
2214 
2215 			obj_request->pages = pages;
2216 			page_count = (u32)calc_pages_for(offset, length);
2217 			obj_request->page_count = page_count;
2218 			if ((offset + length) & ~PAGE_MASK)
2219 				page_count--;	/* more on last page */
2220 			pages += page_count;
2221 		}
2222 
2223 		osd_req = rbd_osd_req_create(rbd_dev, write_request,
2224 						obj_request);
2225 		if (!osd_req)
2226 			goto out_partial;
2227 		obj_request->osd_req = osd_req;
2228 		obj_request->callback = rbd_img_obj_callback;
2229 
2230 		osd_req_op_extent_init(osd_req, 0, opcode, offset, length,
2231 						0, 0);
2232 		if (type == OBJ_REQUEST_BIO)
2233 			osd_req_op_extent_osd_data_bio(osd_req, 0,
2234 					obj_request->bio_list, length);
2235 		else
2236 			osd_req_op_extent_osd_data_pages(osd_req, 0,
2237 					obj_request->pages, length,
2238 					offset & ~PAGE_MASK, false, false);
2239 
2240 		if (write_request)
2241 			rbd_osd_req_format_write(obj_request);
2242 		else
2243 			rbd_osd_req_format_read(obj_request);
2244 
2245 		obj_request->img_offset = img_offset;
2246 
2247 		img_offset += length;
2248 		resid -= length;
2249 	}
2250 
2251 	return 0;
2252 
2253 out_partial:
2254 	rbd_obj_request_put(obj_request);
2255 out_unwind:
2256 	for_each_obj_request_safe(img_request, obj_request, next_obj_request)
2257 		rbd_obj_request_put(obj_request);
2258 
2259 	return -ENOMEM;
2260 }
2261 
2262 static void
2263 rbd_img_obj_copyup_callback(struct rbd_obj_request *obj_request)
2264 {
2265 	struct rbd_img_request *img_request;
2266 	struct rbd_device *rbd_dev;
2267 	struct page **pages;
2268 	u32 page_count;
2269 
2270 	rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2271 	rbd_assert(obj_request_img_data_test(obj_request));
2272 	img_request = obj_request->img_request;
2273 	rbd_assert(img_request);
2274 
2275 	rbd_dev = img_request->rbd_dev;
2276 	rbd_assert(rbd_dev);
2277 
2278 	pages = obj_request->copyup_pages;
2279 	rbd_assert(pages != NULL);
2280 	obj_request->copyup_pages = NULL;
2281 	page_count = obj_request->copyup_page_count;
2282 	rbd_assert(page_count);
2283 	obj_request->copyup_page_count = 0;
2284 	ceph_release_page_vector(pages, page_count);
2285 
2286 	/*
2287 	 * We want the transfer count to reflect the size of the
2288 	 * original write request.  There is no such thing as a
2289 	 * successful short write, so if the request was successful
2290 	 * we can just set it to the originally-requested length.
2291 	 */
2292 	if (!obj_request->result)
2293 		obj_request->xferred = obj_request->length;
2294 
2295 	/* Finish up with the normal image object callback */
2296 
2297 	rbd_img_obj_callback(obj_request);
2298 }
2299 
2300 static void
2301 rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request)
2302 {
2303 	struct rbd_obj_request *orig_request;
2304 	struct ceph_osd_request *osd_req;
2305 	struct ceph_osd_client *osdc;
2306 	struct rbd_device *rbd_dev;
2307 	struct page **pages;
2308 	u32 page_count;
2309 	int img_result;
2310 	u64 parent_length;
2311 	u64 offset;
2312 	u64 length;
2313 
2314 	rbd_assert(img_request_child_test(img_request));
2315 
2316 	/* First get what we need from the image request */
2317 
2318 	pages = img_request->copyup_pages;
2319 	rbd_assert(pages != NULL);
2320 	img_request->copyup_pages = NULL;
2321 	page_count = img_request->copyup_page_count;
2322 	rbd_assert(page_count);
2323 	img_request->copyup_page_count = 0;
2324 
2325 	orig_request = img_request->obj_request;
2326 	rbd_assert(orig_request != NULL);
2327 	rbd_assert(obj_request_type_valid(orig_request->type));
2328 	img_result = img_request->result;
2329 	parent_length = img_request->length;
2330 	rbd_assert(parent_length == img_request->xferred);
2331 	rbd_img_request_put(img_request);
2332 
2333 	rbd_assert(orig_request->img_request);
2334 	rbd_dev = orig_request->img_request->rbd_dev;
2335 	rbd_assert(rbd_dev);
2336 
2337 	/*
2338 	 * If the overlap has become 0 (most likely because the
2339 	 * image has been flattened) we need to free the pages
2340 	 * and re-submit the original write request.
2341 	 */
2342 	if (!rbd_dev->parent_overlap) {
2343 		struct ceph_osd_client *osdc;
2344 
2345 		ceph_release_page_vector(pages, page_count);
2346 		osdc = &rbd_dev->rbd_client->client->osdc;
2347 		img_result = rbd_obj_request_submit(osdc, orig_request);
2348 		if (!img_result)
2349 			return;
2350 	}
2351 
2352 	if (img_result)
2353 		goto out_err;
2354 
2355 	/*
2356 	 * The original osd request is of no use to use any more.
2357 	 * We need a new one that can hold the two ops in a copyup
2358 	 * request.  Allocate the new copyup osd request for the
2359 	 * original request, and release the old one.
2360 	 */
2361 	img_result = -ENOMEM;
2362 	osd_req = rbd_osd_req_create_copyup(orig_request);
2363 	if (!osd_req)
2364 		goto out_err;
2365 	rbd_osd_req_destroy(orig_request->osd_req);
2366 	orig_request->osd_req = osd_req;
2367 	orig_request->copyup_pages = pages;
2368 	orig_request->copyup_page_count = page_count;
2369 
2370 	/* Initialize the copyup op */
2371 
2372 	osd_req_op_cls_init(osd_req, 0, CEPH_OSD_OP_CALL, "rbd", "copyup");
2373 	osd_req_op_cls_request_data_pages(osd_req, 0, pages, parent_length, 0,
2374 						false, false);
2375 
2376 	/* Then the original write request op */
2377 
2378 	offset = orig_request->offset;
2379 	length = orig_request->length;
2380 	osd_req_op_extent_init(osd_req, 1, CEPH_OSD_OP_WRITE,
2381 					offset, length, 0, 0);
2382 	if (orig_request->type == OBJ_REQUEST_BIO)
2383 		osd_req_op_extent_osd_data_bio(osd_req, 1,
2384 					orig_request->bio_list, length);
2385 	else
2386 		osd_req_op_extent_osd_data_pages(osd_req, 1,
2387 					orig_request->pages, length,
2388 					offset & ~PAGE_MASK, false, false);
2389 
2390 	rbd_osd_req_format_write(orig_request);
2391 
2392 	/* All set, send it off. */
2393 
2394 	orig_request->callback = rbd_img_obj_copyup_callback;
2395 	osdc = &rbd_dev->rbd_client->client->osdc;
2396 	img_result = rbd_obj_request_submit(osdc, orig_request);
2397 	if (!img_result)
2398 		return;
2399 out_err:
2400 	/* Record the error code and complete the request */
2401 
2402 	orig_request->result = img_result;
2403 	orig_request->xferred = 0;
2404 	obj_request_done_set(orig_request);
2405 	rbd_obj_request_complete(orig_request);
2406 }
2407 
2408 /*
2409  * Read from the parent image the range of data that covers the
2410  * entire target of the given object request.  This is used for
2411  * satisfying a layered image write request when the target of an
2412  * object request from the image request does not exist.
2413  *
2414  * A page array big enough to hold the returned data is allocated
2415  * and supplied to rbd_img_request_fill() as the "data descriptor."
2416  * When the read completes, this page array will be transferred to
2417  * the original object request for the copyup operation.
2418  *
2419  * If an error occurs, record it as the result of the original
2420  * object request and mark it done so it gets completed.
2421  */
2422 static int rbd_img_obj_parent_read_full(struct rbd_obj_request *obj_request)
2423 {
2424 	struct rbd_img_request *img_request = NULL;
2425 	struct rbd_img_request *parent_request = NULL;
2426 	struct rbd_device *rbd_dev;
2427 	u64 img_offset;
2428 	u64 length;
2429 	struct page **pages = NULL;
2430 	u32 page_count;
2431 	int result;
2432 
2433 	rbd_assert(obj_request_img_data_test(obj_request));
2434 	rbd_assert(obj_request_type_valid(obj_request->type));
2435 
2436 	img_request = obj_request->img_request;
2437 	rbd_assert(img_request != NULL);
2438 	rbd_dev = img_request->rbd_dev;
2439 	rbd_assert(rbd_dev->parent != NULL);
2440 
2441 	/*
2442 	 * Determine the byte range covered by the object in the
2443 	 * child image to which the original request was to be sent.
2444 	 */
2445 	img_offset = obj_request->img_offset - obj_request->offset;
2446 	length = (u64)1 << rbd_dev->header.obj_order;
2447 
2448 	/*
2449 	 * There is no defined parent data beyond the parent
2450 	 * overlap, so limit what we read at that boundary if
2451 	 * necessary.
2452 	 */
2453 	if (img_offset + length > rbd_dev->parent_overlap) {
2454 		rbd_assert(img_offset < rbd_dev->parent_overlap);
2455 		length = rbd_dev->parent_overlap - img_offset;
2456 	}
2457 
2458 	/*
2459 	 * Allocate a page array big enough to receive the data read
2460 	 * from the parent.
2461 	 */
2462 	page_count = (u32)calc_pages_for(0, length);
2463 	pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2464 	if (IS_ERR(pages)) {
2465 		result = PTR_ERR(pages);
2466 		pages = NULL;
2467 		goto out_err;
2468 	}
2469 
2470 	result = -ENOMEM;
2471 	parent_request = rbd_parent_request_create(obj_request,
2472 						img_offset, length);
2473 	if (!parent_request)
2474 		goto out_err;
2475 
2476 	result = rbd_img_request_fill(parent_request, OBJ_REQUEST_PAGES, pages);
2477 	if (result)
2478 		goto out_err;
2479 	parent_request->copyup_pages = pages;
2480 	parent_request->copyup_page_count = page_count;
2481 
2482 	parent_request->callback = rbd_img_obj_parent_read_full_callback;
2483 	result = rbd_img_request_submit(parent_request);
2484 	if (!result)
2485 		return 0;
2486 
2487 	parent_request->copyup_pages = NULL;
2488 	parent_request->copyup_page_count = 0;
2489 	parent_request->obj_request = NULL;
2490 	rbd_obj_request_put(obj_request);
2491 out_err:
2492 	if (pages)
2493 		ceph_release_page_vector(pages, page_count);
2494 	if (parent_request)
2495 		rbd_img_request_put(parent_request);
2496 	obj_request->result = result;
2497 	obj_request->xferred = 0;
2498 	obj_request_done_set(obj_request);
2499 
2500 	return result;
2501 }
2502 
2503 static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request)
2504 {
2505 	struct rbd_obj_request *orig_request;
2506 	struct rbd_device *rbd_dev;
2507 	int result;
2508 
2509 	rbd_assert(!obj_request_img_data_test(obj_request));
2510 
2511 	/*
2512 	 * All we need from the object request is the original
2513 	 * request and the result of the STAT op.  Grab those, then
2514 	 * we're done with the request.
2515 	 */
2516 	orig_request = obj_request->obj_request;
2517 	obj_request->obj_request = NULL;
2518 	rbd_obj_request_put(orig_request);
2519 	rbd_assert(orig_request);
2520 	rbd_assert(orig_request->img_request);
2521 
2522 	result = obj_request->result;
2523 	obj_request->result = 0;
2524 
2525 	dout("%s: obj %p for obj %p result %d %llu/%llu\n", __func__,
2526 		obj_request, orig_request, result,
2527 		obj_request->xferred, obj_request->length);
2528 	rbd_obj_request_put(obj_request);
2529 
2530 	/*
2531 	 * If the overlap has become 0 (most likely because the
2532 	 * image has been flattened) we need to free the pages
2533 	 * and re-submit the original write request.
2534 	 */
2535 	rbd_dev = orig_request->img_request->rbd_dev;
2536 	if (!rbd_dev->parent_overlap) {
2537 		struct ceph_osd_client *osdc;
2538 
2539 		osdc = &rbd_dev->rbd_client->client->osdc;
2540 		result = rbd_obj_request_submit(osdc, orig_request);
2541 		if (!result)
2542 			return;
2543 	}
2544 
2545 	/*
2546 	 * Our only purpose here is to determine whether the object
2547 	 * exists, and we don't want to treat the non-existence as
2548 	 * an error.  If something else comes back, transfer the
2549 	 * error to the original request and complete it now.
2550 	 */
2551 	if (!result) {
2552 		obj_request_existence_set(orig_request, true);
2553 	} else if (result == -ENOENT) {
2554 		obj_request_existence_set(orig_request, false);
2555 	} else if (result) {
2556 		orig_request->result = result;
2557 		goto out;
2558 	}
2559 
2560 	/*
2561 	 * Resubmit the original request now that we have recorded
2562 	 * whether the target object exists.
2563 	 */
2564 	orig_request->result = rbd_img_obj_request_submit(orig_request);
2565 out:
2566 	if (orig_request->result)
2567 		rbd_obj_request_complete(orig_request);
2568 }
2569 
2570 static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request)
2571 {
2572 	struct rbd_obj_request *stat_request;
2573 	struct rbd_device *rbd_dev;
2574 	struct ceph_osd_client *osdc;
2575 	struct page **pages = NULL;
2576 	u32 page_count;
2577 	size_t size;
2578 	int ret;
2579 
2580 	/*
2581 	 * The response data for a STAT call consists of:
2582 	 *     le64 length;
2583 	 *     struct {
2584 	 *         le32 tv_sec;
2585 	 *         le32 tv_nsec;
2586 	 *     } mtime;
2587 	 */
2588 	size = sizeof (__le64) + sizeof (__le32) + sizeof (__le32);
2589 	page_count = (u32)calc_pages_for(0, size);
2590 	pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2591 	if (IS_ERR(pages))
2592 		return PTR_ERR(pages);
2593 
2594 	ret = -ENOMEM;
2595 	stat_request = rbd_obj_request_create(obj_request->object_name, 0, 0,
2596 							OBJ_REQUEST_PAGES);
2597 	if (!stat_request)
2598 		goto out;
2599 
2600 	rbd_obj_request_get(obj_request);
2601 	stat_request->obj_request = obj_request;
2602 	stat_request->pages = pages;
2603 	stat_request->page_count = page_count;
2604 
2605 	rbd_assert(obj_request->img_request);
2606 	rbd_dev = obj_request->img_request->rbd_dev;
2607 	stat_request->osd_req = rbd_osd_req_create(rbd_dev, false,
2608 						stat_request);
2609 	if (!stat_request->osd_req)
2610 		goto out;
2611 	stat_request->callback = rbd_img_obj_exists_callback;
2612 
2613 	osd_req_op_init(stat_request->osd_req, 0, CEPH_OSD_OP_STAT);
2614 	osd_req_op_raw_data_in_pages(stat_request->osd_req, 0, pages, size, 0,
2615 					false, false);
2616 	rbd_osd_req_format_read(stat_request);
2617 
2618 	osdc = &rbd_dev->rbd_client->client->osdc;
2619 	ret = rbd_obj_request_submit(osdc, stat_request);
2620 out:
2621 	if (ret)
2622 		rbd_obj_request_put(obj_request);
2623 
2624 	return ret;
2625 }
2626 
2627 static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request)
2628 {
2629 	struct rbd_img_request *img_request;
2630 	struct rbd_device *rbd_dev;
2631 	bool known;
2632 
2633 	rbd_assert(obj_request_img_data_test(obj_request));
2634 
2635 	img_request = obj_request->img_request;
2636 	rbd_assert(img_request);
2637 	rbd_dev = img_request->rbd_dev;
2638 
2639 	/*
2640 	 * Only writes to layered images need special handling.
2641 	 * Reads and non-layered writes are simple object requests.
2642 	 * Layered writes that start beyond the end of the overlap
2643 	 * with the parent have no parent data, so they too are
2644 	 * simple object requests.  Finally, if the target object is
2645 	 * known to already exist, its parent data has already been
2646 	 * copied, so a write to the object can also be handled as a
2647 	 * simple object request.
2648 	 */
2649 	if (!img_request_write_test(img_request) ||
2650 		!img_request_layered_test(img_request) ||
2651 		rbd_dev->parent_overlap <= obj_request->img_offset ||
2652 		((known = obj_request_known_test(obj_request)) &&
2653 			obj_request_exists_test(obj_request))) {
2654 
2655 		struct rbd_device *rbd_dev;
2656 		struct ceph_osd_client *osdc;
2657 
2658 		rbd_dev = obj_request->img_request->rbd_dev;
2659 		osdc = &rbd_dev->rbd_client->client->osdc;
2660 
2661 		return rbd_obj_request_submit(osdc, obj_request);
2662 	}
2663 
2664 	/*
2665 	 * It's a layered write.  The target object might exist but
2666 	 * we may not know that yet.  If we know it doesn't exist,
2667 	 * start by reading the data for the full target object from
2668 	 * the parent so we can use it for a copyup to the target.
2669 	 */
2670 	if (known)
2671 		return rbd_img_obj_parent_read_full(obj_request);
2672 
2673 	/* We don't know whether the target exists.  Go find out. */
2674 
2675 	return rbd_img_obj_exists_submit(obj_request);
2676 }
2677 
2678 static int rbd_img_request_submit(struct rbd_img_request *img_request)
2679 {
2680 	struct rbd_obj_request *obj_request;
2681 	struct rbd_obj_request *next_obj_request;
2682 
2683 	dout("%s: img %p\n", __func__, img_request);
2684 	for_each_obj_request_safe(img_request, obj_request, next_obj_request) {
2685 		int ret;
2686 
2687 		ret = rbd_img_obj_request_submit(obj_request);
2688 		if (ret)
2689 			return ret;
2690 	}
2691 
2692 	return 0;
2693 }
2694 
2695 static void rbd_img_parent_read_callback(struct rbd_img_request *img_request)
2696 {
2697 	struct rbd_obj_request *obj_request;
2698 	struct rbd_device *rbd_dev;
2699 	u64 obj_end;
2700 	u64 img_xferred;
2701 	int img_result;
2702 
2703 	rbd_assert(img_request_child_test(img_request));
2704 
2705 	/* First get what we need from the image request and release it */
2706 
2707 	obj_request = img_request->obj_request;
2708 	img_xferred = img_request->xferred;
2709 	img_result = img_request->result;
2710 	rbd_img_request_put(img_request);
2711 
2712 	/*
2713 	 * If the overlap has become 0 (most likely because the
2714 	 * image has been flattened) we need to re-submit the
2715 	 * original request.
2716 	 */
2717 	rbd_assert(obj_request);
2718 	rbd_assert(obj_request->img_request);
2719 	rbd_dev = obj_request->img_request->rbd_dev;
2720 	if (!rbd_dev->parent_overlap) {
2721 		struct ceph_osd_client *osdc;
2722 
2723 		osdc = &rbd_dev->rbd_client->client->osdc;
2724 		img_result = rbd_obj_request_submit(osdc, obj_request);
2725 		if (!img_result)
2726 			return;
2727 	}
2728 
2729 	obj_request->result = img_result;
2730 	if (obj_request->result)
2731 		goto out;
2732 
2733 	/*
2734 	 * We need to zero anything beyond the parent overlap
2735 	 * boundary.  Since rbd_img_obj_request_read_callback()
2736 	 * will zero anything beyond the end of a short read, an
2737 	 * easy way to do this is to pretend the data from the
2738 	 * parent came up short--ending at the overlap boundary.
2739 	 */
2740 	rbd_assert(obj_request->img_offset < U64_MAX - obj_request->length);
2741 	obj_end = obj_request->img_offset + obj_request->length;
2742 	if (obj_end > rbd_dev->parent_overlap) {
2743 		u64 xferred = 0;
2744 
2745 		if (obj_request->img_offset < rbd_dev->parent_overlap)
2746 			xferred = rbd_dev->parent_overlap -
2747 					obj_request->img_offset;
2748 
2749 		obj_request->xferred = min(img_xferred, xferred);
2750 	} else {
2751 		obj_request->xferred = img_xferred;
2752 	}
2753 out:
2754 	rbd_img_obj_request_read_callback(obj_request);
2755 	rbd_obj_request_complete(obj_request);
2756 }
2757 
2758 static void rbd_img_parent_read(struct rbd_obj_request *obj_request)
2759 {
2760 	struct rbd_img_request *img_request;
2761 	int result;
2762 
2763 	rbd_assert(obj_request_img_data_test(obj_request));
2764 	rbd_assert(obj_request->img_request != NULL);
2765 	rbd_assert(obj_request->result == (s32) -ENOENT);
2766 	rbd_assert(obj_request_type_valid(obj_request->type));
2767 
2768 	/* rbd_read_finish(obj_request, obj_request->length); */
2769 	img_request = rbd_parent_request_create(obj_request,
2770 						obj_request->img_offset,
2771 						obj_request->length);
2772 	result = -ENOMEM;
2773 	if (!img_request)
2774 		goto out_err;
2775 
2776 	if (obj_request->type == OBJ_REQUEST_BIO)
2777 		result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
2778 						obj_request->bio_list);
2779 	else
2780 		result = rbd_img_request_fill(img_request, OBJ_REQUEST_PAGES,
2781 						obj_request->pages);
2782 	if (result)
2783 		goto out_err;
2784 
2785 	img_request->callback = rbd_img_parent_read_callback;
2786 	result = rbd_img_request_submit(img_request);
2787 	if (result)
2788 		goto out_err;
2789 
2790 	return;
2791 out_err:
2792 	if (img_request)
2793 		rbd_img_request_put(img_request);
2794 	obj_request->result = result;
2795 	obj_request->xferred = 0;
2796 	obj_request_done_set(obj_request);
2797 }
2798 
2799 static int rbd_obj_notify_ack_sync(struct rbd_device *rbd_dev, u64 notify_id)
2800 {
2801 	struct rbd_obj_request *obj_request;
2802 	struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2803 	int ret;
2804 
2805 	obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
2806 							OBJ_REQUEST_NODATA);
2807 	if (!obj_request)
2808 		return -ENOMEM;
2809 
2810 	ret = -ENOMEM;
2811 	obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2812 	if (!obj_request->osd_req)
2813 		goto out;
2814 
2815 	osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_NOTIFY_ACK,
2816 					notify_id, 0, 0);
2817 	rbd_osd_req_format_read(obj_request);
2818 
2819 	ret = rbd_obj_request_submit(osdc, obj_request);
2820 	if (ret)
2821 		goto out;
2822 	ret = rbd_obj_request_wait(obj_request);
2823 out:
2824 	rbd_obj_request_put(obj_request);
2825 
2826 	return ret;
2827 }
2828 
2829 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
2830 {
2831 	struct rbd_device *rbd_dev = (struct rbd_device *)data;
2832 	int ret;
2833 
2834 	if (!rbd_dev)
2835 		return;
2836 
2837 	dout("%s: \"%s\" notify_id %llu opcode %u\n", __func__,
2838 		rbd_dev->header_name, (unsigned long long)notify_id,
2839 		(unsigned int)opcode);
2840 	ret = rbd_dev_refresh(rbd_dev);
2841 	if (ret)
2842 		rbd_warn(rbd_dev, "header refresh error (%d)\n", ret);
2843 
2844 	rbd_obj_notify_ack_sync(rbd_dev, notify_id);
2845 }
2846 
2847 /*
2848  * Request sync osd watch/unwatch.  The value of "start" determines
2849  * whether a watch request is being initiated or torn down.
2850  */
2851 static int __rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, bool start)
2852 {
2853 	struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2854 	struct rbd_obj_request *obj_request;
2855 	int ret;
2856 
2857 	rbd_assert(start ^ !!rbd_dev->watch_event);
2858 	rbd_assert(start ^ !!rbd_dev->watch_request);
2859 
2860 	if (start) {
2861 		ret = ceph_osdc_create_event(osdc, rbd_watch_cb, rbd_dev,
2862 						&rbd_dev->watch_event);
2863 		if (ret < 0)
2864 			return ret;
2865 		rbd_assert(rbd_dev->watch_event != NULL);
2866 	}
2867 
2868 	ret = -ENOMEM;
2869 	obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
2870 							OBJ_REQUEST_NODATA);
2871 	if (!obj_request)
2872 		goto out_cancel;
2873 
2874 	obj_request->osd_req = rbd_osd_req_create(rbd_dev, true, obj_request);
2875 	if (!obj_request->osd_req)
2876 		goto out_cancel;
2877 
2878 	if (start)
2879 		ceph_osdc_set_request_linger(osdc, obj_request->osd_req);
2880 	else
2881 		ceph_osdc_unregister_linger_request(osdc,
2882 					rbd_dev->watch_request->osd_req);
2883 
2884 	osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_WATCH,
2885 				rbd_dev->watch_event->cookie, 0, start ? 1 : 0);
2886 	rbd_osd_req_format_write(obj_request);
2887 
2888 	ret = rbd_obj_request_submit(osdc, obj_request);
2889 	if (ret)
2890 		goto out_cancel;
2891 	ret = rbd_obj_request_wait(obj_request);
2892 	if (ret)
2893 		goto out_cancel;
2894 	ret = obj_request->result;
2895 	if (ret)
2896 		goto out_cancel;
2897 
2898 	/*
2899 	 * A watch request is set to linger, so the underlying osd
2900 	 * request won't go away until we unregister it.  We retain
2901 	 * a pointer to the object request during that time (in
2902 	 * rbd_dev->watch_request), so we'll keep a reference to
2903 	 * it.  We'll drop that reference (below) after we've
2904 	 * unregistered it.
2905 	 */
2906 	if (start) {
2907 		rbd_dev->watch_request = obj_request;
2908 
2909 		return 0;
2910 	}
2911 
2912 	/* We have successfully torn down the watch request */
2913 
2914 	rbd_obj_request_put(rbd_dev->watch_request);
2915 	rbd_dev->watch_request = NULL;
2916 out_cancel:
2917 	/* Cancel the event if we're tearing down, or on error */
2918 	ceph_osdc_cancel_event(rbd_dev->watch_event);
2919 	rbd_dev->watch_event = NULL;
2920 	if (obj_request)
2921 		rbd_obj_request_put(obj_request);
2922 
2923 	return ret;
2924 }
2925 
2926 static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev)
2927 {
2928 	return __rbd_dev_header_watch_sync(rbd_dev, true);
2929 }
2930 
2931 static void rbd_dev_header_unwatch_sync(struct rbd_device *rbd_dev)
2932 {
2933 	int ret;
2934 
2935 	ret = __rbd_dev_header_watch_sync(rbd_dev, false);
2936 	if (ret) {
2937 		rbd_warn(rbd_dev, "unable to tear down watch request: %d\n",
2938 			 ret);
2939 	}
2940 }
2941 
2942 /*
2943  * Synchronous osd object method call.  Returns the number of bytes
2944  * returned in the outbound buffer, or a negative error code.
2945  */
2946 static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
2947 			     const char *object_name,
2948 			     const char *class_name,
2949 			     const char *method_name,
2950 			     const void *outbound,
2951 			     size_t outbound_size,
2952 			     void *inbound,
2953 			     size_t inbound_size)
2954 {
2955 	struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2956 	struct rbd_obj_request *obj_request;
2957 	struct page **pages;
2958 	u32 page_count;
2959 	int ret;
2960 
2961 	/*
2962 	 * Method calls are ultimately read operations.  The result
2963 	 * should placed into the inbound buffer provided.  They
2964 	 * also supply outbound data--parameters for the object
2965 	 * method.  Currently if this is present it will be a
2966 	 * snapshot id.
2967 	 */
2968 	page_count = (u32)calc_pages_for(0, inbound_size);
2969 	pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2970 	if (IS_ERR(pages))
2971 		return PTR_ERR(pages);
2972 
2973 	ret = -ENOMEM;
2974 	obj_request = rbd_obj_request_create(object_name, 0, inbound_size,
2975 							OBJ_REQUEST_PAGES);
2976 	if (!obj_request)
2977 		goto out;
2978 
2979 	obj_request->pages = pages;
2980 	obj_request->page_count = page_count;
2981 
2982 	obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2983 	if (!obj_request->osd_req)
2984 		goto out;
2985 
2986 	osd_req_op_cls_init(obj_request->osd_req, 0, CEPH_OSD_OP_CALL,
2987 					class_name, method_name);
2988 	if (outbound_size) {
2989 		struct ceph_pagelist *pagelist;
2990 
2991 		pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS);
2992 		if (!pagelist)
2993 			goto out;
2994 
2995 		ceph_pagelist_init(pagelist);
2996 		ceph_pagelist_append(pagelist, outbound, outbound_size);
2997 		osd_req_op_cls_request_data_pagelist(obj_request->osd_req, 0,
2998 						pagelist);
2999 	}
3000 	osd_req_op_cls_response_data_pages(obj_request->osd_req, 0,
3001 					obj_request->pages, inbound_size,
3002 					0, false, false);
3003 	rbd_osd_req_format_read(obj_request);
3004 
3005 	ret = rbd_obj_request_submit(osdc, obj_request);
3006 	if (ret)
3007 		goto out;
3008 	ret = rbd_obj_request_wait(obj_request);
3009 	if (ret)
3010 		goto out;
3011 
3012 	ret = obj_request->result;
3013 	if (ret < 0)
3014 		goto out;
3015 
3016 	rbd_assert(obj_request->xferred < (u64)INT_MAX);
3017 	ret = (int)obj_request->xferred;
3018 	ceph_copy_from_page_vector(pages, inbound, 0, obj_request->xferred);
3019 out:
3020 	if (obj_request)
3021 		rbd_obj_request_put(obj_request);
3022 	else
3023 		ceph_release_page_vector(pages, page_count);
3024 
3025 	return ret;
3026 }
3027 
3028 static void rbd_request_fn(struct request_queue *q)
3029 		__releases(q->queue_lock) __acquires(q->queue_lock)
3030 {
3031 	struct rbd_device *rbd_dev = q->queuedata;
3032 	bool read_only = rbd_dev->mapping.read_only;
3033 	struct request *rq;
3034 	int result;
3035 
3036 	while ((rq = blk_fetch_request(q))) {
3037 		bool write_request = rq_data_dir(rq) == WRITE;
3038 		struct rbd_img_request *img_request;
3039 		u64 offset;
3040 		u64 length;
3041 
3042 		/* Ignore any non-FS requests that filter through. */
3043 
3044 		if (rq->cmd_type != REQ_TYPE_FS) {
3045 			dout("%s: non-fs request type %d\n", __func__,
3046 				(int) rq->cmd_type);
3047 			__blk_end_request_all(rq, 0);
3048 			continue;
3049 		}
3050 
3051 		/* Ignore/skip any zero-length requests */
3052 
3053 		offset = (u64) blk_rq_pos(rq) << SECTOR_SHIFT;
3054 		length = (u64) blk_rq_bytes(rq);
3055 
3056 		if (!length) {
3057 			dout("%s: zero-length request\n", __func__);
3058 			__blk_end_request_all(rq, 0);
3059 			continue;
3060 		}
3061 
3062 		spin_unlock_irq(q->queue_lock);
3063 
3064 		/* Disallow writes to a read-only device */
3065 
3066 		if (write_request) {
3067 			result = -EROFS;
3068 			if (read_only)
3069 				goto end_request;
3070 			rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP);
3071 		}
3072 
3073 		/*
3074 		 * Quit early if the mapped snapshot no longer
3075 		 * exists.  It's still possible the snapshot will
3076 		 * have disappeared by the time our request arrives
3077 		 * at the osd, but there's no sense in sending it if
3078 		 * we already know.
3079 		 */
3080 		if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) {
3081 			dout("request for non-existent snapshot");
3082 			rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
3083 			result = -ENXIO;
3084 			goto end_request;
3085 		}
3086 
3087 		result = -EINVAL;
3088 		if (offset && length > U64_MAX - offset + 1) {
3089 			rbd_warn(rbd_dev, "bad request range (%llu~%llu)\n",
3090 				offset, length);
3091 			goto end_request;	/* Shouldn't happen */
3092 		}
3093 
3094 		result = -EIO;
3095 		if (offset + length > rbd_dev->mapping.size) {
3096 			rbd_warn(rbd_dev, "beyond EOD (%llu~%llu > %llu)\n",
3097 				offset, length, rbd_dev->mapping.size);
3098 			goto end_request;
3099 		}
3100 
3101 		result = -ENOMEM;
3102 		img_request = rbd_img_request_create(rbd_dev, offset, length,
3103 							write_request);
3104 		if (!img_request)
3105 			goto end_request;
3106 
3107 		img_request->rq = rq;
3108 
3109 		result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
3110 						rq->bio);
3111 		if (!result)
3112 			result = rbd_img_request_submit(img_request);
3113 		if (result)
3114 			rbd_img_request_put(img_request);
3115 end_request:
3116 		spin_lock_irq(q->queue_lock);
3117 		if (result < 0) {
3118 			rbd_warn(rbd_dev, "%s %llx at %llx result %d\n",
3119 				write_request ? "write" : "read",
3120 				length, offset, result);
3121 
3122 			__blk_end_request_all(rq, result);
3123 		}
3124 	}
3125 }
3126 
3127 /*
3128  * a queue callback. Makes sure that we don't create a bio that spans across
3129  * multiple osd objects. One exception would be with a single page bios,
3130  * which we handle later at bio_chain_clone_range()
3131  */
3132 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
3133 			  struct bio_vec *bvec)
3134 {
3135 	struct rbd_device *rbd_dev = q->queuedata;
3136 	sector_t sector_offset;
3137 	sector_t sectors_per_obj;
3138 	sector_t obj_sector_offset;
3139 	int ret;
3140 
3141 	/*
3142 	 * Find how far into its rbd object the partition-relative
3143 	 * bio start sector is to offset relative to the enclosing
3144 	 * device.
3145 	 */
3146 	sector_offset = get_start_sect(bmd->bi_bdev) + bmd->bi_sector;
3147 	sectors_per_obj = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
3148 	obj_sector_offset = sector_offset & (sectors_per_obj - 1);
3149 
3150 	/*
3151 	 * Compute the number of bytes from that offset to the end
3152 	 * of the object.  Account for what's already used by the bio.
3153 	 */
3154 	ret = (int) (sectors_per_obj - obj_sector_offset) << SECTOR_SHIFT;
3155 	if (ret > bmd->bi_size)
3156 		ret -= bmd->bi_size;
3157 	else
3158 		ret = 0;
3159 
3160 	/*
3161 	 * Don't send back more than was asked for.  And if the bio
3162 	 * was empty, let the whole thing through because:  "Note
3163 	 * that a block device *must* allow a single page to be
3164 	 * added to an empty bio."
3165 	 */
3166 	rbd_assert(bvec->bv_len <= PAGE_SIZE);
3167 	if (ret > (int) bvec->bv_len || !bmd->bi_size)
3168 		ret = (int) bvec->bv_len;
3169 
3170 	return ret;
3171 }
3172 
3173 static void rbd_free_disk(struct rbd_device *rbd_dev)
3174 {
3175 	struct gendisk *disk = rbd_dev->disk;
3176 
3177 	if (!disk)
3178 		return;
3179 
3180 	rbd_dev->disk = NULL;
3181 	if (disk->flags & GENHD_FL_UP) {
3182 		del_gendisk(disk);
3183 		if (disk->queue)
3184 			blk_cleanup_queue(disk->queue);
3185 	}
3186 	put_disk(disk);
3187 }
3188 
3189 static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
3190 				const char *object_name,
3191 				u64 offset, u64 length, void *buf)
3192 
3193 {
3194 	struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3195 	struct rbd_obj_request *obj_request;
3196 	struct page **pages = NULL;
3197 	u32 page_count;
3198 	size_t size;
3199 	int ret;
3200 
3201 	page_count = (u32) calc_pages_for(offset, length);
3202 	pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
3203 	if (IS_ERR(pages))
3204 		ret = PTR_ERR(pages);
3205 
3206 	ret = -ENOMEM;
3207 	obj_request = rbd_obj_request_create(object_name, offset, length,
3208 							OBJ_REQUEST_PAGES);
3209 	if (!obj_request)
3210 		goto out;
3211 
3212 	obj_request->pages = pages;
3213 	obj_request->page_count = page_count;
3214 
3215 	obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
3216 	if (!obj_request->osd_req)
3217 		goto out;
3218 
3219 	osd_req_op_extent_init(obj_request->osd_req, 0, CEPH_OSD_OP_READ,
3220 					offset, length, 0, 0);
3221 	osd_req_op_extent_osd_data_pages(obj_request->osd_req, 0,
3222 					obj_request->pages,
3223 					obj_request->length,
3224 					obj_request->offset & ~PAGE_MASK,
3225 					false, false);
3226 	rbd_osd_req_format_read(obj_request);
3227 
3228 	ret = rbd_obj_request_submit(osdc, obj_request);
3229 	if (ret)
3230 		goto out;
3231 	ret = rbd_obj_request_wait(obj_request);
3232 	if (ret)
3233 		goto out;
3234 
3235 	ret = obj_request->result;
3236 	if (ret < 0)
3237 		goto out;
3238 
3239 	rbd_assert(obj_request->xferred <= (u64) SIZE_MAX);
3240 	size = (size_t) obj_request->xferred;
3241 	ceph_copy_from_page_vector(pages, buf, 0, size);
3242 	rbd_assert(size <= (size_t)INT_MAX);
3243 	ret = (int)size;
3244 out:
3245 	if (obj_request)
3246 		rbd_obj_request_put(obj_request);
3247 	else
3248 		ceph_release_page_vector(pages, page_count);
3249 
3250 	return ret;
3251 }
3252 
3253 /*
3254  * Read the complete header for the given rbd device.  On successful
3255  * return, the rbd_dev->header field will contain up-to-date
3256  * information about the image.
3257  */
3258 static int rbd_dev_v1_header_info(struct rbd_device *rbd_dev)
3259 {
3260 	struct rbd_image_header_ondisk *ondisk = NULL;
3261 	u32 snap_count = 0;
3262 	u64 names_size = 0;
3263 	u32 want_count;
3264 	int ret;
3265 
3266 	/*
3267 	 * The complete header will include an array of its 64-bit
3268 	 * snapshot ids, followed by the names of those snapshots as
3269 	 * a contiguous block of NUL-terminated strings.  Note that
3270 	 * the number of snapshots could change by the time we read
3271 	 * it in, in which case we re-read it.
3272 	 */
3273 	do {
3274 		size_t size;
3275 
3276 		kfree(ondisk);
3277 
3278 		size = sizeof (*ondisk);
3279 		size += snap_count * sizeof (struct rbd_image_snap_ondisk);
3280 		size += names_size;
3281 		ondisk = kmalloc(size, GFP_KERNEL);
3282 		if (!ondisk)
3283 			return -ENOMEM;
3284 
3285 		ret = rbd_obj_read_sync(rbd_dev, rbd_dev->header_name,
3286 				       0, size, ondisk);
3287 		if (ret < 0)
3288 			goto out;
3289 		if ((size_t)ret < size) {
3290 			ret = -ENXIO;
3291 			rbd_warn(rbd_dev, "short header read (want %zd got %d)",
3292 				size, ret);
3293 			goto out;
3294 		}
3295 		if (!rbd_dev_ondisk_valid(ondisk)) {
3296 			ret = -ENXIO;
3297 			rbd_warn(rbd_dev, "invalid header");
3298 			goto out;
3299 		}
3300 
3301 		names_size = le64_to_cpu(ondisk->snap_names_len);
3302 		want_count = snap_count;
3303 		snap_count = le32_to_cpu(ondisk->snap_count);
3304 	} while (snap_count != want_count);
3305 
3306 	ret = rbd_header_from_disk(rbd_dev, ondisk);
3307 out:
3308 	kfree(ondisk);
3309 
3310 	return ret;
3311 }
3312 
3313 /*
3314  * Clear the rbd device's EXISTS flag if the snapshot it's mapped to
3315  * has disappeared from the (just updated) snapshot context.
3316  */
3317 static void rbd_exists_validate(struct rbd_device *rbd_dev)
3318 {
3319 	u64 snap_id;
3320 
3321 	if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags))
3322 		return;
3323 
3324 	snap_id = rbd_dev->spec->snap_id;
3325 	if (snap_id == CEPH_NOSNAP)
3326 		return;
3327 
3328 	if (rbd_dev_snap_index(rbd_dev, snap_id) == BAD_SNAP_INDEX)
3329 		clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
3330 }
3331 
3332 static void rbd_dev_update_size(struct rbd_device *rbd_dev)
3333 {
3334 	sector_t size;
3335 	bool removing;
3336 
3337 	/*
3338 	 * Don't hold the lock while doing disk operations,
3339 	 * or lock ordering will conflict with the bdev mutex via:
3340 	 * rbd_add() -> blkdev_get() -> rbd_open()
3341 	 */
3342 	spin_lock_irq(&rbd_dev->lock);
3343 	removing = test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags);
3344 	spin_unlock_irq(&rbd_dev->lock);
3345 	/*
3346 	 * If the device is being removed, rbd_dev->disk has
3347 	 * been destroyed, so don't try to update its size
3348 	 */
3349 	if (!removing) {
3350 		size = (sector_t)rbd_dev->mapping.size / SECTOR_SIZE;
3351 		dout("setting size to %llu sectors", (unsigned long long)size);
3352 		set_capacity(rbd_dev->disk, size);
3353 		revalidate_disk(rbd_dev->disk);
3354 	}
3355 }
3356 
3357 static int rbd_dev_refresh(struct rbd_device *rbd_dev)
3358 {
3359 	u64 mapping_size;
3360 	int ret;
3361 
3362 	rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
3363 	down_write(&rbd_dev->header_rwsem);
3364 	mapping_size = rbd_dev->mapping.size;
3365 	if (rbd_dev->image_format == 1)
3366 		ret = rbd_dev_v1_header_info(rbd_dev);
3367 	else
3368 		ret = rbd_dev_v2_header_info(rbd_dev);
3369 
3370 	/* If it's a mapped snapshot, validate its EXISTS flag */
3371 
3372 	rbd_exists_validate(rbd_dev);
3373 	up_write(&rbd_dev->header_rwsem);
3374 
3375 	if (mapping_size != rbd_dev->mapping.size) {
3376 		rbd_dev_update_size(rbd_dev);
3377 	}
3378 
3379 	return ret;
3380 }
3381 
3382 static int rbd_init_disk(struct rbd_device *rbd_dev)
3383 {
3384 	struct gendisk *disk;
3385 	struct request_queue *q;
3386 	u64 segment_size;
3387 
3388 	/* create gendisk info */
3389 	disk = alloc_disk(single_major ?
3390 			  (1 << RBD_SINGLE_MAJOR_PART_SHIFT) :
3391 			  RBD_MINORS_PER_MAJOR);
3392 	if (!disk)
3393 		return -ENOMEM;
3394 
3395 	snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
3396 		 rbd_dev->dev_id);
3397 	disk->major = rbd_dev->major;
3398 	disk->first_minor = rbd_dev->minor;
3399 	if (single_major)
3400 		disk->flags |= GENHD_FL_EXT_DEVT;
3401 	disk->fops = &rbd_bd_ops;
3402 	disk->private_data = rbd_dev;
3403 
3404 	q = blk_init_queue(rbd_request_fn, &rbd_dev->lock);
3405 	if (!q)
3406 		goto out_disk;
3407 
3408 	/* We use the default size, but let's be explicit about it. */
3409 	blk_queue_physical_block_size(q, SECTOR_SIZE);
3410 
3411 	/* set io sizes to object size */
3412 	segment_size = rbd_obj_bytes(&rbd_dev->header);
3413 	blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
3414 	blk_queue_max_segment_size(q, segment_size);
3415 	blk_queue_io_min(q, segment_size);
3416 	blk_queue_io_opt(q, segment_size);
3417 
3418 	blk_queue_merge_bvec(q, rbd_merge_bvec);
3419 	disk->queue = q;
3420 
3421 	q->queuedata = rbd_dev;
3422 
3423 	rbd_dev->disk = disk;
3424 
3425 	return 0;
3426 out_disk:
3427 	put_disk(disk);
3428 
3429 	return -ENOMEM;
3430 }
3431 
3432 /*
3433   sysfs
3434 */
3435 
3436 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
3437 {
3438 	return container_of(dev, struct rbd_device, dev);
3439 }
3440 
3441 static ssize_t rbd_size_show(struct device *dev,
3442 			     struct device_attribute *attr, char *buf)
3443 {
3444 	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3445 
3446 	return sprintf(buf, "%llu\n",
3447 		(unsigned long long)rbd_dev->mapping.size);
3448 }
3449 
3450 /*
3451  * Note this shows the features for whatever's mapped, which is not
3452  * necessarily the base image.
3453  */
3454 static ssize_t rbd_features_show(struct device *dev,
3455 			     struct device_attribute *attr, char *buf)
3456 {
3457 	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3458 
3459 	return sprintf(buf, "0x%016llx\n",
3460 			(unsigned long long)rbd_dev->mapping.features);
3461 }
3462 
3463 static ssize_t rbd_major_show(struct device *dev,
3464 			      struct device_attribute *attr, char *buf)
3465 {
3466 	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3467 
3468 	if (rbd_dev->major)
3469 		return sprintf(buf, "%d\n", rbd_dev->major);
3470 
3471 	return sprintf(buf, "(none)\n");
3472 }
3473 
3474 static ssize_t rbd_minor_show(struct device *dev,
3475 			      struct device_attribute *attr, char *buf)
3476 {
3477 	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3478 
3479 	return sprintf(buf, "%d\n", rbd_dev->minor);
3480 }
3481 
3482 static ssize_t rbd_client_id_show(struct device *dev,
3483 				  struct device_attribute *attr, char *buf)
3484 {
3485 	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3486 
3487 	return sprintf(buf, "client%lld\n",
3488 			ceph_client_id(rbd_dev->rbd_client->client));
3489 }
3490 
3491 static ssize_t rbd_pool_show(struct device *dev,
3492 			     struct device_attribute *attr, char *buf)
3493 {
3494 	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3495 
3496 	return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
3497 }
3498 
3499 static ssize_t rbd_pool_id_show(struct device *dev,
3500 			     struct device_attribute *attr, char *buf)
3501 {
3502 	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3503 
3504 	return sprintf(buf, "%llu\n",
3505 			(unsigned long long) rbd_dev->spec->pool_id);
3506 }
3507 
3508 static ssize_t rbd_name_show(struct device *dev,
3509 			     struct device_attribute *attr, char *buf)
3510 {
3511 	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3512 
3513 	if (rbd_dev->spec->image_name)
3514 		return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
3515 
3516 	return sprintf(buf, "(unknown)\n");
3517 }
3518 
3519 static ssize_t rbd_image_id_show(struct device *dev,
3520 			     struct device_attribute *attr, char *buf)
3521 {
3522 	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3523 
3524 	return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
3525 }
3526 
3527 /*
3528  * Shows the name of the currently-mapped snapshot (or
3529  * RBD_SNAP_HEAD_NAME for the base image).
3530  */
3531 static ssize_t rbd_snap_show(struct device *dev,
3532 			     struct device_attribute *attr,
3533 			     char *buf)
3534 {
3535 	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3536 
3537 	return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
3538 }
3539 
3540 /*
3541  * For an rbd v2 image, shows the pool id, image id, and snapshot id
3542  * for the parent image.  If there is no parent, simply shows
3543  * "(no parent image)".
3544  */
3545 static ssize_t rbd_parent_show(struct device *dev,
3546 			     struct device_attribute *attr,
3547 			     char *buf)
3548 {
3549 	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3550 	struct rbd_spec *spec = rbd_dev->parent_spec;
3551 	int count;
3552 	char *bufp = buf;
3553 
3554 	if (!spec)
3555 		return sprintf(buf, "(no parent image)\n");
3556 
3557 	count = sprintf(bufp, "pool_id %llu\npool_name %s\n",
3558 			(unsigned long long) spec->pool_id, spec->pool_name);
3559 	if (count < 0)
3560 		return count;
3561 	bufp += count;
3562 
3563 	count = sprintf(bufp, "image_id %s\nimage_name %s\n", spec->image_id,
3564 			spec->image_name ? spec->image_name : "(unknown)");
3565 	if (count < 0)
3566 		return count;
3567 	bufp += count;
3568 
3569 	count = sprintf(bufp, "snap_id %llu\nsnap_name %s\n",
3570 			(unsigned long long) spec->snap_id, spec->snap_name);
3571 	if (count < 0)
3572 		return count;
3573 	bufp += count;
3574 
3575 	count = sprintf(bufp, "overlap %llu\n", rbd_dev->parent_overlap);
3576 	if (count < 0)
3577 		return count;
3578 	bufp += count;
3579 
3580 	return (ssize_t) (bufp - buf);
3581 }
3582 
3583 static ssize_t rbd_image_refresh(struct device *dev,
3584 				 struct device_attribute *attr,
3585 				 const char *buf,
3586 				 size_t size)
3587 {
3588 	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3589 	int ret;
3590 
3591 	ret = rbd_dev_refresh(rbd_dev);
3592 	if (ret)
3593 		rbd_warn(rbd_dev, ": manual header refresh error (%d)\n", ret);
3594 
3595 	return ret < 0 ? ret : size;
3596 }
3597 
3598 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
3599 static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
3600 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
3601 static DEVICE_ATTR(minor, S_IRUGO, rbd_minor_show, NULL);
3602 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
3603 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
3604 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
3605 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
3606 static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
3607 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
3608 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
3609 static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL);
3610 
3611 static struct attribute *rbd_attrs[] = {
3612 	&dev_attr_size.attr,
3613 	&dev_attr_features.attr,
3614 	&dev_attr_major.attr,
3615 	&dev_attr_minor.attr,
3616 	&dev_attr_client_id.attr,
3617 	&dev_attr_pool.attr,
3618 	&dev_attr_pool_id.attr,
3619 	&dev_attr_name.attr,
3620 	&dev_attr_image_id.attr,
3621 	&dev_attr_current_snap.attr,
3622 	&dev_attr_parent.attr,
3623 	&dev_attr_refresh.attr,
3624 	NULL
3625 };
3626 
3627 static struct attribute_group rbd_attr_group = {
3628 	.attrs = rbd_attrs,
3629 };
3630 
3631 static const struct attribute_group *rbd_attr_groups[] = {
3632 	&rbd_attr_group,
3633 	NULL
3634 };
3635 
3636 static void rbd_sysfs_dev_release(struct device *dev)
3637 {
3638 }
3639 
3640 static struct device_type rbd_device_type = {
3641 	.name		= "rbd",
3642 	.groups		= rbd_attr_groups,
3643 	.release	= rbd_sysfs_dev_release,
3644 };
3645 
3646 static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
3647 {
3648 	kref_get(&spec->kref);
3649 
3650 	return spec;
3651 }
3652 
3653 static void rbd_spec_free(struct kref *kref);
3654 static void rbd_spec_put(struct rbd_spec *spec)
3655 {
3656 	if (spec)
3657 		kref_put(&spec->kref, rbd_spec_free);
3658 }
3659 
3660 static struct rbd_spec *rbd_spec_alloc(void)
3661 {
3662 	struct rbd_spec *spec;
3663 
3664 	spec = kzalloc(sizeof (*spec), GFP_KERNEL);
3665 	if (!spec)
3666 		return NULL;
3667 	kref_init(&spec->kref);
3668 
3669 	return spec;
3670 }
3671 
3672 static void rbd_spec_free(struct kref *kref)
3673 {
3674 	struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
3675 
3676 	kfree(spec->pool_name);
3677 	kfree(spec->image_id);
3678 	kfree(spec->image_name);
3679 	kfree(spec->snap_name);
3680 	kfree(spec);
3681 }
3682 
3683 static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
3684 				struct rbd_spec *spec)
3685 {
3686 	struct rbd_device *rbd_dev;
3687 
3688 	rbd_dev = kzalloc(sizeof (*rbd_dev), GFP_KERNEL);
3689 	if (!rbd_dev)
3690 		return NULL;
3691 
3692 	spin_lock_init(&rbd_dev->lock);
3693 	rbd_dev->flags = 0;
3694 	atomic_set(&rbd_dev->parent_ref, 0);
3695 	INIT_LIST_HEAD(&rbd_dev->node);
3696 	init_rwsem(&rbd_dev->header_rwsem);
3697 
3698 	rbd_dev->spec = spec;
3699 	rbd_dev->rbd_client = rbdc;
3700 
3701 	/* Initialize the layout used for all rbd requests */
3702 
3703 	rbd_dev->layout.fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
3704 	rbd_dev->layout.fl_stripe_count = cpu_to_le32(1);
3705 	rbd_dev->layout.fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
3706 	rbd_dev->layout.fl_pg_pool = cpu_to_le32((u32) spec->pool_id);
3707 
3708 	return rbd_dev;
3709 }
3710 
3711 static void rbd_dev_destroy(struct rbd_device *rbd_dev)
3712 {
3713 	rbd_put_client(rbd_dev->rbd_client);
3714 	rbd_spec_put(rbd_dev->spec);
3715 	kfree(rbd_dev);
3716 }
3717 
3718 /*
3719  * Get the size and object order for an image snapshot, or if
3720  * snap_id is CEPH_NOSNAP, gets this information for the base
3721  * image.
3722  */
3723 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
3724 				u8 *order, u64 *snap_size)
3725 {
3726 	__le64 snapid = cpu_to_le64(snap_id);
3727 	int ret;
3728 	struct {
3729 		u8 order;
3730 		__le64 size;
3731 	} __attribute__ ((packed)) size_buf = { 0 };
3732 
3733 	ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3734 				"rbd", "get_size",
3735 				&snapid, sizeof (snapid),
3736 				&size_buf, sizeof (size_buf));
3737 	dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3738 	if (ret < 0)
3739 		return ret;
3740 	if (ret < sizeof (size_buf))
3741 		return -ERANGE;
3742 
3743 	if (order) {
3744 		*order = size_buf.order;
3745 		dout("  order %u", (unsigned int)*order);
3746 	}
3747 	*snap_size = le64_to_cpu(size_buf.size);
3748 
3749 	dout("  snap_id 0x%016llx snap_size = %llu\n",
3750 		(unsigned long long)snap_id,
3751 		(unsigned long long)*snap_size);
3752 
3753 	return 0;
3754 }
3755 
3756 static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
3757 {
3758 	return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
3759 					&rbd_dev->header.obj_order,
3760 					&rbd_dev->header.image_size);
3761 }
3762 
3763 static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
3764 {
3765 	void *reply_buf;
3766 	int ret;
3767 	void *p;
3768 
3769 	reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
3770 	if (!reply_buf)
3771 		return -ENOMEM;
3772 
3773 	ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3774 				"rbd", "get_object_prefix", NULL, 0,
3775 				reply_buf, RBD_OBJ_PREFIX_LEN_MAX);
3776 	dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3777 	if (ret < 0)
3778 		goto out;
3779 
3780 	p = reply_buf;
3781 	rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
3782 						p + ret, NULL, GFP_NOIO);
3783 	ret = 0;
3784 
3785 	if (IS_ERR(rbd_dev->header.object_prefix)) {
3786 		ret = PTR_ERR(rbd_dev->header.object_prefix);
3787 		rbd_dev->header.object_prefix = NULL;
3788 	} else {
3789 		dout("  object_prefix = %s\n", rbd_dev->header.object_prefix);
3790 	}
3791 out:
3792 	kfree(reply_buf);
3793 
3794 	return ret;
3795 }
3796 
3797 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
3798 		u64 *snap_features)
3799 {
3800 	__le64 snapid = cpu_to_le64(snap_id);
3801 	struct {
3802 		__le64 features;
3803 		__le64 incompat;
3804 	} __attribute__ ((packed)) features_buf = { 0 };
3805 	u64 incompat;
3806 	int ret;
3807 
3808 	ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3809 				"rbd", "get_features",
3810 				&snapid, sizeof (snapid),
3811 				&features_buf, sizeof (features_buf));
3812 	dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3813 	if (ret < 0)
3814 		return ret;
3815 	if (ret < sizeof (features_buf))
3816 		return -ERANGE;
3817 
3818 	incompat = le64_to_cpu(features_buf.incompat);
3819 	if (incompat & ~RBD_FEATURES_SUPPORTED)
3820 		return -ENXIO;
3821 
3822 	*snap_features = le64_to_cpu(features_buf.features);
3823 
3824 	dout("  snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
3825 		(unsigned long long)snap_id,
3826 		(unsigned long long)*snap_features,
3827 		(unsigned long long)le64_to_cpu(features_buf.incompat));
3828 
3829 	return 0;
3830 }
3831 
3832 static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
3833 {
3834 	return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
3835 						&rbd_dev->header.features);
3836 }
3837 
3838 static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
3839 {
3840 	struct rbd_spec *parent_spec;
3841 	size_t size;
3842 	void *reply_buf = NULL;
3843 	__le64 snapid;
3844 	void *p;
3845 	void *end;
3846 	u64 pool_id;
3847 	char *image_id;
3848 	u64 snap_id;
3849 	u64 overlap;
3850 	int ret;
3851 
3852 	parent_spec = rbd_spec_alloc();
3853 	if (!parent_spec)
3854 		return -ENOMEM;
3855 
3856 	size = sizeof (__le64) +				/* pool_id */
3857 		sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX +	/* image_id */
3858 		sizeof (__le64) +				/* snap_id */
3859 		sizeof (__le64);				/* overlap */
3860 	reply_buf = kmalloc(size, GFP_KERNEL);
3861 	if (!reply_buf) {
3862 		ret = -ENOMEM;
3863 		goto out_err;
3864 	}
3865 
3866 	snapid = cpu_to_le64(CEPH_NOSNAP);
3867 	ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3868 				"rbd", "get_parent",
3869 				&snapid, sizeof (snapid),
3870 				reply_buf, size);
3871 	dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3872 	if (ret < 0)
3873 		goto out_err;
3874 
3875 	p = reply_buf;
3876 	end = reply_buf + ret;
3877 	ret = -ERANGE;
3878 	ceph_decode_64_safe(&p, end, pool_id, out_err);
3879 	if (pool_id == CEPH_NOPOOL) {
3880 		/*
3881 		 * Either the parent never existed, or we have
3882 		 * record of it but the image got flattened so it no
3883 		 * longer has a parent.  When the parent of a
3884 		 * layered image disappears we immediately set the
3885 		 * overlap to 0.  The effect of this is that all new
3886 		 * requests will be treated as if the image had no
3887 		 * parent.
3888 		 */
3889 		if (rbd_dev->parent_overlap) {
3890 			rbd_dev->parent_overlap = 0;
3891 			smp_mb();
3892 			rbd_dev_parent_put(rbd_dev);
3893 			pr_info("%s: clone image has been flattened\n",
3894 				rbd_dev->disk->disk_name);
3895 		}
3896 
3897 		goto out;	/* No parent?  No problem. */
3898 	}
3899 
3900 	/* The ceph file layout needs to fit pool id in 32 bits */
3901 
3902 	ret = -EIO;
3903 	if (pool_id > (u64)U32_MAX) {
3904 		rbd_warn(NULL, "parent pool id too large (%llu > %u)\n",
3905 			(unsigned long long)pool_id, U32_MAX);
3906 		goto out_err;
3907 	}
3908 
3909 	image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
3910 	if (IS_ERR(image_id)) {
3911 		ret = PTR_ERR(image_id);
3912 		goto out_err;
3913 	}
3914 	ceph_decode_64_safe(&p, end, snap_id, out_err);
3915 	ceph_decode_64_safe(&p, end, overlap, out_err);
3916 
3917 	/*
3918 	 * The parent won't change (except when the clone is
3919 	 * flattened, already handled that).  So we only need to
3920 	 * record the parent spec we have not already done so.
3921 	 */
3922 	if (!rbd_dev->parent_spec) {
3923 		parent_spec->pool_id = pool_id;
3924 		parent_spec->image_id = image_id;
3925 		parent_spec->snap_id = snap_id;
3926 		rbd_dev->parent_spec = parent_spec;
3927 		parent_spec = NULL;	/* rbd_dev now owns this */
3928 	}
3929 
3930 	/*
3931 	 * We always update the parent overlap.  If it's zero we
3932 	 * treat it specially.
3933 	 */
3934 	rbd_dev->parent_overlap = overlap;
3935 	smp_mb();
3936 	if (!overlap) {
3937 
3938 		/* A null parent_spec indicates it's the initial probe */
3939 
3940 		if (parent_spec) {
3941 			/*
3942 			 * The overlap has become zero, so the clone
3943 			 * must have been resized down to 0 at some
3944 			 * point.  Treat this the same as a flatten.
3945 			 */
3946 			rbd_dev_parent_put(rbd_dev);
3947 			pr_info("%s: clone image now standalone\n",
3948 				rbd_dev->disk->disk_name);
3949 		} else {
3950 			/*
3951 			 * For the initial probe, if we find the
3952 			 * overlap is zero we just pretend there was
3953 			 * no parent image.
3954 			 */
3955 			rbd_warn(rbd_dev, "ignoring parent of "
3956 						"clone with overlap 0\n");
3957 		}
3958 	}
3959 out:
3960 	ret = 0;
3961 out_err:
3962 	kfree(reply_buf);
3963 	rbd_spec_put(parent_spec);
3964 
3965 	return ret;
3966 }
3967 
3968 static int rbd_dev_v2_striping_info(struct rbd_device *rbd_dev)
3969 {
3970 	struct {
3971 		__le64 stripe_unit;
3972 		__le64 stripe_count;
3973 	} __attribute__ ((packed)) striping_info_buf = { 0 };
3974 	size_t size = sizeof (striping_info_buf);
3975 	void *p;
3976 	u64 obj_size;
3977 	u64 stripe_unit;
3978 	u64 stripe_count;
3979 	int ret;
3980 
3981 	ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3982 				"rbd", "get_stripe_unit_count", NULL, 0,
3983 				(char *)&striping_info_buf, size);
3984 	dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3985 	if (ret < 0)
3986 		return ret;
3987 	if (ret < size)
3988 		return -ERANGE;
3989 
3990 	/*
3991 	 * We don't actually support the "fancy striping" feature
3992 	 * (STRIPINGV2) yet, but if the striping sizes are the
3993 	 * defaults the behavior is the same as before.  So find
3994 	 * out, and only fail if the image has non-default values.
3995 	 */
3996 	ret = -EINVAL;
3997 	obj_size = (u64)1 << rbd_dev->header.obj_order;
3998 	p = &striping_info_buf;
3999 	stripe_unit = ceph_decode_64(&p);
4000 	if (stripe_unit != obj_size) {
4001 		rbd_warn(rbd_dev, "unsupported stripe unit "
4002 				"(got %llu want %llu)",
4003 				stripe_unit, obj_size);
4004 		return -EINVAL;
4005 	}
4006 	stripe_count = ceph_decode_64(&p);
4007 	if (stripe_count != 1) {
4008 		rbd_warn(rbd_dev, "unsupported stripe count "
4009 				"(got %llu want 1)", stripe_count);
4010 		return -EINVAL;
4011 	}
4012 	rbd_dev->header.stripe_unit = stripe_unit;
4013 	rbd_dev->header.stripe_count = stripe_count;
4014 
4015 	return 0;
4016 }
4017 
4018 static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
4019 {
4020 	size_t image_id_size;
4021 	char *image_id;
4022 	void *p;
4023 	void *end;
4024 	size_t size;
4025 	void *reply_buf = NULL;
4026 	size_t len = 0;
4027 	char *image_name = NULL;
4028 	int ret;
4029 
4030 	rbd_assert(!rbd_dev->spec->image_name);
4031 
4032 	len = strlen(rbd_dev->spec->image_id);
4033 	image_id_size = sizeof (__le32) + len;
4034 	image_id = kmalloc(image_id_size, GFP_KERNEL);
4035 	if (!image_id)
4036 		return NULL;
4037 
4038 	p = image_id;
4039 	end = image_id + image_id_size;
4040 	ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32)len);
4041 
4042 	size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
4043 	reply_buf = kmalloc(size, GFP_KERNEL);
4044 	if (!reply_buf)
4045 		goto out;
4046 
4047 	ret = rbd_obj_method_sync(rbd_dev, RBD_DIRECTORY,
4048 				"rbd", "dir_get_name",
4049 				image_id, image_id_size,
4050 				reply_buf, size);
4051 	if (ret < 0)
4052 		goto out;
4053 	p = reply_buf;
4054 	end = reply_buf + ret;
4055 
4056 	image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
4057 	if (IS_ERR(image_name))
4058 		image_name = NULL;
4059 	else
4060 		dout("%s: name is %s len is %zd\n", __func__, image_name, len);
4061 out:
4062 	kfree(reply_buf);
4063 	kfree(image_id);
4064 
4065 	return image_name;
4066 }
4067 
4068 static u64 rbd_v1_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
4069 {
4070 	struct ceph_snap_context *snapc = rbd_dev->header.snapc;
4071 	const char *snap_name;
4072 	u32 which = 0;
4073 
4074 	/* Skip over names until we find the one we are looking for */
4075 
4076 	snap_name = rbd_dev->header.snap_names;
4077 	while (which < snapc->num_snaps) {
4078 		if (!strcmp(name, snap_name))
4079 			return snapc->snaps[which];
4080 		snap_name += strlen(snap_name) + 1;
4081 		which++;
4082 	}
4083 	return CEPH_NOSNAP;
4084 }
4085 
4086 static u64 rbd_v2_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
4087 {
4088 	struct ceph_snap_context *snapc = rbd_dev->header.snapc;
4089 	u32 which;
4090 	bool found = false;
4091 	u64 snap_id;
4092 
4093 	for (which = 0; !found && which < snapc->num_snaps; which++) {
4094 		const char *snap_name;
4095 
4096 		snap_id = snapc->snaps[which];
4097 		snap_name = rbd_dev_v2_snap_name(rbd_dev, snap_id);
4098 		if (IS_ERR(snap_name)) {
4099 			/* ignore no-longer existing snapshots */
4100 			if (PTR_ERR(snap_name) == -ENOENT)
4101 				continue;
4102 			else
4103 				break;
4104 		}
4105 		found = !strcmp(name, snap_name);
4106 		kfree(snap_name);
4107 	}
4108 	return found ? snap_id : CEPH_NOSNAP;
4109 }
4110 
4111 /*
4112  * Assumes name is never RBD_SNAP_HEAD_NAME; returns CEPH_NOSNAP if
4113  * no snapshot by that name is found, or if an error occurs.
4114  */
4115 static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
4116 {
4117 	if (rbd_dev->image_format == 1)
4118 		return rbd_v1_snap_id_by_name(rbd_dev, name);
4119 
4120 	return rbd_v2_snap_id_by_name(rbd_dev, name);
4121 }
4122 
4123 /*
4124  * When an rbd image has a parent image, it is identified by the
4125  * pool, image, and snapshot ids (not names).  This function fills
4126  * in the names for those ids.  (It's OK if we can't figure out the
4127  * name for an image id, but the pool and snapshot ids should always
4128  * exist and have names.)  All names in an rbd spec are dynamically
4129  * allocated.
4130  *
4131  * When an image being mapped (not a parent) is probed, we have the
4132  * pool name and pool id, image name and image id, and the snapshot
4133  * name.  The only thing we're missing is the snapshot id.
4134  */
4135 static int rbd_dev_spec_update(struct rbd_device *rbd_dev)
4136 {
4137 	struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
4138 	struct rbd_spec *spec = rbd_dev->spec;
4139 	const char *pool_name;
4140 	const char *image_name;
4141 	const char *snap_name;
4142 	int ret;
4143 
4144 	/*
4145 	 * An image being mapped will have the pool name (etc.), but
4146 	 * we need to look up the snapshot id.
4147 	 */
4148 	if (spec->pool_name) {
4149 		if (strcmp(spec->snap_name, RBD_SNAP_HEAD_NAME)) {
4150 			u64 snap_id;
4151 
4152 			snap_id = rbd_snap_id_by_name(rbd_dev, spec->snap_name);
4153 			if (snap_id == CEPH_NOSNAP)
4154 				return -ENOENT;
4155 			spec->snap_id = snap_id;
4156 		} else {
4157 			spec->snap_id = CEPH_NOSNAP;
4158 		}
4159 
4160 		return 0;
4161 	}
4162 
4163 	/* Get the pool name; we have to make our own copy of this */
4164 
4165 	pool_name = ceph_pg_pool_name_by_id(osdc->osdmap, spec->pool_id);
4166 	if (!pool_name) {
4167 		rbd_warn(rbd_dev, "no pool with id %llu", spec->pool_id);
4168 		return -EIO;
4169 	}
4170 	pool_name = kstrdup(pool_name, GFP_KERNEL);
4171 	if (!pool_name)
4172 		return -ENOMEM;
4173 
4174 	/* Fetch the image name; tolerate failure here */
4175 
4176 	image_name = rbd_dev_image_name(rbd_dev);
4177 	if (!image_name)
4178 		rbd_warn(rbd_dev, "unable to get image name");
4179 
4180 	/* Look up the snapshot name, and make a copy */
4181 
4182 	snap_name = rbd_snap_name(rbd_dev, spec->snap_id);
4183 	if (IS_ERR(snap_name)) {
4184 		ret = PTR_ERR(snap_name);
4185 		goto out_err;
4186 	}
4187 
4188 	spec->pool_name = pool_name;
4189 	spec->image_name = image_name;
4190 	spec->snap_name = snap_name;
4191 
4192 	return 0;
4193 out_err:
4194 	kfree(image_name);
4195 	kfree(pool_name);
4196 
4197 	return ret;
4198 }
4199 
4200 static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev)
4201 {
4202 	size_t size;
4203 	int ret;
4204 	void *reply_buf;
4205 	void *p;
4206 	void *end;
4207 	u64 seq;
4208 	u32 snap_count;
4209 	struct ceph_snap_context *snapc;
4210 	u32 i;
4211 
4212 	/*
4213 	 * We'll need room for the seq value (maximum snapshot id),
4214 	 * snapshot count, and array of that many snapshot ids.
4215 	 * For now we have a fixed upper limit on the number we're
4216 	 * prepared to receive.
4217 	 */
4218 	size = sizeof (__le64) + sizeof (__le32) +
4219 			RBD_MAX_SNAP_COUNT * sizeof (__le64);
4220 	reply_buf = kzalloc(size, GFP_KERNEL);
4221 	if (!reply_buf)
4222 		return -ENOMEM;
4223 
4224 	ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
4225 				"rbd", "get_snapcontext", NULL, 0,
4226 				reply_buf, size);
4227 	dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4228 	if (ret < 0)
4229 		goto out;
4230 
4231 	p = reply_buf;
4232 	end = reply_buf + ret;
4233 	ret = -ERANGE;
4234 	ceph_decode_64_safe(&p, end, seq, out);
4235 	ceph_decode_32_safe(&p, end, snap_count, out);
4236 
4237 	/*
4238 	 * Make sure the reported number of snapshot ids wouldn't go
4239 	 * beyond the end of our buffer.  But before checking that,
4240 	 * make sure the computed size of the snapshot context we
4241 	 * allocate is representable in a size_t.
4242 	 */
4243 	if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
4244 				 / sizeof (u64)) {
4245 		ret = -EINVAL;
4246 		goto out;
4247 	}
4248 	if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
4249 		goto out;
4250 	ret = 0;
4251 
4252 	snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
4253 	if (!snapc) {
4254 		ret = -ENOMEM;
4255 		goto out;
4256 	}
4257 	snapc->seq = seq;
4258 	for (i = 0; i < snap_count; i++)
4259 		snapc->snaps[i] = ceph_decode_64(&p);
4260 
4261 	ceph_put_snap_context(rbd_dev->header.snapc);
4262 	rbd_dev->header.snapc = snapc;
4263 
4264 	dout("  snap context seq = %llu, snap_count = %u\n",
4265 		(unsigned long long)seq, (unsigned int)snap_count);
4266 out:
4267 	kfree(reply_buf);
4268 
4269 	return ret;
4270 }
4271 
4272 static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
4273 					u64 snap_id)
4274 {
4275 	size_t size;
4276 	void *reply_buf;
4277 	__le64 snapid;
4278 	int ret;
4279 	void *p;
4280 	void *end;
4281 	char *snap_name;
4282 
4283 	size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
4284 	reply_buf = kmalloc(size, GFP_KERNEL);
4285 	if (!reply_buf)
4286 		return ERR_PTR(-ENOMEM);
4287 
4288 	snapid = cpu_to_le64(snap_id);
4289 	ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
4290 				"rbd", "get_snapshot_name",
4291 				&snapid, sizeof (snapid),
4292 				reply_buf, size);
4293 	dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4294 	if (ret < 0) {
4295 		snap_name = ERR_PTR(ret);
4296 		goto out;
4297 	}
4298 
4299 	p = reply_buf;
4300 	end = reply_buf + ret;
4301 	snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
4302 	if (IS_ERR(snap_name))
4303 		goto out;
4304 
4305 	dout("  snap_id 0x%016llx snap_name = %s\n",
4306 		(unsigned long long)snap_id, snap_name);
4307 out:
4308 	kfree(reply_buf);
4309 
4310 	return snap_name;
4311 }
4312 
4313 static int rbd_dev_v2_header_info(struct rbd_device *rbd_dev)
4314 {
4315 	bool first_time = rbd_dev->header.object_prefix == NULL;
4316 	int ret;
4317 
4318 	ret = rbd_dev_v2_image_size(rbd_dev);
4319 	if (ret)
4320 		return ret;
4321 
4322 	if (first_time) {
4323 		ret = rbd_dev_v2_header_onetime(rbd_dev);
4324 		if (ret)
4325 			return ret;
4326 	}
4327 
4328 	/*
4329 	 * If the image supports layering, get the parent info.  We
4330 	 * need to probe the first time regardless.  Thereafter we
4331 	 * only need to if there's a parent, to see if it has
4332 	 * disappeared due to the mapped image getting flattened.
4333 	 */
4334 	if (rbd_dev->header.features & RBD_FEATURE_LAYERING &&
4335 			(first_time || rbd_dev->parent_spec)) {
4336 		bool warn;
4337 
4338 		ret = rbd_dev_v2_parent_info(rbd_dev);
4339 		if (ret)
4340 			return ret;
4341 
4342 		/*
4343 		 * Print a warning if this is the initial probe and
4344 		 * the image has a parent.  Don't print it if the
4345 		 * image now being probed is itself a parent.  We
4346 		 * can tell at this point because we won't know its
4347 		 * pool name yet (just its pool id).
4348 		 */
4349 		warn = rbd_dev->parent_spec && rbd_dev->spec->pool_name;
4350 		if (first_time && warn)
4351 			rbd_warn(rbd_dev, "WARNING: kernel layering "
4352 					"is EXPERIMENTAL!");
4353 	}
4354 
4355 	if (rbd_dev->spec->snap_id == CEPH_NOSNAP)
4356 		if (rbd_dev->mapping.size != rbd_dev->header.image_size)
4357 			rbd_dev->mapping.size = rbd_dev->header.image_size;
4358 
4359 	ret = rbd_dev_v2_snap_context(rbd_dev);
4360 	dout("rbd_dev_v2_snap_context returned %d\n", ret);
4361 
4362 	return ret;
4363 }
4364 
4365 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
4366 {
4367 	struct device *dev;
4368 	int ret;
4369 
4370 	dev = &rbd_dev->dev;
4371 	dev->bus = &rbd_bus_type;
4372 	dev->type = &rbd_device_type;
4373 	dev->parent = &rbd_root_dev;
4374 	dev->release = rbd_dev_device_release;
4375 	dev_set_name(dev, "%d", rbd_dev->dev_id);
4376 	ret = device_register(dev);
4377 
4378 	return ret;
4379 }
4380 
4381 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
4382 {
4383 	device_unregister(&rbd_dev->dev);
4384 }
4385 
4386 /*
4387  * Get a unique rbd identifier for the given new rbd_dev, and add
4388  * the rbd_dev to the global list.
4389  */
4390 static int rbd_dev_id_get(struct rbd_device *rbd_dev)
4391 {
4392 	int new_dev_id;
4393 
4394 	new_dev_id = ida_simple_get(&rbd_dev_id_ida,
4395 				    0, minor_to_rbd_dev_id(1 << MINORBITS),
4396 				    GFP_KERNEL);
4397 	if (new_dev_id < 0)
4398 		return new_dev_id;
4399 
4400 	rbd_dev->dev_id = new_dev_id;
4401 
4402 	spin_lock(&rbd_dev_list_lock);
4403 	list_add_tail(&rbd_dev->node, &rbd_dev_list);
4404 	spin_unlock(&rbd_dev_list_lock);
4405 
4406 	dout("rbd_dev %p given dev id %d\n", rbd_dev, rbd_dev->dev_id);
4407 
4408 	return 0;
4409 }
4410 
4411 /*
4412  * Remove an rbd_dev from the global list, and record that its
4413  * identifier is no longer in use.
4414  */
4415 static void rbd_dev_id_put(struct rbd_device *rbd_dev)
4416 {
4417 	spin_lock(&rbd_dev_list_lock);
4418 	list_del_init(&rbd_dev->node);
4419 	spin_unlock(&rbd_dev_list_lock);
4420 
4421 	ida_simple_remove(&rbd_dev_id_ida, rbd_dev->dev_id);
4422 
4423 	dout("rbd_dev %p released dev id %d\n", rbd_dev, rbd_dev->dev_id);
4424 }
4425 
4426 /*
4427  * Skips over white space at *buf, and updates *buf to point to the
4428  * first found non-space character (if any). Returns the length of
4429  * the token (string of non-white space characters) found.  Note
4430  * that *buf must be terminated with '\0'.
4431  */
4432 static inline size_t next_token(const char **buf)
4433 {
4434         /*
4435         * These are the characters that produce nonzero for
4436         * isspace() in the "C" and "POSIX" locales.
4437         */
4438         const char *spaces = " \f\n\r\t\v";
4439 
4440         *buf += strspn(*buf, spaces);	/* Find start of token */
4441 
4442 	return strcspn(*buf, spaces);   /* Return token length */
4443 }
4444 
4445 /*
4446  * Finds the next token in *buf, and if the provided token buffer is
4447  * big enough, copies the found token into it.  The result, if
4448  * copied, is guaranteed to be terminated with '\0'.  Note that *buf
4449  * must be terminated with '\0' on entry.
4450  *
4451  * Returns the length of the token found (not including the '\0').
4452  * Return value will be 0 if no token is found, and it will be >=
4453  * token_size if the token would not fit.
4454  *
4455  * The *buf pointer will be updated to point beyond the end of the
4456  * found token.  Note that this occurs even if the token buffer is
4457  * too small to hold it.
4458  */
4459 static inline size_t copy_token(const char **buf,
4460 				char *token,
4461 				size_t token_size)
4462 {
4463         size_t len;
4464 
4465 	len = next_token(buf);
4466 	if (len < token_size) {
4467 		memcpy(token, *buf, len);
4468 		*(token + len) = '\0';
4469 	}
4470 	*buf += len;
4471 
4472         return len;
4473 }
4474 
4475 /*
4476  * Finds the next token in *buf, dynamically allocates a buffer big
4477  * enough to hold a copy of it, and copies the token into the new
4478  * buffer.  The copy is guaranteed to be terminated with '\0'.  Note
4479  * that a duplicate buffer is created even for a zero-length token.
4480  *
4481  * Returns a pointer to the newly-allocated duplicate, or a null
4482  * pointer if memory for the duplicate was not available.  If
4483  * the lenp argument is a non-null pointer, the length of the token
4484  * (not including the '\0') is returned in *lenp.
4485  *
4486  * If successful, the *buf pointer will be updated to point beyond
4487  * the end of the found token.
4488  *
4489  * Note: uses GFP_KERNEL for allocation.
4490  */
4491 static inline char *dup_token(const char **buf, size_t *lenp)
4492 {
4493 	char *dup;
4494 	size_t len;
4495 
4496 	len = next_token(buf);
4497 	dup = kmemdup(*buf, len + 1, GFP_KERNEL);
4498 	if (!dup)
4499 		return NULL;
4500 	*(dup + len) = '\0';
4501 	*buf += len;
4502 
4503 	if (lenp)
4504 		*lenp = len;
4505 
4506 	return dup;
4507 }
4508 
4509 /*
4510  * Parse the options provided for an "rbd add" (i.e., rbd image
4511  * mapping) request.  These arrive via a write to /sys/bus/rbd/add,
4512  * and the data written is passed here via a NUL-terminated buffer.
4513  * Returns 0 if successful or an error code otherwise.
4514  *
4515  * The information extracted from these options is recorded in
4516  * the other parameters which return dynamically-allocated
4517  * structures:
4518  *  ceph_opts
4519  *      The address of a pointer that will refer to a ceph options
4520  *      structure.  Caller must release the returned pointer using
4521  *      ceph_destroy_options() when it is no longer needed.
4522  *  rbd_opts
4523  *	Address of an rbd options pointer.  Fully initialized by
4524  *	this function; caller must release with kfree().
4525  *  spec
4526  *	Address of an rbd image specification pointer.  Fully
4527  *	initialized by this function based on parsed options.
4528  *	Caller must release with rbd_spec_put().
4529  *
4530  * The options passed take this form:
4531  *  <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
4532  * where:
4533  *  <mon_addrs>
4534  *      A comma-separated list of one or more monitor addresses.
4535  *      A monitor address is an ip address, optionally followed
4536  *      by a port number (separated by a colon).
4537  *        I.e.:  ip1[:port1][,ip2[:port2]...]
4538  *  <options>
4539  *      A comma-separated list of ceph and/or rbd options.
4540  *  <pool_name>
4541  *      The name of the rados pool containing the rbd image.
4542  *  <image_name>
4543  *      The name of the image in that pool to map.
4544  *  <snap_id>
4545  *      An optional snapshot id.  If provided, the mapping will
4546  *      present data from the image at the time that snapshot was
4547  *      created.  The image head is used if no snapshot id is
4548  *      provided.  Snapshot mappings are always read-only.
4549  */
4550 static int rbd_add_parse_args(const char *buf,
4551 				struct ceph_options **ceph_opts,
4552 				struct rbd_options **opts,
4553 				struct rbd_spec **rbd_spec)
4554 {
4555 	size_t len;
4556 	char *options;
4557 	const char *mon_addrs;
4558 	char *snap_name;
4559 	size_t mon_addrs_size;
4560 	struct rbd_spec *spec = NULL;
4561 	struct rbd_options *rbd_opts = NULL;
4562 	struct ceph_options *copts;
4563 	int ret;
4564 
4565 	/* The first four tokens are required */
4566 
4567 	len = next_token(&buf);
4568 	if (!len) {
4569 		rbd_warn(NULL, "no monitor address(es) provided");
4570 		return -EINVAL;
4571 	}
4572 	mon_addrs = buf;
4573 	mon_addrs_size = len + 1;
4574 	buf += len;
4575 
4576 	ret = -EINVAL;
4577 	options = dup_token(&buf, NULL);
4578 	if (!options)
4579 		return -ENOMEM;
4580 	if (!*options) {
4581 		rbd_warn(NULL, "no options provided");
4582 		goto out_err;
4583 	}
4584 
4585 	spec = rbd_spec_alloc();
4586 	if (!spec)
4587 		goto out_mem;
4588 
4589 	spec->pool_name = dup_token(&buf, NULL);
4590 	if (!spec->pool_name)
4591 		goto out_mem;
4592 	if (!*spec->pool_name) {
4593 		rbd_warn(NULL, "no pool name provided");
4594 		goto out_err;
4595 	}
4596 
4597 	spec->image_name = dup_token(&buf, NULL);
4598 	if (!spec->image_name)
4599 		goto out_mem;
4600 	if (!*spec->image_name) {
4601 		rbd_warn(NULL, "no image name provided");
4602 		goto out_err;
4603 	}
4604 
4605 	/*
4606 	 * Snapshot name is optional; default is to use "-"
4607 	 * (indicating the head/no snapshot).
4608 	 */
4609 	len = next_token(&buf);
4610 	if (!len) {
4611 		buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
4612 		len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
4613 	} else if (len > RBD_MAX_SNAP_NAME_LEN) {
4614 		ret = -ENAMETOOLONG;
4615 		goto out_err;
4616 	}
4617 	snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
4618 	if (!snap_name)
4619 		goto out_mem;
4620 	*(snap_name + len) = '\0';
4621 	spec->snap_name = snap_name;
4622 
4623 	/* Initialize all rbd options to the defaults */
4624 
4625 	rbd_opts = kzalloc(sizeof (*rbd_opts), GFP_KERNEL);
4626 	if (!rbd_opts)
4627 		goto out_mem;
4628 
4629 	rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
4630 
4631 	copts = ceph_parse_options(options, mon_addrs,
4632 					mon_addrs + mon_addrs_size - 1,
4633 					parse_rbd_opts_token, rbd_opts);
4634 	if (IS_ERR(copts)) {
4635 		ret = PTR_ERR(copts);
4636 		goto out_err;
4637 	}
4638 	kfree(options);
4639 
4640 	*ceph_opts = copts;
4641 	*opts = rbd_opts;
4642 	*rbd_spec = spec;
4643 
4644 	return 0;
4645 out_mem:
4646 	ret = -ENOMEM;
4647 out_err:
4648 	kfree(rbd_opts);
4649 	rbd_spec_put(spec);
4650 	kfree(options);
4651 
4652 	return ret;
4653 }
4654 
4655 /*
4656  * An rbd format 2 image has a unique identifier, distinct from the
4657  * name given to it by the user.  Internally, that identifier is
4658  * what's used to specify the names of objects related to the image.
4659  *
4660  * A special "rbd id" object is used to map an rbd image name to its
4661  * id.  If that object doesn't exist, then there is no v2 rbd image
4662  * with the supplied name.
4663  *
4664  * This function will record the given rbd_dev's image_id field if
4665  * it can be determined, and in that case will return 0.  If any
4666  * errors occur a negative errno will be returned and the rbd_dev's
4667  * image_id field will be unchanged (and should be NULL).
4668  */
4669 static int rbd_dev_image_id(struct rbd_device *rbd_dev)
4670 {
4671 	int ret;
4672 	size_t size;
4673 	char *object_name;
4674 	void *response;
4675 	char *image_id;
4676 
4677 	/*
4678 	 * When probing a parent image, the image id is already
4679 	 * known (and the image name likely is not).  There's no
4680 	 * need to fetch the image id again in this case.  We
4681 	 * do still need to set the image format though.
4682 	 */
4683 	if (rbd_dev->spec->image_id) {
4684 		rbd_dev->image_format = *rbd_dev->spec->image_id ? 2 : 1;
4685 
4686 		return 0;
4687 	}
4688 
4689 	/*
4690 	 * First, see if the format 2 image id file exists, and if
4691 	 * so, get the image's persistent id from it.
4692 	 */
4693 	size = sizeof (RBD_ID_PREFIX) + strlen(rbd_dev->spec->image_name);
4694 	object_name = kmalloc(size, GFP_NOIO);
4695 	if (!object_name)
4696 		return -ENOMEM;
4697 	sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->spec->image_name);
4698 	dout("rbd id object name is %s\n", object_name);
4699 
4700 	/* Response will be an encoded string, which includes a length */
4701 
4702 	size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
4703 	response = kzalloc(size, GFP_NOIO);
4704 	if (!response) {
4705 		ret = -ENOMEM;
4706 		goto out;
4707 	}
4708 
4709 	/* If it doesn't exist we'll assume it's a format 1 image */
4710 
4711 	ret = rbd_obj_method_sync(rbd_dev, object_name,
4712 				"rbd", "get_id", NULL, 0,
4713 				response, RBD_IMAGE_ID_LEN_MAX);
4714 	dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4715 	if (ret == -ENOENT) {
4716 		image_id = kstrdup("", GFP_KERNEL);
4717 		ret = image_id ? 0 : -ENOMEM;
4718 		if (!ret)
4719 			rbd_dev->image_format = 1;
4720 	} else if (ret > sizeof (__le32)) {
4721 		void *p = response;
4722 
4723 		image_id = ceph_extract_encoded_string(&p, p + ret,
4724 						NULL, GFP_NOIO);
4725 		ret = IS_ERR(image_id) ? PTR_ERR(image_id) : 0;
4726 		if (!ret)
4727 			rbd_dev->image_format = 2;
4728 	} else {
4729 		ret = -EINVAL;
4730 	}
4731 
4732 	if (!ret) {
4733 		rbd_dev->spec->image_id = image_id;
4734 		dout("image_id is %s\n", image_id);
4735 	}
4736 out:
4737 	kfree(response);
4738 	kfree(object_name);
4739 
4740 	return ret;
4741 }
4742 
4743 /*
4744  * Undo whatever state changes are made by v1 or v2 header info
4745  * call.
4746  */
4747 static void rbd_dev_unprobe(struct rbd_device *rbd_dev)
4748 {
4749 	struct rbd_image_header	*header;
4750 
4751 	/* Drop parent reference unless it's already been done (or none) */
4752 
4753 	if (rbd_dev->parent_overlap)
4754 		rbd_dev_parent_put(rbd_dev);
4755 
4756 	/* Free dynamic fields from the header, then zero it out */
4757 
4758 	header = &rbd_dev->header;
4759 	ceph_put_snap_context(header->snapc);
4760 	kfree(header->snap_sizes);
4761 	kfree(header->snap_names);
4762 	kfree(header->object_prefix);
4763 	memset(header, 0, sizeof (*header));
4764 }
4765 
4766 static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev)
4767 {
4768 	int ret;
4769 
4770 	ret = rbd_dev_v2_object_prefix(rbd_dev);
4771 	if (ret)
4772 		goto out_err;
4773 
4774 	/*
4775 	 * Get the and check features for the image.  Currently the
4776 	 * features are assumed to never change.
4777 	 */
4778 	ret = rbd_dev_v2_features(rbd_dev);
4779 	if (ret)
4780 		goto out_err;
4781 
4782 	/* If the image supports fancy striping, get its parameters */
4783 
4784 	if (rbd_dev->header.features & RBD_FEATURE_STRIPINGV2) {
4785 		ret = rbd_dev_v2_striping_info(rbd_dev);
4786 		if (ret < 0)
4787 			goto out_err;
4788 	}
4789 	/* No support for crypto and compression type format 2 images */
4790 
4791 	return 0;
4792 out_err:
4793 	rbd_dev->header.features = 0;
4794 	kfree(rbd_dev->header.object_prefix);
4795 	rbd_dev->header.object_prefix = NULL;
4796 
4797 	return ret;
4798 }
4799 
4800 static int rbd_dev_probe_parent(struct rbd_device *rbd_dev)
4801 {
4802 	struct rbd_device *parent = NULL;
4803 	struct rbd_spec *parent_spec;
4804 	struct rbd_client *rbdc;
4805 	int ret;
4806 
4807 	if (!rbd_dev->parent_spec)
4808 		return 0;
4809 	/*
4810 	 * We need to pass a reference to the client and the parent
4811 	 * spec when creating the parent rbd_dev.  Images related by
4812 	 * parent/child relationships always share both.
4813 	 */
4814 	parent_spec = rbd_spec_get(rbd_dev->parent_spec);
4815 	rbdc = __rbd_get_client(rbd_dev->rbd_client);
4816 
4817 	ret = -ENOMEM;
4818 	parent = rbd_dev_create(rbdc, parent_spec);
4819 	if (!parent)
4820 		goto out_err;
4821 
4822 	ret = rbd_dev_image_probe(parent, false);
4823 	if (ret < 0)
4824 		goto out_err;
4825 	rbd_dev->parent = parent;
4826 	atomic_set(&rbd_dev->parent_ref, 1);
4827 
4828 	return 0;
4829 out_err:
4830 	if (parent) {
4831 		rbd_dev_unparent(rbd_dev);
4832 		kfree(rbd_dev->header_name);
4833 		rbd_dev_destroy(parent);
4834 	} else {
4835 		rbd_put_client(rbdc);
4836 		rbd_spec_put(parent_spec);
4837 	}
4838 
4839 	return ret;
4840 }
4841 
4842 static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
4843 {
4844 	int ret;
4845 
4846 	/* Get an id and fill in device name. */
4847 
4848 	ret = rbd_dev_id_get(rbd_dev);
4849 	if (ret)
4850 		return ret;
4851 
4852 	BUILD_BUG_ON(DEV_NAME_LEN
4853 			< sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
4854 	sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
4855 
4856 	/* Record our major and minor device numbers. */
4857 
4858 	if (!single_major) {
4859 		ret = register_blkdev(0, rbd_dev->name);
4860 		if (ret < 0)
4861 			goto err_out_id;
4862 
4863 		rbd_dev->major = ret;
4864 		rbd_dev->minor = 0;
4865 	} else {
4866 		rbd_dev->major = rbd_major;
4867 		rbd_dev->minor = rbd_dev_id_to_minor(rbd_dev->dev_id);
4868 	}
4869 
4870 	/* Set up the blkdev mapping. */
4871 
4872 	ret = rbd_init_disk(rbd_dev);
4873 	if (ret)
4874 		goto err_out_blkdev;
4875 
4876 	ret = rbd_dev_mapping_set(rbd_dev);
4877 	if (ret)
4878 		goto err_out_disk;
4879 	set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
4880 
4881 	ret = rbd_bus_add_dev(rbd_dev);
4882 	if (ret)
4883 		goto err_out_mapping;
4884 
4885 	/* Everything's ready.  Announce the disk to the world. */
4886 
4887 	set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
4888 	add_disk(rbd_dev->disk);
4889 
4890 	pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
4891 		(unsigned long long) rbd_dev->mapping.size);
4892 
4893 	return ret;
4894 
4895 err_out_mapping:
4896 	rbd_dev_mapping_clear(rbd_dev);
4897 err_out_disk:
4898 	rbd_free_disk(rbd_dev);
4899 err_out_blkdev:
4900 	if (!single_major)
4901 		unregister_blkdev(rbd_dev->major, rbd_dev->name);
4902 err_out_id:
4903 	rbd_dev_id_put(rbd_dev);
4904 	rbd_dev_mapping_clear(rbd_dev);
4905 
4906 	return ret;
4907 }
4908 
4909 static int rbd_dev_header_name(struct rbd_device *rbd_dev)
4910 {
4911 	struct rbd_spec *spec = rbd_dev->spec;
4912 	size_t size;
4913 
4914 	/* Record the header object name for this rbd image. */
4915 
4916 	rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
4917 
4918 	if (rbd_dev->image_format == 1)
4919 		size = strlen(spec->image_name) + sizeof (RBD_SUFFIX);
4920 	else
4921 		size = sizeof (RBD_HEADER_PREFIX) + strlen(spec->image_id);
4922 
4923 	rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
4924 	if (!rbd_dev->header_name)
4925 		return -ENOMEM;
4926 
4927 	if (rbd_dev->image_format == 1)
4928 		sprintf(rbd_dev->header_name, "%s%s",
4929 			spec->image_name, RBD_SUFFIX);
4930 	else
4931 		sprintf(rbd_dev->header_name, "%s%s",
4932 			RBD_HEADER_PREFIX, spec->image_id);
4933 	return 0;
4934 }
4935 
4936 static void rbd_dev_image_release(struct rbd_device *rbd_dev)
4937 {
4938 	rbd_dev_unprobe(rbd_dev);
4939 	kfree(rbd_dev->header_name);
4940 	rbd_dev->header_name = NULL;
4941 	rbd_dev->image_format = 0;
4942 	kfree(rbd_dev->spec->image_id);
4943 	rbd_dev->spec->image_id = NULL;
4944 
4945 	rbd_dev_destroy(rbd_dev);
4946 }
4947 
4948 /*
4949  * Probe for the existence of the header object for the given rbd
4950  * device.  If this image is the one being mapped (i.e., not a
4951  * parent), initiate a watch on its header object before using that
4952  * object to get detailed information about the rbd image.
4953  */
4954 static int rbd_dev_image_probe(struct rbd_device *rbd_dev, bool mapping)
4955 {
4956 	int ret;
4957 
4958 	/*
4959 	 * Get the id from the image id object.  Unless there's an
4960 	 * error, rbd_dev->spec->image_id will be filled in with
4961 	 * a dynamically-allocated string, and rbd_dev->image_format
4962 	 * will be set to either 1 or 2.
4963 	 */
4964 	ret = rbd_dev_image_id(rbd_dev);
4965 	if (ret)
4966 		return ret;
4967 	rbd_assert(rbd_dev->spec->image_id);
4968 	rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
4969 
4970 	ret = rbd_dev_header_name(rbd_dev);
4971 	if (ret)
4972 		goto err_out_format;
4973 
4974 	if (mapping) {
4975 		ret = rbd_dev_header_watch_sync(rbd_dev);
4976 		if (ret)
4977 			goto out_header_name;
4978 	}
4979 
4980 	if (rbd_dev->image_format == 1)
4981 		ret = rbd_dev_v1_header_info(rbd_dev);
4982 	else
4983 		ret = rbd_dev_v2_header_info(rbd_dev);
4984 	if (ret)
4985 		goto err_out_watch;
4986 
4987 	ret = rbd_dev_spec_update(rbd_dev);
4988 	if (ret)
4989 		goto err_out_probe;
4990 
4991 	ret = rbd_dev_probe_parent(rbd_dev);
4992 	if (ret)
4993 		goto err_out_probe;
4994 
4995 	dout("discovered format %u image, header name is %s\n",
4996 		rbd_dev->image_format, rbd_dev->header_name);
4997 
4998 	return 0;
4999 err_out_probe:
5000 	rbd_dev_unprobe(rbd_dev);
5001 err_out_watch:
5002 	if (mapping)
5003 		rbd_dev_header_unwatch_sync(rbd_dev);
5004 out_header_name:
5005 	kfree(rbd_dev->header_name);
5006 	rbd_dev->header_name = NULL;
5007 err_out_format:
5008 	rbd_dev->image_format = 0;
5009 	kfree(rbd_dev->spec->image_id);
5010 	rbd_dev->spec->image_id = NULL;
5011 
5012 	dout("probe failed, returning %d\n", ret);
5013 
5014 	return ret;
5015 }
5016 
5017 static ssize_t do_rbd_add(struct bus_type *bus,
5018 			  const char *buf,
5019 			  size_t count)
5020 {
5021 	struct rbd_device *rbd_dev = NULL;
5022 	struct ceph_options *ceph_opts = NULL;
5023 	struct rbd_options *rbd_opts = NULL;
5024 	struct rbd_spec *spec = NULL;
5025 	struct rbd_client *rbdc;
5026 	struct ceph_osd_client *osdc;
5027 	bool read_only;
5028 	int rc = -ENOMEM;
5029 
5030 	if (!try_module_get(THIS_MODULE))
5031 		return -ENODEV;
5032 
5033 	/* parse add command */
5034 	rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
5035 	if (rc < 0)
5036 		goto err_out_module;
5037 	read_only = rbd_opts->read_only;
5038 	kfree(rbd_opts);
5039 	rbd_opts = NULL;	/* done with this */
5040 
5041 	rbdc = rbd_get_client(ceph_opts);
5042 	if (IS_ERR(rbdc)) {
5043 		rc = PTR_ERR(rbdc);
5044 		goto err_out_args;
5045 	}
5046 
5047 	/* pick the pool */
5048 	osdc = &rbdc->client->osdc;
5049 	rc = ceph_pg_poolid_by_name(osdc->osdmap, spec->pool_name);
5050 	if (rc < 0)
5051 		goto err_out_client;
5052 	spec->pool_id = (u64)rc;
5053 
5054 	/* The ceph file layout needs to fit pool id in 32 bits */
5055 
5056 	if (spec->pool_id > (u64)U32_MAX) {
5057 		rbd_warn(NULL, "pool id too large (%llu > %u)\n",
5058 				(unsigned long long)spec->pool_id, U32_MAX);
5059 		rc = -EIO;
5060 		goto err_out_client;
5061 	}
5062 
5063 	rbd_dev = rbd_dev_create(rbdc, spec);
5064 	if (!rbd_dev)
5065 		goto err_out_client;
5066 	rbdc = NULL;		/* rbd_dev now owns this */
5067 	spec = NULL;		/* rbd_dev now owns this */
5068 
5069 	rc = rbd_dev_image_probe(rbd_dev, true);
5070 	if (rc < 0)
5071 		goto err_out_rbd_dev;
5072 
5073 	/* If we are mapping a snapshot it must be marked read-only */
5074 
5075 	if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
5076 		read_only = true;
5077 	rbd_dev->mapping.read_only = read_only;
5078 
5079 	rc = rbd_dev_device_setup(rbd_dev);
5080 	if (rc) {
5081 		/*
5082 		 * rbd_dev_header_unwatch_sync() can't be moved into
5083 		 * rbd_dev_image_release() without refactoring, see
5084 		 * commit 1f3ef78861ac.
5085 		 */
5086 		rbd_dev_header_unwatch_sync(rbd_dev);
5087 		rbd_dev_image_release(rbd_dev);
5088 		goto err_out_module;
5089 	}
5090 
5091 	return count;
5092 
5093 err_out_rbd_dev:
5094 	rbd_dev_destroy(rbd_dev);
5095 err_out_client:
5096 	rbd_put_client(rbdc);
5097 err_out_args:
5098 	rbd_spec_put(spec);
5099 err_out_module:
5100 	module_put(THIS_MODULE);
5101 
5102 	dout("Error adding device %s\n", buf);
5103 
5104 	return (ssize_t)rc;
5105 }
5106 
5107 static ssize_t rbd_add(struct bus_type *bus,
5108 		       const char *buf,
5109 		       size_t count)
5110 {
5111 	if (single_major)
5112 		return -EINVAL;
5113 
5114 	return do_rbd_add(bus, buf, count);
5115 }
5116 
5117 static ssize_t rbd_add_single_major(struct bus_type *bus,
5118 				    const char *buf,
5119 				    size_t count)
5120 {
5121 	return do_rbd_add(bus, buf, count);
5122 }
5123 
5124 static void rbd_dev_device_release(struct device *dev)
5125 {
5126 	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5127 
5128 	rbd_free_disk(rbd_dev);
5129 	clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
5130 	rbd_dev_mapping_clear(rbd_dev);
5131 	if (!single_major)
5132 		unregister_blkdev(rbd_dev->major, rbd_dev->name);
5133 	rbd_dev_id_put(rbd_dev);
5134 	rbd_dev_mapping_clear(rbd_dev);
5135 }
5136 
5137 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev)
5138 {
5139 	while (rbd_dev->parent) {
5140 		struct rbd_device *first = rbd_dev;
5141 		struct rbd_device *second = first->parent;
5142 		struct rbd_device *third;
5143 
5144 		/*
5145 		 * Follow to the parent with no grandparent and
5146 		 * remove it.
5147 		 */
5148 		while (second && (third = second->parent)) {
5149 			first = second;
5150 			second = third;
5151 		}
5152 		rbd_assert(second);
5153 		rbd_dev_image_release(second);
5154 		first->parent = NULL;
5155 		first->parent_overlap = 0;
5156 
5157 		rbd_assert(first->parent_spec);
5158 		rbd_spec_put(first->parent_spec);
5159 		first->parent_spec = NULL;
5160 	}
5161 }
5162 
5163 static ssize_t do_rbd_remove(struct bus_type *bus,
5164 			     const char *buf,
5165 			     size_t count)
5166 {
5167 	struct rbd_device *rbd_dev = NULL;
5168 	struct list_head *tmp;
5169 	int dev_id;
5170 	unsigned long ul;
5171 	bool already = false;
5172 	int ret;
5173 
5174 	ret = kstrtoul(buf, 10, &ul);
5175 	if (ret)
5176 		return ret;
5177 
5178 	/* convert to int; abort if we lost anything in the conversion */
5179 	dev_id = (int)ul;
5180 	if (dev_id != ul)
5181 		return -EINVAL;
5182 
5183 	ret = -ENOENT;
5184 	spin_lock(&rbd_dev_list_lock);
5185 	list_for_each(tmp, &rbd_dev_list) {
5186 		rbd_dev = list_entry(tmp, struct rbd_device, node);
5187 		if (rbd_dev->dev_id == dev_id) {
5188 			ret = 0;
5189 			break;
5190 		}
5191 	}
5192 	if (!ret) {
5193 		spin_lock_irq(&rbd_dev->lock);
5194 		if (rbd_dev->open_count)
5195 			ret = -EBUSY;
5196 		else
5197 			already = test_and_set_bit(RBD_DEV_FLAG_REMOVING,
5198 							&rbd_dev->flags);
5199 		spin_unlock_irq(&rbd_dev->lock);
5200 	}
5201 	spin_unlock(&rbd_dev_list_lock);
5202 	if (ret < 0 || already)
5203 		return ret;
5204 
5205 	rbd_dev_header_unwatch_sync(rbd_dev);
5206 	/*
5207 	 * flush remaining watch callbacks - these must be complete
5208 	 * before the osd_client is shutdown
5209 	 */
5210 	dout("%s: flushing notifies", __func__);
5211 	ceph_osdc_flush_notifies(&rbd_dev->rbd_client->client->osdc);
5212 
5213 	/*
5214 	 * Don't free anything from rbd_dev->disk until after all
5215 	 * notifies are completely processed. Otherwise
5216 	 * rbd_bus_del_dev() will race with rbd_watch_cb(), resulting
5217 	 * in a potential use after free of rbd_dev->disk or rbd_dev.
5218 	 */
5219 	rbd_bus_del_dev(rbd_dev);
5220 	rbd_dev_image_release(rbd_dev);
5221 	module_put(THIS_MODULE);
5222 
5223 	return count;
5224 }
5225 
5226 static ssize_t rbd_remove(struct bus_type *bus,
5227 			  const char *buf,
5228 			  size_t count)
5229 {
5230 	if (single_major)
5231 		return -EINVAL;
5232 
5233 	return do_rbd_remove(bus, buf, count);
5234 }
5235 
5236 static ssize_t rbd_remove_single_major(struct bus_type *bus,
5237 				       const char *buf,
5238 				       size_t count)
5239 {
5240 	return do_rbd_remove(bus, buf, count);
5241 }
5242 
5243 /*
5244  * create control files in sysfs
5245  * /sys/bus/rbd/...
5246  */
5247 static int rbd_sysfs_init(void)
5248 {
5249 	int ret;
5250 
5251 	ret = device_register(&rbd_root_dev);
5252 	if (ret < 0)
5253 		return ret;
5254 
5255 	ret = bus_register(&rbd_bus_type);
5256 	if (ret < 0)
5257 		device_unregister(&rbd_root_dev);
5258 
5259 	return ret;
5260 }
5261 
5262 static void rbd_sysfs_cleanup(void)
5263 {
5264 	bus_unregister(&rbd_bus_type);
5265 	device_unregister(&rbd_root_dev);
5266 }
5267 
5268 static int rbd_slab_init(void)
5269 {
5270 	rbd_assert(!rbd_img_request_cache);
5271 	rbd_img_request_cache = kmem_cache_create("rbd_img_request",
5272 					sizeof (struct rbd_img_request),
5273 					__alignof__(struct rbd_img_request),
5274 					0, NULL);
5275 	if (!rbd_img_request_cache)
5276 		return -ENOMEM;
5277 
5278 	rbd_assert(!rbd_obj_request_cache);
5279 	rbd_obj_request_cache = kmem_cache_create("rbd_obj_request",
5280 					sizeof (struct rbd_obj_request),
5281 					__alignof__(struct rbd_obj_request),
5282 					0, NULL);
5283 	if (!rbd_obj_request_cache)
5284 		goto out_err;
5285 
5286 	rbd_assert(!rbd_segment_name_cache);
5287 	rbd_segment_name_cache = kmem_cache_create("rbd_segment_name",
5288 					CEPH_MAX_OID_NAME_LEN + 1, 1, 0, NULL);
5289 	if (rbd_segment_name_cache)
5290 		return 0;
5291 out_err:
5292 	if (rbd_obj_request_cache) {
5293 		kmem_cache_destroy(rbd_obj_request_cache);
5294 		rbd_obj_request_cache = NULL;
5295 	}
5296 
5297 	kmem_cache_destroy(rbd_img_request_cache);
5298 	rbd_img_request_cache = NULL;
5299 
5300 	return -ENOMEM;
5301 }
5302 
5303 static void rbd_slab_exit(void)
5304 {
5305 	rbd_assert(rbd_segment_name_cache);
5306 	kmem_cache_destroy(rbd_segment_name_cache);
5307 	rbd_segment_name_cache = NULL;
5308 
5309 	rbd_assert(rbd_obj_request_cache);
5310 	kmem_cache_destroy(rbd_obj_request_cache);
5311 	rbd_obj_request_cache = NULL;
5312 
5313 	rbd_assert(rbd_img_request_cache);
5314 	kmem_cache_destroy(rbd_img_request_cache);
5315 	rbd_img_request_cache = NULL;
5316 }
5317 
5318 static int __init rbd_init(void)
5319 {
5320 	int rc;
5321 
5322 	if (!libceph_compatible(NULL)) {
5323 		rbd_warn(NULL, "libceph incompatibility (quitting)");
5324 		return -EINVAL;
5325 	}
5326 
5327 	rc = rbd_slab_init();
5328 	if (rc)
5329 		return rc;
5330 
5331 	if (single_major) {
5332 		rbd_major = register_blkdev(0, RBD_DRV_NAME);
5333 		if (rbd_major < 0) {
5334 			rc = rbd_major;
5335 			goto err_out_slab;
5336 		}
5337 	}
5338 
5339 	rc = rbd_sysfs_init();
5340 	if (rc)
5341 		goto err_out_blkdev;
5342 
5343 	if (single_major)
5344 		pr_info("loaded (major %d)\n", rbd_major);
5345 	else
5346 		pr_info("loaded\n");
5347 
5348 	return 0;
5349 
5350 err_out_blkdev:
5351 	if (single_major)
5352 		unregister_blkdev(rbd_major, RBD_DRV_NAME);
5353 err_out_slab:
5354 	rbd_slab_exit();
5355 	return rc;
5356 }
5357 
5358 static void __exit rbd_exit(void)
5359 {
5360 	rbd_sysfs_cleanup();
5361 	if (single_major)
5362 		unregister_blkdev(rbd_major, RBD_DRV_NAME);
5363 	rbd_slab_exit();
5364 }
5365 
5366 module_init(rbd_init);
5367 module_exit(rbd_exit);
5368 
5369 MODULE_AUTHOR("Alex Elder <elder@inktank.com>");
5370 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
5371 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
5372 /* following authorship retained from original osdblk.c */
5373 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
5374 
5375 MODULE_DESCRIPTION("RADOS Block Device (RBD) driver");
5376 MODULE_LICENSE("GPL");
5377