xref: /openbmc/linux/drivers/block/rbd.c (revision cf028200)
1 /*
2    rbd.c -- Export ceph rados objects as a Linux block device
3 
4 
5    based on drivers/block/osdblk.c:
6 
7    Copyright 2009 Red Hat, Inc.
8 
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation.
12 
13    This program is distributed in the hope that it will be useful,
14    but WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16    GNU General Public License for more details.
17 
18    You should have received a copy of the GNU General Public License
19    along with this program; see the file COPYING.  If not, write to
20    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
21 
22 
23 
24    For usage instructions, please refer to:
25 
26                  Documentation/ABI/testing/sysfs-bus-rbd
27 
28  */
29 
30 #include <linux/ceph/libceph.h>
31 #include <linux/ceph/osd_client.h>
32 #include <linux/ceph/mon_client.h>
33 #include <linux/ceph/decode.h>
34 #include <linux/parser.h>
35 
36 #include <linux/kernel.h>
37 #include <linux/device.h>
38 #include <linux/module.h>
39 #include <linux/fs.h>
40 #include <linux/blkdev.h>
41 
42 #include "rbd_types.h"
43 
44 #define RBD_DEBUG	/* Activate rbd_assert() calls */
45 
46 /*
47  * The basic unit of block I/O is a sector.  It is interpreted in a
48  * number of contexts in Linux (blk, bio, genhd), but the default is
49  * universally 512 bytes.  These symbols are just slightly more
50  * meaningful than the bare numbers they represent.
51  */
52 #define	SECTOR_SHIFT	9
53 #define	SECTOR_SIZE	(1ULL << SECTOR_SHIFT)
54 
55 /* It might be useful to have this defined elsewhere too */
56 
57 #define	U64_MAX	((u64) (~0ULL))
58 
59 #define RBD_DRV_NAME "rbd"
60 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
61 
62 #define RBD_MINORS_PER_MAJOR	256		/* max minors per blkdev */
63 
64 #define RBD_MAX_SNAP_NAME_LEN	32
65 #define RBD_MAX_SNAP_COUNT	510	/* allows max snapc to fit in 4KB */
66 #define RBD_MAX_OPT_LEN		1024
67 
68 #define RBD_SNAP_HEAD_NAME	"-"
69 
70 #define RBD_IMAGE_ID_LEN_MAX	64
71 #define RBD_OBJ_PREFIX_LEN_MAX	64
72 
73 /*
74  * An RBD device name will be "rbd#", where the "rbd" comes from
75  * RBD_DRV_NAME above, and # is a unique integer identifier.
76  * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
77  * enough to hold all possible device names.
78  */
79 #define DEV_NAME_LEN		32
80 #define MAX_INT_FORMAT_WIDTH	((5 * sizeof (int)) / 2 + 1)
81 
82 #define RBD_READ_ONLY_DEFAULT		false
83 
84 /*
85  * block device image metadata (in-memory version)
86  */
87 struct rbd_image_header {
88 	/* These four fields never change for a given rbd image */
89 	char *object_prefix;
90 	u64 features;
91 	__u8 obj_order;
92 	__u8 crypt_type;
93 	__u8 comp_type;
94 
95 	/* The remaining fields need to be updated occasionally */
96 	u64 image_size;
97 	struct ceph_snap_context *snapc;
98 	char *snap_names;
99 	u64 *snap_sizes;
100 
101 	u64 obj_version;
102 };
103 
104 struct rbd_options {
105 	bool	read_only;
106 };
107 
108 /*
109  * an instance of the client.  multiple devices may share an rbd client.
110  */
111 struct rbd_client {
112 	struct ceph_client	*client;
113 	struct kref		kref;
114 	struct list_head	node;
115 };
116 
117 /*
118  * a request completion status
119  */
120 struct rbd_req_status {
121 	int done;
122 	int rc;
123 	u64 bytes;
124 };
125 
126 /*
127  * a collection of requests
128  */
129 struct rbd_req_coll {
130 	int			total;
131 	int			num_done;
132 	struct kref		kref;
133 	struct rbd_req_status	status[0];
134 };
135 
136 /*
137  * a single io request
138  */
139 struct rbd_request {
140 	struct request		*rq;		/* blk layer request */
141 	struct bio		*bio;		/* cloned bio */
142 	struct page		**pages;	/* list of used pages */
143 	u64			len;
144 	int			coll_index;
145 	struct rbd_req_coll	*coll;
146 };
147 
148 struct rbd_snap {
149 	struct	device		dev;
150 	const char		*name;
151 	u64			size;
152 	struct list_head	node;
153 	u64			id;
154 	u64			features;
155 };
156 
157 struct rbd_mapping {
158 	char                    *snap_name;
159 	u64                     snap_id;
160 	u64                     size;
161 	u64                     features;
162 	bool                    snap_exists;
163 	bool			read_only;
164 };
165 
166 /*
167  * a single device
168  */
169 struct rbd_device {
170 	int			dev_id;		/* blkdev unique id */
171 
172 	int			major;		/* blkdev assigned major */
173 	struct gendisk		*disk;		/* blkdev's gendisk and rq */
174 
175 	u32			image_format;	/* Either 1 or 2 */
176 	struct rbd_options	rbd_opts;
177 	struct rbd_client	*rbd_client;
178 
179 	char			name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
180 
181 	spinlock_t		lock;		/* queue lock */
182 
183 	struct rbd_image_header	header;
184 	char			*image_id;
185 	size_t			image_id_len;
186 	char			*image_name;
187 	size_t			image_name_len;
188 	char			*header_name;
189 	char			*pool_name;
190 	int			pool_id;
191 
192 	struct ceph_osd_event   *watch_event;
193 	struct ceph_osd_request *watch_request;
194 
195 	/* protects updating the header */
196 	struct rw_semaphore     header_rwsem;
197 
198 	struct rbd_mapping	mapping;
199 
200 	struct list_head	node;
201 
202 	/* list of snapshots */
203 	struct list_head	snaps;
204 
205 	/* sysfs related */
206 	struct device		dev;
207 };
208 
209 static DEFINE_MUTEX(ctl_mutex);	  /* Serialize open/close/setup/teardown */
210 
211 static LIST_HEAD(rbd_dev_list);    /* devices */
212 static DEFINE_SPINLOCK(rbd_dev_list_lock);
213 
214 static LIST_HEAD(rbd_client_list);		/* clients */
215 static DEFINE_SPINLOCK(rbd_client_list_lock);
216 
217 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev);
218 static int rbd_dev_snaps_register(struct rbd_device *rbd_dev);
219 
220 static void rbd_dev_release(struct device *dev);
221 static void __rbd_remove_snap_dev(struct rbd_snap *snap);
222 
223 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
224 		       size_t count);
225 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
226 			  size_t count);
227 
228 static struct bus_attribute rbd_bus_attrs[] = {
229 	__ATTR(add, S_IWUSR, NULL, rbd_add),
230 	__ATTR(remove, S_IWUSR, NULL, rbd_remove),
231 	__ATTR_NULL
232 };
233 
234 static struct bus_type rbd_bus_type = {
235 	.name		= "rbd",
236 	.bus_attrs	= rbd_bus_attrs,
237 };
238 
239 static void rbd_root_dev_release(struct device *dev)
240 {
241 }
242 
243 static struct device rbd_root_dev = {
244 	.init_name =    "rbd",
245 	.release =      rbd_root_dev_release,
246 };
247 
248 #ifdef RBD_DEBUG
249 #define rbd_assert(expr)						\
250 		if (unlikely(!(expr))) {				\
251 			printk(KERN_ERR "\nAssertion failure in %s() "	\
252 						"at line %d:\n\n"	\
253 					"\trbd_assert(%s);\n\n",	\
254 					__func__, __LINE__, #expr);	\
255 			BUG();						\
256 		}
257 #else /* !RBD_DEBUG */
258 #  define rbd_assert(expr)	((void) 0)
259 #endif /* !RBD_DEBUG */
260 
261 static struct device *rbd_get_dev(struct rbd_device *rbd_dev)
262 {
263 	return get_device(&rbd_dev->dev);
264 }
265 
266 static void rbd_put_dev(struct rbd_device *rbd_dev)
267 {
268 	put_device(&rbd_dev->dev);
269 }
270 
271 static int rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver);
272 
273 static int rbd_open(struct block_device *bdev, fmode_t mode)
274 {
275 	struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
276 
277 	if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
278 		return -EROFS;
279 
280 	rbd_get_dev(rbd_dev);
281 	set_device_ro(bdev, rbd_dev->mapping.read_only);
282 
283 	return 0;
284 }
285 
286 static int rbd_release(struct gendisk *disk, fmode_t mode)
287 {
288 	struct rbd_device *rbd_dev = disk->private_data;
289 
290 	rbd_put_dev(rbd_dev);
291 
292 	return 0;
293 }
294 
295 static const struct block_device_operations rbd_bd_ops = {
296 	.owner			= THIS_MODULE,
297 	.open			= rbd_open,
298 	.release		= rbd_release,
299 };
300 
301 /*
302  * Initialize an rbd client instance.
303  * We own *ceph_opts.
304  */
305 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
306 {
307 	struct rbd_client *rbdc;
308 	int ret = -ENOMEM;
309 
310 	dout("rbd_client_create\n");
311 	rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
312 	if (!rbdc)
313 		goto out_opt;
314 
315 	kref_init(&rbdc->kref);
316 	INIT_LIST_HEAD(&rbdc->node);
317 
318 	mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
319 
320 	rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
321 	if (IS_ERR(rbdc->client))
322 		goto out_mutex;
323 	ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
324 
325 	ret = ceph_open_session(rbdc->client);
326 	if (ret < 0)
327 		goto out_err;
328 
329 	spin_lock(&rbd_client_list_lock);
330 	list_add_tail(&rbdc->node, &rbd_client_list);
331 	spin_unlock(&rbd_client_list_lock);
332 
333 	mutex_unlock(&ctl_mutex);
334 
335 	dout("rbd_client_create created %p\n", rbdc);
336 	return rbdc;
337 
338 out_err:
339 	ceph_destroy_client(rbdc->client);
340 out_mutex:
341 	mutex_unlock(&ctl_mutex);
342 	kfree(rbdc);
343 out_opt:
344 	if (ceph_opts)
345 		ceph_destroy_options(ceph_opts);
346 	return ERR_PTR(ret);
347 }
348 
349 /*
350  * Find a ceph client with specific addr and configuration.  If
351  * found, bump its reference count.
352  */
353 static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
354 {
355 	struct rbd_client *client_node;
356 	bool found = false;
357 
358 	if (ceph_opts->flags & CEPH_OPT_NOSHARE)
359 		return NULL;
360 
361 	spin_lock(&rbd_client_list_lock);
362 	list_for_each_entry(client_node, &rbd_client_list, node) {
363 		if (!ceph_compare_options(ceph_opts, client_node->client)) {
364 			kref_get(&client_node->kref);
365 			found = true;
366 			break;
367 		}
368 	}
369 	spin_unlock(&rbd_client_list_lock);
370 
371 	return found ? client_node : NULL;
372 }
373 
374 /*
375  * mount options
376  */
377 enum {
378 	Opt_last_int,
379 	/* int args above */
380 	Opt_last_string,
381 	/* string args above */
382 	Opt_read_only,
383 	Opt_read_write,
384 	/* Boolean args above */
385 	Opt_last_bool,
386 };
387 
388 static match_table_t rbd_opts_tokens = {
389 	/* int args above */
390 	/* string args above */
391 	{Opt_read_only, "mapping.read_only"},
392 	{Opt_read_only, "ro"},		/* Alternate spelling */
393 	{Opt_read_write, "read_write"},
394 	{Opt_read_write, "rw"},		/* Alternate spelling */
395 	/* Boolean args above */
396 	{-1, NULL}
397 };
398 
399 static int parse_rbd_opts_token(char *c, void *private)
400 {
401 	struct rbd_options *rbd_opts = private;
402 	substring_t argstr[MAX_OPT_ARGS];
403 	int token, intval, ret;
404 
405 	token = match_token(c, rbd_opts_tokens, argstr);
406 	if (token < 0)
407 		return -EINVAL;
408 
409 	if (token < Opt_last_int) {
410 		ret = match_int(&argstr[0], &intval);
411 		if (ret < 0) {
412 			pr_err("bad mount option arg (not int) "
413 			       "at '%s'\n", c);
414 			return ret;
415 		}
416 		dout("got int token %d val %d\n", token, intval);
417 	} else if (token > Opt_last_int && token < Opt_last_string) {
418 		dout("got string token %d val %s\n", token,
419 		     argstr[0].from);
420 	} else if (token > Opt_last_string && token < Opt_last_bool) {
421 		dout("got Boolean token %d\n", token);
422 	} else {
423 		dout("got token %d\n", token);
424 	}
425 
426 	switch (token) {
427 	case Opt_read_only:
428 		rbd_opts->read_only = true;
429 		break;
430 	case Opt_read_write:
431 		rbd_opts->read_only = false;
432 		break;
433 	default:
434 		rbd_assert(false);
435 		break;
436 	}
437 	return 0;
438 }
439 
440 /*
441  * Get a ceph client with specific addr and configuration, if one does
442  * not exist create it.
443  */
444 static int rbd_get_client(struct rbd_device *rbd_dev, const char *mon_addr,
445 				size_t mon_addr_len, char *options)
446 {
447 	struct rbd_options *rbd_opts = &rbd_dev->rbd_opts;
448 	struct ceph_options *ceph_opts;
449 	struct rbd_client *rbdc;
450 
451 	rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
452 
453 	ceph_opts = ceph_parse_options(options, mon_addr,
454 					mon_addr + mon_addr_len,
455 					parse_rbd_opts_token, rbd_opts);
456 	if (IS_ERR(ceph_opts))
457 		return PTR_ERR(ceph_opts);
458 
459 	rbdc = rbd_client_find(ceph_opts);
460 	if (rbdc) {
461 		/* using an existing client */
462 		ceph_destroy_options(ceph_opts);
463 	} else {
464 		rbdc = rbd_client_create(ceph_opts);
465 		if (IS_ERR(rbdc))
466 			return PTR_ERR(rbdc);
467 	}
468 	rbd_dev->rbd_client = rbdc;
469 
470 	return 0;
471 }
472 
473 /*
474  * Destroy ceph client
475  *
476  * Caller must hold rbd_client_list_lock.
477  */
478 static void rbd_client_release(struct kref *kref)
479 {
480 	struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
481 
482 	dout("rbd_release_client %p\n", rbdc);
483 	spin_lock(&rbd_client_list_lock);
484 	list_del(&rbdc->node);
485 	spin_unlock(&rbd_client_list_lock);
486 
487 	ceph_destroy_client(rbdc->client);
488 	kfree(rbdc);
489 }
490 
491 /*
492  * Drop reference to ceph client node. If it's not referenced anymore, release
493  * it.
494  */
495 static void rbd_put_client(struct rbd_device *rbd_dev)
496 {
497 	kref_put(&rbd_dev->rbd_client->kref, rbd_client_release);
498 	rbd_dev->rbd_client = NULL;
499 }
500 
501 /*
502  * Destroy requests collection
503  */
504 static void rbd_coll_release(struct kref *kref)
505 {
506 	struct rbd_req_coll *coll =
507 		container_of(kref, struct rbd_req_coll, kref);
508 
509 	dout("rbd_coll_release %p\n", coll);
510 	kfree(coll);
511 }
512 
513 static bool rbd_image_format_valid(u32 image_format)
514 {
515 	return image_format == 1 || image_format == 2;
516 }
517 
518 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
519 {
520 	size_t size;
521 	u32 snap_count;
522 
523 	/* The header has to start with the magic rbd header text */
524 	if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
525 		return false;
526 
527 	/*
528 	 * The size of a snapshot header has to fit in a size_t, and
529 	 * that limits the number of snapshots.
530 	 */
531 	snap_count = le32_to_cpu(ondisk->snap_count);
532 	size = SIZE_MAX - sizeof (struct ceph_snap_context);
533 	if (snap_count > size / sizeof (__le64))
534 		return false;
535 
536 	/*
537 	 * Not only that, but the size of the entire the snapshot
538 	 * header must also be representable in a size_t.
539 	 */
540 	size -= snap_count * sizeof (__le64);
541 	if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
542 		return false;
543 
544 	return true;
545 }
546 
547 /*
548  * Create a new header structure, translate header format from the on-disk
549  * header.
550  */
551 static int rbd_header_from_disk(struct rbd_image_header *header,
552 				 struct rbd_image_header_ondisk *ondisk)
553 {
554 	u32 snap_count;
555 	size_t len;
556 	size_t size;
557 	u32 i;
558 
559 	memset(header, 0, sizeof (*header));
560 
561 	snap_count = le32_to_cpu(ondisk->snap_count);
562 
563 	len = strnlen(ondisk->object_prefix, sizeof (ondisk->object_prefix));
564 	header->object_prefix = kmalloc(len + 1, GFP_KERNEL);
565 	if (!header->object_prefix)
566 		return -ENOMEM;
567 	memcpy(header->object_prefix, ondisk->object_prefix, len);
568 	header->object_prefix[len] = '\0';
569 
570 	if (snap_count) {
571 		u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
572 
573 		/* Save a copy of the snapshot names */
574 
575 		if (snap_names_len > (u64) SIZE_MAX)
576 			return -EIO;
577 		header->snap_names = kmalloc(snap_names_len, GFP_KERNEL);
578 		if (!header->snap_names)
579 			goto out_err;
580 		/*
581 		 * Note that rbd_dev_v1_header_read() guarantees
582 		 * the ondisk buffer we're working with has
583 		 * snap_names_len bytes beyond the end of the
584 		 * snapshot id array, this memcpy() is safe.
585 		 */
586 		memcpy(header->snap_names, &ondisk->snaps[snap_count],
587 			snap_names_len);
588 
589 		/* Record each snapshot's size */
590 
591 		size = snap_count * sizeof (*header->snap_sizes);
592 		header->snap_sizes = kmalloc(size, GFP_KERNEL);
593 		if (!header->snap_sizes)
594 			goto out_err;
595 		for (i = 0; i < snap_count; i++)
596 			header->snap_sizes[i] =
597 				le64_to_cpu(ondisk->snaps[i].image_size);
598 	} else {
599 		WARN_ON(ondisk->snap_names_len);
600 		header->snap_names = NULL;
601 		header->snap_sizes = NULL;
602 	}
603 
604 	header->features = 0;	/* No features support in v1 images */
605 	header->obj_order = ondisk->options.order;
606 	header->crypt_type = ondisk->options.crypt_type;
607 	header->comp_type = ondisk->options.comp_type;
608 
609 	/* Allocate and fill in the snapshot context */
610 
611 	header->image_size = le64_to_cpu(ondisk->image_size);
612 	size = sizeof (struct ceph_snap_context);
613 	size += snap_count * sizeof (header->snapc->snaps[0]);
614 	header->snapc = kzalloc(size, GFP_KERNEL);
615 	if (!header->snapc)
616 		goto out_err;
617 
618 	atomic_set(&header->snapc->nref, 1);
619 	header->snapc->seq = le64_to_cpu(ondisk->snap_seq);
620 	header->snapc->num_snaps = snap_count;
621 	for (i = 0; i < snap_count; i++)
622 		header->snapc->snaps[i] =
623 			le64_to_cpu(ondisk->snaps[i].id);
624 
625 	return 0;
626 
627 out_err:
628 	kfree(header->snap_sizes);
629 	header->snap_sizes = NULL;
630 	kfree(header->snap_names);
631 	header->snap_names = NULL;
632 	kfree(header->object_prefix);
633 	header->object_prefix = NULL;
634 
635 	return -ENOMEM;
636 }
637 
638 static int snap_by_name(struct rbd_device *rbd_dev, const char *snap_name)
639 {
640 
641 	struct rbd_snap *snap;
642 
643 	list_for_each_entry(snap, &rbd_dev->snaps, node) {
644 		if (!strcmp(snap_name, snap->name)) {
645 			rbd_dev->mapping.snap_id = snap->id;
646 			rbd_dev->mapping.size = snap->size;
647 			rbd_dev->mapping.features = snap->features;
648 
649 			return 0;
650 		}
651 	}
652 
653 	return -ENOENT;
654 }
655 
656 static int rbd_dev_set_mapping(struct rbd_device *rbd_dev, char *snap_name)
657 {
658 	int ret;
659 
660 	if (!memcmp(snap_name, RBD_SNAP_HEAD_NAME,
661 		    sizeof (RBD_SNAP_HEAD_NAME))) {
662 		rbd_dev->mapping.snap_id = CEPH_NOSNAP;
663 		rbd_dev->mapping.size = rbd_dev->header.image_size;
664 		rbd_dev->mapping.features = rbd_dev->header.features;
665 		rbd_dev->mapping.snap_exists = false;
666 		rbd_dev->mapping.read_only = rbd_dev->rbd_opts.read_only;
667 		ret = 0;
668 	} else {
669 		ret = snap_by_name(rbd_dev, snap_name);
670 		if (ret < 0)
671 			goto done;
672 		rbd_dev->mapping.snap_exists = true;
673 		rbd_dev->mapping.read_only = true;
674 	}
675 	rbd_dev->mapping.snap_name = snap_name;
676 done:
677 	return ret;
678 }
679 
680 static void rbd_header_free(struct rbd_image_header *header)
681 {
682 	kfree(header->object_prefix);
683 	header->object_prefix = NULL;
684 	kfree(header->snap_sizes);
685 	header->snap_sizes = NULL;
686 	kfree(header->snap_names);
687 	header->snap_names = NULL;
688 	ceph_put_snap_context(header->snapc);
689 	header->snapc = NULL;
690 }
691 
692 static char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
693 {
694 	char *name;
695 	u64 segment;
696 	int ret;
697 
698 	name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO);
699 	if (!name)
700 		return NULL;
701 	segment = offset >> rbd_dev->header.obj_order;
702 	ret = snprintf(name, RBD_MAX_SEG_NAME_LEN, "%s.%012llx",
703 			rbd_dev->header.object_prefix, segment);
704 	if (ret < 0 || ret >= RBD_MAX_SEG_NAME_LEN) {
705 		pr_err("error formatting segment name for #%llu (%d)\n",
706 			segment, ret);
707 		kfree(name);
708 		name = NULL;
709 	}
710 
711 	return name;
712 }
713 
714 static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
715 {
716 	u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
717 
718 	return offset & (segment_size - 1);
719 }
720 
721 static u64 rbd_segment_length(struct rbd_device *rbd_dev,
722 				u64 offset, u64 length)
723 {
724 	u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
725 
726 	offset &= segment_size - 1;
727 
728 	rbd_assert(length <= U64_MAX - offset);
729 	if (offset + length > segment_size)
730 		length = segment_size - offset;
731 
732 	return length;
733 }
734 
735 static int rbd_get_num_segments(struct rbd_image_header *header,
736 				u64 ofs, u64 len)
737 {
738 	u64 start_seg;
739 	u64 end_seg;
740 
741 	if (!len)
742 		return 0;
743 	if (len - 1 > U64_MAX - ofs)
744 		return -ERANGE;
745 
746 	start_seg = ofs >> header->obj_order;
747 	end_seg = (ofs + len - 1) >> header->obj_order;
748 
749 	return end_seg - start_seg + 1;
750 }
751 
752 /*
753  * returns the size of an object in the image
754  */
755 static u64 rbd_obj_bytes(struct rbd_image_header *header)
756 {
757 	return 1 << header->obj_order;
758 }
759 
760 /*
761  * bio helpers
762  */
763 
764 static void bio_chain_put(struct bio *chain)
765 {
766 	struct bio *tmp;
767 
768 	while (chain) {
769 		tmp = chain;
770 		chain = chain->bi_next;
771 		bio_put(tmp);
772 	}
773 }
774 
775 /*
776  * zeros a bio chain, starting at specific offset
777  */
778 static void zero_bio_chain(struct bio *chain, int start_ofs)
779 {
780 	struct bio_vec *bv;
781 	unsigned long flags;
782 	void *buf;
783 	int i;
784 	int pos = 0;
785 
786 	while (chain) {
787 		bio_for_each_segment(bv, chain, i) {
788 			if (pos + bv->bv_len > start_ofs) {
789 				int remainder = max(start_ofs - pos, 0);
790 				buf = bvec_kmap_irq(bv, &flags);
791 				memset(buf + remainder, 0,
792 				       bv->bv_len - remainder);
793 				bvec_kunmap_irq(buf, &flags);
794 			}
795 			pos += bv->bv_len;
796 		}
797 
798 		chain = chain->bi_next;
799 	}
800 }
801 
802 /*
803  * bio_chain_clone - clone a chain of bios up to a certain length.
804  * might return a bio_pair that will need to be released.
805  */
806 static struct bio *bio_chain_clone(struct bio **old, struct bio **next,
807 				   struct bio_pair **bp,
808 				   int len, gfp_t gfpmask)
809 {
810 	struct bio *old_chain = *old;
811 	struct bio *new_chain = NULL;
812 	struct bio *tail;
813 	int total = 0;
814 
815 	if (*bp) {
816 		bio_pair_release(*bp);
817 		*bp = NULL;
818 	}
819 
820 	while (old_chain && (total < len)) {
821 		struct bio *tmp;
822 
823 		tmp = bio_kmalloc(gfpmask, old_chain->bi_max_vecs);
824 		if (!tmp)
825 			goto err_out;
826 		gfpmask &= ~__GFP_WAIT;	/* can't wait after the first */
827 
828 		if (total + old_chain->bi_size > len) {
829 			struct bio_pair *bp;
830 
831 			/*
832 			 * this split can only happen with a single paged bio,
833 			 * split_bio will BUG_ON if this is not the case
834 			 */
835 			dout("bio_chain_clone split! total=%d remaining=%d"
836 			     "bi_size=%u\n",
837 			     total, len - total, old_chain->bi_size);
838 
839 			/* split the bio. We'll release it either in the next
840 			   call, or it will have to be released outside */
841 			bp = bio_split(old_chain, (len - total) / SECTOR_SIZE);
842 			if (!bp)
843 				goto err_out;
844 
845 			__bio_clone(tmp, &bp->bio1);
846 
847 			*next = &bp->bio2;
848 		} else {
849 			__bio_clone(tmp, old_chain);
850 			*next = old_chain->bi_next;
851 		}
852 
853 		tmp->bi_bdev = NULL;
854 		tmp->bi_next = NULL;
855 		if (new_chain)
856 			tail->bi_next = tmp;
857 		else
858 			new_chain = tmp;
859 		tail = tmp;
860 		old_chain = old_chain->bi_next;
861 
862 		total += tmp->bi_size;
863 	}
864 
865 	rbd_assert(total == len);
866 
867 	*old = old_chain;
868 
869 	return new_chain;
870 
871 err_out:
872 	dout("bio_chain_clone with err\n");
873 	bio_chain_put(new_chain);
874 	return NULL;
875 }
876 
877 /*
878  * helpers for osd request op vectors.
879  */
880 static struct ceph_osd_req_op *rbd_create_rw_ops(int num_ops,
881 					int opcode, u32 payload_len)
882 {
883 	struct ceph_osd_req_op *ops;
884 
885 	ops = kzalloc(sizeof (*ops) * (num_ops + 1), GFP_NOIO);
886 	if (!ops)
887 		return NULL;
888 
889 	ops[0].op = opcode;
890 
891 	/*
892 	 * op extent offset and length will be set later on
893 	 * in calc_raw_layout()
894 	 */
895 	ops[0].payload_len = payload_len;
896 
897 	return ops;
898 }
899 
900 static void rbd_destroy_ops(struct ceph_osd_req_op *ops)
901 {
902 	kfree(ops);
903 }
904 
905 static void rbd_coll_end_req_index(struct request *rq,
906 				   struct rbd_req_coll *coll,
907 				   int index,
908 				   int ret, u64 len)
909 {
910 	struct request_queue *q;
911 	int min, max, i;
912 
913 	dout("rbd_coll_end_req_index %p index %d ret %d len %llu\n",
914 	     coll, index, ret, (unsigned long long) len);
915 
916 	if (!rq)
917 		return;
918 
919 	if (!coll) {
920 		blk_end_request(rq, ret, len);
921 		return;
922 	}
923 
924 	q = rq->q;
925 
926 	spin_lock_irq(q->queue_lock);
927 	coll->status[index].done = 1;
928 	coll->status[index].rc = ret;
929 	coll->status[index].bytes = len;
930 	max = min = coll->num_done;
931 	while (max < coll->total && coll->status[max].done)
932 		max++;
933 
934 	for (i = min; i<max; i++) {
935 		__blk_end_request(rq, coll->status[i].rc,
936 				  coll->status[i].bytes);
937 		coll->num_done++;
938 		kref_put(&coll->kref, rbd_coll_release);
939 	}
940 	spin_unlock_irq(q->queue_lock);
941 }
942 
943 static void rbd_coll_end_req(struct rbd_request *req,
944 			     int ret, u64 len)
945 {
946 	rbd_coll_end_req_index(req->rq, req->coll, req->coll_index, ret, len);
947 }
948 
949 /*
950  * Send ceph osd request
951  */
952 static int rbd_do_request(struct request *rq,
953 			  struct rbd_device *rbd_dev,
954 			  struct ceph_snap_context *snapc,
955 			  u64 snapid,
956 			  const char *object_name, u64 ofs, u64 len,
957 			  struct bio *bio,
958 			  struct page **pages,
959 			  int num_pages,
960 			  int flags,
961 			  struct ceph_osd_req_op *ops,
962 			  struct rbd_req_coll *coll,
963 			  int coll_index,
964 			  void (*rbd_cb)(struct ceph_osd_request *req,
965 					 struct ceph_msg *msg),
966 			  struct ceph_osd_request **linger_req,
967 			  u64 *ver)
968 {
969 	struct ceph_osd_request *req;
970 	struct ceph_file_layout *layout;
971 	int ret;
972 	u64 bno;
973 	struct timespec mtime = CURRENT_TIME;
974 	struct rbd_request *req_data;
975 	struct ceph_osd_request_head *reqhead;
976 	struct ceph_osd_client *osdc;
977 
978 	req_data = kzalloc(sizeof(*req_data), GFP_NOIO);
979 	if (!req_data) {
980 		if (coll)
981 			rbd_coll_end_req_index(rq, coll, coll_index,
982 					       -ENOMEM, len);
983 		return -ENOMEM;
984 	}
985 
986 	if (coll) {
987 		req_data->coll = coll;
988 		req_data->coll_index = coll_index;
989 	}
990 
991 	dout("rbd_do_request object_name=%s ofs=%llu len=%llu\n", object_name,
992 		(unsigned long long) ofs, (unsigned long long) len);
993 
994 	osdc = &rbd_dev->rbd_client->client->osdc;
995 	req = ceph_osdc_alloc_request(osdc, flags, snapc, ops,
996 					false, GFP_NOIO, pages, bio);
997 	if (!req) {
998 		ret = -ENOMEM;
999 		goto done_pages;
1000 	}
1001 
1002 	req->r_callback = rbd_cb;
1003 
1004 	req_data->rq = rq;
1005 	req_data->bio = bio;
1006 	req_data->pages = pages;
1007 	req_data->len = len;
1008 
1009 	req->r_priv = req_data;
1010 
1011 	reqhead = req->r_request->front.iov_base;
1012 	reqhead->snapid = cpu_to_le64(CEPH_NOSNAP);
1013 
1014 	strncpy(req->r_oid, object_name, sizeof(req->r_oid));
1015 	req->r_oid_len = strlen(req->r_oid);
1016 
1017 	layout = &req->r_file_layout;
1018 	memset(layout, 0, sizeof(*layout));
1019 	layout->fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
1020 	layout->fl_stripe_count = cpu_to_le32(1);
1021 	layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
1022 	layout->fl_pg_pool = cpu_to_le32(rbd_dev->pool_id);
1023 	ret = ceph_calc_raw_layout(osdc, layout, snapid, ofs, &len, &bno,
1024 				   req, ops);
1025 	rbd_assert(ret == 0);
1026 
1027 	ceph_osdc_build_request(req, ofs, &len,
1028 				ops,
1029 				snapc,
1030 				&mtime,
1031 				req->r_oid, req->r_oid_len);
1032 
1033 	if (linger_req) {
1034 		ceph_osdc_set_request_linger(osdc, req);
1035 		*linger_req = req;
1036 	}
1037 
1038 	ret = ceph_osdc_start_request(osdc, req, false);
1039 	if (ret < 0)
1040 		goto done_err;
1041 
1042 	if (!rbd_cb) {
1043 		ret = ceph_osdc_wait_request(osdc, req);
1044 		if (ver)
1045 			*ver = le64_to_cpu(req->r_reassert_version.version);
1046 		dout("reassert_ver=%llu\n",
1047 			(unsigned long long)
1048 				le64_to_cpu(req->r_reassert_version.version));
1049 		ceph_osdc_put_request(req);
1050 	}
1051 	return ret;
1052 
1053 done_err:
1054 	bio_chain_put(req_data->bio);
1055 	ceph_osdc_put_request(req);
1056 done_pages:
1057 	rbd_coll_end_req(req_data, ret, len);
1058 	kfree(req_data);
1059 	return ret;
1060 }
1061 
1062 /*
1063  * Ceph osd op callback
1064  */
1065 static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
1066 {
1067 	struct rbd_request *req_data = req->r_priv;
1068 	struct ceph_osd_reply_head *replyhead;
1069 	struct ceph_osd_op *op;
1070 	__s32 rc;
1071 	u64 bytes;
1072 	int read_op;
1073 
1074 	/* parse reply */
1075 	replyhead = msg->front.iov_base;
1076 	WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
1077 	op = (void *)(replyhead + 1);
1078 	rc = le32_to_cpu(replyhead->result);
1079 	bytes = le64_to_cpu(op->extent.length);
1080 	read_op = (le16_to_cpu(op->op) == CEPH_OSD_OP_READ);
1081 
1082 	dout("rbd_req_cb bytes=%llu readop=%d rc=%d\n",
1083 		(unsigned long long) bytes, read_op, (int) rc);
1084 
1085 	if (rc == -ENOENT && read_op) {
1086 		zero_bio_chain(req_data->bio, 0);
1087 		rc = 0;
1088 	} else if (rc == 0 && read_op && bytes < req_data->len) {
1089 		zero_bio_chain(req_data->bio, bytes);
1090 		bytes = req_data->len;
1091 	}
1092 
1093 	rbd_coll_end_req(req_data, rc, bytes);
1094 
1095 	if (req_data->bio)
1096 		bio_chain_put(req_data->bio);
1097 
1098 	ceph_osdc_put_request(req);
1099 	kfree(req_data);
1100 }
1101 
1102 static void rbd_simple_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
1103 {
1104 	ceph_osdc_put_request(req);
1105 }
1106 
1107 /*
1108  * Do a synchronous ceph osd operation
1109  */
1110 static int rbd_req_sync_op(struct rbd_device *rbd_dev,
1111 			   struct ceph_snap_context *snapc,
1112 			   u64 snapid,
1113 			   int flags,
1114 			   struct ceph_osd_req_op *ops,
1115 			   const char *object_name,
1116 			   u64 ofs, u64 inbound_size,
1117 			   char *inbound,
1118 			   struct ceph_osd_request **linger_req,
1119 			   u64 *ver)
1120 {
1121 	int ret;
1122 	struct page **pages;
1123 	int num_pages;
1124 
1125 	rbd_assert(ops != NULL);
1126 
1127 	num_pages = calc_pages_for(ofs, inbound_size);
1128 	pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
1129 	if (IS_ERR(pages))
1130 		return PTR_ERR(pages);
1131 
1132 	ret = rbd_do_request(NULL, rbd_dev, snapc, snapid,
1133 			  object_name, ofs, inbound_size, NULL,
1134 			  pages, num_pages,
1135 			  flags,
1136 			  ops,
1137 			  NULL, 0,
1138 			  NULL,
1139 			  linger_req, ver);
1140 	if (ret < 0)
1141 		goto done;
1142 
1143 	if ((flags & CEPH_OSD_FLAG_READ) && inbound)
1144 		ret = ceph_copy_from_page_vector(pages, inbound, ofs, ret);
1145 
1146 done:
1147 	ceph_release_page_vector(pages, num_pages);
1148 	return ret;
1149 }
1150 
1151 /*
1152  * Do an asynchronous ceph osd operation
1153  */
1154 static int rbd_do_op(struct request *rq,
1155 		     struct rbd_device *rbd_dev,
1156 		     struct ceph_snap_context *snapc,
1157 		     u64 snapid,
1158 		     int opcode, int flags,
1159 		     u64 ofs, u64 len,
1160 		     struct bio *bio,
1161 		     struct rbd_req_coll *coll,
1162 		     int coll_index)
1163 {
1164 	char *seg_name;
1165 	u64 seg_ofs;
1166 	u64 seg_len;
1167 	int ret;
1168 	struct ceph_osd_req_op *ops;
1169 	u32 payload_len;
1170 
1171 	seg_name = rbd_segment_name(rbd_dev, ofs);
1172 	if (!seg_name)
1173 		return -ENOMEM;
1174 	seg_len = rbd_segment_length(rbd_dev, ofs, len);
1175 	seg_ofs = rbd_segment_offset(rbd_dev, ofs);
1176 
1177 	payload_len = (flags & CEPH_OSD_FLAG_WRITE ? seg_len : 0);
1178 
1179 	ret = -ENOMEM;
1180 	ops = rbd_create_rw_ops(1, opcode, payload_len);
1181 	if (!ops)
1182 		goto done;
1183 
1184 	/* we've taken care of segment sizes earlier when we
1185 	   cloned the bios. We should never have a segment
1186 	   truncated at this point */
1187 	rbd_assert(seg_len == len);
1188 
1189 	ret = rbd_do_request(rq, rbd_dev, snapc, snapid,
1190 			     seg_name, seg_ofs, seg_len,
1191 			     bio,
1192 			     NULL, 0,
1193 			     flags,
1194 			     ops,
1195 			     coll, coll_index,
1196 			     rbd_req_cb, 0, NULL);
1197 
1198 	rbd_destroy_ops(ops);
1199 done:
1200 	kfree(seg_name);
1201 	return ret;
1202 }
1203 
1204 /*
1205  * Request async osd write
1206  */
1207 static int rbd_req_write(struct request *rq,
1208 			 struct rbd_device *rbd_dev,
1209 			 struct ceph_snap_context *snapc,
1210 			 u64 ofs, u64 len,
1211 			 struct bio *bio,
1212 			 struct rbd_req_coll *coll,
1213 			 int coll_index)
1214 {
1215 	return rbd_do_op(rq, rbd_dev, snapc, CEPH_NOSNAP,
1216 			 CEPH_OSD_OP_WRITE,
1217 			 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1218 			 ofs, len, bio, coll, coll_index);
1219 }
1220 
1221 /*
1222  * Request async osd read
1223  */
1224 static int rbd_req_read(struct request *rq,
1225 			 struct rbd_device *rbd_dev,
1226 			 u64 snapid,
1227 			 u64 ofs, u64 len,
1228 			 struct bio *bio,
1229 			 struct rbd_req_coll *coll,
1230 			 int coll_index)
1231 {
1232 	return rbd_do_op(rq, rbd_dev, NULL,
1233 			 snapid,
1234 			 CEPH_OSD_OP_READ,
1235 			 CEPH_OSD_FLAG_READ,
1236 			 ofs, len, bio, coll, coll_index);
1237 }
1238 
1239 /*
1240  * Request sync osd read
1241  */
1242 static int rbd_req_sync_read(struct rbd_device *rbd_dev,
1243 			  u64 snapid,
1244 			  const char *object_name,
1245 			  u64 ofs, u64 len,
1246 			  char *buf,
1247 			  u64 *ver)
1248 {
1249 	struct ceph_osd_req_op *ops;
1250 	int ret;
1251 
1252 	ops = rbd_create_rw_ops(1, CEPH_OSD_OP_READ, 0);
1253 	if (!ops)
1254 		return -ENOMEM;
1255 
1256 	ret = rbd_req_sync_op(rbd_dev, NULL,
1257 			       snapid,
1258 			       CEPH_OSD_FLAG_READ,
1259 			       ops, object_name, ofs, len, buf, NULL, ver);
1260 	rbd_destroy_ops(ops);
1261 
1262 	return ret;
1263 }
1264 
1265 /*
1266  * Request sync osd watch
1267  */
1268 static int rbd_req_sync_notify_ack(struct rbd_device *rbd_dev,
1269 				   u64 ver,
1270 				   u64 notify_id)
1271 {
1272 	struct ceph_osd_req_op *ops;
1273 	int ret;
1274 
1275 	ops = rbd_create_rw_ops(1, CEPH_OSD_OP_NOTIFY_ACK, 0);
1276 	if (!ops)
1277 		return -ENOMEM;
1278 
1279 	ops[0].watch.ver = cpu_to_le64(ver);
1280 	ops[0].watch.cookie = notify_id;
1281 	ops[0].watch.flag = 0;
1282 
1283 	ret = rbd_do_request(NULL, rbd_dev, NULL, CEPH_NOSNAP,
1284 			  rbd_dev->header_name, 0, 0, NULL,
1285 			  NULL, 0,
1286 			  CEPH_OSD_FLAG_READ,
1287 			  ops,
1288 			  NULL, 0,
1289 			  rbd_simple_req_cb, 0, NULL);
1290 
1291 	rbd_destroy_ops(ops);
1292 	return ret;
1293 }
1294 
1295 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1296 {
1297 	struct rbd_device *rbd_dev = (struct rbd_device *)data;
1298 	u64 hver;
1299 	int rc;
1300 
1301 	if (!rbd_dev)
1302 		return;
1303 
1304 	dout("rbd_watch_cb %s notify_id=%llu opcode=%u\n",
1305 		rbd_dev->header_name, (unsigned long long) notify_id,
1306 		(unsigned int) opcode);
1307 	rc = rbd_refresh_header(rbd_dev, &hver);
1308 	if (rc)
1309 		pr_warning(RBD_DRV_NAME "%d got notification but failed to "
1310 			   " update snaps: %d\n", rbd_dev->major, rc);
1311 
1312 	rbd_req_sync_notify_ack(rbd_dev, hver, notify_id);
1313 }
1314 
1315 /*
1316  * Request sync osd watch
1317  */
1318 static int rbd_req_sync_watch(struct rbd_device *rbd_dev)
1319 {
1320 	struct ceph_osd_req_op *ops;
1321 	struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1322 	int ret;
1323 
1324 	ops = rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH, 0);
1325 	if (!ops)
1326 		return -ENOMEM;
1327 
1328 	ret = ceph_osdc_create_event(osdc, rbd_watch_cb, 0,
1329 				     (void *)rbd_dev, &rbd_dev->watch_event);
1330 	if (ret < 0)
1331 		goto fail;
1332 
1333 	ops[0].watch.ver = cpu_to_le64(rbd_dev->header.obj_version);
1334 	ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
1335 	ops[0].watch.flag = 1;
1336 
1337 	ret = rbd_req_sync_op(rbd_dev, NULL,
1338 			      CEPH_NOSNAP,
1339 			      CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1340 			      ops,
1341 			      rbd_dev->header_name,
1342 			      0, 0, NULL,
1343 			      &rbd_dev->watch_request, NULL);
1344 
1345 	if (ret < 0)
1346 		goto fail_event;
1347 
1348 	rbd_destroy_ops(ops);
1349 	return 0;
1350 
1351 fail_event:
1352 	ceph_osdc_cancel_event(rbd_dev->watch_event);
1353 	rbd_dev->watch_event = NULL;
1354 fail:
1355 	rbd_destroy_ops(ops);
1356 	return ret;
1357 }
1358 
1359 /*
1360  * Request sync osd unwatch
1361  */
1362 static int rbd_req_sync_unwatch(struct rbd_device *rbd_dev)
1363 {
1364 	struct ceph_osd_req_op *ops;
1365 	int ret;
1366 
1367 	ops = rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH, 0);
1368 	if (!ops)
1369 		return -ENOMEM;
1370 
1371 	ops[0].watch.ver = 0;
1372 	ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
1373 	ops[0].watch.flag = 0;
1374 
1375 	ret = rbd_req_sync_op(rbd_dev, NULL,
1376 			      CEPH_NOSNAP,
1377 			      CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1378 			      ops,
1379 			      rbd_dev->header_name,
1380 			      0, 0, NULL, NULL, NULL);
1381 
1382 
1383 	rbd_destroy_ops(ops);
1384 	ceph_osdc_cancel_event(rbd_dev->watch_event);
1385 	rbd_dev->watch_event = NULL;
1386 	return ret;
1387 }
1388 
1389 /*
1390  * Synchronous osd object method call
1391  */
1392 static int rbd_req_sync_exec(struct rbd_device *rbd_dev,
1393 			     const char *object_name,
1394 			     const char *class_name,
1395 			     const char *method_name,
1396 			     const char *outbound,
1397 			     size_t outbound_size,
1398 			     char *inbound,
1399 			     size_t inbound_size,
1400 			     int flags,
1401 			     u64 *ver)
1402 {
1403 	struct ceph_osd_req_op *ops;
1404 	int class_name_len = strlen(class_name);
1405 	int method_name_len = strlen(method_name);
1406 	int payload_size;
1407 	int ret;
1408 
1409 	/*
1410 	 * Any input parameters required by the method we're calling
1411 	 * will be sent along with the class and method names as
1412 	 * part of the message payload.  That data and its size are
1413 	 * supplied via the indata and indata_len fields (named from
1414 	 * the perspective of the server side) in the OSD request
1415 	 * operation.
1416 	 */
1417 	payload_size = class_name_len + method_name_len + outbound_size;
1418 	ops = rbd_create_rw_ops(1, CEPH_OSD_OP_CALL, payload_size);
1419 	if (!ops)
1420 		return -ENOMEM;
1421 
1422 	ops[0].cls.class_name = class_name;
1423 	ops[0].cls.class_len = (__u8) class_name_len;
1424 	ops[0].cls.method_name = method_name;
1425 	ops[0].cls.method_len = (__u8) method_name_len;
1426 	ops[0].cls.argc = 0;
1427 	ops[0].cls.indata = outbound;
1428 	ops[0].cls.indata_len = outbound_size;
1429 
1430 	ret = rbd_req_sync_op(rbd_dev, NULL,
1431 			       CEPH_NOSNAP,
1432 			       flags, ops,
1433 			       object_name, 0, inbound_size, inbound,
1434 			       NULL, ver);
1435 
1436 	rbd_destroy_ops(ops);
1437 
1438 	dout("cls_exec returned %d\n", ret);
1439 	return ret;
1440 }
1441 
1442 static struct rbd_req_coll *rbd_alloc_coll(int num_reqs)
1443 {
1444 	struct rbd_req_coll *coll =
1445 			kzalloc(sizeof(struct rbd_req_coll) +
1446 			        sizeof(struct rbd_req_status) * num_reqs,
1447 				GFP_ATOMIC);
1448 
1449 	if (!coll)
1450 		return NULL;
1451 	coll->total = num_reqs;
1452 	kref_init(&coll->kref);
1453 	return coll;
1454 }
1455 
1456 /*
1457  * block device queue callback
1458  */
1459 static void rbd_rq_fn(struct request_queue *q)
1460 {
1461 	struct rbd_device *rbd_dev = q->queuedata;
1462 	struct request *rq;
1463 	struct bio_pair *bp = NULL;
1464 
1465 	while ((rq = blk_fetch_request(q))) {
1466 		struct bio *bio;
1467 		struct bio *rq_bio, *next_bio = NULL;
1468 		bool do_write;
1469 		unsigned int size;
1470 		u64 op_size = 0;
1471 		u64 ofs;
1472 		int num_segs, cur_seg = 0;
1473 		struct rbd_req_coll *coll;
1474 		struct ceph_snap_context *snapc;
1475 
1476 		dout("fetched request\n");
1477 
1478 		/* filter out block requests we don't understand */
1479 		if ((rq->cmd_type != REQ_TYPE_FS)) {
1480 			__blk_end_request_all(rq, 0);
1481 			continue;
1482 		}
1483 
1484 		/* deduce our operation (read, write) */
1485 		do_write = (rq_data_dir(rq) == WRITE);
1486 
1487 		size = blk_rq_bytes(rq);
1488 		ofs = blk_rq_pos(rq) * SECTOR_SIZE;
1489 		rq_bio = rq->bio;
1490 		if (do_write && rbd_dev->mapping.read_only) {
1491 			__blk_end_request_all(rq, -EROFS);
1492 			continue;
1493 		}
1494 
1495 		spin_unlock_irq(q->queue_lock);
1496 
1497 		down_read(&rbd_dev->header_rwsem);
1498 
1499 		if (rbd_dev->mapping.snap_id != CEPH_NOSNAP &&
1500 				!rbd_dev->mapping.snap_exists) {
1501 			up_read(&rbd_dev->header_rwsem);
1502 			dout("request for non-existent snapshot");
1503 			spin_lock_irq(q->queue_lock);
1504 			__blk_end_request_all(rq, -ENXIO);
1505 			continue;
1506 		}
1507 
1508 		snapc = ceph_get_snap_context(rbd_dev->header.snapc);
1509 
1510 		up_read(&rbd_dev->header_rwsem);
1511 
1512 		dout("%s 0x%x bytes at 0x%llx\n",
1513 		     do_write ? "write" : "read",
1514 		     size, (unsigned long long) blk_rq_pos(rq) * SECTOR_SIZE);
1515 
1516 		num_segs = rbd_get_num_segments(&rbd_dev->header, ofs, size);
1517 		if (num_segs <= 0) {
1518 			spin_lock_irq(q->queue_lock);
1519 			__blk_end_request_all(rq, num_segs);
1520 			ceph_put_snap_context(snapc);
1521 			continue;
1522 		}
1523 		coll = rbd_alloc_coll(num_segs);
1524 		if (!coll) {
1525 			spin_lock_irq(q->queue_lock);
1526 			__blk_end_request_all(rq, -ENOMEM);
1527 			ceph_put_snap_context(snapc);
1528 			continue;
1529 		}
1530 
1531 		do {
1532 			/* a bio clone to be passed down to OSD req */
1533 			dout("rq->bio->bi_vcnt=%hu\n", rq->bio->bi_vcnt);
1534 			op_size = rbd_segment_length(rbd_dev, ofs, size);
1535 			kref_get(&coll->kref);
1536 			bio = bio_chain_clone(&rq_bio, &next_bio, &bp,
1537 					      op_size, GFP_ATOMIC);
1538 			if (!bio) {
1539 				rbd_coll_end_req_index(rq, coll, cur_seg,
1540 						       -ENOMEM, op_size);
1541 				goto next_seg;
1542 			}
1543 
1544 
1545 			/* init OSD command: write or read */
1546 			if (do_write)
1547 				rbd_req_write(rq, rbd_dev,
1548 					      snapc,
1549 					      ofs,
1550 					      op_size, bio,
1551 					      coll, cur_seg);
1552 			else
1553 				rbd_req_read(rq, rbd_dev,
1554 					     rbd_dev->mapping.snap_id,
1555 					     ofs,
1556 					     op_size, bio,
1557 					     coll, cur_seg);
1558 
1559 next_seg:
1560 			size -= op_size;
1561 			ofs += op_size;
1562 
1563 			cur_seg++;
1564 			rq_bio = next_bio;
1565 		} while (size > 0);
1566 		kref_put(&coll->kref, rbd_coll_release);
1567 
1568 		if (bp)
1569 			bio_pair_release(bp);
1570 		spin_lock_irq(q->queue_lock);
1571 
1572 		ceph_put_snap_context(snapc);
1573 	}
1574 }
1575 
1576 /*
1577  * a queue callback. Makes sure that we don't create a bio that spans across
1578  * multiple osd objects. One exception would be with a single page bios,
1579  * which we handle later at bio_chain_clone
1580  */
1581 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
1582 			  struct bio_vec *bvec)
1583 {
1584 	struct rbd_device *rbd_dev = q->queuedata;
1585 	unsigned int chunk_sectors;
1586 	sector_t sector;
1587 	unsigned int bio_sectors;
1588 	int max;
1589 
1590 	chunk_sectors = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
1591 	sector = bmd->bi_sector + get_start_sect(bmd->bi_bdev);
1592 	bio_sectors = bmd->bi_size >> SECTOR_SHIFT;
1593 
1594 	max =  (chunk_sectors - ((sector & (chunk_sectors - 1))
1595 				 + bio_sectors)) << SECTOR_SHIFT;
1596 	if (max < 0)
1597 		max = 0; /* bio_add cannot handle a negative return */
1598 	if (max <= bvec->bv_len && bio_sectors == 0)
1599 		return bvec->bv_len;
1600 	return max;
1601 }
1602 
1603 static void rbd_free_disk(struct rbd_device *rbd_dev)
1604 {
1605 	struct gendisk *disk = rbd_dev->disk;
1606 
1607 	if (!disk)
1608 		return;
1609 
1610 	if (disk->flags & GENHD_FL_UP)
1611 		del_gendisk(disk);
1612 	if (disk->queue)
1613 		blk_cleanup_queue(disk->queue);
1614 	put_disk(disk);
1615 }
1616 
1617 /*
1618  * Read the complete header for the given rbd device.
1619  *
1620  * Returns a pointer to a dynamically-allocated buffer containing
1621  * the complete and validated header.  Caller can pass the address
1622  * of a variable that will be filled in with the version of the
1623  * header object at the time it was read.
1624  *
1625  * Returns a pointer-coded errno if a failure occurs.
1626  */
1627 static struct rbd_image_header_ondisk *
1628 rbd_dev_v1_header_read(struct rbd_device *rbd_dev, u64 *version)
1629 {
1630 	struct rbd_image_header_ondisk *ondisk = NULL;
1631 	u32 snap_count = 0;
1632 	u64 names_size = 0;
1633 	u32 want_count;
1634 	int ret;
1635 
1636 	/*
1637 	 * The complete header will include an array of its 64-bit
1638 	 * snapshot ids, followed by the names of those snapshots as
1639 	 * a contiguous block of NUL-terminated strings.  Note that
1640 	 * the number of snapshots could change by the time we read
1641 	 * it in, in which case we re-read it.
1642 	 */
1643 	do {
1644 		size_t size;
1645 
1646 		kfree(ondisk);
1647 
1648 		size = sizeof (*ondisk);
1649 		size += snap_count * sizeof (struct rbd_image_snap_ondisk);
1650 		size += names_size;
1651 		ondisk = kmalloc(size, GFP_KERNEL);
1652 		if (!ondisk)
1653 			return ERR_PTR(-ENOMEM);
1654 
1655 		ret = rbd_req_sync_read(rbd_dev, CEPH_NOSNAP,
1656 				       rbd_dev->header_name,
1657 				       0, size,
1658 				       (char *) ondisk, version);
1659 
1660 		if (ret < 0)
1661 			goto out_err;
1662 		if (WARN_ON((size_t) ret < size)) {
1663 			ret = -ENXIO;
1664 			pr_warning("short header read for image %s"
1665 					" (want %zd got %d)\n",
1666 				rbd_dev->image_name, size, ret);
1667 			goto out_err;
1668 		}
1669 		if (!rbd_dev_ondisk_valid(ondisk)) {
1670 			ret = -ENXIO;
1671 			pr_warning("invalid header for image %s\n",
1672 				rbd_dev->image_name);
1673 			goto out_err;
1674 		}
1675 
1676 		names_size = le64_to_cpu(ondisk->snap_names_len);
1677 		want_count = snap_count;
1678 		snap_count = le32_to_cpu(ondisk->snap_count);
1679 	} while (snap_count != want_count);
1680 
1681 	return ondisk;
1682 
1683 out_err:
1684 	kfree(ondisk);
1685 
1686 	return ERR_PTR(ret);
1687 }
1688 
1689 /*
1690  * reload the ondisk the header
1691  */
1692 static int rbd_read_header(struct rbd_device *rbd_dev,
1693 			   struct rbd_image_header *header)
1694 {
1695 	struct rbd_image_header_ondisk *ondisk;
1696 	u64 ver = 0;
1697 	int ret;
1698 
1699 	ondisk = rbd_dev_v1_header_read(rbd_dev, &ver);
1700 	if (IS_ERR(ondisk))
1701 		return PTR_ERR(ondisk);
1702 	ret = rbd_header_from_disk(header, ondisk);
1703 	if (ret >= 0)
1704 		header->obj_version = ver;
1705 	kfree(ondisk);
1706 
1707 	return ret;
1708 }
1709 
1710 static void __rbd_remove_all_snaps(struct rbd_device *rbd_dev)
1711 {
1712 	struct rbd_snap *snap;
1713 	struct rbd_snap *next;
1714 
1715 	list_for_each_entry_safe(snap, next, &rbd_dev->snaps, node)
1716 		__rbd_remove_snap_dev(snap);
1717 }
1718 
1719 /*
1720  * only read the first part of the ondisk header, without the snaps info
1721  */
1722 static int __rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver)
1723 {
1724 	int ret;
1725 	struct rbd_image_header h;
1726 
1727 	ret = rbd_read_header(rbd_dev, &h);
1728 	if (ret < 0)
1729 		return ret;
1730 
1731 	down_write(&rbd_dev->header_rwsem);
1732 
1733 	/* resized? */
1734 	if (rbd_dev->mapping.snap_id == CEPH_NOSNAP) {
1735 		sector_t size = (sector_t) h.image_size / SECTOR_SIZE;
1736 
1737 		if (size != (sector_t) rbd_dev->mapping.size) {
1738 			dout("setting size to %llu sectors",
1739 				(unsigned long long) size);
1740 			rbd_dev->mapping.size = (u64) size;
1741 			set_capacity(rbd_dev->disk, size);
1742 		}
1743 	}
1744 
1745 	/* rbd_dev->header.object_prefix shouldn't change */
1746 	kfree(rbd_dev->header.snap_sizes);
1747 	kfree(rbd_dev->header.snap_names);
1748 	/* osd requests may still refer to snapc */
1749 	ceph_put_snap_context(rbd_dev->header.snapc);
1750 
1751 	if (hver)
1752 		*hver = h.obj_version;
1753 	rbd_dev->header.obj_version = h.obj_version;
1754 	rbd_dev->header.image_size = h.image_size;
1755 	rbd_dev->header.snapc = h.snapc;
1756 	rbd_dev->header.snap_names = h.snap_names;
1757 	rbd_dev->header.snap_sizes = h.snap_sizes;
1758 	/* Free the extra copy of the object prefix */
1759 	WARN_ON(strcmp(rbd_dev->header.object_prefix, h.object_prefix));
1760 	kfree(h.object_prefix);
1761 
1762 	ret = rbd_dev_snaps_update(rbd_dev);
1763 	if (!ret)
1764 		ret = rbd_dev_snaps_register(rbd_dev);
1765 
1766 	up_write(&rbd_dev->header_rwsem);
1767 
1768 	return ret;
1769 }
1770 
1771 static int rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver)
1772 {
1773 	int ret;
1774 
1775 	mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1776 	ret = __rbd_refresh_header(rbd_dev, hver);
1777 	mutex_unlock(&ctl_mutex);
1778 
1779 	return ret;
1780 }
1781 
1782 static int rbd_init_disk(struct rbd_device *rbd_dev)
1783 {
1784 	struct gendisk *disk;
1785 	struct request_queue *q;
1786 	u64 segment_size;
1787 
1788 	/* create gendisk info */
1789 	disk = alloc_disk(RBD_MINORS_PER_MAJOR);
1790 	if (!disk)
1791 		return -ENOMEM;
1792 
1793 	snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
1794 		 rbd_dev->dev_id);
1795 	disk->major = rbd_dev->major;
1796 	disk->first_minor = 0;
1797 	disk->fops = &rbd_bd_ops;
1798 	disk->private_data = rbd_dev;
1799 
1800 	/* init rq */
1801 	q = blk_init_queue(rbd_rq_fn, &rbd_dev->lock);
1802 	if (!q)
1803 		goto out_disk;
1804 
1805 	/* We use the default size, but let's be explicit about it. */
1806 	blk_queue_physical_block_size(q, SECTOR_SIZE);
1807 
1808 	/* set io sizes to object size */
1809 	segment_size = rbd_obj_bytes(&rbd_dev->header);
1810 	blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
1811 	blk_queue_max_segment_size(q, segment_size);
1812 	blk_queue_io_min(q, segment_size);
1813 	blk_queue_io_opt(q, segment_size);
1814 
1815 	blk_queue_merge_bvec(q, rbd_merge_bvec);
1816 	disk->queue = q;
1817 
1818 	q->queuedata = rbd_dev;
1819 
1820 	rbd_dev->disk = disk;
1821 
1822 	set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
1823 
1824 	return 0;
1825 out_disk:
1826 	put_disk(disk);
1827 
1828 	return -ENOMEM;
1829 }
1830 
1831 /*
1832   sysfs
1833 */
1834 
1835 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
1836 {
1837 	return container_of(dev, struct rbd_device, dev);
1838 }
1839 
1840 static ssize_t rbd_size_show(struct device *dev,
1841 			     struct device_attribute *attr, char *buf)
1842 {
1843 	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1844 	sector_t size;
1845 
1846 	down_read(&rbd_dev->header_rwsem);
1847 	size = get_capacity(rbd_dev->disk);
1848 	up_read(&rbd_dev->header_rwsem);
1849 
1850 	return sprintf(buf, "%llu\n", (unsigned long long) size * SECTOR_SIZE);
1851 }
1852 
1853 /*
1854  * Note this shows the features for whatever's mapped, which is not
1855  * necessarily the base image.
1856  */
1857 static ssize_t rbd_features_show(struct device *dev,
1858 			     struct device_attribute *attr, char *buf)
1859 {
1860 	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1861 
1862 	return sprintf(buf, "0x%016llx\n",
1863 			(unsigned long long) rbd_dev->mapping.features);
1864 }
1865 
1866 static ssize_t rbd_major_show(struct device *dev,
1867 			      struct device_attribute *attr, char *buf)
1868 {
1869 	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1870 
1871 	return sprintf(buf, "%d\n", rbd_dev->major);
1872 }
1873 
1874 static ssize_t rbd_client_id_show(struct device *dev,
1875 				  struct device_attribute *attr, char *buf)
1876 {
1877 	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1878 
1879 	return sprintf(buf, "client%lld\n",
1880 			ceph_client_id(rbd_dev->rbd_client->client));
1881 }
1882 
1883 static ssize_t rbd_pool_show(struct device *dev,
1884 			     struct device_attribute *attr, char *buf)
1885 {
1886 	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1887 
1888 	return sprintf(buf, "%s\n", rbd_dev->pool_name);
1889 }
1890 
1891 static ssize_t rbd_pool_id_show(struct device *dev,
1892 			     struct device_attribute *attr, char *buf)
1893 {
1894 	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1895 
1896 	return sprintf(buf, "%d\n", rbd_dev->pool_id);
1897 }
1898 
1899 static ssize_t rbd_name_show(struct device *dev,
1900 			     struct device_attribute *attr, char *buf)
1901 {
1902 	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1903 
1904 	return sprintf(buf, "%s\n", rbd_dev->image_name);
1905 }
1906 
1907 static ssize_t rbd_image_id_show(struct device *dev,
1908 			     struct device_attribute *attr, char *buf)
1909 {
1910 	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1911 
1912 	return sprintf(buf, "%s\n", rbd_dev->image_id);
1913 }
1914 
1915 /*
1916  * Shows the name of the currently-mapped snapshot (or
1917  * RBD_SNAP_HEAD_NAME for the base image).
1918  */
1919 static ssize_t rbd_snap_show(struct device *dev,
1920 			     struct device_attribute *attr,
1921 			     char *buf)
1922 {
1923 	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1924 
1925 	return sprintf(buf, "%s\n", rbd_dev->mapping.snap_name);
1926 }
1927 
1928 static ssize_t rbd_image_refresh(struct device *dev,
1929 				 struct device_attribute *attr,
1930 				 const char *buf,
1931 				 size_t size)
1932 {
1933 	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1934 	int ret;
1935 
1936 	ret = rbd_refresh_header(rbd_dev, NULL);
1937 
1938 	return ret < 0 ? ret : size;
1939 }
1940 
1941 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
1942 static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
1943 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
1944 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
1945 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
1946 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
1947 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
1948 static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
1949 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
1950 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
1951 
1952 static struct attribute *rbd_attrs[] = {
1953 	&dev_attr_size.attr,
1954 	&dev_attr_features.attr,
1955 	&dev_attr_major.attr,
1956 	&dev_attr_client_id.attr,
1957 	&dev_attr_pool.attr,
1958 	&dev_attr_pool_id.attr,
1959 	&dev_attr_name.attr,
1960 	&dev_attr_image_id.attr,
1961 	&dev_attr_current_snap.attr,
1962 	&dev_attr_refresh.attr,
1963 	NULL
1964 };
1965 
1966 static struct attribute_group rbd_attr_group = {
1967 	.attrs = rbd_attrs,
1968 };
1969 
1970 static const struct attribute_group *rbd_attr_groups[] = {
1971 	&rbd_attr_group,
1972 	NULL
1973 };
1974 
1975 static void rbd_sysfs_dev_release(struct device *dev)
1976 {
1977 }
1978 
1979 static struct device_type rbd_device_type = {
1980 	.name		= "rbd",
1981 	.groups		= rbd_attr_groups,
1982 	.release	= rbd_sysfs_dev_release,
1983 };
1984 
1985 
1986 /*
1987   sysfs - snapshots
1988 */
1989 
1990 static ssize_t rbd_snap_size_show(struct device *dev,
1991 				  struct device_attribute *attr,
1992 				  char *buf)
1993 {
1994 	struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1995 
1996 	return sprintf(buf, "%llu\n", (unsigned long long)snap->size);
1997 }
1998 
1999 static ssize_t rbd_snap_id_show(struct device *dev,
2000 				struct device_attribute *attr,
2001 				char *buf)
2002 {
2003 	struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2004 
2005 	return sprintf(buf, "%llu\n", (unsigned long long)snap->id);
2006 }
2007 
2008 static ssize_t rbd_snap_features_show(struct device *dev,
2009 				struct device_attribute *attr,
2010 				char *buf)
2011 {
2012 	struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2013 
2014 	return sprintf(buf, "0x%016llx\n",
2015 			(unsigned long long) snap->features);
2016 }
2017 
2018 static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
2019 static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
2020 static DEVICE_ATTR(snap_features, S_IRUGO, rbd_snap_features_show, NULL);
2021 
2022 static struct attribute *rbd_snap_attrs[] = {
2023 	&dev_attr_snap_size.attr,
2024 	&dev_attr_snap_id.attr,
2025 	&dev_attr_snap_features.attr,
2026 	NULL,
2027 };
2028 
2029 static struct attribute_group rbd_snap_attr_group = {
2030 	.attrs = rbd_snap_attrs,
2031 };
2032 
2033 static void rbd_snap_dev_release(struct device *dev)
2034 {
2035 	struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2036 	kfree(snap->name);
2037 	kfree(snap);
2038 }
2039 
2040 static const struct attribute_group *rbd_snap_attr_groups[] = {
2041 	&rbd_snap_attr_group,
2042 	NULL
2043 };
2044 
2045 static struct device_type rbd_snap_device_type = {
2046 	.groups		= rbd_snap_attr_groups,
2047 	.release	= rbd_snap_dev_release,
2048 };
2049 
2050 static bool rbd_snap_registered(struct rbd_snap *snap)
2051 {
2052 	bool ret = snap->dev.type == &rbd_snap_device_type;
2053 	bool reg = device_is_registered(&snap->dev);
2054 
2055 	rbd_assert(!ret ^ reg);
2056 
2057 	return ret;
2058 }
2059 
2060 static void __rbd_remove_snap_dev(struct rbd_snap *snap)
2061 {
2062 	list_del(&snap->node);
2063 	if (device_is_registered(&snap->dev))
2064 		device_unregister(&snap->dev);
2065 }
2066 
2067 static int rbd_register_snap_dev(struct rbd_snap *snap,
2068 				  struct device *parent)
2069 {
2070 	struct device *dev = &snap->dev;
2071 	int ret;
2072 
2073 	dev->type = &rbd_snap_device_type;
2074 	dev->parent = parent;
2075 	dev->release = rbd_snap_dev_release;
2076 	dev_set_name(dev, "snap_%s", snap->name);
2077 	dout("%s: registering device for snapshot %s\n", __func__, snap->name);
2078 
2079 	ret = device_register(dev);
2080 
2081 	return ret;
2082 }
2083 
2084 static struct rbd_snap *__rbd_add_snap_dev(struct rbd_device *rbd_dev,
2085 						const char *snap_name,
2086 						u64 snap_id, u64 snap_size,
2087 						u64 snap_features)
2088 {
2089 	struct rbd_snap *snap;
2090 	int ret;
2091 
2092 	snap = kzalloc(sizeof (*snap), GFP_KERNEL);
2093 	if (!snap)
2094 		return ERR_PTR(-ENOMEM);
2095 
2096 	ret = -ENOMEM;
2097 	snap->name = kstrdup(snap_name, GFP_KERNEL);
2098 	if (!snap->name)
2099 		goto err;
2100 
2101 	snap->id = snap_id;
2102 	snap->size = snap_size;
2103 	snap->features = snap_features;
2104 
2105 	return snap;
2106 
2107 err:
2108 	kfree(snap->name);
2109 	kfree(snap);
2110 
2111 	return ERR_PTR(ret);
2112 }
2113 
2114 static char *rbd_dev_v1_snap_info(struct rbd_device *rbd_dev, u32 which,
2115 		u64 *snap_size, u64 *snap_features)
2116 {
2117 	char *snap_name;
2118 
2119 	rbd_assert(which < rbd_dev->header.snapc->num_snaps);
2120 
2121 	*snap_size = rbd_dev->header.snap_sizes[which];
2122 	*snap_features = 0;	/* No features for v1 */
2123 
2124 	/* Skip over names until we find the one we are looking for */
2125 
2126 	snap_name = rbd_dev->header.snap_names;
2127 	while (which--)
2128 		snap_name += strlen(snap_name) + 1;
2129 
2130 	return snap_name;
2131 }
2132 
2133 /*
2134  * Get the size and object order for an image snapshot, or if
2135  * snap_id is CEPH_NOSNAP, gets this information for the base
2136  * image.
2137  */
2138 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
2139 				u8 *order, u64 *snap_size)
2140 {
2141 	__le64 snapid = cpu_to_le64(snap_id);
2142 	int ret;
2143 	struct {
2144 		u8 order;
2145 		__le64 size;
2146 	} __attribute__ ((packed)) size_buf = { 0 };
2147 
2148 	ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2149 				"rbd", "get_size",
2150 				(char *) &snapid, sizeof (snapid),
2151 				(char *) &size_buf, sizeof (size_buf),
2152 				CEPH_OSD_FLAG_READ, NULL);
2153 	dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2154 	if (ret < 0)
2155 		return ret;
2156 
2157 	*order = size_buf.order;
2158 	*snap_size = le64_to_cpu(size_buf.size);
2159 
2160 	dout("  snap_id 0x%016llx order = %u, snap_size = %llu\n",
2161 		(unsigned long long) snap_id, (unsigned int) *order,
2162 		(unsigned long long) *snap_size);
2163 
2164 	return 0;
2165 }
2166 
2167 static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
2168 {
2169 	return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
2170 					&rbd_dev->header.obj_order,
2171 					&rbd_dev->header.image_size);
2172 }
2173 
2174 static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
2175 {
2176 	void *reply_buf;
2177 	int ret;
2178 	void *p;
2179 
2180 	reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
2181 	if (!reply_buf)
2182 		return -ENOMEM;
2183 
2184 	ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2185 				"rbd", "get_object_prefix",
2186 				NULL, 0,
2187 				reply_buf, RBD_OBJ_PREFIX_LEN_MAX,
2188 				CEPH_OSD_FLAG_READ, NULL);
2189 	dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2190 	if (ret < 0)
2191 		goto out;
2192 
2193 	p = reply_buf;
2194 	rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
2195 						p + RBD_OBJ_PREFIX_LEN_MAX,
2196 						NULL, GFP_NOIO);
2197 
2198 	if (IS_ERR(rbd_dev->header.object_prefix)) {
2199 		ret = PTR_ERR(rbd_dev->header.object_prefix);
2200 		rbd_dev->header.object_prefix = NULL;
2201 	} else {
2202 		dout("  object_prefix = %s\n", rbd_dev->header.object_prefix);
2203 	}
2204 
2205 out:
2206 	kfree(reply_buf);
2207 
2208 	return ret;
2209 }
2210 
2211 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
2212 		u64 *snap_features)
2213 {
2214 	__le64 snapid = cpu_to_le64(snap_id);
2215 	struct {
2216 		__le64 features;
2217 		__le64 incompat;
2218 	} features_buf = { 0 };
2219 	int ret;
2220 
2221 	ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2222 				"rbd", "get_features",
2223 				(char *) &snapid, sizeof (snapid),
2224 				(char *) &features_buf, sizeof (features_buf),
2225 				CEPH_OSD_FLAG_READ, NULL);
2226 	dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2227 	if (ret < 0)
2228 		return ret;
2229 	*snap_features = le64_to_cpu(features_buf.features);
2230 
2231 	dout("  snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
2232 		(unsigned long long) snap_id,
2233 		(unsigned long long) *snap_features,
2234 		(unsigned long long) le64_to_cpu(features_buf.incompat));
2235 
2236 	return 0;
2237 }
2238 
2239 static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
2240 {
2241 	return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
2242 						&rbd_dev->header.features);
2243 }
2244 
2245 static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev, u64 *ver)
2246 {
2247 	size_t size;
2248 	int ret;
2249 	void *reply_buf;
2250 	void *p;
2251 	void *end;
2252 	u64 seq;
2253 	u32 snap_count;
2254 	struct ceph_snap_context *snapc;
2255 	u32 i;
2256 
2257 	/*
2258 	 * We'll need room for the seq value (maximum snapshot id),
2259 	 * snapshot count, and array of that many snapshot ids.
2260 	 * For now we have a fixed upper limit on the number we're
2261 	 * prepared to receive.
2262 	 */
2263 	size = sizeof (__le64) + sizeof (__le32) +
2264 			RBD_MAX_SNAP_COUNT * sizeof (__le64);
2265 	reply_buf = kzalloc(size, GFP_KERNEL);
2266 	if (!reply_buf)
2267 		return -ENOMEM;
2268 
2269 	ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2270 				"rbd", "get_snapcontext",
2271 				NULL, 0,
2272 				reply_buf, size,
2273 				CEPH_OSD_FLAG_READ, ver);
2274 	dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2275 	if (ret < 0)
2276 		goto out;
2277 
2278 	ret = -ERANGE;
2279 	p = reply_buf;
2280 	end = (char *) reply_buf + size;
2281 	ceph_decode_64_safe(&p, end, seq, out);
2282 	ceph_decode_32_safe(&p, end, snap_count, out);
2283 
2284 	/*
2285 	 * Make sure the reported number of snapshot ids wouldn't go
2286 	 * beyond the end of our buffer.  But before checking that,
2287 	 * make sure the computed size of the snapshot context we
2288 	 * allocate is representable in a size_t.
2289 	 */
2290 	if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
2291 				 / sizeof (u64)) {
2292 		ret = -EINVAL;
2293 		goto out;
2294 	}
2295 	if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
2296 		goto out;
2297 
2298 	size = sizeof (struct ceph_snap_context) +
2299 				snap_count * sizeof (snapc->snaps[0]);
2300 	snapc = kmalloc(size, GFP_KERNEL);
2301 	if (!snapc) {
2302 		ret = -ENOMEM;
2303 		goto out;
2304 	}
2305 
2306 	atomic_set(&snapc->nref, 1);
2307 	snapc->seq = seq;
2308 	snapc->num_snaps = snap_count;
2309 	for (i = 0; i < snap_count; i++)
2310 		snapc->snaps[i] = ceph_decode_64(&p);
2311 
2312 	rbd_dev->header.snapc = snapc;
2313 
2314 	dout("  snap context seq = %llu, snap_count = %u\n",
2315 		(unsigned long long) seq, (unsigned int) snap_count);
2316 
2317 out:
2318 	kfree(reply_buf);
2319 
2320 	return 0;
2321 }
2322 
2323 static char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev, u32 which)
2324 {
2325 	size_t size;
2326 	void *reply_buf;
2327 	__le64 snap_id;
2328 	int ret;
2329 	void *p;
2330 	void *end;
2331 	size_t snap_name_len;
2332 	char *snap_name;
2333 
2334 	size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
2335 	reply_buf = kmalloc(size, GFP_KERNEL);
2336 	if (!reply_buf)
2337 		return ERR_PTR(-ENOMEM);
2338 
2339 	snap_id = cpu_to_le64(rbd_dev->header.snapc->snaps[which]);
2340 	ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2341 				"rbd", "get_snapshot_name",
2342 				(char *) &snap_id, sizeof (snap_id),
2343 				reply_buf, size,
2344 				CEPH_OSD_FLAG_READ, NULL);
2345 	dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2346 	if (ret < 0)
2347 		goto out;
2348 
2349 	p = reply_buf;
2350 	end = (char *) reply_buf + size;
2351 	snap_name_len = 0;
2352 	snap_name = ceph_extract_encoded_string(&p, end, &snap_name_len,
2353 				GFP_KERNEL);
2354 	if (IS_ERR(snap_name)) {
2355 		ret = PTR_ERR(snap_name);
2356 		goto out;
2357 	} else {
2358 		dout("  snap_id 0x%016llx snap_name = %s\n",
2359 			(unsigned long long) le64_to_cpu(snap_id), snap_name);
2360 	}
2361 	kfree(reply_buf);
2362 
2363 	return snap_name;
2364 out:
2365 	kfree(reply_buf);
2366 
2367 	return ERR_PTR(ret);
2368 }
2369 
2370 static char *rbd_dev_v2_snap_info(struct rbd_device *rbd_dev, u32 which,
2371 		u64 *snap_size, u64 *snap_features)
2372 {
2373 	__le64 snap_id;
2374 	u8 order;
2375 	int ret;
2376 
2377 	snap_id = rbd_dev->header.snapc->snaps[which];
2378 	ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, &order, snap_size);
2379 	if (ret)
2380 		return ERR_PTR(ret);
2381 	ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, snap_features);
2382 	if (ret)
2383 		return ERR_PTR(ret);
2384 
2385 	return rbd_dev_v2_snap_name(rbd_dev, which);
2386 }
2387 
2388 static char *rbd_dev_snap_info(struct rbd_device *rbd_dev, u32 which,
2389 		u64 *snap_size, u64 *snap_features)
2390 {
2391 	if (rbd_dev->image_format == 1)
2392 		return rbd_dev_v1_snap_info(rbd_dev, which,
2393 					snap_size, snap_features);
2394 	if (rbd_dev->image_format == 2)
2395 		return rbd_dev_v2_snap_info(rbd_dev, which,
2396 					snap_size, snap_features);
2397 	return ERR_PTR(-EINVAL);
2398 }
2399 
2400 /*
2401  * Scan the rbd device's current snapshot list and compare it to the
2402  * newly-received snapshot context.  Remove any existing snapshots
2403  * not present in the new snapshot context.  Add a new snapshot for
2404  * any snaphots in the snapshot context not in the current list.
2405  * And verify there are no changes to snapshots we already know
2406  * about.
2407  *
2408  * Assumes the snapshots in the snapshot context are sorted by
2409  * snapshot id, highest id first.  (Snapshots in the rbd_dev's list
2410  * are also maintained in that order.)
2411  */
2412 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev)
2413 {
2414 	struct ceph_snap_context *snapc = rbd_dev->header.snapc;
2415 	const u32 snap_count = snapc->num_snaps;
2416 	struct list_head *head = &rbd_dev->snaps;
2417 	struct list_head *links = head->next;
2418 	u32 index = 0;
2419 
2420 	dout("%s: snap count is %u\n", __func__, (unsigned int) snap_count);
2421 	while (index < snap_count || links != head) {
2422 		u64 snap_id;
2423 		struct rbd_snap *snap;
2424 		char *snap_name;
2425 		u64 snap_size = 0;
2426 		u64 snap_features = 0;
2427 
2428 		snap_id = index < snap_count ? snapc->snaps[index]
2429 					     : CEPH_NOSNAP;
2430 		snap = links != head ? list_entry(links, struct rbd_snap, node)
2431 				     : NULL;
2432 		rbd_assert(!snap || snap->id != CEPH_NOSNAP);
2433 
2434 		if (snap_id == CEPH_NOSNAP || (snap && snap->id > snap_id)) {
2435 			struct list_head *next = links->next;
2436 
2437 			/* Existing snapshot not in the new snap context */
2438 
2439 			if (rbd_dev->mapping.snap_id == snap->id)
2440 				rbd_dev->mapping.snap_exists = false;
2441 			__rbd_remove_snap_dev(snap);
2442 			dout("%ssnap id %llu has been removed\n",
2443 				rbd_dev->mapping.snap_id == snap->id ?
2444 								"mapped " : "",
2445 				(unsigned long long) snap->id);
2446 
2447 			/* Done with this list entry; advance */
2448 
2449 			links = next;
2450 			continue;
2451 		}
2452 
2453 		snap_name = rbd_dev_snap_info(rbd_dev, index,
2454 					&snap_size, &snap_features);
2455 		if (IS_ERR(snap_name))
2456 			return PTR_ERR(snap_name);
2457 
2458 		dout("entry %u: snap_id = %llu\n", (unsigned int) snap_count,
2459 			(unsigned long long) snap_id);
2460 		if (!snap || (snap_id != CEPH_NOSNAP && snap->id < snap_id)) {
2461 			struct rbd_snap *new_snap;
2462 
2463 			/* We haven't seen this snapshot before */
2464 
2465 			new_snap = __rbd_add_snap_dev(rbd_dev, snap_name,
2466 					snap_id, snap_size, snap_features);
2467 			if (IS_ERR(new_snap)) {
2468 				int err = PTR_ERR(new_snap);
2469 
2470 				dout("  failed to add dev, error %d\n", err);
2471 
2472 				return err;
2473 			}
2474 
2475 			/* New goes before existing, or at end of list */
2476 
2477 			dout("  added dev%s\n", snap ? "" : " at end\n");
2478 			if (snap)
2479 				list_add_tail(&new_snap->node, &snap->node);
2480 			else
2481 				list_add_tail(&new_snap->node, head);
2482 		} else {
2483 			/* Already have this one */
2484 
2485 			dout("  already present\n");
2486 
2487 			rbd_assert(snap->size == snap_size);
2488 			rbd_assert(!strcmp(snap->name, snap_name));
2489 			rbd_assert(snap->features == snap_features);
2490 
2491 			/* Done with this list entry; advance */
2492 
2493 			links = links->next;
2494 		}
2495 
2496 		/* Advance to the next entry in the snapshot context */
2497 
2498 		index++;
2499 	}
2500 	dout("%s: done\n", __func__);
2501 
2502 	return 0;
2503 }
2504 
2505 /*
2506  * Scan the list of snapshots and register the devices for any that
2507  * have not already been registered.
2508  */
2509 static int rbd_dev_snaps_register(struct rbd_device *rbd_dev)
2510 {
2511 	struct rbd_snap *snap;
2512 	int ret = 0;
2513 
2514 	dout("%s called\n", __func__);
2515 	if (WARN_ON(!device_is_registered(&rbd_dev->dev)))
2516 		return -EIO;
2517 
2518 	list_for_each_entry(snap, &rbd_dev->snaps, node) {
2519 		if (!rbd_snap_registered(snap)) {
2520 			ret = rbd_register_snap_dev(snap, &rbd_dev->dev);
2521 			if (ret < 0)
2522 				break;
2523 		}
2524 	}
2525 	dout("%s: returning %d\n", __func__, ret);
2526 
2527 	return ret;
2528 }
2529 
2530 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
2531 {
2532 	struct device *dev;
2533 	int ret;
2534 
2535 	mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2536 
2537 	dev = &rbd_dev->dev;
2538 	dev->bus = &rbd_bus_type;
2539 	dev->type = &rbd_device_type;
2540 	dev->parent = &rbd_root_dev;
2541 	dev->release = rbd_dev_release;
2542 	dev_set_name(dev, "%d", rbd_dev->dev_id);
2543 	ret = device_register(dev);
2544 
2545 	mutex_unlock(&ctl_mutex);
2546 
2547 	return ret;
2548 }
2549 
2550 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
2551 {
2552 	device_unregister(&rbd_dev->dev);
2553 }
2554 
2555 static int rbd_init_watch_dev(struct rbd_device *rbd_dev)
2556 {
2557 	int ret, rc;
2558 
2559 	do {
2560 		ret = rbd_req_sync_watch(rbd_dev);
2561 		if (ret == -ERANGE) {
2562 			rc = rbd_refresh_header(rbd_dev, NULL);
2563 			if (rc < 0)
2564 				return rc;
2565 		}
2566 	} while (ret == -ERANGE);
2567 
2568 	return ret;
2569 }
2570 
2571 static atomic64_t rbd_dev_id_max = ATOMIC64_INIT(0);
2572 
2573 /*
2574  * Get a unique rbd identifier for the given new rbd_dev, and add
2575  * the rbd_dev to the global list.  The minimum rbd id is 1.
2576  */
2577 static void rbd_dev_id_get(struct rbd_device *rbd_dev)
2578 {
2579 	rbd_dev->dev_id = atomic64_inc_return(&rbd_dev_id_max);
2580 
2581 	spin_lock(&rbd_dev_list_lock);
2582 	list_add_tail(&rbd_dev->node, &rbd_dev_list);
2583 	spin_unlock(&rbd_dev_list_lock);
2584 	dout("rbd_dev %p given dev id %llu\n", rbd_dev,
2585 		(unsigned long long) rbd_dev->dev_id);
2586 }
2587 
2588 /*
2589  * Remove an rbd_dev from the global list, and record that its
2590  * identifier is no longer in use.
2591  */
2592 static void rbd_dev_id_put(struct rbd_device *rbd_dev)
2593 {
2594 	struct list_head *tmp;
2595 	int rbd_id = rbd_dev->dev_id;
2596 	int max_id;
2597 
2598 	rbd_assert(rbd_id > 0);
2599 
2600 	dout("rbd_dev %p released dev id %llu\n", rbd_dev,
2601 		(unsigned long long) rbd_dev->dev_id);
2602 	spin_lock(&rbd_dev_list_lock);
2603 	list_del_init(&rbd_dev->node);
2604 
2605 	/*
2606 	 * If the id being "put" is not the current maximum, there
2607 	 * is nothing special we need to do.
2608 	 */
2609 	if (rbd_id != atomic64_read(&rbd_dev_id_max)) {
2610 		spin_unlock(&rbd_dev_list_lock);
2611 		return;
2612 	}
2613 
2614 	/*
2615 	 * We need to update the current maximum id.  Search the
2616 	 * list to find out what it is.  We're more likely to find
2617 	 * the maximum at the end, so search the list backward.
2618 	 */
2619 	max_id = 0;
2620 	list_for_each_prev(tmp, &rbd_dev_list) {
2621 		struct rbd_device *rbd_dev;
2622 
2623 		rbd_dev = list_entry(tmp, struct rbd_device, node);
2624 		if (rbd_id > max_id)
2625 			max_id = rbd_id;
2626 	}
2627 	spin_unlock(&rbd_dev_list_lock);
2628 
2629 	/*
2630 	 * The max id could have been updated by rbd_dev_id_get(), in
2631 	 * which case it now accurately reflects the new maximum.
2632 	 * Be careful not to overwrite the maximum value in that
2633 	 * case.
2634 	 */
2635 	atomic64_cmpxchg(&rbd_dev_id_max, rbd_id, max_id);
2636 	dout("  max dev id has been reset\n");
2637 }
2638 
2639 /*
2640  * Skips over white space at *buf, and updates *buf to point to the
2641  * first found non-space character (if any). Returns the length of
2642  * the token (string of non-white space characters) found.  Note
2643  * that *buf must be terminated with '\0'.
2644  */
2645 static inline size_t next_token(const char **buf)
2646 {
2647         /*
2648         * These are the characters that produce nonzero for
2649         * isspace() in the "C" and "POSIX" locales.
2650         */
2651         const char *spaces = " \f\n\r\t\v";
2652 
2653         *buf += strspn(*buf, spaces);	/* Find start of token */
2654 
2655 	return strcspn(*buf, spaces);   /* Return token length */
2656 }
2657 
2658 /*
2659  * Finds the next token in *buf, and if the provided token buffer is
2660  * big enough, copies the found token into it.  The result, if
2661  * copied, is guaranteed to be terminated with '\0'.  Note that *buf
2662  * must be terminated with '\0' on entry.
2663  *
2664  * Returns the length of the token found (not including the '\0').
2665  * Return value will be 0 if no token is found, and it will be >=
2666  * token_size if the token would not fit.
2667  *
2668  * The *buf pointer will be updated to point beyond the end of the
2669  * found token.  Note that this occurs even if the token buffer is
2670  * too small to hold it.
2671  */
2672 static inline size_t copy_token(const char **buf,
2673 				char *token,
2674 				size_t token_size)
2675 {
2676         size_t len;
2677 
2678 	len = next_token(buf);
2679 	if (len < token_size) {
2680 		memcpy(token, *buf, len);
2681 		*(token + len) = '\0';
2682 	}
2683 	*buf += len;
2684 
2685         return len;
2686 }
2687 
2688 /*
2689  * Finds the next token in *buf, dynamically allocates a buffer big
2690  * enough to hold a copy of it, and copies the token into the new
2691  * buffer.  The copy is guaranteed to be terminated with '\0'.  Note
2692  * that a duplicate buffer is created even for a zero-length token.
2693  *
2694  * Returns a pointer to the newly-allocated duplicate, or a null
2695  * pointer if memory for the duplicate was not available.  If
2696  * the lenp argument is a non-null pointer, the length of the token
2697  * (not including the '\0') is returned in *lenp.
2698  *
2699  * If successful, the *buf pointer will be updated to point beyond
2700  * the end of the found token.
2701  *
2702  * Note: uses GFP_KERNEL for allocation.
2703  */
2704 static inline char *dup_token(const char **buf, size_t *lenp)
2705 {
2706 	char *dup;
2707 	size_t len;
2708 
2709 	len = next_token(buf);
2710 	dup = kmalloc(len + 1, GFP_KERNEL);
2711 	if (!dup)
2712 		return NULL;
2713 
2714 	memcpy(dup, *buf, len);
2715 	*(dup + len) = '\0';
2716 	*buf += len;
2717 
2718 	if (lenp)
2719 		*lenp = len;
2720 
2721 	return dup;
2722 }
2723 
2724 /*
2725  * This fills in the pool_name, image_name, image_name_len, rbd_dev,
2726  * rbd_md_name, and name fields of the given rbd_dev, based on the
2727  * list of monitor addresses and other options provided via
2728  * /sys/bus/rbd/add.  Returns a pointer to a dynamically-allocated
2729  * copy of the snapshot name to map if successful, or a
2730  * pointer-coded error otherwise.
2731  *
2732  * Note: rbd_dev is assumed to have been initially zero-filled.
2733  */
2734 static char *rbd_add_parse_args(struct rbd_device *rbd_dev,
2735 				const char *buf,
2736 				const char **mon_addrs,
2737 				size_t *mon_addrs_size,
2738 				char *options,
2739 				size_t options_size)
2740 {
2741 	size_t len;
2742 	char *err_ptr = ERR_PTR(-EINVAL);
2743 	char *snap_name;
2744 
2745 	/* The first four tokens are required */
2746 
2747 	len = next_token(&buf);
2748 	if (!len)
2749 		return err_ptr;
2750 	*mon_addrs_size = len + 1;
2751 	*mon_addrs = buf;
2752 
2753 	buf += len;
2754 
2755 	len = copy_token(&buf, options, options_size);
2756 	if (!len || len >= options_size)
2757 		return err_ptr;
2758 
2759 	err_ptr = ERR_PTR(-ENOMEM);
2760 	rbd_dev->pool_name = dup_token(&buf, NULL);
2761 	if (!rbd_dev->pool_name)
2762 		goto out_err;
2763 
2764 	rbd_dev->image_name = dup_token(&buf, &rbd_dev->image_name_len);
2765 	if (!rbd_dev->image_name)
2766 		goto out_err;
2767 
2768 	/* Snapshot name is optional */
2769 	len = next_token(&buf);
2770 	if (!len) {
2771 		buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
2772 		len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
2773 	}
2774 	snap_name = kmalloc(len + 1, GFP_KERNEL);
2775 	if (!snap_name)
2776 		goto out_err;
2777 	memcpy(snap_name, buf, len);
2778 	*(snap_name + len) = '\0';
2779 
2780 dout("    SNAP_NAME is <%s>, len is %zd\n", snap_name, len);
2781 
2782 	return snap_name;
2783 
2784 out_err:
2785 	kfree(rbd_dev->image_name);
2786 	rbd_dev->image_name = NULL;
2787 	rbd_dev->image_name_len = 0;
2788 	kfree(rbd_dev->pool_name);
2789 	rbd_dev->pool_name = NULL;
2790 
2791 	return err_ptr;
2792 }
2793 
2794 /*
2795  * An rbd format 2 image has a unique identifier, distinct from the
2796  * name given to it by the user.  Internally, that identifier is
2797  * what's used to specify the names of objects related to the image.
2798  *
2799  * A special "rbd id" object is used to map an rbd image name to its
2800  * id.  If that object doesn't exist, then there is no v2 rbd image
2801  * with the supplied name.
2802  *
2803  * This function will record the given rbd_dev's image_id field if
2804  * it can be determined, and in that case will return 0.  If any
2805  * errors occur a negative errno will be returned and the rbd_dev's
2806  * image_id field will be unchanged (and should be NULL).
2807  */
2808 static int rbd_dev_image_id(struct rbd_device *rbd_dev)
2809 {
2810 	int ret;
2811 	size_t size;
2812 	char *object_name;
2813 	void *response;
2814 	void *p;
2815 
2816 	/*
2817 	 * First, see if the format 2 image id file exists, and if
2818 	 * so, get the image's persistent id from it.
2819 	 */
2820 	size = sizeof (RBD_ID_PREFIX) + rbd_dev->image_name_len;
2821 	object_name = kmalloc(size, GFP_NOIO);
2822 	if (!object_name)
2823 		return -ENOMEM;
2824 	sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->image_name);
2825 	dout("rbd id object name is %s\n", object_name);
2826 
2827 	/* Response will be an encoded string, which includes a length */
2828 
2829 	size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
2830 	response = kzalloc(size, GFP_NOIO);
2831 	if (!response) {
2832 		ret = -ENOMEM;
2833 		goto out;
2834 	}
2835 
2836 	ret = rbd_req_sync_exec(rbd_dev, object_name,
2837 				"rbd", "get_id",
2838 				NULL, 0,
2839 				response, RBD_IMAGE_ID_LEN_MAX,
2840 				CEPH_OSD_FLAG_READ, NULL);
2841 	dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2842 	if (ret < 0)
2843 		goto out;
2844 
2845 	p = response;
2846 	rbd_dev->image_id = ceph_extract_encoded_string(&p,
2847 						p + RBD_IMAGE_ID_LEN_MAX,
2848 						&rbd_dev->image_id_len,
2849 						GFP_NOIO);
2850 	if (IS_ERR(rbd_dev->image_id)) {
2851 		ret = PTR_ERR(rbd_dev->image_id);
2852 		rbd_dev->image_id = NULL;
2853 	} else {
2854 		dout("image_id is %s\n", rbd_dev->image_id);
2855 	}
2856 out:
2857 	kfree(response);
2858 	kfree(object_name);
2859 
2860 	return ret;
2861 }
2862 
2863 static int rbd_dev_v1_probe(struct rbd_device *rbd_dev)
2864 {
2865 	int ret;
2866 	size_t size;
2867 
2868 	/* Version 1 images have no id; empty string is used */
2869 
2870 	rbd_dev->image_id = kstrdup("", GFP_KERNEL);
2871 	if (!rbd_dev->image_id)
2872 		return -ENOMEM;
2873 	rbd_dev->image_id_len = 0;
2874 
2875 	/* Record the header object name for this rbd image. */
2876 
2877 	size = rbd_dev->image_name_len + sizeof (RBD_SUFFIX);
2878 	rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
2879 	if (!rbd_dev->header_name) {
2880 		ret = -ENOMEM;
2881 		goto out_err;
2882 	}
2883 	sprintf(rbd_dev->header_name, "%s%s", rbd_dev->image_name, RBD_SUFFIX);
2884 
2885 	/* Populate rbd image metadata */
2886 
2887 	ret = rbd_read_header(rbd_dev, &rbd_dev->header);
2888 	if (ret < 0)
2889 		goto out_err;
2890 	rbd_dev->image_format = 1;
2891 
2892 	dout("discovered version 1 image, header name is %s\n",
2893 		rbd_dev->header_name);
2894 
2895 	return 0;
2896 
2897 out_err:
2898 	kfree(rbd_dev->header_name);
2899 	rbd_dev->header_name = NULL;
2900 	kfree(rbd_dev->image_id);
2901 	rbd_dev->image_id = NULL;
2902 
2903 	return ret;
2904 }
2905 
2906 static int rbd_dev_v2_probe(struct rbd_device *rbd_dev)
2907 {
2908 	size_t size;
2909 	int ret;
2910 	u64 ver = 0;
2911 
2912 	/*
2913 	 * Image id was filled in by the caller.  Record the header
2914 	 * object name for this rbd image.
2915 	 */
2916 	size = sizeof (RBD_HEADER_PREFIX) + rbd_dev->image_id_len;
2917 	rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
2918 	if (!rbd_dev->header_name)
2919 		return -ENOMEM;
2920 	sprintf(rbd_dev->header_name, "%s%s",
2921 			RBD_HEADER_PREFIX, rbd_dev->image_id);
2922 
2923 	/* Get the size and object order for the image */
2924 
2925 	ret = rbd_dev_v2_image_size(rbd_dev);
2926 	if (ret < 0)
2927 		goto out_err;
2928 
2929 	/* Get the object prefix (a.k.a. block_name) for the image */
2930 
2931 	ret = rbd_dev_v2_object_prefix(rbd_dev);
2932 	if (ret < 0)
2933 		goto out_err;
2934 
2935 	/* Get the features for the image */
2936 
2937 	ret = rbd_dev_v2_features(rbd_dev);
2938 	if (ret < 0)
2939 		goto out_err;
2940 
2941 	/* crypto and compression type aren't (yet) supported for v2 images */
2942 
2943 	rbd_dev->header.crypt_type = 0;
2944 	rbd_dev->header.comp_type = 0;
2945 
2946 	/* Get the snapshot context, plus the header version */
2947 
2948 	ret = rbd_dev_v2_snap_context(rbd_dev, &ver);
2949 	if (ret)
2950 		goto out_err;
2951 	rbd_dev->header.obj_version = ver;
2952 
2953 	rbd_dev->image_format = 2;
2954 
2955 	dout("discovered version 2 image, header name is %s\n",
2956 		rbd_dev->header_name);
2957 
2958 	return -ENOTSUPP;
2959 out_err:
2960 	kfree(rbd_dev->header_name);
2961 	rbd_dev->header_name = NULL;
2962 	kfree(rbd_dev->header.object_prefix);
2963 	rbd_dev->header.object_prefix = NULL;
2964 
2965 	return ret;
2966 }
2967 
2968 /*
2969  * Probe for the existence of the header object for the given rbd
2970  * device.  For format 2 images this includes determining the image
2971  * id.
2972  */
2973 static int rbd_dev_probe(struct rbd_device *rbd_dev)
2974 {
2975 	int ret;
2976 
2977 	/*
2978 	 * Get the id from the image id object.  If it's not a
2979 	 * format 2 image, we'll get ENOENT back, and we'll assume
2980 	 * it's a format 1 image.
2981 	 */
2982 	ret = rbd_dev_image_id(rbd_dev);
2983 	if (ret)
2984 		ret = rbd_dev_v1_probe(rbd_dev);
2985 	else
2986 		ret = rbd_dev_v2_probe(rbd_dev);
2987 	if (ret)
2988 		dout("probe failed, returning %d\n", ret);
2989 
2990 	return ret;
2991 }
2992 
2993 static ssize_t rbd_add(struct bus_type *bus,
2994 		       const char *buf,
2995 		       size_t count)
2996 {
2997 	char *options;
2998 	struct rbd_device *rbd_dev = NULL;
2999 	const char *mon_addrs = NULL;
3000 	size_t mon_addrs_size = 0;
3001 	struct ceph_osd_client *osdc;
3002 	int rc = -ENOMEM;
3003 	char *snap_name;
3004 
3005 	if (!try_module_get(THIS_MODULE))
3006 		return -ENODEV;
3007 
3008 	options = kmalloc(count, GFP_KERNEL);
3009 	if (!options)
3010 		goto err_out_mem;
3011 	rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
3012 	if (!rbd_dev)
3013 		goto err_out_mem;
3014 
3015 	/* static rbd_device initialization */
3016 	spin_lock_init(&rbd_dev->lock);
3017 	INIT_LIST_HEAD(&rbd_dev->node);
3018 	INIT_LIST_HEAD(&rbd_dev->snaps);
3019 	init_rwsem(&rbd_dev->header_rwsem);
3020 
3021 	/* parse add command */
3022 	snap_name = rbd_add_parse_args(rbd_dev, buf,
3023 				&mon_addrs, &mon_addrs_size, options, count);
3024 	if (IS_ERR(snap_name)) {
3025 		rc = PTR_ERR(snap_name);
3026 		goto err_out_mem;
3027 	}
3028 
3029 	rc = rbd_get_client(rbd_dev, mon_addrs, mon_addrs_size - 1, options);
3030 	if (rc < 0)
3031 		goto err_out_args;
3032 
3033 	/* pick the pool */
3034 	osdc = &rbd_dev->rbd_client->client->osdc;
3035 	rc = ceph_pg_poolid_by_name(osdc->osdmap, rbd_dev->pool_name);
3036 	if (rc < 0)
3037 		goto err_out_client;
3038 	rbd_dev->pool_id = rc;
3039 
3040 	rc = rbd_dev_probe(rbd_dev);
3041 	if (rc < 0)
3042 		goto err_out_client;
3043 	rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
3044 
3045 	/* no need to lock here, as rbd_dev is not registered yet */
3046 	rc = rbd_dev_snaps_update(rbd_dev);
3047 	if (rc)
3048 		goto err_out_header;
3049 
3050 	rc = rbd_dev_set_mapping(rbd_dev, snap_name);
3051 	if (rc)
3052 		goto err_out_header;
3053 
3054 	/* generate unique id: find highest unique id, add one */
3055 	rbd_dev_id_get(rbd_dev);
3056 
3057 	/* Fill in the device name, now that we have its id. */
3058 	BUILD_BUG_ON(DEV_NAME_LEN
3059 			< sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
3060 	sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
3061 
3062 	/* Get our block major device number. */
3063 
3064 	rc = register_blkdev(0, rbd_dev->name);
3065 	if (rc < 0)
3066 		goto err_out_id;
3067 	rbd_dev->major = rc;
3068 
3069 	/* Set up the blkdev mapping. */
3070 
3071 	rc = rbd_init_disk(rbd_dev);
3072 	if (rc)
3073 		goto err_out_blkdev;
3074 
3075 	rc = rbd_bus_add_dev(rbd_dev);
3076 	if (rc)
3077 		goto err_out_disk;
3078 
3079 	/*
3080 	 * At this point cleanup in the event of an error is the job
3081 	 * of the sysfs code (initiated by rbd_bus_del_dev()).
3082 	 */
3083 
3084 	down_write(&rbd_dev->header_rwsem);
3085 	rc = rbd_dev_snaps_register(rbd_dev);
3086 	up_write(&rbd_dev->header_rwsem);
3087 	if (rc)
3088 		goto err_out_bus;
3089 
3090 	rc = rbd_init_watch_dev(rbd_dev);
3091 	if (rc)
3092 		goto err_out_bus;
3093 
3094 	/* Everything's ready.  Announce the disk to the world. */
3095 
3096 	add_disk(rbd_dev->disk);
3097 
3098 	pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
3099 		(unsigned long long) rbd_dev->mapping.size);
3100 
3101 	return count;
3102 
3103 err_out_bus:
3104 	/* this will also clean up rest of rbd_dev stuff */
3105 
3106 	rbd_bus_del_dev(rbd_dev);
3107 	kfree(options);
3108 	return rc;
3109 
3110 err_out_disk:
3111 	rbd_free_disk(rbd_dev);
3112 err_out_blkdev:
3113 	unregister_blkdev(rbd_dev->major, rbd_dev->name);
3114 err_out_id:
3115 	rbd_dev_id_put(rbd_dev);
3116 err_out_header:
3117 	rbd_header_free(&rbd_dev->header);
3118 err_out_client:
3119 	kfree(rbd_dev->header_name);
3120 	rbd_put_client(rbd_dev);
3121 	kfree(rbd_dev->image_id);
3122 err_out_args:
3123 	kfree(rbd_dev->mapping.snap_name);
3124 	kfree(rbd_dev->image_name);
3125 	kfree(rbd_dev->pool_name);
3126 err_out_mem:
3127 	kfree(rbd_dev);
3128 	kfree(options);
3129 
3130 	dout("Error adding device %s\n", buf);
3131 	module_put(THIS_MODULE);
3132 
3133 	return (ssize_t) rc;
3134 }
3135 
3136 static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
3137 {
3138 	struct list_head *tmp;
3139 	struct rbd_device *rbd_dev;
3140 
3141 	spin_lock(&rbd_dev_list_lock);
3142 	list_for_each(tmp, &rbd_dev_list) {
3143 		rbd_dev = list_entry(tmp, struct rbd_device, node);
3144 		if (rbd_dev->dev_id == dev_id) {
3145 			spin_unlock(&rbd_dev_list_lock);
3146 			return rbd_dev;
3147 		}
3148 	}
3149 	spin_unlock(&rbd_dev_list_lock);
3150 	return NULL;
3151 }
3152 
3153 static void rbd_dev_release(struct device *dev)
3154 {
3155 	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3156 
3157 	if (rbd_dev->watch_request) {
3158 		struct ceph_client *client = rbd_dev->rbd_client->client;
3159 
3160 		ceph_osdc_unregister_linger_request(&client->osdc,
3161 						    rbd_dev->watch_request);
3162 	}
3163 	if (rbd_dev->watch_event)
3164 		rbd_req_sync_unwatch(rbd_dev);
3165 
3166 	rbd_put_client(rbd_dev);
3167 
3168 	/* clean up and free blkdev */
3169 	rbd_free_disk(rbd_dev);
3170 	unregister_blkdev(rbd_dev->major, rbd_dev->name);
3171 
3172 	/* release allocated disk header fields */
3173 	rbd_header_free(&rbd_dev->header);
3174 
3175 	/* done with the id, and with the rbd_dev */
3176 	kfree(rbd_dev->mapping.snap_name);
3177 	kfree(rbd_dev->image_id);
3178 	kfree(rbd_dev->header_name);
3179 	kfree(rbd_dev->pool_name);
3180 	kfree(rbd_dev->image_name);
3181 	rbd_dev_id_put(rbd_dev);
3182 	kfree(rbd_dev);
3183 
3184 	/* release module ref */
3185 	module_put(THIS_MODULE);
3186 }
3187 
3188 static ssize_t rbd_remove(struct bus_type *bus,
3189 			  const char *buf,
3190 			  size_t count)
3191 {
3192 	struct rbd_device *rbd_dev = NULL;
3193 	int target_id, rc;
3194 	unsigned long ul;
3195 	int ret = count;
3196 
3197 	rc = strict_strtoul(buf, 10, &ul);
3198 	if (rc)
3199 		return rc;
3200 
3201 	/* convert to int; abort if we lost anything in the conversion */
3202 	target_id = (int) ul;
3203 	if (target_id != ul)
3204 		return -EINVAL;
3205 
3206 	mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
3207 
3208 	rbd_dev = __rbd_get_dev(target_id);
3209 	if (!rbd_dev) {
3210 		ret = -ENOENT;
3211 		goto done;
3212 	}
3213 
3214 	__rbd_remove_all_snaps(rbd_dev);
3215 	rbd_bus_del_dev(rbd_dev);
3216 
3217 done:
3218 	mutex_unlock(&ctl_mutex);
3219 
3220 	return ret;
3221 }
3222 
3223 /*
3224  * create control files in sysfs
3225  * /sys/bus/rbd/...
3226  */
3227 static int rbd_sysfs_init(void)
3228 {
3229 	int ret;
3230 
3231 	ret = device_register(&rbd_root_dev);
3232 	if (ret < 0)
3233 		return ret;
3234 
3235 	ret = bus_register(&rbd_bus_type);
3236 	if (ret < 0)
3237 		device_unregister(&rbd_root_dev);
3238 
3239 	return ret;
3240 }
3241 
3242 static void rbd_sysfs_cleanup(void)
3243 {
3244 	bus_unregister(&rbd_bus_type);
3245 	device_unregister(&rbd_root_dev);
3246 }
3247 
3248 int __init rbd_init(void)
3249 {
3250 	int rc;
3251 
3252 	rc = rbd_sysfs_init();
3253 	if (rc)
3254 		return rc;
3255 	pr_info("loaded " RBD_DRV_NAME_LONG "\n");
3256 	return 0;
3257 }
3258 
3259 void __exit rbd_exit(void)
3260 {
3261 	rbd_sysfs_cleanup();
3262 }
3263 
3264 module_init(rbd_init);
3265 module_exit(rbd_exit);
3266 
3267 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
3268 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
3269 MODULE_DESCRIPTION("rados block device");
3270 
3271 /* following authorship retained from original osdblk.c */
3272 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
3273 
3274 MODULE_LICENSE("GPL");
3275