xref: /openbmc/linux/drivers/block/rbd.c (revision 565d76cb)
1 /*
2    rbd.c -- Export ceph rados objects as a Linux block device
3 
4 
5    based on drivers/block/osdblk.c:
6 
7    Copyright 2009 Red Hat, Inc.
8 
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation.
12 
13    This program is distributed in the hope that it will be useful,
14    but WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16    GNU General Public License for more details.
17 
18    You should have received a copy of the GNU General Public License
19    along with this program; see the file COPYING.  If not, write to
20    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
21 
22 
23 
24    For usage instructions, please refer to:
25 
26                  Documentation/ABI/testing/sysfs-bus-rbd
27 
28  */
29 
30 #include <linux/ceph/libceph.h>
31 #include <linux/ceph/osd_client.h>
32 #include <linux/ceph/mon_client.h>
33 #include <linux/ceph/decode.h>
34 #include <linux/parser.h>
35 
36 #include <linux/kernel.h>
37 #include <linux/device.h>
38 #include <linux/module.h>
39 #include <linux/fs.h>
40 #include <linux/blkdev.h>
41 
42 #include "rbd_types.h"
43 
44 #define DRV_NAME "rbd"
45 #define DRV_NAME_LONG "rbd (rados block device)"
46 
47 #define RBD_MINORS_PER_MAJOR	256		/* max minors per blkdev */
48 
49 #define RBD_MAX_MD_NAME_LEN	(96 + sizeof(RBD_SUFFIX))
50 #define RBD_MAX_POOL_NAME_LEN	64
51 #define RBD_MAX_SNAP_NAME_LEN	32
52 #define RBD_MAX_OPT_LEN		1024
53 
54 #define RBD_SNAP_HEAD_NAME	"-"
55 
56 #define DEV_NAME_LEN		32
57 
58 #define RBD_NOTIFY_TIMEOUT_DEFAULT 10
59 
60 /*
61  * block device image metadata (in-memory version)
62  */
63 struct rbd_image_header {
64 	u64 image_size;
65 	char block_name[32];
66 	__u8 obj_order;
67 	__u8 crypt_type;
68 	__u8 comp_type;
69 	struct rw_semaphore snap_rwsem;
70 	struct ceph_snap_context *snapc;
71 	size_t snap_names_len;
72 	u64 snap_seq;
73 	u32 total_snaps;
74 
75 	char *snap_names;
76 	u64 *snap_sizes;
77 
78 	u64 obj_version;
79 };
80 
81 struct rbd_options {
82 	int	notify_timeout;
83 };
84 
85 /*
86  * an instance of the client.  multiple devices may share a client.
87  */
88 struct rbd_client {
89 	struct ceph_client	*client;
90 	struct rbd_options	*rbd_opts;
91 	struct kref		kref;
92 	struct list_head	node;
93 };
94 
95 /*
96  * a single io request
97  */
98 struct rbd_request {
99 	struct request		*rq;		/* blk layer request */
100 	struct bio		*bio;		/* cloned bio */
101 	struct page		**pages;	/* list of used pages */
102 	u64			len;
103 };
104 
105 struct rbd_snap {
106 	struct	device		dev;
107 	const char		*name;
108 	size_t			size;
109 	struct list_head	node;
110 	u64			id;
111 };
112 
113 /*
114  * a single device
115  */
116 struct rbd_device {
117 	int			id;		/* blkdev unique id */
118 
119 	int			major;		/* blkdev assigned major */
120 	struct gendisk		*disk;		/* blkdev's gendisk and rq */
121 	struct request_queue	*q;
122 
123 	struct ceph_client	*client;
124 	struct rbd_client	*rbd_client;
125 
126 	char			name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
127 
128 	spinlock_t		lock;		/* queue lock */
129 
130 	struct rbd_image_header	header;
131 	char			obj[RBD_MAX_OBJ_NAME_LEN]; /* rbd image name */
132 	int			obj_len;
133 	char			obj_md_name[RBD_MAX_MD_NAME_LEN]; /* hdr nm. */
134 	char			pool_name[RBD_MAX_POOL_NAME_LEN];
135 	int			poolid;
136 
137 	struct ceph_osd_event   *watch_event;
138 	struct ceph_osd_request *watch_request;
139 
140 	char                    snap_name[RBD_MAX_SNAP_NAME_LEN];
141 	u32 cur_snap;	/* index+1 of current snapshot within snap context
142 			   0 - for the head */
143 	int read_only;
144 
145 	struct list_head	node;
146 
147 	/* list of snapshots */
148 	struct list_head	snaps;
149 
150 	/* sysfs related */
151 	struct device		dev;
152 };
153 
154 static struct bus_type rbd_bus_type = {
155 	.name		= "rbd",
156 };
157 
158 static spinlock_t node_lock;      /* protects client get/put */
159 
160 static DEFINE_MUTEX(ctl_mutex);	  /* Serialize open/close/setup/teardown */
161 static LIST_HEAD(rbd_dev_list);    /* devices */
162 static LIST_HEAD(rbd_client_list);      /* clients */
163 
164 static int __rbd_init_snaps_header(struct rbd_device *rbd_dev);
165 static void rbd_dev_release(struct device *dev);
166 static ssize_t rbd_snap_rollback(struct device *dev,
167 				 struct device_attribute *attr,
168 				 const char *buf,
169 				 size_t size);
170 static ssize_t rbd_snap_add(struct device *dev,
171 			    struct device_attribute *attr,
172 			    const char *buf,
173 			    size_t count);
174 static void __rbd_remove_snap_dev(struct rbd_device *rbd_dev,
175 				  struct rbd_snap *snap);;
176 
177 
178 static struct rbd_device *dev_to_rbd(struct device *dev)
179 {
180 	return container_of(dev, struct rbd_device, dev);
181 }
182 
183 static struct device *rbd_get_dev(struct rbd_device *rbd_dev)
184 {
185 	return get_device(&rbd_dev->dev);
186 }
187 
188 static void rbd_put_dev(struct rbd_device *rbd_dev)
189 {
190 	put_device(&rbd_dev->dev);
191 }
192 
193 static int __rbd_update_snaps(struct rbd_device *rbd_dev);
194 
195 static int rbd_open(struct block_device *bdev, fmode_t mode)
196 {
197 	struct gendisk *disk = bdev->bd_disk;
198 	struct rbd_device *rbd_dev = disk->private_data;
199 
200 	rbd_get_dev(rbd_dev);
201 
202 	set_device_ro(bdev, rbd_dev->read_only);
203 
204 	if ((mode & FMODE_WRITE) && rbd_dev->read_only)
205 		return -EROFS;
206 
207 	return 0;
208 }
209 
210 static int rbd_release(struct gendisk *disk, fmode_t mode)
211 {
212 	struct rbd_device *rbd_dev = disk->private_data;
213 
214 	rbd_put_dev(rbd_dev);
215 
216 	return 0;
217 }
218 
219 static const struct block_device_operations rbd_bd_ops = {
220 	.owner			= THIS_MODULE,
221 	.open			= rbd_open,
222 	.release		= rbd_release,
223 };
224 
225 /*
226  * Initialize an rbd client instance.
227  * We own *opt.
228  */
229 static struct rbd_client *rbd_client_create(struct ceph_options *opt,
230 					    struct rbd_options *rbd_opts)
231 {
232 	struct rbd_client *rbdc;
233 	int ret = -ENOMEM;
234 
235 	dout("rbd_client_create\n");
236 	rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
237 	if (!rbdc)
238 		goto out_opt;
239 
240 	kref_init(&rbdc->kref);
241 	INIT_LIST_HEAD(&rbdc->node);
242 
243 	rbdc->client = ceph_create_client(opt, rbdc);
244 	if (IS_ERR(rbdc->client))
245 		goto out_rbdc;
246 	opt = NULL; /* Now rbdc->client is responsible for opt */
247 
248 	ret = ceph_open_session(rbdc->client);
249 	if (ret < 0)
250 		goto out_err;
251 
252 	rbdc->rbd_opts = rbd_opts;
253 
254 	spin_lock(&node_lock);
255 	list_add_tail(&rbdc->node, &rbd_client_list);
256 	spin_unlock(&node_lock);
257 
258 	dout("rbd_client_create created %p\n", rbdc);
259 	return rbdc;
260 
261 out_err:
262 	ceph_destroy_client(rbdc->client);
263 out_rbdc:
264 	kfree(rbdc);
265 out_opt:
266 	if (opt)
267 		ceph_destroy_options(opt);
268 	return ERR_PTR(ret);
269 }
270 
271 /*
272  * Find a ceph client with specific addr and configuration.
273  */
274 static struct rbd_client *__rbd_client_find(struct ceph_options *opt)
275 {
276 	struct rbd_client *client_node;
277 
278 	if (opt->flags & CEPH_OPT_NOSHARE)
279 		return NULL;
280 
281 	list_for_each_entry(client_node, &rbd_client_list, node)
282 		if (ceph_compare_options(opt, client_node->client) == 0)
283 			return client_node;
284 	return NULL;
285 }
286 
287 /*
288  * mount options
289  */
290 enum {
291 	Opt_notify_timeout,
292 	Opt_last_int,
293 	/* int args above */
294 	Opt_last_string,
295 	/* string args above */
296 };
297 
298 static match_table_t rbdopt_tokens = {
299 	{Opt_notify_timeout, "notify_timeout=%d"},
300 	/* int args above */
301 	/* string args above */
302 	{-1, NULL}
303 };
304 
305 static int parse_rbd_opts_token(char *c, void *private)
306 {
307 	struct rbd_options *rbdopt = private;
308 	substring_t argstr[MAX_OPT_ARGS];
309 	int token, intval, ret;
310 
311 	token = match_token((char *)c, rbdopt_tokens, argstr);
312 	if (token < 0)
313 		return -EINVAL;
314 
315 	if (token < Opt_last_int) {
316 		ret = match_int(&argstr[0], &intval);
317 		if (ret < 0) {
318 			pr_err("bad mount option arg (not int) "
319 			       "at '%s'\n", c);
320 			return ret;
321 		}
322 		dout("got int token %d val %d\n", token, intval);
323 	} else if (token > Opt_last_int && token < Opt_last_string) {
324 		dout("got string token %d val %s\n", token,
325 		     argstr[0].from);
326 	} else {
327 		dout("got token %d\n", token);
328 	}
329 
330 	switch (token) {
331 	case Opt_notify_timeout:
332 		rbdopt->notify_timeout = intval;
333 		break;
334 	default:
335 		BUG_ON(token);
336 	}
337 	return 0;
338 }
339 
340 /*
341  * Get a ceph client with specific addr and configuration, if one does
342  * not exist create it.
343  */
344 static int rbd_get_client(struct rbd_device *rbd_dev, const char *mon_addr,
345 			  char *options)
346 {
347 	struct rbd_client *rbdc;
348 	struct ceph_options *opt;
349 	int ret;
350 	struct rbd_options *rbd_opts;
351 
352 	rbd_opts = kzalloc(sizeof(*rbd_opts), GFP_KERNEL);
353 	if (!rbd_opts)
354 		return -ENOMEM;
355 
356 	rbd_opts->notify_timeout = RBD_NOTIFY_TIMEOUT_DEFAULT;
357 
358 	ret = ceph_parse_options(&opt, options, mon_addr,
359 				 mon_addr + strlen(mon_addr), parse_rbd_opts_token, rbd_opts);
360 	if (ret < 0)
361 		goto done_err;
362 
363 	spin_lock(&node_lock);
364 	rbdc = __rbd_client_find(opt);
365 	if (rbdc) {
366 		ceph_destroy_options(opt);
367 
368 		/* using an existing client */
369 		kref_get(&rbdc->kref);
370 		rbd_dev->rbd_client = rbdc;
371 		rbd_dev->client = rbdc->client;
372 		spin_unlock(&node_lock);
373 		return 0;
374 	}
375 	spin_unlock(&node_lock);
376 
377 	rbdc = rbd_client_create(opt, rbd_opts);
378 	if (IS_ERR(rbdc)) {
379 		ret = PTR_ERR(rbdc);
380 		goto done_err;
381 	}
382 
383 	rbd_dev->rbd_client = rbdc;
384 	rbd_dev->client = rbdc->client;
385 	return 0;
386 done_err:
387 	kfree(rbd_opts);
388 	return ret;
389 }
390 
391 /*
392  * Destroy ceph client
393  */
394 static void rbd_client_release(struct kref *kref)
395 {
396 	struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
397 
398 	dout("rbd_release_client %p\n", rbdc);
399 	spin_lock(&node_lock);
400 	list_del(&rbdc->node);
401 	spin_unlock(&node_lock);
402 
403 	ceph_destroy_client(rbdc->client);
404 	kfree(rbdc->rbd_opts);
405 	kfree(rbdc);
406 }
407 
408 /*
409  * Drop reference to ceph client node. If it's not referenced anymore, release
410  * it.
411  */
412 static void rbd_put_client(struct rbd_device *rbd_dev)
413 {
414 	kref_put(&rbd_dev->rbd_client->kref, rbd_client_release);
415 	rbd_dev->rbd_client = NULL;
416 	rbd_dev->client = NULL;
417 }
418 
419 
420 /*
421  * Create a new header structure, translate header format from the on-disk
422  * header.
423  */
424 static int rbd_header_from_disk(struct rbd_image_header *header,
425 				 struct rbd_image_header_ondisk *ondisk,
426 				 int allocated_snaps,
427 				 gfp_t gfp_flags)
428 {
429 	int i;
430 	u32 snap_count = le32_to_cpu(ondisk->snap_count);
431 	int ret = -ENOMEM;
432 
433 	init_rwsem(&header->snap_rwsem);
434 	header->snap_names_len = le64_to_cpu(ondisk->snap_names_len);
435 	header->snapc = kmalloc(sizeof(struct ceph_snap_context) +
436 				snap_count *
437 				 sizeof(struct rbd_image_snap_ondisk),
438 				gfp_flags);
439 	if (!header->snapc)
440 		return -ENOMEM;
441 	if (snap_count) {
442 		header->snap_names = kmalloc(header->snap_names_len,
443 					     GFP_KERNEL);
444 		if (!header->snap_names)
445 			goto err_snapc;
446 		header->snap_sizes = kmalloc(snap_count * sizeof(u64),
447 					     GFP_KERNEL);
448 		if (!header->snap_sizes)
449 			goto err_names;
450 	} else {
451 		header->snap_names = NULL;
452 		header->snap_sizes = NULL;
453 	}
454 	memcpy(header->block_name, ondisk->block_name,
455 	       sizeof(ondisk->block_name));
456 
457 	header->image_size = le64_to_cpu(ondisk->image_size);
458 	header->obj_order = ondisk->options.order;
459 	header->crypt_type = ondisk->options.crypt_type;
460 	header->comp_type = ondisk->options.comp_type;
461 
462 	atomic_set(&header->snapc->nref, 1);
463 	header->snap_seq = le64_to_cpu(ondisk->snap_seq);
464 	header->snapc->num_snaps = snap_count;
465 	header->total_snaps = snap_count;
466 
467 	if (snap_count &&
468 	    allocated_snaps == snap_count) {
469 		for (i = 0; i < snap_count; i++) {
470 			header->snapc->snaps[i] =
471 				le64_to_cpu(ondisk->snaps[i].id);
472 			header->snap_sizes[i] =
473 				le64_to_cpu(ondisk->snaps[i].image_size);
474 		}
475 
476 		/* copy snapshot names */
477 		memcpy(header->snap_names, &ondisk->snaps[i],
478 			header->snap_names_len);
479 	}
480 
481 	return 0;
482 
483 err_names:
484 	kfree(header->snap_names);
485 err_snapc:
486 	kfree(header->snapc);
487 	return ret;
488 }
489 
490 static int snap_index(struct rbd_image_header *header, int snap_num)
491 {
492 	return header->total_snaps - snap_num;
493 }
494 
495 static u64 cur_snap_id(struct rbd_device *rbd_dev)
496 {
497 	struct rbd_image_header *header = &rbd_dev->header;
498 
499 	if (!rbd_dev->cur_snap)
500 		return 0;
501 
502 	return header->snapc->snaps[snap_index(header, rbd_dev->cur_snap)];
503 }
504 
505 static int snap_by_name(struct rbd_image_header *header, const char *snap_name,
506 			u64 *seq, u64 *size)
507 {
508 	int i;
509 	char *p = header->snap_names;
510 
511 	for (i = 0; i < header->total_snaps; i++, p += strlen(p) + 1) {
512 		if (strcmp(snap_name, p) == 0)
513 			break;
514 	}
515 	if (i == header->total_snaps)
516 		return -ENOENT;
517 	if (seq)
518 		*seq = header->snapc->snaps[i];
519 
520 	if (size)
521 		*size = header->snap_sizes[i];
522 
523 	return i;
524 }
525 
526 static int rbd_header_set_snap(struct rbd_device *dev,
527 			       const char *snap_name,
528 			       u64 *size)
529 {
530 	struct rbd_image_header *header = &dev->header;
531 	struct ceph_snap_context *snapc = header->snapc;
532 	int ret = -ENOENT;
533 
534 	down_write(&header->snap_rwsem);
535 
536 	if (!snap_name ||
537 	    !*snap_name ||
538 	    strcmp(snap_name, "-") == 0 ||
539 	    strcmp(snap_name, RBD_SNAP_HEAD_NAME) == 0) {
540 		if (header->total_snaps)
541 			snapc->seq = header->snap_seq;
542 		else
543 			snapc->seq = 0;
544 		dev->cur_snap = 0;
545 		dev->read_only = 0;
546 		if (size)
547 			*size = header->image_size;
548 	} else {
549 		ret = snap_by_name(header, snap_name, &snapc->seq, size);
550 		if (ret < 0)
551 			goto done;
552 
553 		dev->cur_snap = header->total_snaps - ret;
554 		dev->read_only = 1;
555 	}
556 
557 	ret = 0;
558 done:
559 	up_write(&header->snap_rwsem);
560 	return ret;
561 }
562 
563 static void rbd_header_free(struct rbd_image_header *header)
564 {
565 	kfree(header->snapc);
566 	kfree(header->snap_names);
567 	kfree(header->snap_sizes);
568 }
569 
570 /*
571  * get the actual striped segment name, offset and length
572  */
573 static u64 rbd_get_segment(struct rbd_image_header *header,
574 			   const char *block_name,
575 			   u64 ofs, u64 len,
576 			   char *seg_name, u64 *segofs)
577 {
578 	u64 seg = ofs >> header->obj_order;
579 
580 	if (seg_name)
581 		snprintf(seg_name, RBD_MAX_SEG_NAME_LEN,
582 			 "%s.%012llx", block_name, seg);
583 
584 	ofs = ofs & ((1 << header->obj_order) - 1);
585 	len = min_t(u64, len, (1 << header->obj_order) - ofs);
586 
587 	if (segofs)
588 		*segofs = ofs;
589 
590 	return len;
591 }
592 
593 /*
594  * bio helpers
595  */
596 
597 static void bio_chain_put(struct bio *chain)
598 {
599 	struct bio *tmp;
600 
601 	while (chain) {
602 		tmp = chain;
603 		chain = chain->bi_next;
604 		bio_put(tmp);
605 	}
606 }
607 
608 /*
609  * zeros a bio chain, starting at specific offset
610  */
611 static void zero_bio_chain(struct bio *chain, int start_ofs)
612 {
613 	struct bio_vec *bv;
614 	unsigned long flags;
615 	void *buf;
616 	int i;
617 	int pos = 0;
618 
619 	while (chain) {
620 		bio_for_each_segment(bv, chain, i) {
621 			if (pos + bv->bv_len > start_ofs) {
622 				int remainder = max(start_ofs - pos, 0);
623 				buf = bvec_kmap_irq(bv, &flags);
624 				memset(buf + remainder, 0,
625 				       bv->bv_len - remainder);
626 				bvec_kunmap_irq(buf, &flags);
627 			}
628 			pos += bv->bv_len;
629 		}
630 
631 		chain = chain->bi_next;
632 	}
633 }
634 
635 /*
636  * bio_chain_clone - clone a chain of bios up to a certain length.
637  * might return a bio_pair that will need to be released.
638  */
639 static struct bio *bio_chain_clone(struct bio **old, struct bio **next,
640 				   struct bio_pair **bp,
641 				   int len, gfp_t gfpmask)
642 {
643 	struct bio *tmp, *old_chain = *old, *new_chain = NULL, *tail = NULL;
644 	int total = 0;
645 
646 	if (*bp) {
647 		bio_pair_release(*bp);
648 		*bp = NULL;
649 	}
650 
651 	while (old_chain && (total < len)) {
652 		tmp = bio_kmalloc(gfpmask, old_chain->bi_max_vecs);
653 		if (!tmp)
654 			goto err_out;
655 
656 		if (total + old_chain->bi_size > len) {
657 			struct bio_pair *bp;
658 
659 			/*
660 			 * this split can only happen with a single paged bio,
661 			 * split_bio will BUG_ON if this is not the case
662 			 */
663 			dout("bio_chain_clone split! total=%d remaining=%d"
664 			     "bi_size=%d\n",
665 			     (int)total, (int)len-total,
666 			     (int)old_chain->bi_size);
667 
668 			/* split the bio. We'll release it either in the next
669 			   call, or it will have to be released outside */
670 			bp = bio_split(old_chain, (len - total) / 512ULL);
671 			if (!bp)
672 				goto err_out;
673 
674 			__bio_clone(tmp, &bp->bio1);
675 
676 			*next = &bp->bio2;
677 		} else {
678 			__bio_clone(tmp, old_chain);
679 			*next = old_chain->bi_next;
680 		}
681 
682 		tmp->bi_bdev = NULL;
683 		gfpmask &= ~__GFP_WAIT;
684 		tmp->bi_next = NULL;
685 
686 		if (!new_chain) {
687 			new_chain = tail = tmp;
688 		} else {
689 			tail->bi_next = tmp;
690 			tail = tmp;
691 		}
692 		old_chain = old_chain->bi_next;
693 
694 		total += tmp->bi_size;
695 	}
696 
697 	BUG_ON(total < len);
698 
699 	if (tail)
700 		tail->bi_next = NULL;
701 
702 	*old = old_chain;
703 
704 	return new_chain;
705 
706 err_out:
707 	dout("bio_chain_clone with err\n");
708 	bio_chain_put(new_chain);
709 	return NULL;
710 }
711 
712 /*
713  * helpers for osd request op vectors.
714  */
715 static int rbd_create_rw_ops(struct ceph_osd_req_op **ops,
716 			    int num_ops,
717 			    int opcode,
718 			    u32 payload_len)
719 {
720 	*ops = kzalloc(sizeof(struct ceph_osd_req_op) * (num_ops + 1),
721 		       GFP_NOIO);
722 	if (!*ops)
723 		return -ENOMEM;
724 	(*ops)[0].op = opcode;
725 	/*
726 	 * op extent offset and length will be set later on
727 	 * in calc_raw_layout()
728 	 */
729 	(*ops)[0].payload_len = payload_len;
730 	return 0;
731 }
732 
733 static void rbd_destroy_ops(struct ceph_osd_req_op *ops)
734 {
735 	kfree(ops);
736 }
737 
738 /*
739  * Send ceph osd request
740  */
741 static int rbd_do_request(struct request *rq,
742 			  struct rbd_device *dev,
743 			  struct ceph_snap_context *snapc,
744 			  u64 snapid,
745 			  const char *obj, u64 ofs, u64 len,
746 			  struct bio *bio,
747 			  struct page **pages,
748 			  int num_pages,
749 			  int flags,
750 			  struct ceph_osd_req_op *ops,
751 			  int num_reply,
752 			  void (*rbd_cb)(struct ceph_osd_request *req,
753 					 struct ceph_msg *msg),
754 			  struct ceph_osd_request **linger_req,
755 			  u64 *ver)
756 {
757 	struct ceph_osd_request *req;
758 	struct ceph_file_layout *layout;
759 	int ret;
760 	u64 bno;
761 	struct timespec mtime = CURRENT_TIME;
762 	struct rbd_request *req_data;
763 	struct ceph_osd_request_head *reqhead;
764 	struct rbd_image_header *header = &dev->header;
765 
766 	ret = -ENOMEM;
767 	req_data = kzalloc(sizeof(*req_data), GFP_NOIO);
768 	if (!req_data)
769 		goto done;
770 
771 	dout("rbd_do_request len=%lld ofs=%lld\n", len, ofs);
772 
773 	down_read(&header->snap_rwsem);
774 
775 	req = ceph_osdc_alloc_request(&dev->client->osdc, flags,
776 				      snapc,
777 				      ops,
778 				      false,
779 				      GFP_NOIO, pages, bio);
780 	if (IS_ERR(req)) {
781 		up_read(&header->snap_rwsem);
782 		ret = PTR_ERR(req);
783 		goto done_pages;
784 	}
785 
786 	req->r_callback = rbd_cb;
787 
788 	req_data->rq = rq;
789 	req_data->bio = bio;
790 	req_data->pages = pages;
791 	req_data->len = len;
792 
793 	req->r_priv = req_data;
794 
795 	reqhead = req->r_request->front.iov_base;
796 	reqhead->snapid = cpu_to_le64(CEPH_NOSNAP);
797 
798 	strncpy(req->r_oid, obj, sizeof(req->r_oid));
799 	req->r_oid_len = strlen(req->r_oid);
800 
801 	layout = &req->r_file_layout;
802 	memset(layout, 0, sizeof(*layout));
803 	layout->fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
804 	layout->fl_stripe_count = cpu_to_le32(1);
805 	layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
806 	layout->fl_pg_preferred = cpu_to_le32(-1);
807 	layout->fl_pg_pool = cpu_to_le32(dev->poolid);
808 	ceph_calc_raw_layout(&dev->client->osdc, layout, snapid,
809 			     ofs, &len, &bno, req, ops);
810 
811 	ceph_osdc_build_request(req, ofs, &len,
812 				ops,
813 				snapc,
814 				&mtime,
815 				req->r_oid, req->r_oid_len);
816 	up_read(&header->snap_rwsem);
817 
818 	if (linger_req) {
819 		ceph_osdc_set_request_linger(&dev->client->osdc, req);
820 		*linger_req = req;
821 	}
822 
823 	ret = ceph_osdc_start_request(&dev->client->osdc, req, false);
824 	if (ret < 0)
825 		goto done_err;
826 
827 	if (!rbd_cb) {
828 		ret = ceph_osdc_wait_request(&dev->client->osdc, req);
829 		if (ver)
830 			*ver = le64_to_cpu(req->r_reassert_version.version);
831 		dout("reassert_ver=%lld\n", le64_to_cpu(req->r_reassert_version.version));
832 		ceph_osdc_put_request(req);
833 	}
834 	return ret;
835 
836 done_err:
837 	bio_chain_put(req_data->bio);
838 	ceph_osdc_put_request(req);
839 done_pages:
840 	kfree(req_data);
841 done:
842 	if (rq)
843 		blk_end_request(rq, ret, len);
844 	return ret;
845 }
846 
847 /*
848  * Ceph osd op callback
849  */
850 static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
851 {
852 	struct rbd_request *req_data = req->r_priv;
853 	struct ceph_osd_reply_head *replyhead;
854 	struct ceph_osd_op *op;
855 	__s32 rc;
856 	u64 bytes;
857 	int read_op;
858 
859 	/* parse reply */
860 	replyhead = msg->front.iov_base;
861 	WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
862 	op = (void *)(replyhead + 1);
863 	rc = le32_to_cpu(replyhead->result);
864 	bytes = le64_to_cpu(op->extent.length);
865 	read_op = (le32_to_cpu(op->op) == CEPH_OSD_OP_READ);
866 
867 	dout("rbd_req_cb bytes=%lld readop=%d rc=%d\n", bytes, read_op, rc);
868 
869 	if (rc == -ENOENT && read_op) {
870 		zero_bio_chain(req_data->bio, 0);
871 		rc = 0;
872 	} else if (rc == 0 && read_op && bytes < req_data->len) {
873 		zero_bio_chain(req_data->bio, bytes);
874 		bytes = req_data->len;
875 	}
876 
877 	blk_end_request(req_data->rq, rc, bytes);
878 
879 	if (req_data->bio)
880 		bio_chain_put(req_data->bio);
881 
882 	ceph_osdc_put_request(req);
883 	kfree(req_data);
884 }
885 
886 static void rbd_simple_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
887 {
888 	ceph_osdc_put_request(req);
889 }
890 
891 /*
892  * Do a synchronous ceph osd operation
893  */
894 static int rbd_req_sync_op(struct rbd_device *dev,
895 			   struct ceph_snap_context *snapc,
896 			   u64 snapid,
897 			   int opcode,
898 			   int flags,
899 			   struct ceph_osd_req_op *orig_ops,
900 			   int num_reply,
901 			   const char *obj,
902 			   u64 ofs, u64 len,
903 			   char *buf,
904 			   struct ceph_osd_request **linger_req,
905 			   u64 *ver)
906 {
907 	int ret;
908 	struct page **pages;
909 	int num_pages;
910 	struct ceph_osd_req_op *ops = orig_ops;
911 	u32 payload_len;
912 
913 	num_pages = calc_pages_for(ofs , len);
914 	pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
915 	if (IS_ERR(pages))
916 		return PTR_ERR(pages);
917 
918 	if (!orig_ops) {
919 		payload_len = (flags & CEPH_OSD_FLAG_WRITE ? len : 0);
920 		ret = rbd_create_rw_ops(&ops, 1, opcode, payload_len);
921 		if (ret < 0)
922 			goto done;
923 
924 		if ((flags & CEPH_OSD_FLAG_WRITE) && buf) {
925 			ret = ceph_copy_to_page_vector(pages, buf, ofs, len);
926 			if (ret < 0)
927 				goto done_ops;
928 		}
929 	}
930 
931 	ret = rbd_do_request(NULL, dev, snapc, snapid,
932 			  obj, ofs, len, NULL,
933 			  pages, num_pages,
934 			  flags,
935 			  ops,
936 			  2,
937 			  NULL,
938 			  linger_req, ver);
939 	if (ret < 0)
940 		goto done_ops;
941 
942 	if ((flags & CEPH_OSD_FLAG_READ) && buf)
943 		ret = ceph_copy_from_page_vector(pages, buf, ofs, ret);
944 
945 done_ops:
946 	if (!orig_ops)
947 		rbd_destroy_ops(ops);
948 done:
949 	ceph_release_page_vector(pages, num_pages);
950 	return ret;
951 }
952 
953 /*
954  * Do an asynchronous ceph osd operation
955  */
956 static int rbd_do_op(struct request *rq,
957 		     struct rbd_device *rbd_dev ,
958 		     struct ceph_snap_context *snapc,
959 		     u64 snapid,
960 		     int opcode, int flags, int num_reply,
961 		     u64 ofs, u64 len,
962 		     struct bio *bio)
963 {
964 	char *seg_name;
965 	u64 seg_ofs;
966 	u64 seg_len;
967 	int ret;
968 	struct ceph_osd_req_op *ops;
969 	u32 payload_len;
970 
971 	seg_name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO);
972 	if (!seg_name)
973 		return -ENOMEM;
974 
975 	seg_len = rbd_get_segment(&rbd_dev->header,
976 				  rbd_dev->header.block_name,
977 				  ofs, len,
978 				  seg_name, &seg_ofs);
979 
980 	payload_len = (flags & CEPH_OSD_FLAG_WRITE ? seg_len : 0);
981 
982 	ret = rbd_create_rw_ops(&ops, 1, opcode, payload_len);
983 	if (ret < 0)
984 		goto done;
985 
986 	/* we've taken care of segment sizes earlier when we
987 	   cloned the bios. We should never have a segment
988 	   truncated at this point */
989 	BUG_ON(seg_len < len);
990 
991 	ret = rbd_do_request(rq, rbd_dev, snapc, snapid,
992 			     seg_name, seg_ofs, seg_len,
993 			     bio,
994 			     NULL, 0,
995 			     flags,
996 			     ops,
997 			     num_reply,
998 			     rbd_req_cb, 0, NULL);
999 done:
1000 	kfree(seg_name);
1001 	return ret;
1002 }
1003 
1004 /*
1005  * Request async osd write
1006  */
1007 static int rbd_req_write(struct request *rq,
1008 			 struct rbd_device *rbd_dev,
1009 			 struct ceph_snap_context *snapc,
1010 			 u64 ofs, u64 len,
1011 			 struct bio *bio)
1012 {
1013 	return rbd_do_op(rq, rbd_dev, snapc, CEPH_NOSNAP,
1014 			 CEPH_OSD_OP_WRITE,
1015 			 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1016 			 2,
1017 			 ofs, len, bio);
1018 }
1019 
1020 /*
1021  * Request async osd read
1022  */
1023 static int rbd_req_read(struct request *rq,
1024 			 struct rbd_device *rbd_dev,
1025 			 u64 snapid,
1026 			 u64 ofs, u64 len,
1027 			 struct bio *bio)
1028 {
1029 	return rbd_do_op(rq, rbd_dev, NULL,
1030 			 (snapid ? snapid : CEPH_NOSNAP),
1031 			 CEPH_OSD_OP_READ,
1032 			 CEPH_OSD_FLAG_READ,
1033 			 2,
1034 			 ofs, len, bio);
1035 }
1036 
1037 /*
1038  * Request sync osd read
1039  */
1040 static int rbd_req_sync_read(struct rbd_device *dev,
1041 			  struct ceph_snap_context *snapc,
1042 			  u64 snapid,
1043 			  const char *obj,
1044 			  u64 ofs, u64 len,
1045 			  char *buf,
1046 			  u64 *ver)
1047 {
1048 	return rbd_req_sync_op(dev, NULL,
1049 			       (snapid ? snapid : CEPH_NOSNAP),
1050 			       CEPH_OSD_OP_READ,
1051 			       CEPH_OSD_FLAG_READ,
1052 			       NULL,
1053 			       1, obj, ofs, len, buf, NULL, ver);
1054 }
1055 
1056 /*
1057  * Request sync osd watch
1058  */
1059 static int rbd_req_sync_notify_ack(struct rbd_device *dev,
1060 				   u64 ver,
1061 				   u64 notify_id,
1062 				   const char *obj)
1063 {
1064 	struct ceph_osd_req_op *ops;
1065 	struct page **pages = NULL;
1066 	int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_NOTIFY_ACK, 0);
1067 	if (ret < 0)
1068 		return ret;
1069 
1070 	ops[0].watch.ver = cpu_to_le64(dev->header.obj_version);
1071 	ops[0].watch.cookie = notify_id;
1072 	ops[0].watch.flag = 0;
1073 
1074 	ret = rbd_do_request(NULL, dev, NULL, CEPH_NOSNAP,
1075 			  obj, 0, 0, NULL,
1076 			  pages, 0,
1077 			  CEPH_OSD_FLAG_READ,
1078 			  ops,
1079 			  1,
1080 			  rbd_simple_req_cb, 0, NULL);
1081 
1082 	rbd_destroy_ops(ops);
1083 	return ret;
1084 }
1085 
1086 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1087 {
1088 	struct rbd_device *dev = (struct rbd_device *)data;
1089 	if (!dev)
1090 		return;
1091 
1092 	dout("rbd_watch_cb %s notify_id=%lld opcode=%d\n", dev->obj_md_name,
1093 		notify_id, (int)opcode);
1094 	mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1095 	__rbd_update_snaps(dev);
1096 	mutex_unlock(&ctl_mutex);
1097 
1098 	rbd_req_sync_notify_ack(dev, ver, notify_id, dev->obj_md_name);
1099 }
1100 
1101 /*
1102  * Request sync osd watch
1103  */
1104 static int rbd_req_sync_watch(struct rbd_device *dev,
1105 			      const char *obj,
1106 			      u64 ver)
1107 {
1108 	struct ceph_osd_req_op *ops;
1109 	struct ceph_osd_client *osdc = &dev->client->osdc;
1110 
1111 	int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_WATCH, 0);
1112 	if (ret < 0)
1113 		return ret;
1114 
1115 	ret = ceph_osdc_create_event(osdc, rbd_watch_cb, 0,
1116 				     (void *)dev, &dev->watch_event);
1117 	if (ret < 0)
1118 		goto fail;
1119 
1120 	ops[0].watch.ver = cpu_to_le64(ver);
1121 	ops[0].watch.cookie = cpu_to_le64(dev->watch_event->cookie);
1122 	ops[0].watch.flag = 1;
1123 
1124 	ret = rbd_req_sync_op(dev, NULL,
1125 			      CEPH_NOSNAP,
1126 			      0,
1127 			      CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1128 			      ops,
1129 			      1, obj, 0, 0, NULL,
1130 			      &dev->watch_request, NULL);
1131 
1132 	if (ret < 0)
1133 		goto fail_event;
1134 
1135 	rbd_destroy_ops(ops);
1136 	return 0;
1137 
1138 fail_event:
1139 	ceph_osdc_cancel_event(dev->watch_event);
1140 	dev->watch_event = NULL;
1141 fail:
1142 	rbd_destroy_ops(ops);
1143 	return ret;
1144 }
1145 
1146 struct rbd_notify_info {
1147 	struct rbd_device *dev;
1148 };
1149 
1150 static void rbd_notify_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1151 {
1152 	struct rbd_device *dev = (struct rbd_device *)data;
1153 	if (!dev)
1154 		return;
1155 
1156 	dout("rbd_notify_cb %s notify_id=%lld opcode=%d\n", dev->obj_md_name,
1157 		notify_id, (int)opcode);
1158 }
1159 
1160 /*
1161  * Request sync osd notify
1162  */
1163 static int rbd_req_sync_notify(struct rbd_device *dev,
1164 		          const char *obj)
1165 {
1166 	struct ceph_osd_req_op *ops;
1167 	struct ceph_osd_client *osdc = &dev->client->osdc;
1168 	struct ceph_osd_event *event;
1169 	struct rbd_notify_info info;
1170 	int payload_len = sizeof(u32) + sizeof(u32);
1171 	int ret;
1172 
1173 	ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_NOTIFY, payload_len);
1174 	if (ret < 0)
1175 		return ret;
1176 
1177 	info.dev = dev;
1178 
1179 	ret = ceph_osdc_create_event(osdc, rbd_notify_cb, 1,
1180 				     (void *)&info, &event);
1181 	if (ret < 0)
1182 		goto fail;
1183 
1184 	ops[0].watch.ver = 1;
1185 	ops[0].watch.flag = 1;
1186 	ops[0].watch.cookie = event->cookie;
1187 	ops[0].watch.prot_ver = RADOS_NOTIFY_VER;
1188 	ops[0].watch.timeout = 12;
1189 
1190 	ret = rbd_req_sync_op(dev, NULL,
1191 			       CEPH_NOSNAP,
1192 			       0,
1193 			       CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1194 			       ops,
1195 			       1, obj, 0, 0, NULL, NULL, NULL);
1196 	if (ret < 0)
1197 		goto fail_event;
1198 
1199 	ret = ceph_osdc_wait_event(event, CEPH_OSD_TIMEOUT_DEFAULT);
1200 	dout("ceph_osdc_wait_event returned %d\n", ret);
1201 	rbd_destroy_ops(ops);
1202 	return 0;
1203 
1204 fail_event:
1205 	ceph_osdc_cancel_event(event);
1206 fail:
1207 	rbd_destroy_ops(ops);
1208 	return ret;
1209 }
1210 
1211 /*
1212  * Request sync osd rollback
1213  */
1214 static int rbd_req_sync_rollback_obj(struct rbd_device *dev,
1215 				     u64 snapid,
1216 				     const char *obj)
1217 {
1218 	struct ceph_osd_req_op *ops;
1219 	int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_ROLLBACK, 0);
1220 	if (ret < 0)
1221 		return ret;
1222 
1223 	ops[0].snap.snapid = snapid;
1224 
1225 	ret = rbd_req_sync_op(dev, NULL,
1226 			       CEPH_NOSNAP,
1227 			       0,
1228 			       CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1229 			       ops,
1230 			       1, obj, 0, 0, NULL, NULL, NULL);
1231 
1232 	rbd_destroy_ops(ops);
1233 
1234 	return ret;
1235 }
1236 
1237 /*
1238  * Request sync osd read
1239  */
1240 static int rbd_req_sync_exec(struct rbd_device *dev,
1241 			     const char *obj,
1242 			     const char *cls,
1243 			     const char *method,
1244 			     const char *data,
1245 			     int len,
1246 			     u64 *ver)
1247 {
1248 	struct ceph_osd_req_op *ops;
1249 	int cls_len = strlen(cls);
1250 	int method_len = strlen(method);
1251 	int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_CALL,
1252 				    cls_len + method_len + len);
1253 	if (ret < 0)
1254 		return ret;
1255 
1256 	ops[0].cls.class_name = cls;
1257 	ops[0].cls.class_len = (__u8)cls_len;
1258 	ops[0].cls.method_name = method;
1259 	ops[0].cls.method_len = (__u8)method_len;
1260 	ops[0].cls.argc = 0;
1261 	ops[0].cls.indata = data;
1262 	ops[0].cls.indata_len = len;
1263 
1264 	ret = rbd_req_sync_op(dev, NULL,
1265 			       CEPH_NOSNAP,
1266 			       0,
1267 			       CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1268 			       ops,
1269 			       1, obj, 0, 0, NULL, NULL, ver);
1270 
1271 	rbd_destroy_ops(ops);
1272 
1273 	dout("cls_exec returned %d\n", ret);
1274 	return ret;
1275 }
1276 
1277 /*
1278  * block device queue callback
1279  */
1280 static void rbd_rq_fn(struct request_queue *q)
1281 {
1282 	struct rbd_device *rbd_dev = q->queuedata;
1283 	struct request *rq;
1284 	struct bio_pair *bp = NULL;
1285 
1286 	rq = blk_fetch_request(q);
1287 
1288 	while (1) {
1289 		struct bio *bio;
1290 		struct bio *rq_bio, *next_bio = NULL;
1291 		bool do_write;
1292 		int size, op_size = 0;
1293 		u64 ofs;
1294 
1295 		/* peek at request from block layer */
1296 		if (!rq)
1297 			break;
1298 
1299 		dout("fetched request\n");
1300 
1301 		/* filter out block requests we don't understand */
1302 		if ((rq->cmd_type != REQ_TYPE_FS)) {
1303 			__blk_end_request_all(rq, 0);
1304 			goto next;
1305 		}
1306 
1307 		/* deduce our operation (read, write) */
1308 		do_write = (rq_data_dir(rq) == WRITE);
1309 
1310 		size = blk_rq_bytes(rq);
1311 		ofs = blk_rq_pos(rq) * 512ULL;
1312 		rq_bio = rq->bio;
1313 		if (do_write && rbd_dev->read_only) {
1314 			__blk_end_request_all(rq, -EROFS);
1315 			goto next;
1316 		}
1317 
1318 		spin_unlock_irq(q->queue_lock);
1319 
1320 		dout("%s 0x%x bytes at 0x%llx\n",
1321 		     do_write ? "write" : "read",
1322 		     size, blk_rq_pos(rq) * 512ULL);
1323 
1324 		do {
1325 			/* a bio clone to be passed down to OSD req */
1326 			dout("rq->bio->bi_vcnt=%d\n", rq->bio->bi_vcnt);
1327 			op_size = rbd_get_segment(&rbd_dev->header,
1328 						  rbd_dev->header.block_name,
1329 						  ofs, size,
1330 						  NULL, NULL);
1331 			bio = bio_chain_clone(&rq_bio, &next_bio, &bp,
1332 					      op_size, GFP_ATOMIC);
1333 			if (!bio) {
1334 				spin_lock_irq(q->queue_lock);
1335 				__blk_end_request_all(rq, -ENOMEM);
1336 				goto next;
1337 			}
1338 
1339 			/* init OSD command: write or read */
1340 			if (do_write)
1341 				rbd_req_write(rq, rbd_dev,
1342 					      rbd_dev->header.snapc,
1343 					      ofs,
1344 					      op_size, bio);
1345 			else
1346 				rbd_req_read(rq, rbd_dev,
1347 					     cur_snap_id(rbd_dev),
1348 					     ofs,
1349 					     op_size, bio);
1350 
1351 			size -= op_size;
1352 			ofs += op_size;
1353 
1354 			rq_bio = next_bio;
1355 		} while (size > 0);
1356 
1357 		if (bp)
1358 			bio_pair_release(bp);
1359 
1360 		spin_lock_irq(q->queue_lock);
1361 next:
1362 		rq = blk_fetch_request(q);
1363 	}
1364 }
1365 
1366 /*
1367  * a queue callback. Makes sure that we don't create a bio that spans across
1368  * multiple osd objects. One exception would be with a single page bios,
1369  * which we handle later at bio_chain_clone
1370  */
1371 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
1372 			  struct bio_vec *bvec)
1373 {
1374 	struct rbd_device *rbd_dev = q->queuedata;
1375 	unsigned int chunk_sectors = 1 << (rbd_dev->header.obj_order - 9);
1376 	sector_t sector = bmd->bi_sector + get_start_sect(bmd->bi_bdev);
1377 	unsigned int bio_sectors = bmd->bi_size >> 9;
1378 	int max;
1379 
1380 	max =  (chunk_sectors - ((sector & (chunk_sectors - 1))
1381 				 + bio_sectors)) << 9;
1382 	if (max < 0)
1383 		max = 0; /* bio_add cannot handle a negative return */
1384 	if (max <= bvec->bv_len && bio_sectors == 0)
1385 		return bvec->bv_len;
1386 	return max;
1387 }
1388 
1389 static void rbd_free_disk(struct rbd_device *rbd_dev)
1390 {
1391 	struct gendisk *disk = rbd_dev->disk;
1392 
1393 	if (!disk)
1394 		return;
1395 
1396 	rbd_header_free(&rbd_dev->header);
1397 
1398 	if (disk->flags & GENHD_FL_UP)
1399 		del_gendisk(disk);
1400 	if (disk->queue)
1401 		blk_cleanup_queue(disk->queue);
1402 	put_disk(disk);
1403 }
1404 
1405 /*
1406  * reload the ondisk the header
1407  */
1408 static int rbd_read_header(struct rbd_device *rbd_dev,
1409 			   struct rbd_image_header *header)
1410 {
1411 	ssize_t rc;
1412 	struct rbd_image_header_ondisk *dh;
1413 	int snap_count = 0;
1414 	u64 snap_names_len = 0;
1415 	u64 ver;
1416 
1417 	while (1) {
1418 		int len = sizeof(*dh) +
1419 			  snap_count * sizeof(struct rbd_image_snap_ondisk) +
1420 			  snap_names_len;
1421 
1422 		rc = -ENOMEM;
1423 		dh = kmalloc(len, GFP_KERNEL);
1424 		if (!dh)
1425 			return -ENOMEM;
1426 
1427 		rc = rbd_req_sync_read(rbd_dev,
1428 				       NULL, CEPH_NOSNAP,
1429 				       rbd_dev->obj_md_name,
1430 				       0, len,
1431 				       (char *)dh, &ver);
1432 		if (rc < 0)
1433 			goto out_dh;
1434 
1435 		rc = rbd_header_from_disk(header, dh, snap_count, GFP_KERNEL);
1436 		if (rc < 0)
1437 			goto out_dh;
1438 
1439 		if (snap_count != header->total_snaps) {
1440 			snap_count = header->total_snaps;
1441 			snap_names_len = header->snap_names_len;
1442 			rbd_header_free(header);
1443 			kfree(dh);
1444 			continue;
1445 		}
1446 		break;
1447 	}
1448 	header->obj_version = ver;
1449 
1450 out_dh:
1451 	kfree(dh);
1452 	return rc;
1453 }
1454 
1455 /*
1456  * create a snapshot
1457  */
1458 static int rbd_header_add_snap(struct rbd_device *dev,
1459 			       const char *snap_name,
1460 			       gfp_t gfp_flags)
1461 {
1462 	int name_len = strlen(snap_name);
1463 	u64 new_snapid;
1464 	int ret;
1465 	void *data, *data_start, *data_end;
1466 	u64 ver;
1467 
1468 	/* we should create a snapshot only if we're pointing at the head */
1469 	if (dev->cur_snap)
1470 		return -EINVAL;
1471 
1472 	ret = ceph_monc_create_snapid(&dev->client->monc, dev->poolid,
1473 				      &new_snapid);
1474 	dout("created snapid=%lld\n", new_snapid);
1475 	if (ret < 0)
1476 		return ret;
1477 
1478 	data = kmalloc(name_len + 16, gfp_flags);
1479 	if (!data)
1480 		return -ENOMEM;
1481 
1482 	data_start = data;
1483 	data_end = data + name_len + 16;
1484 
1485 	ceph_encode_string_safe(&data, data_end, snap_name, name_len, bad);
1486 	ceph_encode_64_safe(&data, data_end, new_snapid, bad);
1487 
1488 	ret = rbd_req_sync_exec(dev, dev->obj_md_name, "rbd", "snap_add",
1489 				data_start, data - data_start, &ver);
1490 
1491 	kfree(data_start);
1492 
1493 	if (ret < 0)
1494 		return ret;
1495 
1496 	dev->header.snapc->seq =  new_snapid;
1497 
1498 	return 0;
1499 bad:
1500 	return -ERANGE;
1501 }
1502 
1503 static void __rbd_remove_all_snaps(struct rbd_device *rbd_dev)
1504 {
1505 	struct rbd_snap *snap;
1506 
1507 	while (!list_empty(&rbd_dev->snaps)) {
1508 		snap = list_first_entry(&rbd_dev->snaps, struct rbd_snap, node);
1509 		__rbd_remove_snap_dev(rbd_dev, snap);
1510 	}
1511 }
1512 
1513 /*
1514  * only read the first part of the ondisk header, without the snaps info
1515  */
1516 static int __rbd_update_snaps(struct rbd_device *rbd_dev)
1517 {
1518 	int ret;
1519 	struct rbd_image_header h;
1520 	u64 snap_seq;
1521 	int follow_seq = 0;
1522 
1523 	ret = rbd_read_header(rbd_dev, &h);
1524 	if (ret < 0)
1525 		return ret;
1526 
1527 	down_write(&rbd_dev->header.snap_rwsem);
1528 
1529 	snap_seq = rbd_dev->header.snapc->seq;
1530 	if (rbd_dev->header.total_snaps &&
1531 	    rbd_dev->header.snapc->snaps[0] == snap_seq)
1532 		/* pointing at the head, will need to follow that
1533 		   if head moves */
1534 		follow_seq = 1;
1535 
1536 	kfree(rbd_dev->header.snapc);
1537 	kfree(rbd_dev->header.snap_names);
1538 	kfree(rbd_dev->header.snap_sizes);
1539 
1540 	rbd_dev->header.total_snaps = h.total_snaps;
1541 	rbd_dev->header.snapc = h.snapc;
1542 	rbd_dev->header.snap_names = h.snap_names;
1543 	rbd_dev->header.snap_names_len = h.snap_names_len;
1544 	rbd_dev->header.snap_sizes = h.snap_sizes;
1545 	if (follow_seq)
1546 		rbd_dev->header.snapc->seq = rbd_dev->header.snapc->snaps[0];
1547 	else
1548 		rbd_dev->header.snapc->seq = snap_seq;
1549 
1550 	ret = __rbd_init_snaps_header(rbd_dev);
1551 
1552 	up_write(&rbd_dev->header.snap_rwsem);
1553 
1554 	return ret;
1555 }
1556 
1557 static int rbd_init_disk(struct rbd_device *rbd_dev)
1558 {
1559 	struct gendisk *disk;
1560 	struct request_queue *q;
1561 	int rc;
1562 	u64 total_size = 0;
1563 
1564 	/* contact OSD, request size info about the object being mapped */
1565 	rc = rbd_read_header(rbd_dev, &rbd_dev->header);
1566 	if (rc)
1567 		return rc;
1568 
1569 	/* no need to lock here, as rbd_dev is not registered yet */
1570 	rc = __rbd_init_snaps_header(rbd_dev);
1571 	if (rc)
1572 		return rc;
1573 
1574 	rc = rbd_header_set_snap(rbd_dev, rbd_dev->snap_name, &total_size);
1575 	if (rc)
1576 		return rc;
1577 
1578 	/* create gendisk info */
1579 	rc = -ENOMEM;
1580 	disk = alloc_disk(RBD_MINORS_PER_MAJOR);
1581 	if (!disk)
1582 		goto out;
1583 
1584 	sprintf(disk->disk_name, DRV_NAME "%d", rbd_dev->id);
1585 	disk->major = rbd_dev->major;
1586 	disk->first_minor = 0;
1587 	disk->fops = &rbd_bd_ops;
1588 	disk->private_data = rbd_dev;
1589 
1590 	/* init rq */
1591 	rc = -ENOMEM;
1592 	q = blk_init_queue(rbd_rq_fn, &rbd_dev->lock);
1593 	if (!q)
1594 		goto out_disk;
1595 	blk_queue_merge_bvec(q, rbd_merge_bvec);
1596 	disk->queue = q;
1597 
1598 	q->queuedata = rbd_dev;
1599 
1600 	rbd_dev->disk = disk;
1601 	rbd_dev->q = q;
1602 
1603 	/* finally, announce the disk to the world */
1604 	set_capacity(disk, total_size / 512ULL);
1605 	add_disk(disk);
1606 
1607 	pr_info("%s: added with size 0x%llx\n",
1608 		disk->disk_name, (unsigned long long)total_size);
1609 	return 0;
1610 
1611 out_disk:
1612 	put_disk(disk);
1613 out:
1614 	return rc;
1615 }
1616 
1617 /*
1618   sysfs
1619 */
1620 
1621 static ssize_t rbd_size_show(struct device *dev,
1622 			     struct device_attribute *attr, char *buf)
1623 {
1624 	struct rbd_device *rbd_dev = dev_to_rbd(dev);
1625 
1626 	return sprintf(buf, "%llu\n", (unsigned long long)rbd_dev->header.image_size);
1627 }
1628 
1629 static ssize_t rbd_major_show(struct device *dev,
1630 			      struct device_attribute *attr, char *buf)
1631 {
1632 	struct rbd_device *rbd_dev = dev_to_rbd(dev);
1633 
1634 	return sprintf(buf, "%d\n", rbd_dev->major);
1635 }
1636 
1637 static ssize_t rbd_client_id_show(struct device *dev,
1638 				  struct device_attribute *attr, char *buf)
1639 {
1640 	struct rbd_device *rbd_dev = dev_to_rbd(dev);
1641 
1642 	return sprintf(buf, "client%lld\n", ceph_client_id(rbd_dev->client));
1643 }
1644 
1645 static ssize_t rbd_pool_show(struct device *dev,
1646 			     struct device_attribute *attr, char *buf)
1647 {
1648 	struct rbd_device *rbd_dev = dev_to_rbd(dev);
1649 
1650 	return sprintf(buf, "%s\n", rbd_dev->pool_name);
1651 }
1652 
1653 static ssize_t rbd_name_show(struct device *dev,
1654 			     struct device_attribute *attr, char *buf)
1655 {
1656 	struct rbd_device *rbd_dev = dev_to_rbd(dev);
1657 
1658 	return sprintf(buf, "%s\n", rbd_dev->obj);
1659 }
1660 
1661 static ssize_t rbd_snap_show(struct device *dev,
1662 			     struct device_attribute *attr,
1663 			     char *buf)
1664 {
1665 	struct rbd_device *rbd_dev = dev_to_rbd(dev);
1666 
1667 	return sprintf(buf, "%s\n", rbd_dev->snap_name);
1668 }
1669 
1670 static ssize_t rbd_image_refresh(struct device *dev,
1671 				 struct device_attribute *attr,
1672 				 const char *buf,
1673 				 size_t size)
1674 {
1675 	struct rbd_device *rbd_dev = dev_to_rbd(dev);
1676 	int rc;
1677 	int ret = size;
1678 
1679 	mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1680 
1681 	rc = __rbd_update_snaps(rbd_dev);
1682 	if (rc < 0)
1683 		ret = rc;
1684 
1685 	mutex_unlock(&ctl_mutex);
1686 	return ret;
1687 }
1688 
1689 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
1690 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
1691 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
1692 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
1693 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
1694 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
1695 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
1696 static DEVICE_ATTR(create_snap, S_IWUSR, NULL, rbd_snap_add);
1697 static DEVICE_ATTR(rollback_snap, S_IWUSR, NULL, rbd_snap_rollback);
1698 
1699 static struct attribute *rbd_attrs[] = {
1700 	&dev_attr_size.attr,
1701 	&dev_attr_major.attr,
1702 	&dev_attr_client_id.attr,
1703 	&dev_attr_pool.attr,
1704 	&dev_attr_name.attr,
1705 	&dev_attr_current_snap.attr,
1706 	&dev_attr_refresh.attr,
1707 	&dev_attr_create_snap.attr,
1708 	&dev_attr_rollback_snap.attr,
1709 	NULL
1710 };
1711 
1712 static struct attribute_group rbd_attr_group = {
1713 	.attrs = rbd_attrs,
1714 };
1715 
1716 static const struct attribute_group *rbd_attr_groups[] = {
1717 	&rbd_attr_group,
1718 	NULL
1719 };
1720 
1721 static void rbd_sysfs_dev_release(struct device *dev)
1722 {
1723 }
1724 
1725 static struct device_type rbd_device_type = {
1726 	.name		= "rbd",
1727 	.groups		= rbd_attr_groups,
1728 	.release	= rbd_sysfs_dev_release,
1729 };
1730 
1731 
1732 /*
1733   sysfs - snapshots
1734 */
1735 
1736 static ssize_t rbd_snap_size_show(struct device *dev,
1737 				  struct device_attribute *attr,
1738 				  char *buf)
1739 {
1740 	struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1741 
1742 	return sprintf(buf, "%lld\n", (long long)snap->size);
1743 }
1744 
1745 static ssize_t rbd_snap_id_show(struct device *dev,
1746 				struct device_attribute *attr,
1747 				char *buf)
1748 {
1749 	struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1750 
1751 	return sprintf(buf, "%lld\n", (long long)snap->id);
1752 }
1753 
1754 static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
1755 static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
1756 
1757 static struct attribute *rbd_snap_attrs[] = {
1758 	&dev_attr_snap_size.attr,
1759 	&dev_attr_snap_id.attr,
1760 	NULL,
1761 };
1762 
1763 static struct attribute_group rbd_snap_attr_group = {
1764 	.attrs = rbd_snap_attrs,
1765 };
1766 
1767 static void rbd_snap_dev_release(struct device *dev)
1768 {
1769 	struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1770 	kfree(snap->name);
1771 	kfree(snap);
1772 }
1773 
1774 static const struct attribute_group *rbd_snap_attr_groups[] = {
1775 	&rbd_snap_attr_group,
1776 	NULL
1777 };
1778 
1779 static struct device_type rbd_snap_device_type = {
1780 	.groups		= rbd_snap_attr_groups,
1781 	.release	= rbd_snap_dev_release,
1782 };
1783 
1784 static void __rbd_remove_snap_dev(struct rbd_device *rbd_dev,
1785 				  struct rbd_snap *snap)
1786 {
1787 	list_del(&snap->node);
1788 	device_unregister(&snap->dev);
1789 }
1790 
1791 static int rbd_register_snap_dev(struct rbd_device *rbd_dev,
1792 				  struct rbd_snap *snap,
1793 				  struct device *parent)
1794 {
1795 	struct device *dev = &snap->dev;
1796 	int ret;
1797 
1798 	dev->type = &rbd_snap_device_type;
1799 	dev->parent = parent;
1800 	dev->release = rbd_snap_dev_release;
1801 	dev_set_name(dev, "snap_%s", snap->name);
1802 	ret = device_register(dev);
1803 
1804 	return ret;
1805 }
1806 
1807 static int __rbd_add_snap_dev(struct rbd_device *rbd_dev,
1808 			      int i, const char *name,
1809 			      struct rbd_snap **snapp)
1810 {
1811 	int ret;
1812 	struct rbd_snap *snap = kzalloc(sizeof(*snap), GFP_KERNEL);
1813 	if (!snap)
1814 		return -ENOMEM;
1815 	snap->name = kstrdup(name, GFP_KERNEL);
1816 	snap->size = rbd_dev->header.snap_sizes[i];
1817 	snap->id = rbd_dev->header.snapc->snaps[i];
1818 	if (device_is_registered(&rbd_dev->dev)) {
1819 		ret = rbd_register_snap_dev(rbd_dev, snap,
1820 					     &rbd_dev->dev);
1821 		if (ret < 0)
1822 			goto err;
1823 	}
1824 	*snapp = snap;
1825 	return 0;
1826 err:
1827 	kfree(snap->name);
1828 	kfree(snap);
1829 	return ret;
1830 }
1831 
1832 /*
1833  * search for the previous snap in a null delimited string list
1834  */
1835 const char *rbd_prev_snap_name(const char *name, const char *start)
1836 {
1837 	if (name < start + 2)
1838 		return NULL;
1839 
1840 	name -= 2;
1841 	while (*name) {
1842 		if (name == start)
1843 			return start;
1844 		name--;
1845 	}
1846 	return name + 1;
1847 }
1848 
1849 /*
1850  * compare the old list of snapshots that we have to what's in the header
1851  * and update it accordingly. Note that the header holds the snapshots
1852  * in a reverse order (from newest to oldest) and we need to go from
1853  * older to new so that we don't get a duplicate snap name when
1854  * doing the process (e.g., removed snapshot and recreated a new
1855  * one with the same name.
1856  */
1857 static int __rbd_init_snaps_header(struct rbd_device *rbd_dev)
1858 {
1859 	const char *name, *first_name;
1860 	int i = rbd_dev->header.total_snaps;
1861 	struct rbd_snap *snap, *old_snap = NULL;
1862 	int ret;
1863 	struct list_head *p, *n;
1864 
1865 	first_name = rbd_dev->header.snap_names;
1866 	name = first_name + rbd_dev->header.snap_names_len;
1867 
1868 	list_for_each_prev_safe(p, n, &rbd_dev->snaps) {
1869 		u64 cur_id;
1870 
1871 		old_snap = list_entry(p, struct rbd_snap, node);
1872 
1873 		if (i)
1874 			cur_id = rbd_dev->header.snapc->snaps[i - 1];
1875 
1876 		if (!i || old_snap->id < cur_id) {
1877 			/* old_snap->id was skipped, thus was removed */
1878 			__rbd_remove_snap_dev(rbd_dev, old_snap);
1879 			continue;
1880 		}
1881 		if (old_snap->id == cur_id) {
1882 			/* we have this snapshot already */
1883 			i--;
1884 			name = rbd_prev_snap_name(name, first_name);
1885 			continue;
1886 		}
1887 		for (; i > 0;
1888 		     i--, name = rbd_prev_snap_name(name, first_name)) {
1889 			if (!name) {
1890 				WARN_ON(1);
1891 				return -EINVAL;
1892 			}
1893 			cur_id = rbd_dev->header.snapc->snaps[i];
1894 			/* snapshot removal? handle it above */
1895 			if (cur_id >= old_snap->id)
1896 				break;
1897 			/* a new snapshot */
1898 			ret = __rbd_add_snap_dev(rbd_dev, i - 1, name, &snap);
1899 			if (ret < 0)
1900 				return ret;
1901 
1902 			/* note that we add it backward so using n and not p */
1903 			list_add(&snap->node, n);
1904 			p = &snap->node;
1905 		}
1906 	}
1907 	/* we're done going over the old snap list, just add what's left */
1908 	for (; i > 0; i--) {
1909 		name = rbd_prev_snap_name(name, first_name);
1910 		if (!name) {
1911 			WARN_ON(1);
1912 			return -EINVAL;
1913 		}
1914 		ret = __rbd_add_snap_dev(rbd_dev, i - 1, name, &snap);
1915 		if (ret < 0)
1916 			return ret;
1917 		list_add(&snap->node, &rbd_dev->snaps);
1918 	}
1919 
1920 	return 0;
1921 }
1922 
1923 
1924 static void rbd_root_dev_release(struct device *dev)
1925 {
1926 }
1927 
1928 static struct device rbd_root_dev = {
1929 	.init_name =    "rbd",
1930 	.release =      rbd_root_dev_release,
1931 };
1932 
1933 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
1934 {
1935 	int ret = -ENOMEM;
1936 	struct device *dev;
1937 	struct rbd_snap *snap;
1938 
1939 	mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1940 	dev = &rbd_dev->dev;
1941 
1942 	dev->bus = &rbd_bus_type;
1943 	dev->type = &rbd_device_type;
1944 	dev->parent = &rbd_root_dev;
1945 	dev->release = rbd_dev_release;
1946 	dev_set_name(dev, "%d", rbd_dev->id);
1947 	ret = device_register(dev);
1948 	if (ret < 0)
1949 		goto done_free;
1950 
1951 	list_for_each_entry(snap, &rbd_dev->snaps, node) {
1952 		ret = rbd_register_snap_dev(rbd_dev, snap,
1953 					     &rbd_dev->dev);
1954 		if (ret < 0)
1955 			break;
1956 	}
1957 
1958 	mutex_unlock(&ctl_mutex);
1959 	return 0;
1960 done_free:
1961 	mutex_unlock(&ctl_mutex);
1962 	return ret;
1963 }
1964 
1965 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
1966 {
1967 	device_unregister(&rbd_dev->dev);
1968 }
1969 
1970 static int rbd_init_watch_dev(struct rbd_device *rbd_dev)
1971 {
1972 	int ret, rc;
1973 
1974 	do {
1975 		ret = rbd_req_sync_watch(rbd_dev, rbd_dev->obj_md_name,
1976 					 rbd_dev->header.obj_version);
1977 		if (ret == -ERANGE) {
1978 			mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1979 			rc = __rbd_update_snaps(rbd_dev);
1980 			mutex_unlock(&ctl_mutex);
1981 			if (rc < 0)
1982 				return rc;
1983 		}
1984 	} while (ret == -ERANGE);
1985 
1986 	return ret;
1987 }
1988 
1989 static ssize_t rbd_add(struct bus_type *bus,
1990 		       const char *buf,
1991 		       size_t count)
1992 {
1993 	struct ceph_osd_client *osdc;
1994 	struct rbd_device *rbd_dev;
1995 	ssize_t rc = -ENOMEM;
1996 	int irc, new_id = 0;
1997 	struct list_head *tmp;
1998 	char *mon_dev_name;
1999 	char *options;
2000 
2001 	if (!try_module_get(THIS_MODULE))
2002 		return -ENODEV;
2003 
2004 	mon_dev_name = kmalloc(RBD_MAX_OPT_LEN, GFP_KERNEL);
2005 	if (!mon_dev_name)
2006 		goto err_out_mod;
2007 
2008 	options = kmalloc(RBD_MAX_OPT_LEN, GFP_KERNEL);
2009 	if (!options)
2010 		goto err_mon_dev;
2011 
2012 	/* new rbd_device object */
2013 	rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
2014 	if (!rbd_dev)
2015 		goto err_out_opt;
2016 
2017 	/* static rbd_device initialization */
2018 	spin_lock_init(&rbd_dev->lock);
2019 	INIT_LIST_HEAD(&rbd_dev->node);
2020 	INIT_LIST_HEAD(&rbd_dev->snaps);
2021 
2022 	/* generate unique id: find highest unique id, add one */
2023 	mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2024 
2025 	list_for_each(tmp, &rbd_dev_list) {
2026 		struct rbd_device *rbd_dev;
2027 
2028 		rbd_dev = list_entry(tmp, struct rbd_device, node);
2029 		if (rbd_dev->id >= new_id)
2030 			new_id = rbd_dev->id + 1;
2031 	}
2032 
2033 	rbd_dev->id = new_id;
2034 
2035 	/* add to global list */
2036 	list_add_tail(&rbd_dev->node, &rbd_dev_list);
2037 
2038 	/* parse add command */
2039 	if (sscanf(buf, "%" __stringify(RBD_MAX_OPT_LEN) "s "
2040 		   "%" __stringify(RBD_MAX_OPT_LEN) "s "
2041 		   "%" __stringify(RBD_MAX_POOL_NAME_LEN) "s "
2042 		   "%" __stringify(RBD_MAX_OBJ_NAME_LEN) "s"
2043 		   "%" __stringify(RBD_MAX_SNAP_NAME_LEN) "s",
2044 		   mon_dev_name, options, rbd_dev->pool_name,
2045 		   rbd_dev->obj, rbd_dev->snap_name) < 4) {
2046 		rc = -EINVAL;
2047 		goto err_out_slot;
2048 	}
2049 
2050 	if (rbd_dev->snap_name[0] == 0)
2051 		rbd_dev->snap_name[0] = '-';
2052 
2053 	rbd_dev->obj_len = strlen(rbd_dev->obj);
2054 	snprintf(rbd_dev->obj_md_name, sizeof(rbd_dev->obj_md_name), "%s%s",
2055 		 rbd_dev->obj, RBD_SUFFIX);
2056 
2057 	/* initialize rest of new object */
2058 	snprintf(rbd_dev->name, DEV_NAME_LEN, DRV_NAME "%d", rbd_dev->id);
2059 	rc = rbd_get_client(rbd_dev, mon_dev_name, options);
2060 	if (rc < 0)
2061 		goto err_out_slot;
2062 
2063 	mutex_unlock(&ctl_mutex);
2064 
2065 	/* pick the pool */
2066 	osdc = &rbd_dev->client->osdc;
2067 	rc = ceph_pg_poolid_by_name(osdc->osdmap, rbd_dev->pool_name);
2068 	if (rc < 0)
2069 		goto err_out_client;
2070 	rbd_dev->poolid = rc;
2071 
2072 	/* register our block device */
2073 	irc = register_blkdev(0, rbd_dev->name);
2074 	if (irc < 0) {
2075 		rc = irc;
2076 		goto err_out_client;
2077 	}
2078 	rbd_dev->major = irc;
2079 
2080 	rc = rbd_bus_add_dev(rbd_dev);
2081 	if (rc)
2082 		goto err_out_blkdev;
2083 
2084 	/* set up and announce blkdev mapping */
2085 	rc = rbd_init_disk(rbd_dev);
2086 	if (rc)
2087 		goto err_out_bus;
2088 
2089 	rc = rbd_init_watch_dev(rbd_dev);
2090 	if (rc)
2091 		goto err_out_bus;
2092 
2093 	return count;
2094 
2095 err_out_bus:
2096 	mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2097 	list_del_init(&rbd_dev->node);
2098 	mutex_unlock(&ctl_mutex);
2099 
2100 	/* this will also clean up rest of rbd_dev stuff */
2101 
2102 	rbd_bus_del_dev(rbd_dev);
2103 	kfree(options);
2104 	kfree(mon_dev_name);
2105 	return rc;
2106 
2107 err_out_blkdev:
2108 	unregister_blkdev(rbd_dev->major, rbd_dev->name);
2109 err_out_client:
2110 	rbd_put_client(rbd_dev);
2111 	mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2112 err_out_slot:
2113 	list_del_init(&rbd_dev->node);
2114 	mutex_unlock(&ctl_mutex);
2115 
2116 	kfree(rbd_dev);
2117 err_out_opt:
2118 	kfree(options);
2119 err_mon_dev:
2120 	kfree(mon_dev_name);
2121 err_out_mod:
2122 	dout("Error adding device %s\n", buf);
2123 	module_put(THIS_MODULE);
2124 	return rc;
2125 }
2126 
2127 static struct rbd_device *__rbd_get_dev(unsigned long id)
2128 {
2129 	struct list_head *tmp;
2130 	struct rbd_device *rbd_dev;
2131 
2132 	list_for_each(tmp, &rbd_dev_list) {
2133 		rbd_dev = list_entry(tmp, struct rbd_device, node);
2134 		if (rbd_dev->id == id)
2135 			return rbd_dev;
2136 	}
2137 	return NULL;
2138 }
2139 
2140 static void rbd_dev_release(struct device *dev)
2141 {
2142 	struct rbd_device *rbd_dev =
2143 			container_of(dev, struct rbd_device, dev);
2144 
2145 	if (rbd_dev->watch_request)
2146 		ceph_osdc_unregister_linger_request(&rbd_dev->client->osdc,
2147 						    rbd_dev->watch_request);
2148 	if (rbd_dev->watch_event)
2149 		ceph_osdc_cancel_event(rbd_dev->watch_event);
2150 
2151 	rbd_put_client(rbd_dev);
2152 
2153 	/* clean up and free blkdev */
2154 	rbd_free_disk(rbd_dev);
2155 	unregister_blkdev(rbd_dev->major, rbd_dev->name);
2156 	kfree(rbd_dev);
2157 
2158 	/* release module ref */
2159 	module_put(THIS_MODULE);
2160 }
2161 
2162 static ssize_t rbd_remove(struct bus_type *bus,
2163 			  const char *buf,
2164 			  size_t count)
2165 {
2166 	struct rbd_device *rbd_dev = NULL;
2167 	int target_id, rc;
2168 	unsigned long ul;
2169 	int ret = count;
2170 
2171 	rc = strict_strtoul(buf, 10, &ul);
2172 	if (rc)
2173 		return rc;
2174 
2175 	/* convert to int; abort if we lost anything in the conversion */
2176 	target_id = (int) ul;
2177 	if (target_id != ul)
2178 		return -EINVAL;
2179 
2180 	mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2181 
2182 	rbd_dev = __rbd_get_dev(target_id);
2183 	if (!rbd_dev) {
2184 		ret = -ENOENT;
2185 		goto done;
2186 	}
2187 
2188 	list_del_init(&rbd_dev->node);
2189 
2190 	__rbd_remove_all_snaps(rbd_dev);
2191 	rbd_bus_del_dev(rbd_dev);
2192 
2193 done:
2194 	mutex_unlock(&ctl_mutex);
2195 	return ret;
2196 }
2197 
2198 static ssize_t rbd_snap_add(struct device *dev,
2199 			    struct device_attribute *attr,
2200 			    const char *buf,
2201 			    size_t count)
2202 {
2203 	struct rbd_device *rbd_dev = dev_to_rbd(dev);
2204 	int ret;
2205 	char *name = kmalloc(count + 1, GFP_KERNEL);
2206 	if (!name)
2207 		return -ENOMEM;
2208 
2209 	snprintf(name, count, "%s", buf);
2210 
2211 	mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2212 
2213 	ret = rbd_header_add_snap(rbd_dev,
2214 				  name, GFP_KERNEL);
2215 	if (ret < 0)
2216 		goto err_unlock;
2217 
2218 	ret = __rbd_update_snaps(rbd_dev);
2219 	if (ret < 0)
2220 		goto err_unlock;
2221 
2222 	/* shouldn't hold ctl_mutex when notifying.. notify might
2223 	   trigger a watch callback that would need to get that mutex */
2224 	mutex_unlock(&ctl_mutex);
2225 
2226 	/* make a best effort, don't error if failed */
2227 	rbd_req_sync_notify(rbd_dev, rbd_dev->obj_md_name);
2228 
2229 	ret = count;
2230 	kfree(name);
2231 	return ret;
2232 
2233 err_unlock:
2234 	mutex_unlock(&ctl_mutex);
2235 	kfree(name);
2236 	return ret;
2237 }
2238 
2239 static ssize_t rbd_snap_rollback(struct device *dev,
2240 				 struct device_attribute *attr,
2241 				 const char *buf,
2242 				 size_t count)
2243 {
2244 	struct rbd_device *rbd_dev = dev_to_rbd(dev);
2245 	int ret;
2246 	u64 snapid;
2247 	u64 cur_ofs;
2248 	char *seg_name = NULL;
2249 	char *snap_name = kmalloc(count + 1, GFP_KERNEL);
2250 	ret = -ENOMEM;
2251 	if (!snap_name)
2252 		return ret;
2253 
2254 	/* parse snaps add command */
2255 	snprintf(snap_name, count, "%s", buf);
2256 	seg_name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO);
2257 	if (!seg_name)
2258 		goto done;
2259 
2260 	mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2261 
2262 	ret = snap_by_name(&rbd_dev->header, snap_name, &snapid, NULL);
2263 	if (ret < 0)
2264 		goto done_unlock;
2265 
2266 	dout("snapid=%lld\n", snapid);
2267 
2268 	cur_ofs = 0;
2269 	while (cur_ofs < rbd_dev->header.image_size) {
2270 		cur_ofs += rbd_get_segment(&rbd_dev->header,
2271 					   rbd_dev->obj,
2272 					   cur_ofs, (u64)-1,
2273 					   seg_name, NULL);
2274 		dout("seg_name=%s\n", seg_name);
2275 
2276 		ret = rbd_req_sync_rollback_obj(rbd_dev, snapid, seg_name);
2277 		if (ret < 0)
2278 			pr_warning("could not roll back obj %s err=%d\n",
2279 				   seg_name, ret);
2280 	}
2281 
2282 	ret = __rbd_update_snaps(rbd_dev);
2283 	if (ret < 0)
2284 		goto done_unlock;
2285 
2286 	ret = count;
2287 
2288 done_unlock:
2289 	mutex_unlock(&ctl_mutex);
2290 done:
2291 	kfree(seg_name);
2292 	kfree(snap_name);
2293 
2294 	return ret;
2295 }
2296 
2297 static struct bus_attribute rbd_bus_attrs[] = {
2298 	__ATTR(add, S_IWUSR, NULL, rbd_add),
2299 	__ATTR(remove, S_IWUSR, NULL, rbd_remove),
2300 	__ATTR_NULL
2301 };
2302 
2303 /*
2304  * create control files in sysfs
2305  * /sys/bus/rbd/...
2306  */
2307 static int rbd_sysfs_init(void)
2308 {
2309 	int ret;
2310 
2311 	rbd_bus_type.bus_attrs = rbd_bus_attrs;
2312 
2313 	ret = bus_register(&rbd_bus_type);
2314 	 if (ret < 0)
2315 		return ret;
2316 
2317 	ret = device_register(&rbd_root_dev);
2318 
2319 	return ret;
2320 }
2321 
2322 static void rbd_sysfs_cleanup(void)
2323 {
2324 	device_unregister(&rbd_root_dev);
2325 	bus_unregister(&rbd_bus_type);
2326 }
2327 
2328 int __init rbd_init(void)
2329 {
2330 	int rc;
2331 
2332 	rc = rbd_sysfs_init();
2333 	if (rc)
2334 		return rc;
2335 	spin_lock_init(&node_lock);
2336 	pr_info("loaded " DRV_NAME_LONG "\n");
2337 	return 0;
2338 }
2339 
2340 void __exit rbd_exit(void)
2341 {
2342 	rbd_sysfs_cleanup();
2343 }
2344 
2345 module_init(rbd_init);
2346 module_exit(rbd_exit);
2347 
2348 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
2349 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
2350 MODULE_DESCRIPTION("rados block device");
2351 
2352 /* following authorship retained from original osdblk.c */
2353 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
2354 
2355 MODULE_LICENSE("GPL");
2356