xref: /openbmc/linux/drivers/block/rbd.c (revision d0b73b48)
1 /*
2    rbd.c -- Export ceph rados objects as a Linux block device
3 
4 
5    based on drivers/block/osdblk.c:
6 
7    Copyright 2009 Red Hat, Inc.
8 
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation.
12 
13    This program is distributed in the hope that it will be useful,
14    but WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16    GNU General Public License for more details.
17 
18    You should have received a copy of the GNU General Public License
19    along with this program; see the file COPYING.  If not, write to
20    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
21 
22 
23 
24    For usage instructions, please refer to:
25 
26                  Documentation/ABI/testing/sysfs-bus-rbd
27 
28  */
29 
30 #include <linux/ceph/libceph.h>
31 #include <linux/ceph/osd_client.h>
32 #include <linux/ceph/mon_client.h>
33 #include <linux/ceph/decode.h>
34 #include <linux/parser.h>
35 
36 #include <linux/kernel.h>
37 #include <linux/device.h>
38 #include <linux/module.h>
39 #include <linux/fs.h>
40 #include <linux/blkdev.h>
41 
42 #include "rbd_types.h"
43 
44 #define RBD_DEBUG	/* Activate rbd_assert() calls */
45 
46 /*
47  * The basic unit of block I/O is a sector.  It is interpreted in a
48  * number of contexts in Linux (blk, bio, genhd), but the default is
49  * universally 512 bytes.  These symbols are just slightly more
50  * meaningful than the bare numbers they represent.
51  */
52 #define	SECTOR_SHIFT	9
53 #define	SECTOR_SIZE	(1ULL << SECTOR_SHIFT)
54 
55 /* It might be useful to have this defined elsewhere too */
56 
57 #define	U64_MAX	((u64) (~0ULL))
58 
59 #define RBD_DRV_NAME "rbd"
60 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
61 
62 #define RBD_MINORS_PER_MAJOR	256		/* max minors per blkdev */
63 
64 #define RBD_SNAP_DEV_NAME_PREFIX	"snap_"
65 #define RBD_MAX_SNAP_NAME_LEN	\
66 			(NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
67 
68 #define RBD_MAX_SNAP_COUNT	510	/* allows max snapc to fit in 4KB */
69 #define RBD_MAX_OPT_LEN		1024
70 
71 #define RBD_SNAP_HEAD_NAME	"-"
72 
73 /* This allows a single page to hold an image name sent by OSD */
74 #define RBD_IMAGE_NAME_LEN_MAX	(PAGE_SIZE - sizeof (__le32) - 1)
75 #define RBD_IMAGE_ID_LEN_MAX	64
76 
77 #define RBD_OBJ_PREFIX_LEN_MAX	64
78 
79 /* Feature bits */
80 
81 #define RBD_FEATURE_LAYERING      1
82 
83 /* Features supported by this (client software) implementation. */
84 
85 #define RBD_FEATURES_ALL          (0)
86 
87 /*
88  * An RBD device name will be "rbd#", where the "rbd" comes from
89  * RBD_DRV_NAME above, and # is a unique integer identifier.
90  * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
91  * enough to hold all possible device names.
92  */
93 #define DEV_NAME_LEN		32
94 #define MAX_INT_FORMAT_WIDTH	((5 * sizeof (int)) / 2 + 1)
95 
96 #define RBD_READ_ONLY_DEFAULT		false
97 
98 /*
99  * block device image metadata (in-memory version)
100  */
101 struct rbd_image_header {
102 	/* These four fields never change for a given rbd image */
103 	char *object_prefix;
104 	u64 features;
105 	__u8 obj_order;
106 	__u8 crypt_type;
107 	__u8 comp_type;
108 
109 	/* The remaining fields need to be updated occasionally */
110 	u64 image_size;
111 	struct ceph_snap_context *snapc;
112 	char *snap_names;
113 	u64 *snap_sizes;
114 
115 	u64 obj_version;
116 };
117 
118 /*
119  * An rbd image specification.
120  *
121  * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
122  * identify an image.
123  */
124 struct rbd_spec {
125 	u64		pool_id;
126 	char		*pool_name;
127 
128 	char		*image_id;
129 	size_t		image_id_len;
130 	char		*image_name;
131 	size_t		image_name_len;
132 
133 	u64		snap_id;
134 	char		*snap_name;
135 
136 	struct kref	kref;
137 };
138 
139 struct rbd_options {
140 	bool	read_only;
141 };
142 
143 /*
144  * an instance of the client.  multiple devices may share an rbd client.
145  */
146 struct rbd_client {
147 	struct ceph_client	*client;
148 	struct kref		kref;
149 	struct list_head	node;
150 };
151 
152 /*
153  * a request completion status
154  */
155 struct rbd_req_status {
156 	int done;
157 	int rc;
158 	u64 bytes;
159 };
160 
161 /*
162  * a collection of requests
163  */
164 struct rbd_req_coll {
165 	int			total;
166 	int			num_done;
167 	struct kref		kref;
168 	struct rbd_req_status	status[0];
169 };
170 
171 /*
172  * a single io request
173  */
174 struct rbd_request {
175 	struct request		*rq;		/* blk layer request */
176 	struct bio		*bio;		/* cloned bio */
177 	struct page		**pages;	/* list of used pages */
178 	u64			len;
179 	int			coll_index;
180 	struct rbd_req_coll	*coll;
181 };
182 
183 struct rbd_snap {
184 	struct	device		dev;
185 	const char		*name;
186 	u64			size;
187 	struct list_head	node;
188 	u64			id;
189 	u64			features;
190 };
191 
192 struct rbd_mapping {
193 	u64                     size;
194 	u64                     features;
195 	bool			read_only;
196 };
197 
198 /*
199  * a single device
200  */
201 struct rbd_device {
202 	int			dev_id;		/* blkdev unique id */
203 
204 	int			major;		/* blkdev assigned major */
205 	struct gendisk		*disk;		/* blkdev's gendisk and rq */
206 
207 	u32			image_format;	/* Either 1 or 2 */
208 	struct rbd_client	*rbd_client;
209 
210 	char			name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
211 
212 	spinlock_t		lock;		/* queue lock */
213 
214 	struct rbd_image_header	header;
215 	bool                    exists;
216 	struct rbd_spec		*spec;
217 
218 	char			*header_name;
219 
220 	struct ceph_osd_event   *watch_event;
221 	struct ceph_osd_request *watch_request;
222 
223 	struct rbd_spec		*parent_spec;
224 	u64			parent_overlap;
225 
226 	/* protects updating the header */
227 	struct rw_semaphore     header_rwsem;
228 
229 	struct rbd_mapping	mapping;
230 
231 	struct list_head	node;
232 
233 	/* list of snapshots */
234 	struct list_head	snaps;
235 
236 	/* sysfs related */
237 	struct device		dev;
238 	unsigned long		open_count;
239 };
240 
241 static DEFINE_MUTEX(ctl_mutex);	  /* Serialize open/close/setup/teardown */
242 
243 static LIST_HEAD(rbd_dev_list);    /* devices */
244 static DEFINE_SPINLOCK(rbd_dev_list_lock);
245 
246 static LIST_HEAD(rbd_client_list);		/* clients */
247 static DEFINE_SPINLOCK(rbd_client_list_lock);
248 
249 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev);
250 static int rbd_dev_snaps_register(struct rbd_device *rbd_dev);
251 
252 static void rbd_dev_release(struct device *dev);
253 static void rbd_remove_snap_dev(struct rbd_snap *snap);
254 
255 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
256 		       size_t count);
257 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
258 			  size_t count);
259 
260 static struct bus_attribute rbd_bus_attrs[] = {
261 	__ATTR(add, S_IWUSR, NULL, rbd_add),
262 	__ATTR(remove, S_IWUSR, NULL, rbd_remove),
263 	__ATTR_NULL
264 };
265 
266 static struct bus_type rbd_bus_type = {
267 	.name		= "rbd",
268 	.bus_attrs	= rbd_bus_attrs,
269 };
270 
271 static void rbd_root_dev_release(struct device *dev)
272 {
273 }
274 
275 static struct device rbd_root_dev = {
276 	.init_name =    "rbd",
277 	.release =      rbd_root_dev_release,
278 };
279 
280 #ifdef RBD_DEBUG
281 #define rbd_assert(expr)						\
282 		if (unlikely(!(expr))) {				\
283 			printk(KERN_ERR "\nAssertion failure in %s() "	\
284 						"at line %d:\n\n"	\
285 					"\trbd_assert(%s);\n\n",	\
286 					__func__, __LINE__, #expr);	\
287 			BUG();						\
288 		}
289 #else /* !RBD_DEBUG */
290 #  define rbd_assert(expr)	((void) 0)
291 #endif /* !RBD_DEBUG */
292 
293 static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver);
294 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver);
295 
296 static int rbd_open(struct block_device *bdev, fmode_t mode)
297 {
298 	struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
299 
300 	if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
301 		return -EROFS;
302 
303 	mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
304 	(void) get_device(&rbd_dev->dev);
305 	set_device_ro(bdev, rbd_dev->mapping.read_only);
306 	rbd_dev->open_count++;
307 	mutex_unlock(&ctl_mutex);
308 
309 	return 0;
310 }
311 
312 static int rbd_release(struct gendisk *disk, fmode_t mode)
313 {
314 	struct rbd_device *rbd_dev = disk->private_data;
315 
316 	mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
317 	rbd_assert(rbd_dev->open_count > 0);
318 	rbd_dev->open_count--;
319 	put_device(&rbd_dev->dev);
320 	mutex_unlock(&ctl_mutex);
321 
322 	return 0;
323 }
324 
325 static const struct block_device_operations rbd_bd_ops = {
326 	.owner			= THIS_MODULE,
327 	.open			= rbd_open,
328 	.release		= rbd_release,
329 };
330 
331 /*
332  * Initialize an rbd client instance.
333  * We own *ceph_opts.
334  */
335 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
336 {
337 	struct rbd_client *rbdc;
338 	int ret = -ENOMEM;
339 
340 	dout("rbd_client_create\n");
341 	rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
342 	if (!rbdc)
343 		goto out_opt;
344 
345 	kref_init(&rbdc->kref);
346 	INIT_LIST_HEAD(&rbdc->node);
347 
348 	mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
349 
350 	rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
351 	if (IS_ERR(rbdc->client))
352 		goto out_mutex;
353 	ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
354 
355 	ret = ceph_open_session(rbdc->client);
356 	if (ret < 0)
357 		goto out_err;
358 
359 	spin_lock(&rbd_client_list_lock);
360 	list_add_tail(&rbdc->node, &rbd_client_list);
361 	spin_unlock(&rbd_client_list_lock);
362 
363 	mutex_unlock(&ctl_mutex);
364 
365 	dout("rbd_client_create created %p\n", rbdc);
366 	return rbdc;
367 
368 out_err:
369 	ceph_destroy_client(rbdc->client);
370 out_mutex:
371 	mutex_unlock(&ctl_mutex);
372 	kfree(rbdc);
373 out_opt:
374 	if (ceph_opts)
375 		ceph_destroy_options(ceph_opts);
376 	return ERR_PTR(ret);
377 }
378 
379 /*
380  * Find a ceph client with specific addr and configuration.  If
381  * found, bump its reference count.
382  */
383 static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
384 {
385 	struct rbd_client *client_node;
386 	bool found = false;
387 
388 	if (ceph_opts->flags & CEPH_OPT_NOSHARE)
389 		return NULL;
390 
391 	spin_lock(&rbd_client_list_lock);
392 	list_for_each_entry(client_node, &rbd_client_list, node) {
393 		if (!ceph_compare_options(ceph_opts, client_node->client)) {
394 			kref_get(&client_node->kref);
395 			found = true;
396 			break;
397 		}
398 	}
399 	spin_unlock(&rbd_client_list_lock);
400 
401 	return found ? client_node : NULL;
402 }
403 
404 /*
405  * mount options
406  */
407 enum {
408 	Opt_last_int,
409 	/* int args above */
410 	Opt_last_string,
411 	/* string args above */
412 	Opt_read_only,
413 	Opt_read_write,
414 	/* Boolean args above */
415 	Opt_last_bool,
416 };
417 
418 static match_table_t rbd_opts_tokens = {
419 	/* int args above */
420 	/* string args above */
421 	{Opt_read_only, "read_only"},
422 	{Opt_read_only, "ro"},		/* Alternate spelling */
423 	{Opt_read_write, "read_write"},
424 	{Opt_read_write, "rw"},		/* Alternate spelling */
425 	/* Boolean args above */
426 	{-1, NULL}
427 };
428 
429 static int parse_rbd_opts_token(char *c, void *private)
430 {
431 	struct rbd_options *rbd_opts = private;
432 	substring_t argstr[MAX_OPT_ARGS];
433 	int token, intval, ret;
434 
435 	token = match_token(c, rbd_opts_tokens, argstr);
436 	if (token < 0)
437 		return -EINVAL;
438 
439 	if (token < Opt_last_int) {
440 		ret = match_int(&argstr[0], &intval);
441 		if (ret < 0) {
442 			pr_err("bad mount option arg (not int) "
443 			       "at '%s'\n", c);
444 			return ret;
445 		}
446 		dout("got int token %d val %d\n", token, intval);
447 	} else if (token > Opt_last_int && token < Opt_last_string) {
448 		dout("got string token %d val %s\n", token,
449 		     argstr[0].from);
450 	} else if (token > Opt_last_string && token < Opt_last_bool) {
451 		dout("got Boolean token %d\n", token);
452 	} else {
453 		dout("got token %d\n", token);
454 	}
455 
456 	switch (token) {
457 	case Opt_read_only:
458 		rbd_opts->read_only = true;
459 		break;
460 	case Opt_read_write:
461 		rbd_opts->read_only = false;
462 		break;
463 	default:
464 		rbd_assert(false);
465 		break;
466 	}
467 	return 0;
468 }
469 
470 /*
471  * Get a ceph client with specific addr and configuration, if one does
472  * not exist create it.
473  */
474 static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
475 {
476 	struct rbd_client *rbdc;
477 
478 	rbdc = rbd_client_find(ceph_opts);
479 	if (rbdc)	/* using an existing client */
480 		ceph_destroy_options(ceph_opts);
481 	else
482 		rbdc = rbd_client_create(ceph_opts);
483 
484 	return rbdc;
485 }
486 
487 /*
488  * Destroy ceph client
489  *
490  * Caller must hold rbd_client_list_lock.
491  */
492 static void rbd_client_release(struct kref *kref)
493 {
494 	struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
495 
496 	dout("rbd_release_client %p\n", rbdc);
497 	spin_lock(&rbd_client_list_lock);
498 	list_del(&rbdc->node);
499 	spin_unlock(&rbd_client_list_lock);
500 
501 	ceph_destroy_client(rbdc->client);
502 	kfree(rbdc);
503 }
504 
505 /*
506  * Drop reference to ceph client node. If it's not referenced anymore, release
507  * it.
508  */
509 static void rbd_put_client(struct rbd_client *rbdc)
510 {
511 	if (rbdc)
512 		kref_put(&rbdc->kref, rbd_client_release);
513 }
514 
515 /*
516  * Destroy requests collection
517  */
518 static void rbd_coll_release(struct kref *kref)
519 {
520 	struct rbd_req_coll *coll =
521 		container_of(kref, struct rbd_req_coll, kref);
522 
523 	dout("rbd_coll_release %p\n", coll);
524 	kfree(coll);
525 }
526 
527 static bool rbd_image_format_valid(u32 image_format)
528 {
529 	return image_format == 1 || image_format == 2;
530 }
531 
532 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
533 {
534 	size_t size;
535 	u32 snap_count;
536 
537 	/* The header has to start with the magic rbd header text */
538 	if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
539 		return false;
540 
541 	/* The bio layer requires at least sector-sized I/O */
542 
543 	if (ondisk->options.order < SECTOR_SHIFT)
544 		return false;
545 
546 	/* If we use u64 in a few spots we may be able to loosen this */
547 
548 	if (ondisk->options.order > 8 * sizeof (int) - 1)
549 		return false;
550 
551 	/*
552 	 * The size of a snapshot header has to fit in a size_t, and
553 	 * that limits the number of snapshots.
554 	 */
555 	snap_count = le32_to_cpu(ondisk->snap_count);
556 	size = SIZE_MAX - sizeof (struct ceph_snap_context);
557 	if (snap_count > size / sizeof (__le64))
558 		return false;
559 
560 	/*
561 	 * Not only that, but the size of the entire the snapshot
562 	 * header must also be representable in a size_t.
563 	 */
564 	size -= snap_count * sizeof (__le64);
565 	if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
566 		return false;
567 
568 	return true;
569 }
570 
571 /*
572  * Create a new header structure, translate header format from the on-disk
573  * header.
574  */
575 static int rbd_header_from_disk(struct rbd_image_header *header,
576 				 struct rbd_image_header_ondisk *ondisk)
577 {
578 	u32 snap_count;
579 	size_t len;
580 	size_t size;
581 	u32 i;
582 
583 	memset(header, 0, sizeof (*header));
584 
585 	snap_count = le32_to_cpu(ondisk->snap_count);
586 
587 	len = strnlen(ondisk->object_prefix, sizeof (ondisk->object_prefix));
588 	header->object_prefix = kmalloc(len + 1, GFP_KERNEL);
589 	if (!header->object_prefix)
590 		return -ENOMEM;
591 	memcpy(header->object_prefix, ondisk->object_prefix, len);
592 	header->object_prefix[len] = '\0';
593 
594 	if (snap_count) {
595 		u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
596 
597 		/* Save a copy of the snapshot names */
598 
599 		if (snap_names_len > (u64) SIZE_MAX)
600 			return -EIO;
601 		header->snap_names = kmalloc(snap_names_len, GFP_KERNEL);
602 		if (!header->snap_names)
603 			goto out_err;
604 		/*
605 		 * Note that rbd_dev_v1_header_read() guarantees
606 		 * the ondisk buffer we're working with has
607 		 * snap_names_len bytes beyond the end of the
608 		 * snapshot id array, this memcpy() is safe.
609 		 */
610 		memcpy(header->snap_names, &ondisk->snaps[snap_count],
611 			snap_names_len);
612 
613 		/* Record each snapshot's size */
614 
615 		size = snap_count * sizeof (*header->snap_sizes);
616 		header->snap_sizes = kmalloc(size, GFP_KERNEL);
617 		if (!header->snap_sizes)
618 			goto out_err;
619 		for (i = 0; i < snap_count; i++)
620 			header->snap_sizes[i] =
621 				le64_to_cpu(ondisk->snaps[i].image_size);
622 	} else {
623 		WARN_ON(ondisk->snap_names_len);
624 		header->snap_names = NULL;
625 		header->snap_sizes = NULL;
626 	}
627 
628 	header->features = 0;	/* No features support in v1 images */
629 	header->obj_order = ondisk->options.order;
630 	header->crypt_type = ondisk->options.crypt_type;
631 	header->comp_type = ondisk->options.comp_type;
632 
633 	/* Allocate and fill in the snapshot context */
634 
635 	header->image_size = le64_to_cpu(ondisk->image_size);
636 	size = sizeof (struct ceph_snap_context);
637 	size += snap_count * sizeof (header->snapc->snaps[0]);
638 	header->snapc = kzalloc(size, GFP_KERNEL);
639 	if (!header->snapc)
640 		goto out_err;
641 
642 	atomic_set(&header->snapc->nref, 1);
643 	header->snapc->seq = le64_to_cpu(ondisk->snap_seq);
644 	header->snapc->num_snaps = snap_count;
645 	for (i = 0; i < snap_count; i++)
646 		header->snapc->snaps[i] =
647 			le64_to_cpu(ondisk->snaps[i].id);
648 
649 	return 0;
650 
651 out_err:
652 	kfree(header->snap_sizes);
653 	header->snap_sizes = NULL;
654 	kfree(header->snap_names);
655 	header->snap_names = NULL;
656 	kfree(header->object_prefix);
657 	header->object_prefix = NULL;
658 
659 	return -ENOMEM;
660 }
661 
662 static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
663 {
664 	struct rbd_snap *snap;
665 
666 	if (snap_id == CEPH_NOSNAP)
667 		return RBD_SNAP_HEAD_NAME;
668 
669 	list_for_each_entry(snap, &rbd_dev->snaps, node)
670 		if (snap_id == snap->id)
671 			return snap->name;
672 
673 	return NULL;
674 }
675 
676 static int snap_by_name(struct rbd_device *rbd_dev, const char *snap_name)
677 {
678 
679 	struct rbd_snap *snap;
680 
681 	list_for_each_entry(snap, &rbd_dev->snaps, node) {
682 		if (!strcmp(snap_name, snap->name)) {
683 			rbd_dev->spec->snap_id = snap->id;
684 			rbd_dev->mapping.size = snap->size;
685 			rbd_dev->mapping.features = snap->features;
686 
687 			return 0;
688 		}
689 	}
690 
691 	return -ENOENT;
692 }
693 
694 static int rbd_dev_set_mapping(struct rbd_device *rbd_dev)
695 {
696 	int ret;
697 
698 	if (!memcmp(rbd_dev->spec->snap_name, RBD_SNAP_HEAD_NAME,
699 		    sizeof (RBD_SNAP_HEAD_NAME))) {
700 		rbd_dev->spec->snap_id = CEPH_NOSNAP;
701 		rbd_dev->mapping.size = rbd_dev->header.image_size;
702 		rbd_dev->mapping.features = rbd_dev->header.features;
703 		ret = 0;
704 	} else {
705 		ret = snap_by_name(rbd_dev, rbd_dev->spec->snap_name);
706 		if (ret < 0)
707 			goto done;
708 		rbd_dev->mapping.read_only = true;
709 	}
710 	rbd_dev->exists = true;
711 done:
712 	return ret;
713 }
714 
715 static void rbd_header_free(struct rbd_image_header *header)
716 {
717 	kfree(header->object_prefix);
718 	header->object_prefix = NULL;
719 	kfree(header->snap_sizes);
720 	header->snap_sizes = NULL;
721 	kfree(header->snap_names);
722 	header->snap_names = NULL;
723 	ceph_put_snap_context(header->snapc);
724 	header->snapc = NULL;
725 }
726 
727 static char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
728 {
729 	char *name;
730 	u64 segment;
731 	int ret;
732 
733 	name = kmalloc(MAX_OBJ_NAME_SIZE + 1, GFP_NOIO);
734 	if (!name)
735 		return NULL;
736 	segment = offset >> rbd_dev->header.obj_order;
737 	ret = snprintf(name, MAX_OBJ_NAME_SIZE + 1, "%s.%012llx",
738 			rbd_dev->header.object_prefix, segment);
739 	if (ret < 0 || ret > MAX_OBJ_NAME_SIZE) {
740 		pr_err("error formatting segment name for #%llu (%d)\n",
741 			segment, ret);
742 		kfree(name);
743 		name = NULL;
744 	}
745 
746 	return name;
747 }
748 
749 static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
750 {
751 	u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
752 
753 	return offset & (segment_size - 1);
754 }
755 
756 static u64 rbd_segment_length(struct rbd_device *rbd_dev,
757 				u64 offset, u64 length)
758 {
759 	u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
760 
761 	offset &= segment_size - 1;
762 
763 	rbd_assert(length <= U64_MAX - offset);
764 	if (offset + length > segment_size)
765 		length = segment_size - offset;
766 
767 	return length;
768 }
769 
770 static int rbd_get_num_segments(struct rbd_image_header *header,
771 				u64 ofs, u64 len)
772 {
773 	u64 start_seg;
774 	u64 end_seg;
775 
776 	if (!len)
777 		return 0;
778 	if (len - 1 > U64_MAX - ofs)
779 		return -ERANGE;
780 
781 	start_seg = ofs >> header->obj_order;
782 	end_seg = (ofs + len - 1) >> header->obj_order;
783 
784 	return end_seg - start_seg + 1;
785 }
786 
787 /*
788  * returns the size of an object in the image
789  */
790 static u64 rbd_obj_bytes(struct rbd_image_header *header)
791 {
792 	return 1 << header->obj_order;
793 }
794 
795 /*
796  * bio helpers
797  */
798 
799 static void bio_chain_put(struct bio *chain)
800 {
801 	struct bio *tmp;
802 
803 	while (chain) {
804 		tmp = chain;
805 		chain = chain->bi_next;
806 		bio_put(tmp);
807 	}
808 }
809 
810 /*
811  * zeros a bio chain, starting at specific offset
812  */
813 static void zero_bio_chain(struct bio *chain, int start_ofs)
814 {
815 	struct bio_vec *bv;
816 	unsigned long flags;
817 	void *buf;
818 	int i;
819 	int pos = 0;
820 
821 	while (chain) {
822 		bio_for_each_segment(bv, chain, i) {
823 			if (pos + bv->bv_len > start_ofs) {
824 				int remainder = max(start_ofs - pos, 0);
825 				buf = bvec_kmap_irq(bv, &flags);
826 				memset(buf + remainder, 0,
827 				       bv->bv_len - remainder);
828 				bvec_kunmap_irq(buf, &flags);
829 			}
830 			pos += bv->bv_len;
831 		}
832 
833 		chain = chain->bi_next;
834 	}
835 }
836 
837 /*
838  * Clone a portion of a bio, starting at the given byte offset
839  * and continuing for the number of bytes indicated.
840  */
841 static struct bio *bio_clone_range(struct bio *bio_src,
842 					unsigned int offset,
843 					unsigned int len,
844 					gfp_t gfpmask)
845 {
846 	struct bio_vec *bv;
847 	unsigned int resid;
848 	unsigned short idx;
849 	unsigned int voff;
850 	unsigned short end_idx;
851 	unsigned short vcnt;
852 	struct bio *bio;
853 
854 	/* Handle the easy case for the caller */
855 
856 	if (!offset && len == bio_src->bi_size)
857 		return bio_clone(bio_src, gfpmask);
858 
859 	if (WARN_ON_ONCE(!len))
860 		return NULL;
861 	if (WARN_ON_ONCE(len > bio_src->bi_size))
862 		return NULL;
863 	if (WARN_ON_ONCE(offset > bio_src->bi_size - len))
864 		return NULL;
865 
866 	/* Find first affected segment... */
867 
868 	resid = offset;
869 	__bio_for_each_segment(bv, bio_src, idx, 0) {
870 		if (resid < bv->bv_len)
871 			break;
872 		resid -= bv->bv_len;
873 	}
874 	voff = resid;
875 
876 	/* ...and the last affected segment */
877 
878 	resid += len;
879 	__bio_for_each_segment(bv, bio_src, end_idx, idx) {
880 		if (resid <= bv->bv_len)
881 			break;
882 		resid -= bv->bv_len;
883 	}
884 	vcnt = end_idx - idx + 1;
885 
886 	/* Build the clone */
887 
888 	bio = bio_alloc(gfpmask, (unsigned int) vcnt);
889 	if (!bio)
890 		return NULL;	/* ENOMEM */
891 
892 	bio->bi_bdev = bio_src->bi_bdev;
893 	bio->bi_sector = bio_src->bi_sector + (offset >> SECTOR_SHIFT);
894 	bio->bi_rw = bio_src->bi_rw;
895 	bio->bi_flags |= 1 << BIO_CLONED;
896 
897 	/*
898 	 * Copy over our part of the bio_vec, then update the first
899 	 * and last (or only) entries.
900 	 */
901 	memcpy(&bio->bi_io_vec[0], &bio_src->bi_io_vec[idx],
902 			vcnt * sizeof (struct bio_vec));
903 	bio->bi_io_vec[0].bv_offset += voff;
904 	if (vcnt > 1) {
905 		bio->bi_io_vec[0].bv_len -= voff;
906 		bio->bi_io_vec[vcnt - 1].bv_len = resid;
907 	} else {
908 		bio->bi_io_vec[0].bv_len = len;
909 	}
910 
911 	bio->bi_vcnt = vcnt;
912 	bio->bi_size = len;
913 	bio->bi_idx = 0;
914 
915 	return bio;
916 }
917 
918 /*
919  * Clone a portion of a bio chain, starting at the given byte offset
920  * into the first bio in the source chain and continuing for the
921  * number of bytes indicated.  The result is another bio chain of
922  * exactly the given length, or a null pointer on error.
923  *
924  * The bio_src and offset parameters are both in-out.  On entry they
925  * refer to the first source bio and the offset into that bio where
926  * the start of data to be cloned is located.
927  *
928  * On return, bio_src is updated to refer to the bio in the source
929  * chain that contains first un-cloned byte, and *offset will
930  * contain the offset of that byte within that bio.
931  */
932 static struct bio *bio_chain_clone_range(struct bio **bio_src,
933 					unsigned int *offset,
934 					unsigned int len,
935 					gfp_t gfpmask)
936 {
937 	struct bio *bi = *bio_src;
938 	unsigned int off = *offset;
939 	struct bio *chain = NULL;
940 	struct bio **end;
941 
942 	/* Build up a chain of clone bios up to the limit */
943 
944 	if (!bi || off >= bi->bi_size || !len)
945 		return NULL;		/* Nothing to clone */
946 
947 	end = &chain;
948 	while (len) {
949 		unsigned int bi_size;
950 		struct bio *bio;
951 
952 		if (!bi)
953 			goto out_err;	/* EINVAL; ran out of bio's */
954 		bi_size = min_t(unsigned int, bi->bi_size - off, len);
955 		bio = bio_clone_range(bi, off, bi_size, gfpmask);
956 		if (!bio)
957 			goto out_err;	/* ENOMEM */
958 
959 		*end = bio;
960 		end = &bio->bi_next;
961 
962 		off += bi_size;
963 		if (off == bi->bi_size) {
964 			bi = bi->bi_next;
965 			off = 0;
966 		}
967 		len -= bi_size;
968 	}
969 	*bio_src = bi;
970 	*offset = off;
971 
972 	return chain;
973 out_err:
974 	bio_chain_put(chain);
975 
976 	return NULL;
977 }
978 
979 /*
980  * helpers for osd request op vectors.
981  */
982 static struct ceph_osd_req_op *rbd_create_rw_ops(int num_ops,
983 					int opcode, u32 payload_len)
984 {
985 	struct ceph_osd_req_op *ops;
986 
987 	ops = kzalloc(sizeof (*ops) * (num_ops + 1), GFP_NOIO);
988 	if (!ops)
989 		return NULL;
990 
991 	ops[0].op = opcode;
992 
993 	/*
994 	 * op extent offset and length will be set later on
995 	 * in calc_raw_layout()
996 	 */
997 	ops[0].payload_len = payload_len;
998 
999 	return ops;
1000 }
1001 
1002 static void rbd_destroy_ops(struct ceph_osd_req_op *ops)
1003 {
1004 	kfree(ops);
1005 }
1006 
1007 static void rbd_coll_end_req_index(struct request *rq,
1008 				   struct rbd_req_coll *coll,
1009 				   int index,
1010 				   int ret, u64 len)
1011 {
1012 	struct request_queue *q;
1013 	int min, max, i;
1014 
1015 	dout("rbd_coll_end_req_index %p index %d ret %d len %llu\n",
1016 	     coll, index, ret, (unsigned long long) len);
1017 
1018 	if (!rq)
1019 		return;
1020 
1021 	if (!coll) {
1022 		blk_end_request(rq, ret, len);
1023 		return;
1024 	}
1025 
1026 	q = rq->q;
1027 
1028 	spin_lock_irq(q->queue_lock);
1029 	coll->status[index].done = 1;
1030 	coll->status[index].rc = ret;
1031 	coll->status[index].bytes = len;
1032 	max = min = coll->num_done;
1033 	while (max < coll->total && coll->status[max].done)
1034 		max++;
1035 
1036 	for (i = min; i<max; i++) {
1037 		__blk_end_request(rq, coll->status[i].rc,
1038 				  coll->status[i].bytes);
1039 		coll->num_done++;
1040 		kref_put(&coll->kref, rbd_coll_release);
1041 	}
1042 	spin_unlock_irq(q->queue_lock);
1043 }
1044 
1045 static void rbd_coll_end_req(struct rbd_request *req,
1046 			     int ret, u64 len)
1047 {
1048 	rbd_coll_end_req_index(req->rq, req->coll, req->coll_index, ret, len);
1049 }
1050 
1051 /*
1052  * Send ceph osd request
1053  */
1054 static int rbd_do_request(struct request *rq,
1055 			  struct rbd_device *rbd_dev,
1056 			  struct ceph_snap_context *snapc,
1057 			  u64 snapid,
1058 			  const char *object_name, u64 ofs, u64 len,
1059 			  struct bio *bio,
1060 			  struct page **pages,
1061 			  int num_pages,
1062 			  int flags,
1063 			  struct ceph_osd_req_op *ops,
1064 			  struct rbd_req_coll *coll,
1065 			  int coll_index,
1066 			  void (*rbd_cb)(struct ceph_osd_request *req,
1067 					 struct ceph_msg *msg),
1068 			  struct ceph_osd_request **linger_req,
1069 			  u64 *ver)
1070 {
1071 	struct ceph_osd_request *req;
1072 	struct ceph_file_layout *layout;
1073 	int ret;
1074 	u64 bno;
1075 	struct timespec mtime = CURRENT_TIME;
1076 	struct rbd_request *req_data;
1077 	struct ceph_osd_request_head *reqhead;
1078 	struct ceph_osd_client *osdc;
1079 
1080 	req_data = kzalloc(sizeof(*req_data), GFP_NOIO);
1081 	if (!req_data) {
1082 		if (coll)
1083 			rbd_coll_end_req_index(rq, coll, coll_index,
1084 					       -ENOMEM, len);
1085 		return -ENOMEM;
1086 	}
1087 
1088 	if (coll) {
1089 		req_data->coll = coll;
1090 		req_data->coll_index = coll_index;
1091 	}
1092 
1093 	dout("rbd_do_request object_name=%s ofs=%llu len=%llu coll=%p[%d]\n",
1094 		object_name, (unsigned long long) ofs,
1095 		(unsigned long long) len, coll, coll_index);
1096 
1097 	osdc = &rbd_dev->rbd_client->client->osdc;
1098 	req = ceph_osdc_alloc_request(osdc, flags, snapc, ops,
1099 					false, GFP_NOIO, pages, bio);
1100 	if (!req) {
1101 		ret = -ENOMEM;
1102 		goto done_pages;
1103 	}
1104 
1105 	req->r_callback = rbd_cb;
1106 
1107 	req_data->rq = rq;
1108 	req_data->bio = bio;
1109 	req_data->pages = pages;
1110 	req_data->len = len;
1111 
1112 	req->r_priv = req_data;
1113 
1114 	reqhead = req->r_request->front.iov_base;
1115 	reqhead->snapid = cpu_to_le64(CEPH_NOSNAP);
1116 
1117 	strncpy(req->r_oid, object_name, sizeof(req->r_oid));
1118 	req->r_oid_len = strlen(req->r_oid);
1119 
1120 	layout = &req->r_file_layout;
1121 	memset(layout, 0, sizeof(*layout));
1122 	layout->fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
1123 	layout->fl_stripe_count = cpu_to_le32(1);
1124 	layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
1125 	layout->fl_pg_pool = cpu_to_le32((int) rbd_dev->spec->pool_id);
1126 	ret = ceph_calc_raw_layout(osdc, layout, snapid, ofs, &len, &bno,
1127 				   req, ops);
1128 	rbd_assert(ret == 0);
1129 
1130 	ceph_osdc_build_request(req, ofs, &len,
1131 				ops,
1132 				snapc,
1133 				&mtime,
1134 				req->r_oid, req->r_oid_len);
1135 
1136 	if (linger_req) {
1137 		ceph_osdc_set_request_linger(osdc, req);
1138 		*linger_req = req;
1139 	}
1140 
1141 	ret = ceph_osdc_start_request(osdc, req, false);
1142 	if (ret < 0)
1143 		goto done_err;
1144 
1145 	if (!rbd_cb) {
1146 		ret = ceph_osdc_wait_request(osdc, req);
1147 		if (ver)
1148 			*ver = le64_to_cpu(req->r_reassert_version.version);
1149 		dout("reassert_ver=%llu\n",
1150 			(unsigned long long)
1151 				le64_to_cpu(req->r_reassert_version.version));
1152 		ceph_osdc_put_request(req);
1153 	}
1154 	return ret;
1155 
1156 done_err:
1157 	bio_chain_put(req_data->bio);
1158 	ceph_osdc_put_request(req);
1159 done_pages:
1160 	rbd_coll_end_req(req_data, ret, len);
1161 	kfree(req_data);
1162 	return ret;
1163 }
1164 
1165 /*
1166  * Ceph osd op callback
1167  */
1168 static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
1169 {
1170 	struct rbd_request *req_data = req->r_priv;
1171 	struct ceph_osd_reply_head *replyhead;
1172 	struct ceph_osd_op *op;
1173 	__s32 rc;
1174 	u64 bytes;
1175 	int read_op;
1176 
1177 	/* parse reply */
1178 	replyhead = msg->front.iov_base;
1179 	WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
1180 	op = (void *)(replyhead + 1);
1181 	rc = le32_to_cpu(replyhead->result);
1182 	bytes = le64_to_cpu(op->extent.length);
1183 	read_op = (le16_to_cpu(op->op) == CEPH_OSD_OP_READ);
1184 
1185 	dout("rbd_req_cb bytes=%llu readop=%d rc=%d\n",
1186 		(unsigned long long) bytes, read_op, (int) rc);
1187 
1188 	if (rc == -ENOENT && read_op) {
1189 		zero_bio_chain(req_data->bio, 0);
1190 		rc = 0;
1191 	} else if (rc == 0 && read_op && bytes < req_data->len) {
1192 		zero_bio_chain(req_data->bio, bytes);
1193 		bytes = req_data->len;
1194 	}
1195 
1196 	rbd_coll_end_req(req_data, rc, bytes);
1197 
1198 	if (req_data->bio)
1199 		bio_chain_put(req_data->bio);
1200 
1201 	ceph_osdc_put_request(req);
1202 	kfree(req_data);
1203 }
1204 
1205 static void rbd_simple_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
1206 {
1207 	ceph_osdc_put_request(req);
1208 }
1209 
1210 /*
1211  * Do a synchronous ceph osd operation
1212  */
1213 static int rbd_req_sync_op(struct rbd_device *rbd_dev,
1214 			   struct ceph_snap_context *snapc,
1215 			   u64 snapid,
1216 			   int flags,
1217 			   struct ceph_osd_req_op *ops,
1218 			   const char *object_name,
1219 			   u64 ofs, u64 inbound_size,
1220 			   char *inbound,
1221 			   struct ceph_osd_request **linger_req,
1222 			   u64 *ver)
1223 {
1224 	int ret;
1225 	struct page **pages;
1226 	int num_pages;
1227 
1228 	rbd_assert(ops != NULL);
1229 
1230 	num_pages = calc_pages_for(ofs, inbound_size);
1231 	pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
1232 	if (IS_ERR(pages))
1233 		return PTR_ERR(pages);
1234 
1235 	ret = rbd_do_request(NULL, rbd_dev, snapc, snapid,
1236 			  object_name, ofs, inbound_size, NULL,
1237 			  pages, num_pages,
1238 			  flags,
1239 			  ops,
1240 			  NULL, 0,
1241 			  NULL,
1242 			  linger_req, ver);
1243 	if (ret < 0)
1244 		goto done;
1245 
1246 	if ((flags & CEPH_OSD_FLAG_READ) && inbound)
1247 		ret = ceph_copy_from_page_vector(pages, inbound, ofs, ret);
1248 
1249 done:
1250 	ceph_release_page_vector(pages, num_pages);
1251 	return ret;
1252 }
1253 
1254 /*
1255  * Do an asynchronous ceph osd operation
1256  */
1257 static int rbd_do_op(struct request *rq,
1258 		     struct rbd_device *rbd_dev,
1259 		     struct ceph_snap_context *snapc,
1260 		     u64 ofs, u64 len,
1261 		     struct bio *bio,
1262 		     struct rbd_req_coll *coll,
1263 		     int coll_index)
1264 {
1265 	char *seg_name;
1266 	u64 seg_ofs;
1267 	u64 seg_len;
1268 	int ret;
1269 	struct ceph_osd_req_op *ops;
1270 	u32 payload_len;
1271 	int opcode;
1272 	int flags;
1273 	u64 snapid;
1274 
1275 	seg_name = rbd_segment_name(rbd_dev, ofs);
1276 	if (!seg_name)
1277 		return -ENOMEM;
1278 	seg_len = rbd_segment_length(rbd_dev, ofs, len);
1279 	seg_ofs = rbd_segment_offset(rbd_dev, ofs);
1280 
1281 	if (rq_data_dir(rq) == WRITE) {
1282 		opcode = CEPH_OSD_OP_WRITE;
1283 		flags = CEPH_OSD_FLAG_WRITE|CEPH_OSD_FLAG_ONDISK;
1284 		snapid = CEPH_NOSNAP;
1285 		payload_len = seg_len;
1286 	} else {
1287 		opcode = CEPH_OSD_OP_READ;
1288 		flags = CEPH_OSD_FLAG_READ;
1289 		snapc = NULL;
1290 		snapid = rbd_dev->spec->snap_id;
1291 		payload_len = 0;
1292 	}
1293 
1294 	ret = -ENOMEM;
1295 	ops = rbd_create_rw_ops(1, opcode, payload_len);
1296 	if (!ops)
1297 		goto done;
1298 
1299 	/* we've taken care of segment sizes earlier when we
1300 	   cloned the bios. We should never have a segment
1301 	   truncated at this point */
1302 	rbd_assert(seg_len == len);
1303 
1304 	ret = rbd_do_request(rq, rbd_dev, snapc, snapid,
1305 			     seg_name, seg_ofs, seg_len,
1306 			     bio,
1307 			     NULL, 0,
1308 			     flags,
1309 			     ops,
1310 			     coll, coll_index,
1311 			     rbd_req_cb, 0, NULL);
1312 
1313 	rbd_destroy_ops(ops);
1314 done:
1315 	kfree(seg_name);
1316 	return ret;
1317 }
1318 
1319 /*
1320  * Request sync osd read
1321  */
1322 static int rbd_req_sync_read(struct rbd_device *rbd_dev,
1323 			  u64 snapid,
1324 			  const char *object_name,
1325 			  u64 ofs, u64 len,
1326 			  char *buf,
1327 			  u64 *ver)
1328 {
1329 	struct ceph_osd_req_op *ops;
1330 	int ret;
1331 
1332 	ops = rbd_create_rw_ops(1, CEPH_OSD_OP_READ, 0);
1333 	if (!ops)
1334 		return -ENOMEM;
1335 
1336 	ret = rbd_req_sync_op(rbd_dev, NULL,
1337 			       snapid,
1338 			       CEPH_OSD_FLAG_READ,
1339 			       ops, object_name, ofs, len, buf, NULL, ver);
1340 	rbd_destroy_ops(ops);
1341 
1342 	return ret;
1343 }
1344 
1345 /*
1346  * Request sync osd watch
1347  */
1348 static int rbd_req_sync_notify_ack(struct rbd_device *rbd_dev,
1349 				   u64 ver,
1350 				   u64 notify_id)
1351 {
1352 	struct ceph_osd_req_op *ops;
1353 	int ret;
1354 
1355 	ops = rbd_create_rw_ops(1, CEPH_OSD_OP_NOTIFY_ACK, 0);
1356 	if (!ops)
1357 		return -ENOMEM;
1358 
1359 	ops[0].watch.ver = cpu_to_le64(ver);
1360 	ops[0].watch.cookie = notify_id;
1361 	ops[0].watch.flag = 0;
1362 
1363 	ret = rbd_do_request(NULL, rbd_dev, NULL, CEPH_NOSNAP,
1364 			  rbd_dev->header_name, 0, 0, NULL,
1365 			  NULL, 0,
1366 			  CEPH_OSD_FLAG_READ,
1367 			  ops,
1368 			  NULL, 0,
1369 			  rbd_simple_req_cb, 0, NULL);
1370 
1371 	rbd_destroy_ops(ops);
1372 	return ret;
1373 }
1374 
1375 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1376 {
1377 	struct rbd_device *rbd_dev = (struct rbd_device *)data;
1378 	u64 hver;
1379 	int rc;
1380 
1381 	if (!rbd_dev)
1382 		return;
1383 
1384 	dout("rbd_watch_cb %s notify_id=%llu opcode=%u\n",
1385 		rbd_dev->header_name, (unsigned long long) notify_id,
1386 		(unsigned int) opcode);
1387 	rc = rbd_dev_refresh(rbd_dev, &hver);
1388 	if (rc)
1389 		pr_warning(RBD_DRV_NAME "%d got notification but failed to "
1390 			   " update snaps: %d\n", rbd_dev->major, rc);
1391 
1392 	rbd_req_sync_notify_ack(rbd_dev, hver, notify_id);
1393 }
1394 
1395 /*
1396  * Request sync osd watch
1397  */
1398 static int rbd_req_sync_watch(struct rbd_device *rbd_dev)
1399 {
1400 	struct ceph_osd_req_op *ops;
1401 	struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1402 	int ret;
1403 
1404 	ops = rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH, 0);
1405 	if (!ops)
1406 		return -ENOMEM;
1407 
1408 	ret = ceph_osdc_create_event(osdc, rbd_watch_cb, 0,
1409 				     (void *)rbd_dev, &rbd_dev->watch_event);
1410 	if (ret < 0)
1411 		goto fail;
1412 
1413 	ops[0].watch.ver = cpu_to_le64(rbd_dev->header.obj_version);
1414 	ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
1415 	ops[0].watch.flag = 1;
1416 
1417 	ret = rbd_req_sync_op(rbd_dev, NULL,
1418 			      CEPH_NOSNAP,
1419 			      CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1420 			      ops,
1421 			      rbd_dev->header_name,
1422 			      0, 0, NULL,
1423 			      &rbd_dev->watch_request, NULL);
1424 
1425 	if (ret < 0)
1426 		goto fail_event;
1427 
1428 	rbd_destroy_ops(ops);
1429 	return 0;
1430 
1431 fail_event:
1432 	ceph_osdc_cancel_event(rbd_dev->watch_event);
1433 	rbd_dev->watch_event = NULL;
1434 fail:
1435 	rbd_destroy_ops(ops);
1436 	return ret;
1437 }
1438 
1439 /*
1440  * Request sync osd unwatch
1441  */
1442 static int rbd_req_sync_unwatch(struct rbd_device *rbd_dev)
1443 {
1444 	struct ceph_osd_req_op *ops;
1445 	int ret;
1446 
1447 	ops = rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH, 0);
1448 	if (!ops)
1449 		return -ENOMEM;
1450 
1451 	ops[0].watch.ver = 0;
1452 	ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
1453 	ops[0].watch.flag = 0;
1454 
1455 	ret = rbd_req_sync_op(rbd_dev, NULL,
1456 			      CEPH_NOSNAP,
1457 			      CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1458 			      ops,
1459 			      rbd_dev->header_name,
1460 			      0, 0, NULL, NULL, NULL);
1461 
1462 
1463 	rbd_destroy_ops(ops);
1464 	ceph_osdc_cancel_event(rbd_dev->watch_event);
1465 	rbd_dev->watch_event = NULL;
1466 	return ret;
1467 }
1468 
1469 /*
1470  * Synchronous osd object method call
1471  */
1472 static int rbd_req_sync_exec(struct rbd_device *rbd_dev,
1473 			     const char *object_name,
1474 			     const char *class_name,
1475 			     const char *method_name,
1476 			     const char *outbound,
1477 			     size_t outbound_size,
1478 			     char *inbound,
1479 			     size_t inbound_size,
1480 			     int flags,
1481 			     u64 *ver)
1482 {
1483 	struct ceph_osd_req_op *ops;
1484 	int class_name_len = strlen(class_name);
1485 	int method_name_len = strlen(method_name);
1486 	int payload_size;
1487 	int ret;
1488 
1489 	/*
1490 	 * Any input parameters required by the method we're calling
1491 	 * will be sent along with the class and method names as
1492 	 * part of the message payload.  That data and its size are
1493 	 * supplied via the indata and indata_len fields (named from
1494 	 * the perspective of the server side) in the OSD request
1495 	 * operation.
1496 	 */
1497 	payload_size = class_name_len + method_name_len + outbound_size;
1498 	ops = rbd_create_rw_ops(1, CEPH_OSD_OP_CALL, payload_size);
1499 	if (!ops)
1500 		return -ENOMEM;
1501 
1502 	ops[0].cls.class_name = class_name;
1503 	ops[0].cls.class_len = (__u8) class_name_len;
1504 	ops[0].cls.method_name = method_name;
1505 	ops[0].cls.method_len = (__u8) method_name_len;
1506 	ops[0].cls.argc = 0;
1507 	ops[0].cls.indata = outbound;
1508 	ops[0].cls.indata_len = outbound_size;
1509 
1510 	ret = rbd_req_sync_op(rbd_dev, NULL,
1511 			       CEPH_NOSNAP,
1512 			       flags, ops,
1513 			       object_name, 0, inbound_size, inbound,
1514 			       NULL, ver);
1515 
1516 	rbd_destroy_ops(ops);
1517 
1518 	dout("cls_exec returned %d\n", ret);
1519 	return ret;
1520 }
1521 
1522 static struct rbd_req_coll *rbd_alloc_coll(int num_reqs)
1523 {
1524 	struct rbd_req_coll *coll =
1525 			kzalloc(sizeof(struct rbd_req_coll) +
1526 			        sizeof(struct rbd_req_status) * num_reqs,
1527 				GFP_ATOMIC);
1528 
1529 	if (!coll)
1530 		return NULL;
1531 	coll->total = num_reqs;
1532 	kref_init(&coll->kref);
1533 	return coll;
1534 }
1535 
1536 /*
1537  * block device queue callback
1538  */
1539 static void rbd_rq_fn(struct request_queue *q)
1540 {
1541 	struct rbd_device *rbd_dev = q->queuedata;
1542 	struct request *rq;
1543 
1544 	while ((rq = blk_fetch_request(q))) {
1545 		struct bio *bio;
1546 		bool do_write;
1547 		unsigned int size;
1548 		u64 ofs;
1549 		int num_segs, cur_seg = 0;
1550 		struct rbd_req_coll *coll;
1551 		struct ceph_snap_context *snapc;
1552 		unsigned int bio_offset;
1553 
1554 		dout("fetched request\n");
1555 
1556 		/* filter out block requests we don't understand */
1557 		if ((rq->cmd_type != REQ_TYPE_FS)) {
1558 			__blk_end_request_all(rq, 0);
1559 			continue;
1560 		}
1561 
1562 		/* deduce our operation (read, write) */
1563 		do_write = (rq_data_dir(rq) == WRITE);
1564 		if (do_write && rbd_dev->mapping.read_only) {
1565 			__blk_end_request_all(rq, -EROFS);
1566 			continue;
1567 		}
1568 
1569 		spin_unlock_irq(q->queue_lock);
1570 
1571 		down_read(&rbd_dev->header_rwsem);
1572 
1573 		if (!rbd_dev->exists) {
1574 			rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
1575 			up_read(&rbd_dev->header_rwsem);
1576 			dout("request for non-existent snapshot");
1577 			spin_lock_irq(q->queue_lock);
1578 			__blk_end_request_all(rq, -ENXIO);
1579 			continue;
1580 		}
1581 
1582 		snapc = ceph_get_snap_context(rbd_dev->header.snapc);
1583 
1584 		up_read(&rbd_dev->header_rwsem);
1585 
1586 		size = blk_rq_bytes(rq);
1587 		ofs = blk_rq_pos(rq) * SECTOR_SIZE;
1588 		bio = rq->bio;
1589 
1590 		dout("%s 0x%x bytes at 0x%llx\n",
1591 		     do_write ? "write" : "read",
1592 		     size, (unsigned long long) blk_rq_pos(rq) * SECTOR_SIZE);
1593 
1594 		num_segs = rbd_get_num_segments(&rbd_dev->header, ofs, size);
1595 		if (num_segs <= 0) {
1596 			spin_lock_irq(q->queue_lock);
1597 			__blk_end_request_all(rq, num_segs);
1598 			ceph_put_snap_context(snapc);
1599 			continue;
1600 		}
1601 		coll = rbd_alloc_coll(num_segs);
1602 		if (!coll) {
1603 			spin_lock_irq(q->queue_lock);
1604 			__blk_end_request_all(rq, -ENOMEM);
1605 			ceph_put_snap_context(snapc);
1606 			continue;
1607 		}
1608 
1609 		bio_offset = 0;
1610 		do {
1611 			u64 limit = rbd_segment_length(rbd_dev, ofs, size);
1612 			unsigned int chain_size;
1613 			struct bio *bio_chain;
1614 
1615 			BUG_ON(limit > (u64) UINT_MAX);
1616 			chain_size = (unsigned int) limit;
1617 			dout("rq->bio->bi_vcnt=%hu\n", rq->bio->bi_vcnt);
1618 
1619 			kref_get(&coll->kref);
1620 
1621 			/* Pass a cloned bio chain via an osd request */
1622 
1623 			bio_chain = bio_chain_clone_range(&bio,
1624 						&bio_offset, chain_size,
1625 						GFP_ATOMIC);
1626 			if (bio_chain)
1627 				(void) rbd_do_op(rq, rbd_dev, snapc,
1628 						ofs, chain_size,
1629 						bio_chain, coll, cur_seg);
1630 			else
1631 				rbd_coll_end_req_index(rq, coll, cur_seg,
1632 						       -ENOMEM, chain_size);
1633 			size -= chain_size;
1634 			ofs += chain_size;
1635 
1636 			cur_seg++;
1637 		} while (size > 0);
1638 		kref_put(&coll->kref, rbd_coll_release);
1639 
1640 		spin_lock_irq(q->queue_lock);
1641 
1642 		ceph_put_snap_context(snapc);
1643 	}
1644 }
1645 
1646 /*
1647  * a queue callback. Makes sure that we don't create a bio that spans across
1648  * multiple osd objects. One exception would be with a single page bios,
1649  * which we handle later at bio_chain_clone_range()
1650  */
1651 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
1652 			  struct bio_vec *bvec)
1653 {
1654 	struct rbd_device *rbd_dev = q->queuedata;
1655 	sector_t sector_offset;
1656 	sector_t sectors_per_obj;
1657 	sector_t obj_sector_offset;
1658 	int ret;
1659 
1660 	/*
1661 	 * Find how far into its rbd object the partition-relative
1662 	 * bio start sector is to offset relative to the enclosing
1663 	 * device.
1664 	 */
1665 	sector_offset = get_start_sect(bmd->bi_bdev) + bmd->bi_sector;
1666 	sectors_per_obj = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
1667 	obj_sector_offset = sector_offset & (sectors_per_obj - 1);
1668 
1669 	/*
1670 	 * Compute the number of bytes from that offset to the end
1671 	 * of the object.  Account for what's already used by the bio.
1672 	 */
1673 	ret = (int) (sectors_per_obj - obj_sector_offset) << SECTOR_SHIFT;
1674 	if (ret > bmd->bi_size)
1675 		ret -= bmd->bi_size;
1676 	else
1677 		ret = 0;
1678 
1679 	/*
1680 	 * Don't send back more than was asked for.  And if the bio
1681 	 * was empty, let the whole thing through because:  "Note
1682 	 * that a block device *must* allow a single page to be
1683 	 * added to an empty bio."
1684 	 */
1685 	rbd_assert(bvec->bv_len <= PAGE_SIZE);
1686 	if (ret > (int) bvec->bv_len || !bmd->bi_size)
1687 		ret = (int) bvec->bv_len;
1688 
1689 	return ret;
1690 }
1691 
1692 static void rbd_free_disk(struct rbd_device *rbd_dev)
1693 {
1694 	struct gendisk *disk = rbd_dev->disk;
1695 
1696 	if (!disk)
1697 		return;
1698 
1699 	if (disk->flags & GENHD_FL_UP)
1700 		del_gendisk(disk);
1701 	if (disk->queue)
1702 		blk_cleanup_queue(disk->queue);
1703 	put_disk(disk);
1704 }
1705 
1706 /*
1707  * Read the complete header for the given rbd device.
1708  *
1709  * Returns a pointer to a dynamically-allocated buffer containing
1710  * the complete and validated header.  Caller can pass the address
1711  * of a variable that will be filled in with the version of the
1712  * header object at the time it was read.
1713  *
1714  * Returns a pointer-coded errno if a failure occurs.
1715  */
1716 static struct rbd_image_header_ondisk *
1717 rbd_dev_v1_header_read(struct rbd_device *rbd_dev, u64 *version)
1718 {
1719 	struct rbd_image_header_ondisk *ondisk = NULL;
1720 	u32 snap_count = 0;
1721 	u64 names_size = 0;
1722 	u32 want_count;
1723 	int ret;
1724 
1725 	/*
1726 	 * The complete header will include an array of its 64-bit
1727 	 * snapshot ids, followed by the names of those snapshots as
1728 	 * a contiguous block of NUL-terminated strings.  Note that
1729 	 * the number of snapshots could change by the time we read
1730 	 * it in, in which case we re-read it.
1731 	 */
1732 	do {
1733 		size_t size;
1734 
1735 		kfree(ondisk);
1736 
1737 		size = sizeof (*ondisk);
1738 		size += snap_count * sizeof (struct rbd_image_snap_ondisk);
1739 		size += names_size;
1740 		ondisk = kmalloc(size, GFP_KERNEL);
1741 		if (!ondisk)
1742 			return ERR_PTR(-ENOMEM);
1743 
1744 		ret = rbd_req_sync_read(rbd_dev, CEPH_NOSNAP,
1745 				       rbd_dev->header_name,
1746 				       0, size,
1747 				       (char *) ondisk, version);
1748 
1749 		if (ret < 0)
1750 			goto out_err;
1751 		if (WARN_ON((size_t) ret < size)) {
1752 			ret = -ENXIO;
1753 			pr_warning("short header read for image %s"
1754 					" (want %zd got %d)\n",
1755 				rbd_dev->spec->image_name, size, ret);
1756 			goto out_err;
1757 		}
1758 		if (!rbd_dev_ondisk_valid(ondisk)) {
1759 			ret = -ENXIO;
1760 			pr_warning("invalid header for image %s\n",
1761 				rbd_dev->spec->image_name);
1762 			goto out_err;
1763 		}
1764 
1765 		names_size = le64_to_cpu(ondisk->snap_names_len);
1766 		want_count = snap_count;
1767 		snap_count = le32_to_cpu(ondisk->snap_count);
1768 	} while (snap_count != want_count);
1769 
1770 	return ondisk;
1771 
1772 out_err:
1773 	kfree(ondisk);
1774 
1775 	return ERR_PTR(ret);
1776 }
1777 
1778 /*
1779  * reload the ondisk the header
1780  */
1781 static int rbd_read_header(struct rbd_device *rbd_dev,
1782 			   struct rbd_image_header *header)
1783 {
1784 	struct rbd_image_header_ondisk *ondisk;
1785 	u64 ver = 0;
1786 	int ret;
1787 
1788 	ondisk = rbd_dev_v1_header_read(rbd_dev, &ver);
1789 	if (IS_ERR(ondisk))
1790 		return PTR_ERR(ondisk);
1791 	ret = rbd_header_from_disk(header, ondisk);
1792 	if (ret >= 0)
1793 		header->obj_version = ver;
1794 	kfree(ondisk);
1795 
1796 	return ret;
1797 }
1798 
1799 static void rbd_remove_all_snaps(struct rbd_device *rbd_dev)
1800 {
1801 	struct rbd_snap *snap;
1802 	struct rbd_snap *next;
1803 
1804 	list_for_each_entry_safe(snap, next, &rbd_dev->snaps, node)
1805 		rbd_remove_snap_dev(snap);
1806 }
1807 
1808 static void rbd_update_mapping_size(struct rbd_device *rbd_dev)
1809 {
1810 	sector_t size;
1811 
1812 	if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
1813 		return;
1814 
1815 	size = (sector_t) rbd_dev->header.image_size / SECTOR_SIZE;
1816 	dout("setting size to %llu sectors", (unsigned long long) size);
1817 	rbd_dev->mapping.size = (u64) size;
1818 	set_capacity(rbd_dev->disk, size);
1819 }
1820 
1821 /*
1822  * only read the first part of the ondisk header, without the snaps info
1823  */
1824 static int rbd_dev_v1_refresh(struct rbd_device *rbd_dev, u64 *hver)
1825 {
1826 	int ret;
1827 	struct rbd_image_header h;
1828 
1829 	ret = rbd_read_header(rbd_dev, &h);
1830 	if (ret < 0)
1831 		return ret;
1832 
1833 	down_write(&rbd_dev->header_rwsem);
1834 
1835 	/* Update image size, and check for resize of mapped image */
1836 	rbd_dev->header.image_size = h.image_size;
1837 	rbd_update_mapping_size(rbd_dev);
1838 
1839 	/* rbd_dev->header.object_prefix shouldn't change */
1840 	kfree(rbd_dev->header.snap_sizes);
1841 	kfree(rbd_dev->header.snap_names);
1842 	/* osd requests may still refer to snapc */
1843 	ceph_put_snap_context(rbd_dev->header.snapc);
1844 
1845 	if (hver)
1846 		*hver = h.obj_version;
1847 	rbd_dev->header.obj_version = h.obj_version;
1848 	rbd_dev->header.image_size = h.image_size;
1849 	rbd_dev->header.snapc = h.snapc;
1850 	rbd_dev->header.snap_names = h.snap_names;
1851 	rbd_dev->header.snap_sizes = h.snap_sizes;
1852 	/* Free the extra copy of the object prefix */
1853 	WARN_ON(strcmp(rbd_dev->header.object_prefix, h.object_prefix));
1854 	kfree(h.object_prefix);
1855 
1856 	ret = rbd_dev_snaps_update(rbd_dev);
1857 	if (!ret)
1858 		ret = rbd_dev_snaps_register(rbd_dev);
1859 
1860 	up_write(&rbd_dev->header_rwsem);
1861 
1862 	return ret;
1863 }
1864 
1865 static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver)
1866 {
1867 	int ret;
1868 
1869 	rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1870 	mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1871 	if (rbd_dev->image_format == 1)
1872 		ret = rbd_dev_v1_refresh(rbd_dev, hver);
1873 	else
1874 		ret = rbd_dev_v2_refresh(rbd_dev, hver);
1875 	mutex_unlock(&ctl_mutex);
1876 
1877 	return ret;
1878 }
1879 
1880 static int rbd_init_disk(struct rbd_device *rbd_dev)
1881 {
1882 	struct gendisk *disk;
1883 	struct request_queue *q;
1884 	u64 segment_size;
1885 
1886 	/* create gendisk info */
1887 	disk = alloc_disk(RBD_MINORS_PER_MAJOR);
1888 	if (!disk)
1889 		return -ENOMEM;
1890 
1891 	snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
1892 		 rbd_dev->dev_id);
1893 	disk->major = rbd_dev->major;
1894 	disk->first_minor = 0;
1895 	disk->fops = &rbd_bd_ops;
1896 	disk->private_data = rbd_dev;
1897 
1898 	/* init rq */
1899 	q = blk_init_queue(rbd_rq_fn, &rbd_dev->lock);
1900 	if (!q)
1901 		goto out_disk;
1902 
1903 	/* We use the default size, but let's be explicit about it. */
1904 	blk_queue_physical_block_size(q, SECTOR_SIZE);
1905 
1906 	/* set io sizes to object size */
1907 	segment_size = rbd_obj_bytes(&rbd_dev->header);
1908 	blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
1909 	blk_queue_max_segment_size(q, segment_size);
1910 	blk_queue_io_min(q, segment_size);
1911 	blk_queue_io_opt(q, segment_size);
1912 
1913 	blk_queue_merge_bvec(q, rbd_merge_bvec);
1914 	disk->queue = q;
1915 
1916 	q->queuedata = rbd_dev;
1917 
1918 	rbd_dev->disk = disk;
1919 
1920 	set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
1921 
1922 	return 0;
1923 out_disk:
1924 	put_disk(disk);
1925 
1926 	return -ENOMEM;
1927 }
1928 
1929 /*
1930   sysfs
1931 */
1932 
1933 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
1934 {
1935 	return container_of(dev, struct rbd_device, dev);
1936 }
1937 
1938 static ssize_t rbd_size_show(struct device *dev,
1939 			     struct device_attribute *attr, char *buf)
1940 {
1941 	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1942 	sector_t size;
1943 
1944 	down_read(&rbd_dev->header_rwsem);
1945 	size = get_capacity(rbd_dev->disk);
1946 	up_read(&rbd_dev->header_rwsem);
1947 
1948 	return sprintf(buf, "%llu\n", (unsigned long long) size * SECTOR_SIZE);
1949 }
1950 
1951 /*
1952  * Note this shows the features for whatever's mapped, which is not
1953  * necessarily the base image.
1954  */
1955 static ssize_t rbd_features_show(struct device *dev,
1956 			     struct device_attribute *attr, char *buf)
1957 {
1958 	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1959 
1960 	return sprintf(buf, "0x%016llx\n",
1961 			(unsigned long long) rbd_dev->mapping.features);
1962 }
1963 
1964 static ssize_t rbd_major_show(struct device *dev,
1965 			      struct device_attribute *attr, char *buf)
1966 {
1967 	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1968 
1969 	return sprintf(buf, "%d\n", rbd_dev->major);
1970 }
1971 
1972 static ssize_t rbd_client_id_show(struct device *dev,
1973 				  struct device_attribute *attr, char *buf)
1974 {
1975 	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1976 
1977 	return sprintf(buf, "client%lld\n",
1978 			ceph_client_id(rbd_dev->rbd_client->client));
1979 }
1980 
1981 static ssize_t rbd_pool_show(struct device *dev,
1982 			     struct device_attribute *attr, char *buf)
1983 {
1984 	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1985 
1986 	return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
1987 }
1988 
1989 static ssize_t rbd_pool_id_show(struct device *dev,
1990 			     struct device_attribute *attr, char *buf)
1991 {
1992 	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1993 
1994 	return sprintf(buf, "%llu\n",
1995 		(unsigned long long) rbd_dev->spec->pool_id);
1996 }
1997 
1998 static ssize_t rbd_name_show(struct device *dev,
1999 			     struct device_attribute *attr, char *buf)
2000 {
2001 	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2002 
2003 	if (rbd_dev->spec->image_name)
2004 		return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
2005 
2006 	return sprintf(buf, "(unknown)\n");
2007 }
2008 
2009 static ssize_t rbd_image_id_show(struct device *dev,
2010 			     struct device_attribute *attr, char *buf)
2011 {
2012 	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2013 
2014 	return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
2015 }
2016 
2017 /*
2018  * Shows the name of the currently-mapped snapshot (or
2019  * RBD_SNAP_HEAD_NAME for the base image).
2020  */
2021 static ssize_t rbd_snap_show(struct device *dev,
2022 			     struct device_attribute *attr,
2023 			     char *buf)
2024 {
2025 	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2026 
2027 	return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
2028 }
2029 
2030 /*
2031  * For an rbd v2 image, shows the pool id, image id, and snapshot id
2032  * for the parent image.  If there is no parent, simply shows
2033  * "(no parent image)".
2034  */
2035 static ssize_t rbd_parent_show(struct device *dev,
2036 			     struct device_attribute *attr,
2037 			     char *buf)
2038 {
2039 	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2040 	struct rbd_spec *spec = rbd_dev->parent_spec;
2041 	int count;
2042 	char *bufp = buf;
2043 
2044 	if (!spec)
2045 		return sprintf(buf, "(no parent image)\n");
2046 
2047 	count = sprintf(bufp, "pool_id %llu\npool_name %s\n",
2048 			(unsigned long long) spec->pool_id, spec->pool_name);
2049 	if (count < 0)
2050 		return count;
2051 	bufp += count;
2052 
2053 	count = sprintf(bufp, "image_id %s\nimage_name %s\n", spec->image_id,
2054 			spec->image_name ? spec->image_name : "(unknown)");
2055 	if (count < 0)
2056 		return count;
2057 	bufp += count;
2058 
2059 	count = sprintf(bufp, "snap_id %llu\nsnap_name %s\n",
2060 			(unsigned long long) spec->snap_id, spec->snap_name);
2061 	if (count < 0)
2062 		return count;
2063 	bufp += count;
2064 
2065 	count = sprintf(bufp, "overlap %llu\n", rbd_dev->parent_overlap);
2066 	if (count < 0)
2067 		return count;
2068 	bufp += count;
2069 
2070 	return (ssize_t) (bufp - buf);
2071 }
2072 
2073 static ssize_t rbd_image_refresh(struct device *dev,
2074 				 struct device_attribute *attr,
2075 				 const char *buf,
2076 				 size_t size)
2077 {
2078 	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2079 	int ret;
2080 
2081 	ret = rbd_dev_refresh(rbd_dev, NULL);
2082 
2083 	return ret < 0 ? ret : size;
2084 }
2085 
2086 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
2087 static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
2088 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
2089 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
2090 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
2091 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
2092 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
2093 static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
2094 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
2095 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
2096 static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL);
2097 
2098 static struct attribute *rbd_attrs[] = {
2099 	&dev_attr_size.attr,
2100 	&dev_attr_features.attr,
2101 	&dev_attr_major.attr,
2102 	&dev_attr_client_id.attr,
2103 	&dev_attr_pool.attr,
2104 	&dev_attr_pool_id.attr,
2105 	&dev_attr_name.attr,
2106 	&dev_attr_image_id.attr,
2107 	&dev_attr_current_snap.attr,
2108 	&dev_attr_parent.attr,
2109 	&dev_attr_refresh.attr,
2110 	NULL
2111 };
2112 
2113 static struct attribute_group rbd_attr_group = {
2114 	.attrs = rbd_attrs,
2115 };
2116 
2117 static const struct attribute_group *rbd_attr_groups[] = {
2118 	&rbd_attr_group,
2119 	NULL
2120 };
2121 
2122 static void rbd_sysfs_dev_release(struct device *dev)
2123 {
2124 }
2125 
2126 static struct device_type rbd_device_type = {
2127 	.name		= "rbd",
2128 	.groups		= rbd_attr_groups,
2129 	.release	= rbd_sysfs_dev_release,
2130 };
2131 
2132 
2133 /*
2134   sysfs - snapshots
2135 */
2136 
2137 static ssize_t rbd_snap_size_show(struct device *dev,
2138 				  struct device_attribute *attr,
2139 				  char *buf)
2140 {
2141 	struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2142 
2143 	return sprintf(buf, "%llu\n", (unsigned long long)snap->size);
2144 }
2145 
2146 static ssize_t rbd_snap_id_show(struct device *dev,
2147 				struct device_attribute *attr,
2148 				char *buf)
2149 {
2150 	struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2151 
2152 	return sprintf(buf, "%llu\n", (unsigned long long)snap->id);
2153 }
2154 
2155 static ssize_t rbd_snap_features_show(struct device *dev,
2156 				struct device_attribute *attr,
2157 				char *buf)
2158 {
2159 	struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2160 
2161 	return sprintf(buf, "0x%016llx\n",
2162 			(unsigned long long) snap->features);
2163 }
2164 
2165 static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
2166 static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
2167 static DEVICE_ATTR(snap_features, S_IRUGO, rbd_snap_features_show, NULL);
2168 
2169 static struct attribute *rbd_snap_attrs[] = {
2170 	&dev_attr_snap_size.attr,
2171 	&dev_attr_snap_id.attr,
2172 	&dev_attr_snap_features.attr,
2173 	NULL,
2174 };
2175 
2176 static struct attribute_group rbd_snap_attr_group = {
2177 	.attrs = rbd_snap_attrs,
2178 };
2179 
2180 static void rbd_snap_dev_release(struct device *dev)
2181 {
2182 	struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2183 	kfree(snap->name);
2184 	kfree(snap);
2185 }
2186 
2187 static const struct attribute_group *rbd_snap_attr_groups[] = {
2188 	&rbd_snap_attr_group,
2189 	NULL
2190 };
2191 
2192 static struct device_type rbd_snap_device_type = {
2193 	.groups		= rbd_snap_attr_groups,
2194 	.release	= rbd_snap_dev_release,
2195 };
2196 
2197 static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
2198 {
2199 	kref_get(&spec->kref);
2200 
2201 	return spec;
2202 }
2203 
2204 static void rbd_spec_free(struct kref *kref);
2205 static void rbd_spec_put(struct rbd_spec *spec)
2206 {
2207 	if (spec)
2208 		kref_put(&spec->kref, rbd_spec_free);
2209 }
2210 
2211 static struct rbd_spec *rbd_spec_alloc(void)
2212 {
2213 	struct rbd_spec *spec;
2214 
2215 	spec = kzalloc(sizeof (*spec), GFP_KERNEL);
2216 	if (!spec)
2217 		return NULL;
2218 	kref_init(&spec->kref);
2219 
2220 	rbd_spec_put(rbd_spec_get(spec));	/* TEMPORARY */
2221 
2222 	return spec;
2223 }
2224 
2225 static void rbd_spec_free(struct kref *kref)
2226 {
2227 	struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
2228 
2229 	kfree(spec->pool_name);
2230 	kfree(spec->image_id);
2231 	kfree(spec->image_name);
2232 	kfree(spec->snap_name);
2233 	kfree(spec);
2234 }
2235 
2236 struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
2237 				struct rbd_spec *spec)
2238 {
2239 	struct rbd_device *rbd_dev;
2240 
2241 	rbd_dev = kzalloc(sizeof (*rbd_dev), GFP_KERNEL);
2242 	if (!rbd_dev)
2243 		return NULL;
2244 
2245 	spin_lock_init(&rbd_dev->lock);
2246 	INIT_LIST_HEAD(&rbd_dev->node);
2247 	INIT_LIST_HEAD(&rbd_dev->snaps);
2248 	init_rwsem(&rbd_dev->header_rwsem);
2249 
2250 	rbd_dev->spec = spec;
2251 	rbd_dev->rbd_client = rbdc;
2252 
2253 	return rbd_dev;
2254 }
2255 
2256 static void rbd_dev_destroy(struct rbd_device *rbd_dev)
2257 {
2258 	rbd_spec_put(rbd_dev->parent_spec);
2259 	kfree(rbd_dev->header_name);
2260 	rbd_put_client(rbd_dev->rbd_client);
2261 	rbd_spec_put(rbd_dev->spec);
2262 	kfree(rbd_dev);
2263 }
2264 
2265 static bool rbd_snap_registered(struct rbd_snap *snap)
2266 {
2267 	bool ret = snap->dev.type == &rbd_snap_device_type;
2268 	bool reg = device_is_registered(&snap->dev);
2269 
2270 	rbd_assert(!ret ^ reg);
2271 
2272 	return ret;
2273 }
2274 
2275 static void rbd_remove_snap_dev(struct rbd_snap *snap)
2276 {
2277 	list_del(&snap->node);
2278 	if (device_is_registered(&snap->dev))
2279 		device_unregister(&snap->dev);
2280 }
2281 
2282 static int rbd_register_snap_dev(struct rbd_snap *snap,
2283 				  struct device *parent)
2284 {
2285 	struct device *dev = &snap->dev;
2286 	int ret;
2287 
2288 	dev->type = &rbd_snap_device_type;
2289 	dev->parent = parent;
2290 	dev->release = rbd_snap_dev_release;
2291 	dev_set_name(dev, "%s%s", RBD_SNAP_DEV_NAME_PREFIX, snap->name);
2292 	dout("%s: registering device for snapshot %s\n", __func__, snap->name);
2293 
2294 	ret = device_register(dev);
2295 
2296 	return ret;
2297 }
2298 
2299 static struct rbd_snap *__rbd_add_snap_dev(struct rbd_device *rbd_dev,
2300 						const char *snap_name,
2301 						u64 snap_id, u64 snap_size,
2302 						u64 snap_features)
2303 {
2304 	struct rbd_snap *snap;
2305 	int ret;
2306 
2307 	snap = kzalloc(sizeof (*snap), GFP_KERNEL);
2308 	if (!snap)
2309 		return ERR_PTR(-ENOMEM);
2310 
2311 	ret = -ENOMEM;
2312 	snap->name = kstrdup(snap_name, GFP_KERNEL);
2313 	if (!snap->name)
2314 		goto err;
2315 
2316 	snap->id = snap_id;
2317 	snap->size = snap_size;
2318 	snap->features = snap_features;
2319 
2320 	return snap;
2321 
2322 err:
2323 	kfree(snap->name);
2324 	kfree(snap);
2325 
2326 	return ERR_PTR(ret);
2327 }
2328 
2329 static char *rbd_dev_v1_snap_info(struct rbd_device *rbd_dev, u32 which,
2330 		u64 *snap_size, u64 *snap_features)
2331 {
2332 	char *snap_name;
2333 
2334 	rbd_assert(which < rbd_dev->header.snapc->num_snaps);
2335 
2336 	*snap_size = rbd_dev->header.snap_sizes[which];
2337 	*snap_features = 0;	/* No features for v1 */
2338 
2339 	/* Skip over names until we find the one we are looking for */
2340 
2341 	snap_name = rbd_dev->header.snap_names;
2342 	while (which--)
2343 		snap_name += strlen(snap_name) + 1;
2344 
2345 	return snap_name;
2346 }
2347 
2348 /*
2349  * Get the size and object order for an image snapshot, or if
2350  * snap_id is CEPH_NOSNAP, gets this information for the base
2351  * image.
2352  */
2353 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
2354 				u8 *order, u64 *snap_size)
2355 {
2356 	__le64 snapid = cpu_to_le64(snap_id);
2357 	int ret;
2358 	struct {
2359 		u8 order;
2360 		__le64 size;
2361 	} __attribute__ ((packed)) size_buf = { 0 };
2362 
2363 	ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2364 				"rbd", "get_size",
2365 				(char *) &snapid, sizeof (snapid),
2366 				(char *) &size_buf, sizeof (size_buf),
2367 				CEPH_OSD_FLAG_READ, NULL);
2368 	dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2369 	if (ret < 0)
2370 		return ret;
2371 
2372 	*order = size_buf.order;
2373 	*snap_size = le64_to_cpu(size_buf.size);
2374 
2375 	dout("  snap_id 0x%016llx order = %u, snap_size = %llu\n",
2376 		(unsigned long long) snap_id, (unsigned int) *order,
2377 		(unsigned long long) *snap_size);
2378 
2379 	return 0;
2380 }
2381 
2382 static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
2383 {
2384 	return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
2385 					&rbd_dev->header.obj_order,
2386 					&rbd_dev->header.image_size);
2387 }
2388 
2389 static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
2390 {
2391 	void *reply_buf;
2392 	int ret;
2393 	void *p;
2394 
2395 	reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
2396 	if (!reply_buf)
2397 		return -ENOMEM;
2398 
2399 	ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2400 				"rbd", "get_object_prefix",
2401 				NULL, 0,
2402 				reply_buf, RBD_OBJ_PREFIX_LEN_MAX,
2403 				CEPH_OSD_FLAG_READ, NULL);
2404 	dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2405 	if (ret < 0)
2406 		goto out;
2407 	ret = 0;    /* rbd_req_sync_exec() can return positive */
2408 
2409 	p = reply_buf;
2410 	rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
2411 						p + RBD_OBJ_PREFIX_LEN_MAX,
2412 						NULL, GFP_NOIO);
2413 
2414 	if (IS_ERR(rbd_dev->header.object_prefix)) {
2415 		ret = PTR_ERR(rbd_dev->header.object_prefix);
2416 		rbd_dev->header.object_prefix = NULL;
2417 	} else {
2418 		dout("  object_prefix = %s\n", rbd_dev->header.object_prefix);
2419 	}
2420 
2421 out:
2422 	kfree(reply_buf);
2423 
2424 	return ret;
2425 }
2426 
2427 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
2428 		u64 *snap_features)
2429 {
2430 	__le64 snapid = cpu_to_le64(snap_id);
2431 	struct {
2432 		__le64 features;
2433 		__le64 incompat;
2434 	} features_buf = { 0 };
2435 	u64 incompat;
2436 	int ret;
2437 
2438 	ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2439 				"rbd", "get_features",
2440 				(char *) &snapid, sizeof (snapid),
2441 				(char *) &features_buf, sizeof (features_buf),
2442 				CEPH_OSD_FLAG_READ, NULL);
2443 	dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2444 	if (ret < 0)
2445 		return ret;
2446 
2447 	incompat = le64_to_cpu(features_buf.incompat);
2448 	if (incompat & ~RBD_FEATURES_ALL)
2449 		return -ENXIO;
2450 
2451 	*snap_features = le64_to_cpu(features_buf.features);
2452 
2453 	dout("  snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
2454 		(unsigned long long) snap_id,
2455 		(unsigned long long) *snap_features,
2456 		(unsigned long long) le64_to_cpu(features_buf.incompat));
2457 
2458 	return 0;
2459 }
2460 
2461 static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
2462 {
2463 	return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
2464 						&rbd_dev->header.features);
2465 }
2466 
2467 static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
2468 {
2469 	struct rbd_spec *parent_spec;
2470 	size_t size;
2471 	void *reply_buf = NULL;
2472 	__le64 snapid;
2473 	void *p;
2474 	void *end;
2475 	char *image_id;
2476 	u64 overlap;
2477 	size_t len = 0;
2478 	int ret;
2479 
2480 	parent_spec = rbd_spec_alloc();
2481 	if (!parent_spec)
2482 		return -ENOMEM;
2483 
2484 	size = sizeof (__le64) +				/* pool_id */
2485 		sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX +	/* image_id */
2486 		sizeof (__le64) +				/* snap_id */
2487 		sizeof (__le64);				/* overlap */
2488 	reply_buf = kmalloc(size, GFP_KERNEL);
2489 	if (!reply_buf) {
2490 		ret = -ENOMEM;
2491 		goto out_err;
2492 	}
2493 
2494 	snapid = cpu_to_le64(CEPH_NOSNAP);
2495 	ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2496 				"rbd", "get_parent",
2497 				(char *) &snapid, sizeof (snapid),
2498 				(char *) reply_buf, size,
2499 				CEPH_OSD_FLAG_READ, NULL);
2500 	dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2501 	if (ret < 0)
2502 		goto out_err;
2503 
2504 	ret = -ERANGE;
2505 	p = reply_buf;
2506 	end = (char *) reply_buf + size;
2507 	ceph_decode_64_safe(&p, end, parent_spec->pool_id, out_err);
2508 	if (parent_spec->pool_id == CEPH_NOPOOL)
2509 		goto out;	/* No parent?  No problem. */
2510 
2511 	image_id = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
2512 	if (IS_ERR(image_id)) {
2513 		ret = PTR_ERR(image_id);
2514 		goto out_err;
2515 	}
2516 	parent_spec->image_id = image_id;
2517 	parent_spec->image_id_len = len;
2518 	ceph_decode_64_safe(&p, end, parent_spec->snap_id, out_err);
2519 	ceph_decode_64_safe(&p, end, overlap, out_err);
2520 
2521 	rbd_dev->parent_overlap = overlap;
2522 	rbd_dev->parent_spec = parent_spec;
2523 	parent_spec = NULL;	/* rbd_dev now owns this */
2524 out:
2525 	ret = 0;
2526 out_err:
2527 	kfree(reply_buf);
2528 	rbd_spec_put(parent_spec);
2529 
2530 	return ret;
2531 }
2532 
2533 static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
2534 {
2535 	size_t image_id_size;
2536 	char *image_id;
2537 	void *p;
2538 	void *end;
2539 	size_t size;
2540 	void *reply_buf = NULL;
2541 	size_t len = 0;
2542 	char *image_name = NULL;
2543 	int ret;
2544 
2545 	rbd_assert(!rbd_dev->spec->image_name);
2546 
2547 	image_id_size = sizeof (__le32) + rbd_dev->spec->image_id_len;
2548 	image_id = kmalloc(image_id_size, GFP_KERNEL);
2549 	if (!image_id)
2550 		return NULL;
2551 
2552 	p = image_id;
2553 	end = (char *) image_id + image_id_size;
2554 	ceph_encode_string(&p, end, rbd_dev->spec->image_id,
2555 				(u32) rbd_dev->spec->image_id_len);
2556 
2557 	size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
2558 	reply_buf = kmalloc(size, GFP_KERNEL);
2559 	if (!reply_buf)
2560 		goto out;
2561 
2562 	ret = rbd_req_sync_exec(rbd_dev, RBD_DIRECTORY,
2563 				"rbd", "dir_get_name",
2564 				image_id, image_id_size,
2565 				(char *) reply_buf, size,
2566 				CEPH_OSD_FLAG_READ, NULL);
2567 	if (ret < 0)
2568 		goto out;
2569 	p = reply_buf;
2570 	end = (char *) reply_buf + size;
2571 	image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
2572 	if (IS_ERR(image_name))
2573 		image_name = NULL;
2574 	else
2575 		dout("%s: name is %s len is %zd\n", __func__, image_name, len);
2576 out:
2577 	kfree(reply_buf);
2578 	kfree(image_id);
2579 
2580 	return image_name;
2581 }
2582 
2583 /*
2584  * When a parent image gets probed, we only have the pool, image,
2585  * and snapshot ids but not the names of any of them.  This call
2586  * is made later to fill in those names.  It has to be done after
2587  * rbd_dev_snaps_update() has completed because some of the
2588  * information (in particular, snapshot name) is not available
2589  * until then.
2590  */
2591 static int rbd_dev_probe_update_spec(struct rbd_device *rbd_dev)
2592 {
2593 	struct ceph_osd_client *osdc;
2594 	const char *name;
2595 	void *reply_buf = NULL;
2596 	int ret;
2597 
2598 	if (rbd_dev->spec->pool_name)
2599 		return 0;	/* Already have the names */
2600 
2601 	/* Look up the pool name */
2602 
2603 	osdc = &rbd_dev->rbd_client->client->osdc;
2604 	name = ceph_pg_pool_name_by_id(osdc->osdmap, rbd_dev->spec->pool_id);
2605 	if (!name)
2606 		return -EIO;	/* pool id too large (>= 2^31) */
2607 
2608 	rbd_dev->spec->pool_name = kstrdup(name, GFP_KERNEL);
2609 	if (!rbd_dev->spec->pool_name)
2610 		return -ENOMEM;
2611 
2612 	/* Fetch the image name; tolerate failure here */
2613 
2614 	name = rbd_dev_image_name(rbd_dev);
2615 	if (name) {
2616 		rbd_dev->spec->image_name_len = strlen(name);
2617 		rbd_dev->spec->image_name = (char *) name;
2618 	} else {
2619 		pr_warning(RBD_DRV_NAME "%d "
2620 			"unable to get image name for image id %s\n",
2621 			rbd_dev->major, rbd_dev->spec->image_id);
2622 	}
2623 
2624 	/* Look up the snapshot name. */
2625 
2626 	name = rbd_snap_name(rbd_dev, rbd_dev->spec->snap_id);
2627 	if (!name) {
2628 		ret = -EIO;
2629 		goto out_err;
2630 	}
2631 	rbd_dev->spec->snap_name = kstrdup(name, GFP_KERNEL);
2632 	if(!rbd_dev->spec->snap_name)
2633 		goto out_err;
2634 
2635 	return 0;
2636 out_err:
2637 	kfree(reply_buf);
2638 	kfree(rbd_dev->spec->pool_name);
2639 	rbd_dev->spec->pool_name = NULL;
2640 
2641 	return ret;
2642 }
2643 
2644 static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev, u64 *ver)
2645 {
2646 	size_t size;
2647 	int ret;
2648 	void *reply_buf;
2649 	void *p;
2650 	void *end;
2651 	u64 seq;
2652 	u32 snap_count;
2653 	struct ceph_snap_context *snapc;
2654 	u32 i;
2655 
2656 	/*
2657 	 * We'll need room for the seq value (maximum snapshot id),
2658 	 * snapshot count, and array of that many snapshot ids.
2659 	 * For now we have a fixed upper limit on the number we're
2660 	 * prepared to receive.
2661 	 */
2662 	size = sizeof (__le64) + sizeof (__le32) +
2663 			RBD_MAX_SNAP_COUNT * sizeof (__le64);
2664 	reply_buf = kzalloc(size, GFP_KERNEL);
2665 	if (!reply_buf)
2666 		return -ENOMEM;
2667 
2668 	ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2669 				"rbd", "get_snapcontext",
2670 				NULL, 0,
2671 				reply_buf, size,
2672 				CEPH_OSD_FLAG_READ, ver);
2673 	dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2674 	if (ret < 0)
2675 		goto out;
2676 
2677 	ret = -ERANGE;
2678 	p = reply_buf;
2679 	end = (char *) reply_buf + size;
2680 	ceph_decode_64_safe(&p, end, seq, out);
2681 	ceph_decode_32_safe(&p, end, snap_count, out);
2682 
2683 	/*
2684 	 * Make sure the reported number of snapshot ids wouldn't go
2685 	 * beyond the end of our buffer.  But before checking that,
2686 	 * make sure the computed size of the snapshot context we
2687 	 * allocate is representable in a size_t.
2688 	 */
2689 	if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
2690 				 / sizeof (u64)) {
2691 		ret = -EINVAL;
2692 		goto out;
2693 	}
2694 	if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
2695 		goto out;
2696 
2697 	size = sizeof (struct ceph_snap_context) +
2698 				snap_count * sizeof (snapc->snaps[0]);
2699 	snapc = kmalloc(size, GFP_KERNEL);
2700 	if (!snapc) {
2701 		ret = -ENOMEM;
2702 		goto out;
2703 	}
2704 
2705 	atomic_set(&snapc->nref, 1);
2706 	snapc->seq = seq;
2707 	snapc->num_snaps = snap_count;
2708 	for (i = 0; i < snap_count; i++)
2709 		snapc->snaps[i] = ceph_decode_64(&p);
2710 
2711 	rbd_dev->header.snapc = snapc;
2712 
2713 	dout("  snap context seq = %llu, snap_count = %u\n",
2714 		(unsigned long long) seq, (unsigned int) snap_count);
2715 
2716 out:
2717 	kfree(reply_buf);
2718 
2719 	return 0;
2720 }
2721 
2722 static char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev, u32 which)
2723 {
2724 	size_t size;
2725 	void *reply_buf;
2726 	__le64 snap_id;
2727 	int ret;
2728 	void *p;
2729 	void *end;
2730 	char *snap_name;
2731 
2732 	size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
2733 	reply_buf = kmalloc(size, GFP_KERNEL);
2734 	if (!reply_buf)
2735 		return ERR_PTR(-ENOMEM);
2736 
2737 	snap_id = cpu_to_le64(rbd_dev->header.snapc->snaps[which]);
2738 	ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2739 				"rbd", "get_snapshot_name",
2740 				(char *) &snap_id, sizeof (snap_id),
2741 				reply_buf, size,
2742 				CEPH_OSD_FLAG_READ, NULL);
2743 	dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2744 	if (ret < 0)
2745 		goto out;
2746 
2747 	p = reply_buf;
2748 	end = (char *) reply_buf + size;
2749 	snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
2750 	if (IS_ERR(snap_name)) {
2751 		ret = PTR_ERR(snap_name);
2752 		goto out;
2753 	} else {
2754 		dout("  snap_id 0x%016llx snap_name = %s\n",
2755 			(unsigned long long) le64_to_cpu(snap_id), snap_name);
2756 	}
2757 	kfree(reply_buf);
2758 
2759 	return snap_name;
2760 out:
2761 	kfree(reply_buf);
2762 
2763 	return ERR_PTR(ret);
2764 }
2765 
2766 static char *rbd_dev_v2_snap_info(struct rbd_device *rbd_dev, u32 which,
2767 		u64 *snap_size, u64 *snap_features)
2768 {
2769 	__le64 snap_id;
2770 	u8 order;
2771 	int ret;
2772 
2773 	snap_id = rbd_dev->header.snapc->snaps[which];
2774 	ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, &order, snap_size);
2775 	if (ret)
2776 		return ERR_PTR(ret);
2777 	ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, snap_features);
2778 	if (ret)
2779 		return ERR_PTR(ret);
2780 
2781 	return rbd_dev_v2_snap_name(rbd_dev, which);
2782 }
2783 
2784 static char *rbd_dev_snap_info(struct rbd_device *rbd_dev, u32 which,
2785 		u64 *snap_size, u64 *snap_features)
2786 {
2787 	if (rbd_dev->image_format == 1)
2788 		return rbd_dev_v1_snap_info(rbd_dev, which,
2789 					snap_size, snap_features);
2790 	if (rbd_dev->image_format == 2)
2791 		return rbd_dev_v2_snap_info(rbd_dev, which,
2792 					snap_size, snap_features);
2793 	return ERR_PTR(-EINVAL);
2794 }
2795 
2796 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver)
2797 {
2798 	int ret;
2799 	__u8 obj_order;
2800 
2801 	down_write(&rbd_dev->header_rwsem);
2802 
2803 	/* Grab old order first, to see if it changes */
2804 
2805 	obj_order = rbd_dev->header.obj_order,
2806 	ret = rbd_dev_v2_image_size(rbd_dev);
2807 	if (ret)
2808 		goto out;
2809 	if (rbd_dev->header.obj_order != obj_order) {
2810 		ret = -EIO;
2811 		goto out;
2812 	}
2813 	rbd_update_mapping_size(rbd_dev);
2814 
2815 	ret = rbd_dev_v2_snap_context(rbd_dev, hver);
2816 	dout("rbd_dev_v2_snap_context returned %d\n", ret);
2817 	if (ret)
2818 		goto out;
2819 	ret = rbd_dev_snaps_update(rbd_dev);
2820 	dout("rbd_dev_snaps_update returned %d\n", ret);
2821 	if (ret)
2822 		goto out;
2823 	ret = rbd_dev_snaps_register(rbd_dev);
2824 	dout("rbd_dev_snaps_register returned %d\n", ret);
2825 out:
2826 	up_write(&rbd_dev->header_rwsem);
2827 
2828 	return ret;
2829 }
2830 
2831 /*
2832  * Scan the rbd device's current snapshot list and compare it to the
2833  * newly-received snapshot context.  Remove any existing snapshots
2834  * not present in the new snapshot context.  Add a new snapshot for
2835  * any snaphots in the snapshot context not in the current list.
2836  * And verify there are no changes to snapshots we already know
2837  * about.
2838  *
2839  * Assumes the snapshots in the snapshot context are sorted by
2840  * snapshot id, highest id first.  (Snapshots in the rbd_dev's list
2841  * are also maintained in that order.)
2842  */
2843 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev)
2844 {
2845 	struct ceph_snap_context *snapc = rbd_dev->header.snapc;
2846 	const u32 snap_count = snapc->num_snaps;
2847 	struct list_head *head = &rbd_dev->snaps;
2848 	struct list_head *links = head->next;
2849 	u32 index = 0;
2850 
2851 	dout("%s: snap count is %u\n", __func__, (unsigned int) snap_count);
2852 	while (index < snap_count || links != head) {
2853 		u64 snap_id;
2854 		struct rbd_snap *snap;
2855 		char *snap_name;
2856 		u64 snap_size = 0;
2857 		u64 snap_features = 0;
2858 
2859 		snap_id = index < snap_count ? snapc->snaps[index]
2860 					     : CEPH_NOSNAP;
2861 		snap = links != head ? list_entry(links, struct rbd_snap, node)
2862 				     : NULL;
2863 		rbd_assert(!snap || snap->id != CEPH_NOSNAP);
2864 
2865 		if (snap_id == CEPH_NOSNAP || (snap && snap->id > snap_id)) {
2866 			struct list_head *next = links->next;
2867 
2868 			/* Existing snapshot not in the new snap context */
2869 
2870 			if (rbd_dev->spec->snap_id == snap->id)
2871 				rbd_dev->exists = false;
2872 			rbd_remove_snap_dev(snap);
2873 			dout("%ssnap id %llu has been removed\n",
2874 				rbd_dev->spec->snap_id == snap->id ?
2875 							"mapped " : "",
2876 				(unsigned long long) snap->id);
2877 
2878 			/* Done with this list entry; advance */
2879 
2880 			links = next;
2881 			continue;
2882 		}
2883 
2884 		snap_name = rbd_dev_snap_info(rbd_dev, index,
2885 					&snap_size, &snap_features);
2886 		if (IS_ERR(snap_name))
2887 			return PTR_ERR(snap_name);
2888 
2889 		dout("entry %u: snap_id = %llu\n", (unsigned int) snap_count,
2890 			(unsigned long long) snap_id);
2891 		if (!snap || (snap_id != CEPH_NOSNAP && snap->id < snap_id)) {
2892 			struct rbd_snap *new_snap;
2893 
2894 			/* We haven't seen this snapshot before */
2895 
2896 			new_snap = __rbd_add_snap_dev(rbd_dev, snap_name,
2897 					snap_id, snap_size, snap_features);
2898 			if (IS_ERR(new_snap)) {
2899 				int err = PTR_ERR(new_snap);
2900 
2901 				dout("  failed to add dev, error %d\n", err);
2902 
2903 				return err;
2904 			}
2905 
2906 			/* New goes before existing, or at end of list */
2907 
2908 			dout("  added dev%s\n", snap ? "" : " at end\n");
2909 			if (snap)
2910 				list_add_tail(&new_snap->node, &snap->node);
2911 			else
2912 				list_add_tail(&new_snap->node, head);
2913 		} else {
2914 			/* Already have this one */
2915 
2916 			dout("  already present\n");
2917 
2918 			rbd_assert(snap->size == snap_size);
2919 			rbd_assert(!strcmp(snap->name, snap_name));
2920 			rbd_assert(snap->features == snap_features);
2921 
2922 			/* Done with this list entry; advance */
2923 
2924 			links = links->next;
2925 		}
2926 
2927 		/* Advance to the next entry in the snapshot context */
2928 
2929 		index++;
2930 	}
2931 	dout("%s: done\n", __func__);
2932 
2933 	return 0;
2934 }
2935 
2936 /*
2937  * Scan the list of snapshots and register the devices for any that
2938  * have not already been registered.
2939  */
2940 static int rbd_dev_snaps_register(struct rbd_device *rbd_dev)
2941 {
2942 	struct rbd_snap *snap;
2943 	int ret = 0;
2944 
2945 	dout("%s called\n", __func__);
2946 	if (WARN_ON(!device_is_registered(&rbd_dev->dev)))
2947 		return -EIO;
2948 
2949 	list_for_each_entry(snap, &rbd_dev->snaps, node) {
2950 		if (!rbd_snap_registered(snap)) {
2951 			ret = rbd_register_snap_dev(snap, &rbd_dev->dev);
2952 			if (ret < 0)
2953 				break;
2954 		}
2955 	}
2956 	dout("%s: returning %d\n", __func__, ret);
2957 
2958 	return ret;
2959 }
2960 
2961 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
2962 {
2963 	struct device *dev;
2964 	int ret;
2965 
2966 	mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2967 
2968 	dev = &rbd_dev->dev;
2969 	dev->bus = &rbd_bus_type;
2970 	dev->type = &rbd_device_type;
2971 	dev->parent = &rbd_root_dev;
2972 	dev->release = rbd_dev_release;
2973 	dev_set_name(dev, "%d", rbd_dev->dev_id);
2974 	ret = device_register(dev);
2975 
2976 	mutex_unlock(&ctl_mutex);
2977 
2978 	return ret;
2979 }
2980 
2981 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
2982 {
2983 	device_unregister(&rbd_dev->dev);
2984 }
2985 
2986 static int rbd_init_watch_dev(struct rbd_device *rbd_dev)
2987 {
2988 	int ret, rc;
2989 
2990 	do {
2991 		ret = rbd_req_sync_watch(rbd_dev);
2992 		if (ret == -ERANGE) {
2993 			rc = rbd_dev_refresh(rbd_dev, NULL);
2994 			if (rc < 0)
2995 				return rc;
2996 		}
2997 	} while (ret == -ERANGE);
2998 
2999 	return ret;
3000 }
3001 
3002 static atomic64_t rbd_dev_id_max = ATOMIC64_INIT(0);
3003 
3004 /*
3005  * Get a unique rbd identifier for the given new rbd_dev, and add
3006  * the rbd_dev to the global list.  The minimum rbd id is 1.
3007  */
3008 static void rbd_dev_id_get(struct rbd_device *rbd_dev)
3009 {
3010 	rbd_dev->dev_id = atomic64_inc_return(&rbd_dev_id_max);
3011 
3012 	spin_lock(&rbd_dev_list_lock);
3013 	list_add_tail(&rbd_dev->node, &rbd_dev_list);
3014 	spin_unlock(&rbd_dev_list_lock);
3015 	dout("rbd_dev %p given dev id %llu\n", rbd_dev,
3016 		(unsigned long long) rbd_dev->dev_id);
3017 }
3018 
3019 /*
3020  * Remove an rbd_dev from the global list, and record that its
3021  * identifier is no longer in use.
3022  */
3023 static void rbd_dev_id_put(struct rbd_device *rbd_dev)
3024 {
3025 	struct list_head *tmp;
3026 	int rbd_id = rbd_dev->dev_id;
3027 	int max_id;
3028 
3029 	rbd_assert(rbd_id > 0);
3030 
3031 	dout("rbd_dev %p released dev id %llu\n", rbd_dev,
3032 		(unsigned long long) rbd_dev->dev_id);
3033 	spin_lock(&rbd_dev_list_lock);
3034 	list_del_init(&rbd_dev->node);
3035 
3036 	/*
3037 	 * If the id being "put" is not the current maximum, there
3038 	 * is nothing special we need to do.
3039 	 */
3040 	if (rbd_id != atomic64_read(&rbd_dev_id_max)) {
3041 		spin_unlock(&rbd_dev_list_lock);
3042 		return;
3043 	}
3044 
3045 	/*
3046 	 * We need to update the current maximum id.  Search the
3047 	 * list to find out what it is.  We're more likely to find
3048 	 * the maximum at the end, so search the list backward.
3049 	 */
3050 	max_id = 0;
3051 	list_for_each_prev(tmp, &rbd_dev_list) {
3052 		struct rbd_device *rbd_dev;
3053 
3054 		rbd_dev = list_entry(tmp, struct rbd_device, node);
3055 		if (rbd_dev->dev_id > max_id)
3056 			max_id = rbd_dev->dev_id;
3057 	}
3058 	spin_unlock(&rbd_dev_list_lock);
3059 
3060 	/*
3061 	 * The max id could have been updated by rbd_dev_id_get(), in
3062 	 * which case it now accurately reflects the new maximum.
3063 	 * Be careful not to overwrite the maximum value in that
3064 	 * case.
3065 	 */
3066 	atomic64_cmpxchg(&rbd_dev_id_max, rbd_id, max_id);
3067 	dout("  max dev id has been reset\n");
3068 }
3069 
3070 /*
3071  * Skips over white space at *buf, and updates *buf to point to the
3072  * first found non-space character (if any). Returns the length of
3073  * the token (string of non-white space characters) found.  Note
3074  * that *buf must be terminated with '\0'.
3075  */
3076 static inline size_t next_token(const char **buf)
3077 {
3078         /*
3079         * These are the characters that produce nonzero for
3080         * isspace() in the "C" and "POSIX" locales.
3081         */
3082         const char *spaces = " \f\n\r\t\v";
3083 
3084         *buf += strspn(*buf, spaces);	/* Find start of token */
3085 
3086 	return strcspn(*buf, spaces);   /* Return token length */
3087 }
3088 
3089 /*
3090  * Finds the next token in *buf, and if the provided token buffer is
3091  * big enough, copies the found token into it.  The result, if
3092  * copied, is guaranteed to be terminated with '\0'.  Note that *buf
3093  * must be terminated with '\0' on entry.
3094  *
3095  * Returns the length of the token found (not including the '\0').
3096  * Return value will be 0 if no token is found, and it will be >=
3097  * token_size if the token would not fit.
3098  *
3099  * The *buf pointer will be updated to point beyond the end of the
3100  * found token.  Note that this occurs even if the token buffer is
3101  * too small to hold it.
3102  */
3103 static inline size_t copy_token(const char **buf,
3104 				char *token,
3105 				size_t token_size)
3106 {
3107         size_t len;
3108 
3109 	len = next_token(buf);
3110 	if (len < token_size) {
3111 		memcpy(token, *buf, len);
3112 		*(token + len) = '\0';
3113 	}
3114 	*buf += len;
3115 
3116         return len;
3117 }
3118 
3119 /*
3120  * Finds the next token in *buf, dynamically allocates a buffer big
3121  * enough to hold a copy of it, and copies the token into the new
3122  * buffer.  The copy is guaranteed to be terminated with '\0'.  Note
3123  * that a duplicate buffer is created even for a zero-length token.
3124  *
3125  * Returns a pointer to the newly-allocated duplicate, or a null
3126  * pointer if memory for the duplicate was not available.  If
3127  * the lenp argument is a non-null pointer, the length of the token
3128  * (not including the '\0') is returned in *lenp.
3129  *
3130  * If successful, the *buf pointer will be updated to point beyond
3131  * the end of the found token.
3132  *
3133  * Note: uses GFP_KERNEL for allocation.
3134  */
3135 static inline char *dup_token(const char **buf, size_t *lenp)
3136 {
3137 	char *dup;
3138 	size_t len;
3139 
3140 	len = next_token(buf);
3141 	dup = kmalloc(len + 1, GFP_KERNEL);
3142 	if (!dup)
3143 		return NULL;
3144 
3145 	memcpy(dup, *buf, len);
3146 	*(dup + len) = '\0';
3147 	*buf += len;
3148 
3149 	if (lenp)
3150 		*lenp = len;
3151 
3152 	return dup;
3153 }
3154 
3155 /*
3156  * Parse the options provided for an "rbd add" (i.e., rbd image
3157  * mapping) request.  These arrive via a write to /sys/bus/rbd/add,
3158  * and the data written is passed here via a NUL-terminated buffer.
3159  * Returns 0 if successful or an error code otherwise.
3160  *
3161  * The information extracted from these options is recorded in
3162  * the other parameters which return dynamically-allocated
3163  * structures:
3164  *  ceph_opts
3165  *      The address of a pointer that will refer to a ceph options
3166  *      structure.  Caller must release the returned pointer using
3167  *      ceph_destroy_options() when it is no longer needed.
3168  *  rbd_opts
3169  *	Address of an rbd options pointer.  Fully initialized by
3170  *	this function; caller must release with kfree().
3171  *  spec
3172  *	Address of an rbd image specification pointer.  Fully
3173  *	initialized by this function based on parsed options.
3174  *	Caller must release with rbd_spec_put().
3175  *
3176  * The options passed take this form:
3177  *  <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
3178  * where:
3179  *  <mon_addrs>
3180  *      A comma-separated list of one or more monitor addresses.
3181  *      A monitor address is an ip address, optionally followed
3182  *      by a port number (separated by a colon).
3183  *        I.e.:  ip1[:port1][,ip2[:port2]...]
3184  *  <options>
3185  *      A comma-separated list of ceph and/or rbd options.
3186  *  <pool_name>
3187  *      The name of the rados pool containing the rbd image.
3188  *  <image_name>
3189  *      The name of the image in that pool to map.
3190  *  <snap_id>
3191  *      An optional snapshot id.  If provided, the mapping will
3192  *      present data from the image at the time that snapshot was
3193  *      created.  The image head is used if no snapshot id is
3194  *      provided.  Snapshot mappings are always read-only.
3195  */
3196 static int rbd_add_parse_args(const char *buf,
3197 				struct ceph_options **ceph_opts,
3198 				struct rbd_options **opts,
3199 				struct rbd_spec **rbd_spec)
3200 {
3201 	size_t len;
3202 	char *options;
3203 	const char *mon_addrs;
3204 	size_t mon_addrs_size;
3205 	struct rbd_spec *spec = NULL;
3206 	struct rbd_options *rbd_opts = NULL;
3207 	struct ceph_options *copts;
3208 	int ret;
3209 
3210 	/* The first four tokens are required */
3211 
3212 	len = next_token(&buf);
3213 	if (!len)
3214 		return -EINVAL;	/* Missing monitor address(es) */
3215 	mon_addrs = buf;
3216 	mon_addrs_size = len + 1;
3217 	buf += len;
3218 
3219 	ret = -EINVAL;
3220 	options = dup_token(&buf, NULL);
3221 	if (!options)
3222 		return -ENOMEM;
3223 	if (!*options)
3224 		goto out_err;	/* Missing options */
3225 
3226 	spec = rbd_spec_alloc();
3227 	if (!spec)
3228 		goto out_mem;
3229 
3230 	spec->pool_name = dup_token(&buf, NULL);
3231 	if (!spec->pool_name)
3232 		goto out_mem;
3233 	if (!*spec->pool_name)
3234 		goto out_err;	/* Missing pool name */
3235 
3236 	spec->image_name = dup_token(&buf, &spec->image_name_len);
3237 	if (!spec->image_name)
3238 		goto out_mem;
3239 	if (!*spec->image_name)
3240 		goto out_err;	/* Missing image name */
3241 
3242 	/*
3243 	 * Snapshot name is optional; default is to use "-"
3244 	 * (indicating the head/no snapshot).
3245 	 */
3246 	len = next_token(&buf);
3247 	if (!len) {
3248 		buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
3249 		len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
3250 	} else if (len > RBD_MAX_SNAP_NAME_LEN) {
3251 		ret = -ENAMETOOLONG;
3252 		goto out_err;
3253 	}
3254 	spec->snap_name = kmalloc(len + 1, GFP_KERNEL);
3255 	if (!spec->snap_name)
3256 		goto out_mem;
3257 	memcpy(spec->snap_name, buf, len);
3258 	*(spec->snap_name + len) = '\0';
3259 
3260 	/* Initialize all rbd options to the defaults */
3261 
3262 	rbd_opts = kzalloc(sizeof (*rbd_opts), GFP_KERNEL);
3263 	if (!rbd_opts)
3264 		goto out_mem;
3265 
3266 	rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
3267 
3268 	copts = ceph_parse_options(options, mon_addrs,
3269 					mon_addrs + mon_addrs_size - 1,
3270 					parse_rbd_opts_token, rbd_opts);
3271 	if (IS_ERR(copts)) {
3272 		ret = PTR_ERR(copts);
3273 		goto out_err;
3274 	}
3275 	kfree(options);
3276 
3277 	*ceph_opts = copts;
3278 	*opts = rbd_opts;
3279 	*rbd_spec = spec;
3280 
3281 	return 0;
3282 out_mem:
3283 	ret = -ENOMEM;
3284 out_err:
3285 	kfree(rbd_opts);
3286 	rbd_spec_put(spec);
3287 	kfree(options);
3288 
3289 	return ret;
3290 }
3291 
3292 /*
3293  * An rbd format 2 image has a unique identifier, distinct from the
3294  * name given to it by the user.  Internally, that identifier is
3295  * what's used to specify the names of objects related to the image.
3296  *
3297  * A special "rbd id" object is used to map an rbd image name to its
3298  * id.  If that object doesn't exist, then there is no v2 rbd image
3299  * with the supplied name.
3300  *
3301  * This function will record the given rbd_dev's image_id field if
3302  * it can be determined, and in that case will return 0.  If any
3303  * errors occur a negative errno will be returned and the rbd_dev's
3304  * image_id field will be unchanged (and should be NULL).
3305  */
3306 static int rbd_dev_image_id(struct rbd_device *rbd_dev)
3307 {
3308 	int ret;
3309 	size_t size;
3310 	char *object_name;
3311 	void *response;
3312 	void *p;
3313 
3314 	/*
3315 	 * When probing a parent image, the image id is already
3316 	 * known (and the image name likely is not).  There's no
3317 	 * need to fetch the image id again in this case.
3318 	 */
3319 	if (rbd_dev->spec->image_id)
3320 		return 0;
3321 
3322 	/*
3323 	 * First, see if the format 2 image id file exists, and if
3324 	 * so, get the image's persistent id from it.
3325 	 */
3326 	size = sizeof (RBD_ID_PREFIX) + rbd_dev->spec->image_name_len;
3327 	object_name = kmalloc(size, GFP_NOIO);
3328 	if (!object_name)
3329 		return -ENOMEM;
3330 	sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->spec->image_name);
3331 	dout("rbd id object name is %s\n", object_name);
3332 
3333 	/* Response will be an encoded string, which includes a length */
3334 
3335 	size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
3336 	response = kzalloc(size, GFP_NOIO);
3337 	if (!response) {
3338 		ret = -ENOMEM;
3339 		goto out;
3340 	}
3341 
3342 	ret = rbd_req_sync_exec(rbd_dev, object_name,
3343 				"rbd", "get_id",
3344 				NULL, 0,
3345 				response, RBD_IMAGE_ID_LEN_MAX,
3346 				CEPH_OSD_FLAG_READ, NULL);
3347 	dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
3348 	if (ret < 0)
3349 		goto out;
3350 	ret = 0;    /* rbd_req_sync_exec() can return positive */
3351 
3352 	p = response;
3353 	rbd_dev->spec->image_id = ceph_extract_encoded_string(&p,
3354 						p + RBD_IMAGE_ID_LEN_MAX,
3355 						&rbd_dev->spec->image_id_len,
3356 						GFP_NOIO);
3357 	if (IS_ERR(rbd_dev->spec->image_id)) {
3358 		ret = PTR_ERR(rbd_dev->spec->image_id);
3359 		rbd_dev->spec->image_id = NULL;
3360 	} else {
3361 		dout("image_id is %s\n", rbd_dev->spec->image_id);
3362 	}
3363 out:
3364 	kfree(response);
3365 	kfree(object_name);
3366 
3367 	return ret;
3368 }
3369 
3370 static int rbd_dev_v1_probe(struct rbd_device *rbd_dev)
3371 {
3372 	int ret;
3373 	size_t size;
3374 
3375 	/* Version 1 images have no id; empty string is used */
3376 
3377 	rbd_dev->spec->image_id = kstrdup("", GFP_KERNEL);
3378 	if (!rbd_dev->spec->image_id)
3379 		return -ENOMEM;
3380 	rbd_dev->spec->image_id_len = 0;
3381 
3382 	/* Record the header object name for this rbd image. */
3383 
3384 	size = rbd_dev->spec->image_name_len + sizeof (RBD_SUFFIX);
3385 	rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
3386 	if (!rbd_dev->header_name) {
3387 		ret = -ENOMEM;
3388 		goto out_err;
3389 	}
3390 	sprintf(rbd_dev->header_name, "%s%s",
3391 		rbd_dev->spec->image_name, RBD_SUFFIX);
3392 
3393 	/* Populate rbd image metadata */
3394 
3395 	ret = rbd_read_header(rbd_dev, &rbd_dev->header);
3396 	if (ret < 0)
3397 		goto out_err;
3398 
3399 	/* Version 1 images have no parent (no layering) */
3400 
3401 	rbd_dev->parent_spec = NULL;
3402 	rbd_dev->parent_overlap = 0;
3403 
3404 	rbd_dev->image_format = 1;
3405 
3406 	dout("discovered version 1 image, header name is %s\n",
3407 		rbd_dev->header_name);
3408 
3409 	return 0;
3410 
3411 out_err:
3412 	kfree(rbd_dev->header_name);
3413 	rbd_dev->header_name = NULL;
3414 	kfree(rbd_dev->spec->image_id);
3415 	rbd_dev->spec->image_id = NULL;
3416 
3417 	return ret;
3418 }
3419 
3420 static int rbd_dev_v2_probe(struct rbd_device *rbd_dev)
3421 {
3422 	size_t size;
3423 	int ret;
3424 	u64 ver = 0;
3425 
3426 	/*
3427 	 * Image id was filled in by the caller.  Record the header
3428 	 * object name for this rbd image.
3429 	 */
3430 	size = sizeof (RBD_HEADER_PREFIX) + rbd_dev->spec->image_id_len;
3431 	rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
3432 	if (!rbd_dev->header_name)
3433 		return -ENOMEM;
3434 	sprintf(rbd_dev->header_name, "%s%s",
3435 			RBD_HEADER_PREFIX, rbd_dev->spec->image_id);
3436 
3437 	/* Get the size and object order for the image */
3438 
3439 	ret = rbd_dev_v2_image_size(rbd_dev);
3440 	if (ret < 0)
3441 		goto out_err;
3442 
3443 	/* Get the object prefix (a.k.a. block_name) for the image */
3444 
3445 	ret = rbd_dev_v2_object_prefix(rbd_dev);
3446 	if (ret < 0)
3447 		goto out_err;
3448 
3449 	/* Get the and check features for the image */
3450 
3451 	ret = rbd_dev_v2_features(rbd_dev);
3452 	if (ret < 0)
3453 		goto out_err;
3454 
3455 	/* If the image supports layering, get the parent info */
3456 
3457 	if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
3458 		ret = rbd_dev_v2_parent_info(rbd_dev);
3459 		if (ret < 0)
3460 			goto out_err;
3461 	}
3462 
3463 	/* crypto and compression type aren't (yet) supported for v2 images */
3464 
3465 	rbd_dev->header.crypt_type = 0;
3466 	rbd_dev->header.comp_type = 0;
3467 
3468 	/* Get the snapshot context, plus the header version */
3469 
3470 	ret = rbd_dev_v2_snap_context(rbd_dev, &ver);
3471 	if (ret)
3472 		goto out_err;
3473 	rbd_dev->header.obj_version = ver;
3474 
3475 	rbd_dev->image_format = 2;
3476 
3477 	dout("discovered version 2 image, header name is %s\n",
3478 		rbd_dev->header_name);
3479 
3480 	return 0;
3481 out_err:
3482 	rbd_dev->parent_overlap = 0;
3483 	rbd_spec_put(rbd_dev->parent_spec);
3484 	rbd_dev->parent_spec = NULL;
3485 	kfree(rbd_dev->header_name);
3486 	rbd_dev->header_name = NULL;
3487 	kfree(rbd_dev->header.object_prefix);
3488 	rbd_dev->header.object_prefix = NULL;
3489 
3490 	return ret;
3491 }
3492 
3493 static int rbd_dev_probe_finish(struct rbd_device *rbd_dev)
3494 {
3495 	int ret;
3496 
3497 	/* no need to lock here, as rbd_dev is not registered yet */
3498 	ret = rbd_dev_snaps_update(rbd_dev);
3499 	if (ret)
3500 		return ret;
3501 
3502 	ret = rbd_dev_probe_update_spec(rbd_dev);
3503 	if (ret)
3504 		goto err_out_snaps;
3505 
3506 	ret = rbd_dev_set_mapping(rbd_dev);
3507 	if (ret)
3508 		goto err_out_snaps;
3509 
3510 	/* generate unique id: find highest unique id, add one */
3511 	rbd_dev_id_get(rbd_dev);
3512 
3513 	/* Fill in the device name, now that we have its id. */
3514 	BUILD_BUG_ON(DEV_NAME_LEN
3515 			< sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
3516 	sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
3517 
3518 	/* Get our block major device number. */
3519 
3520 	ret = register_blkdev(0, rbd_dev->name);
3521 	if (ret < 0)
3522 		goto err_out_id;
3523 	rbd_dev->major = ret;
3524 
3525 	/* Set up the blkdev mapping. */
3526 
3527 	ret = rbd_init_disk(rbd_dev);
3528 	if (ret)
3529 		goto err_out_blkdev;
3530 
3531 	ret = rbd_bus_add_dev(rbd_dev);
3532 	if (ret)
3533 		goto err_out_disk;
3534 
3535 	/*
3536 	 * At this point cleanup in the event of an error is the job
3537 	 * of the sysfs code (initiated by rbd_bus_del_dev()).
3538 	 */
3539 	down_write(&rbd_dev->header_rwsem);
3540 	ret = rbd_dev_snaps_register(rbd_dev);
3541 	up_write(&rbd_dev->header_rwsem);
3542 	if (ret)
3543 		goto err_out_bus;
3544 
3545 	ret = rbd_init_watch_dev(rbd_dev);
3546 	if (ret)
3547 		goto err_out_bus;
3548 
3549 	/* Everything's ready.  Announce the disk to the world. */
3550 
3551 	add_disk(rbd_dev->disk);
3552 
3553 	pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
3554 		(unsigned long long) rbd_dev->mapping.size);
3555 
3556 	return ret;
3557 err_out_bus:
3558 	/* this will also clean up rest of rbd_dev stuff */
3559 
3560 	rbd_bus_del_dev(rbd_dev);
3561 
3562 	return ret;
3563 err_out_disk:
3564 	rbd_free_disk(rbd_dev);
3565 err_out_blkdev:
3566 	unregister_blkdev(rbd_dev->major, rbd_dev->name);
3567 err_out_id:
3568 	rbd_dev_id_put(rbd_dev);
3569 err_out_snaps:
3570 	rbd_remove_all_snaps(rbd_dev);
3571 
3572 	return ret;
3573 }
3574 
3575 /*
3576  * Probe for the existence of the header object for the given rbd
3577  * device.  For format 2 images this includes determining the image
3578  * id.
3579  */
3580 static int rbd_dev_probe(struct rbd_device *rbd_dev)
3581 {
3582 	int ret;
3583 
3584 	/*
3585 	 * Get the id from the image id object.  If it's not a
3586 	 * format 2 image, we'll get ENOENT back, and we'll assume
3587 	 * it's a format 1 image.
3588 	 */
3589 	ret = rbd_dev_image_id(rbd_dev);
3590 	if (ret)
3591 		ret = rbd_dev_v1_probe(rbd_dev);
3592 	else
3593 		ret = rbd_dev_v2_probe(rbd_dev);
3594 	if (ret) {
3595 		dout("probe failed, returning %d\n", ret);
3596 
3597 		return ret;
3598 	}
3599 
3600 	ret = rbd_dev_probe_finish(rbd_dev);
3601 	if (ret)
3602 		rbd_header_free(&rbd_dev->header);
3603 
3604 	return ret;
3605 }
3606 
3607 static ssize_t rbd_add(struct bus_type *bus,
3608 		       const char *buf,
3609 		       size_t count)
3610 {
3611 	struct rbd_device *rbd_dev = NULL;
3612 	struct ceph_options *ceph_opts = NULL;
3613 	struct rbd_options *rbd_opts = NULL;
3614 	struct rbd_spec *spec = NULL;
3615 	struct rbd_client *rbdc;
3616 	struct ceph_osd_client *osdc;
3617 	int rc = -ENOMEM;
3618 
3619 	if (!try_module_get(THIS_MODULE))
3620 		return -ENODEV;
3621 
3622 	/* parse add command */
3623 	rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
3624 	if (rc < 0)
3625 		goto err_out_module;
3626 
3627 	rbdc = rbd_get_client(ceph_opts);
3628 	if (IS_ERR(rbdc)) {
3629 		rc = PTR_ERR(rbdc);
3630 		goto err_out_args;
3631 	}
3632 	ceph_opts = NULL;	/* rbd_dev client now owns this */
3633 
3634 	/* pick the pool */
3635 	osdc = &rbdc->client->osdc;
3636 	rc = ceph_pg_poolid_by_name(osdc->osdmap, spec->pool_name);
3637 	if (rc < 0)
3638 		goto err_out_client;
3639 	spec->pool_id = (u64) rc;
3640 
3641 	rbd_dev = rbd_dev_create(rbdc, spec);
3642 	if (!rbd_dev)
3643 		goto err_out_client;
3644 	rbdc = NULL;		/* rbd_dev now owns this */
3645 	spec = NULL;		/* rbd_dev now owns this */
3646 
3647 	rbd_dev->mapping.read_only = rbd_opts->read_only;
3648 	kfree(rbd_opts);
3649 	rbd_opts = NULL;	/* done with this */
3650 
3651 	rc = rbd_dev_probe(rbd_dev);
3652 	if (rc < 0)
3653 		goto err_out_rbd_dev;
3654 
3655 	return count;
3656 err_out_rbd_dev:
3657 	rbd_dev_destroy(rbd_dev);
3658 err_out_client:
3659 	rbd_put_client(rbdc);
3660 err_out_args:
3661 	if (ceph_opts)
3662 		ceph_destroy_options(ceph_opts);
3663 	kfree(rbd_opts);
3664 	rbd_spec_put(spec);
3665 err_out_module:
3666 	module_put(THIS_MODULE);
3667 
3668 	dout("Error adding device %s\n", buf);
3669 
3670 	return (ssize_t) rc;
3671 }
3672 
3673 static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
3674 {
3675 	struct list_head *tmp;
3676 	struct rbd_device *rbd_dev;
3677 
3678 	spin_lock(&rbd_dev_list_lock);
3679 	list_for_each(tmp, &rbd_dev_list) {
3680 		rbd_dev = list_entry(tmp, struct rbd_device, node);
3681 		if (rbd_dev->dev_id == dev_id) {
3682 			spin_unlock(&rbd_dev_list_lock);
3683 			return rbd_dev;
3684 		}
3685 	}
3686 	spin_unlock(&rbd_dev_list_lock);
3687 	return NULL;
3688 }
3689 
3690 static void rbd_dev_release(struct device *dev)
3691 {
3692 	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3693 
3694 	if (rbd_dev->watch_request) {
3695 		struct ceph_client *client = rbd_dev->rbd_client->client;
3696 
3697 		ceph_osdc_unregister_linger_request(&client->osdc,
3698 						    rbd_dev->watch_request);
3699 	}
3700 	if (rbd_dev->watch_event)
3701 		rbd_req_sync_unwatch(rbd_dev);
3702 
3703 
3704 	/* clean up and free blkdev */
3705 	rbd_free_disk(rbd_dev);
3706 	unregister_blkdev(rbd_dev->major, rbd_dev->name);
3707 
3708 	/* release allocated disk header fields */
3709 	rbd_header_free(&rbd_dev->header);
3710 
3711 	/* done with the id, and with the rbd_dev */
3712 	rbd_dev_id_put(rbd_dev);
3713 	rbd_assert(rbd_dev->rbd_client != NULL);
3714 	rbd_dev_destroy(rbd_dev);
3715 
3716 	/* release module ref */
3717 	module_put(THIS_MODULE);
3718 }
3719 
3720 static ssize_t rbd_remove(struct bus_type *bus,
3721 			  const char *buf,
3722 			  size_t count)
3723 {
3724 	struct rbd_device *rbd_dev = NULL;
3725 	int target_id, rc;
3726 	unsigned long ul;
3727 	int ret = count;
3728 
3729 	rc = strict_strtoul(buf, 10, &ul);
3730 	if (rc)
3731 		return rc;
3732 
3733 	/* convert to int; abort if we lost anything in the conversion */
3734 	target_id = (int) ul;
3735 	if (target_id != ul)
3736 		return -EINVAL;
3737 
3738 	mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
3739 
3740 	rbd_dev = __rbd_get_dev(target_id);
3741 	if (!rbd_dev) {
3742 		ret = -ENOENT;
3743 		goto done;
3744 	}
3745 
3746 	if (rbd_dev->open_count) {
3747 		ret = -EBUSY;
3748 		goto done;
3749 	}
3750 
3751 	rbd_remove_all_snaps(rbd_dev);
3752 	rbd_bus_del_dev(rbd_dev);
3753 
3754 done:
3755 	mutex_unlock(&ctl_mutex);
3756 
3757 	return ret;
3758 }
3759 
3760 /*
3761  * create control files in sysfs
3762  * /sys/bus/rbd/...
3763  */
3764 static int rbd_sysfs_init(void)
3765 {
3766 	int ret;
3767 
3768 	ret = device_register(&rbd_root_dev);
3769 	if (ret < 0)
3770 		return ret;
3771 
3772 	ret = bus_register(&rbd_bus_type);
3773 	if (ret < 0)
3774 		device_unregister(&rbd_root_dev);
3775 
3776 	return ret;
3777 }
3778 
3779 static void rbd_sysfs_cleanup(void)
3780 {
3781 	bus_unregister(&rbd_bus_type);
3782 	device_unregister(&rbd_root_dev);
3783 }
3784 
3785 int __init rbd_init(void)
3786 {
3787 	int rc;
3788 
3789 	rc = rbd_sysfs_init();
3790 	if (rc)
3791 		return rc;
3792 	pr_info("loaded " RBD_DRV_NAME_LONG "\n");
3793 	return 0;
3794 }
3795 
3796 void __exit rbd_exit(void)
3797 {
3798 	rbd_sysfs_cleanup();
3799 }
3800 
3801 module_init(rbd_init);
3802 module_exit(rbd_exit);
3803 
3804 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
3805 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
3806 MODULE_DESCRIPTION("rados block device");
3807 
3808 /* following authorship retained from original osdblk.c */
3809 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
3810 
3811 MODULE_LICENSE("GPL");
3812