xref: /openbmc/linux/drivers/block/rbd.c (revision 28f65c11)
1 /*
2    rbd.c -- Export ceph rados objects as a Linux block device
3 
4 
5    based on drivers/block/osdblk.c:
6 
7    Copyright 2009 Red Hat, Inc.
8 
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation.
12 
13    This program is distributed in the hope that it will be useful,
14    but WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16    GNU General Public License for more details.
17 
18    You should have received a copy of the GNU General Public License
19    along with this program; see the file COPYING.  If not, write to
20    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
21 
22 
23 
24    For usage instructions, please refer to:
25 
26                  Documentation/ABI/testing/sysfs-bus-rbd
27 
28  */
29 
30 #include <linux/ceph/libceph.h>
31 #include <linux/ceph/osd_client.h>
32 #include <linux/ceph/mon_client.h>
33 #include <linux/ceph/decode.h>
34 #include <linux/parser.h>
35 
36 #include <linux/kernel.h>
37 #include <linux/device.h>
38 #include <linux/module.h>
39 #include <linux/fs.h>
40 #include <linux/blkdev.h>
41 
42 #include "rbd_types.h"
43 
44 #define DRV_NAME "rbd"
45 #define DRV_NAME_LONG "rbd (rados block device)"
46 
47 #define RBD_MINORS_PER_MAJOR	256		/* max minors per blkdev */
48 
49 #define RBD_MAX_MD_NAME_LEN	(96 + sizeof(RBD_SUFFIX))
50 #define RBD_MAX_POOL_NAME_LEN	64
51 #define RBD_MAX_SNAP_NAME_LEN	32
52 #define RBD_MAX_OPT_LEN		1024
53 
54 #define RBD_SNAP_HEAD_NAME	"-"
55 
56 #define DEV_NAME_LEN		32
57 
58 #define RBD_NOTIFY_TIMEOUT_DEFAULT 10
59 
60 /*
61  * block device image metadata (in-memory version)
62  */
63 struct rbd_image_header {
64 	u64 image_size;
65 	char block_name[32];
66 	__u8 obj_order;
67 	__u8 crypt_type;
68 	__u8 comp_type;
69 	struct rw_semaphore snap_rwsem;
70 	struct ceph_snap_context *snapc;
71 	size_t snap_names_len;
72 	u64 snap_seq;
73 	u32 total_snaps;
74 
75 	char *snap_names;
76 	u64 *snap_sizes;
77 
78 	u64 obj_version;
79 };
80 
81 struct rbd_options {
82 	int	notify_timeout;
83 };
84 
85 /*
86  * an instance of the client.  multiple devices may share a client.
87  */
88 struct rbd_client {
89 	struct ceph_client	*client;
90 	struct rbd_options	*rbd_opts;
91 	struct kref		kref;
92 	struct list_head	node;
93 };
94 
95 struct rbd_req_coll;
96 
97 /*
98  * a single io request
99  */
100 struct rbd_request {
101 	struct request		*rq;		/* blk layer request */
102 	struct bio		*bio;		/* cloned bio */
103 	struct page		**pages;	/* list of used pages */
104 	u64			len;
105 	int			coll_index;
106 	struct rbd_req_coll	*coll;
107 };
108 
109 struct rbd_req_status {
110 	int done;
111 	int rc;
112 	u64 bytes;
113 };
114 
115 /*
116  * a collection of requests
117  */
118 struct rbd_req_coll {
119 	int			total;
120 	int			num_done;
121 	struct kref		kref;
122 	struct rbd_req_status	status[0];
123 };
124 
125 struct rbd_snap {
126 	struct	device		dev;
127 	const char		*name;
128 	size_t			size;
129 	struct list_head	node;
130 	u64			id;
131 };
132 
133 /*
134  * a single device
135  */
136 struct rbd_device {
137 	int			id;		/* blkdev unique id */
138 
139 	int			major;		/* blkdev assigned major */
140 	struct gendisk		*disk;		/* blkdev's gendisk and rq */
141 	struct request_queue	*q;
142 
143 	struct ceph_client	*client;
144 	struct rbd_client	*rbd_client;
145 
146 	char			name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
147 
148 	spinlock_t		lock;		/* queue lock */
149 
150 	struct rbd_image_header	header;
151 	char			obj[RBD_MAX_OBJ_NAME_LEN]; /* rbd image name */
152 	int			obj_len;
153 	char			obj_md_name[RBD_MAX_MD_NAME_LEN]; /* hdr nm. */
154 	char			pool_name[RBD_MAX_POOL_NAME_LEN];
155 	int			poolid;
156 
157 	struct ceph_osd_event   *watch_event;
158 	struct ceph_osd_request *watch_request;
159 
160 	char                    snap_name[RBD_MAX_SNAP_NAME_LEN];
161 	u32 cur_snap;	/* index+1 of current snapshot within snap context
162 			   0 - for the head */
163 	int read_only;
164 
165 	struct list_head	node;
166 
167 	/* list of snapshots */
168 	struct list_head	snaps;
169 
170 	/* sysfs related */
171 	struct device		dev;
172 };
173 
174 static struct bus_type rbd_bus_type = {
175 	.name		= "rbd",
176 };
177 
178 static spinlock_t node_lock;      /* protects client get/put */
179 
180 static DEFINE_MUTEX(ctl_mutex);	  /* Serialize open/close/setup/teardown */
181 static LIST_HEAD(rbd_dev_list);    /* devices */
182 static LIST_HEAD(rbd_client_list);      /* clients */
183 
184 static int __rbd_init_snaps_header(struct rbd_device *rbd_dev);
185 static void rbd_dev_release(struct device *dev);
186 static ssize_t rbd_snap_rollback(struct device *dev,
187 				 struct device_attribute *attr,
188 				 const char *buf,
189 				 size_t size);
190 static ssize_t rbd_snap_add(struct device *dev,
191 			    struct device_attribute *attr,
192 			    const char *buf,
193 			    size_t count);
194 static void __rbd_remove_snap_dev(struct rbd_device *rbd_dev,
195 				  struct rbd_snap *snap);;
196 
197 
198 static struct rbd_device *dev_to_rbd(struct device *dev)
199 {
200 	return container_of(dev, struct rbd_device, dev);
201 }
202 
203 static struct device *rbd_get_dev(struct rbd_device *rbd_dev)
204 {
205 	return get_device(&rbd_dev->dev);
206 }
207 
208 static void rbd_put_dev(struct rbd_device *rbd_dev)
209 {
210 	put_device(&rbd_dev->dev);
211 }
212 
213 static int __rbd_update_snaps(struct rbd_device *rbd_dev);
214 
215 static int rbd_open(struct block_device *bdev, fmode_t mode)
216 {
217 	struct gendisk *disk = bdev->bd_disk;
218 	struct rbd_device *rbd_dev = disk->private_data;
219 
220 	rbd_get_dev(rbd_dev);
221 
222 	set_device_ro(bdev, rbd_dev->read_only);
223 
224 	if ((mode & FMODE_WRITE) && rbd_dev->read_only)
225 		return -EROFS;
226 
227 	return 0;
228 }
229 
230 static int rbd_release(struct gendisk *disk, fmode_t mode)
231 {
232 	struct rbd_device *rbd_dev = disk->private_data;
233 
234 	rbd_put_dev(rbd_dev);
235 
236 	return 0;
237 }
238 
239 static const struct block_device_operations rbd_bd_ops = {
240 	.owner			= THIS_MODULE,
241 	.open			= rbd_open,
242 	.release		= rbd_release,
243 };
244 
245 /*
246  * Initialize an rbd client instance.
247  * We own *opt.
248  */
249 static struct rbd_client *rbd_client_create(struct ceph_options *opt,
250 					    struct rbd_options *rbd_opts)
251 {
252 	struct rbd_client *rbdc;
253 	int ret = -ENOMEM;
254 
255 	dout("rbd_client_create\n");
256 	rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
257 	if (!rbdc)
258 		goto out_opt;
259 
260 	kref_init(&rbdc->kref);
261 	INIT_LIST_HEAD(&rbdc->node);
262 
263 	rbdc->client = ceph_create_client(opt, rbdc);
264 	if (IS_ERR(rbdc->client))
265 		goto out_rbdc;
266 	opt = NULL; /* Now rbdc->client is responsible for opt */
267 
268 	ret = ceph_open_session(rbdc->client);
269 	if (ret < 0)
270 		goto out_err;
271 
272 	rbdc->rbd_opts = rbd_opts;
273 
274 	spin_lock(&node_lock);
275 	list_add_tail(&rbdc->node, &rbd_client_list);
276 	spin_unlock(&node_lock);
277 
278 	dout("rbd_client_create created %p\n", rbdc);
279 	return rbdc;
280 
281 out_err:
282 	ceph_destroy_client(rbdc->client);
283 out_rbdc:
284 	kfree(rbdc);
285 out_opt:
286 	if (opt)
287 		ceph_destroy_options(opt);
288 	return ERR_PTR(ret);
289 }
290 
291 /*
292  * Find a ceph client with specific addr and configuration.
293  */
294 static struct rbd_client *__rbd_client_find(struct ceph_options *opt)
295 {
296 	struct rbd_client *client_node;
297 
298 	if (opt->flags & CEPH_OPT_NOSHARE)
299 		return NULL;
300 
301 	list_for_each_entry(client_node, &rbd_client_list, node)
302 		if (ceph_compare_options(opt, client_node->client) == 0)
303 			return client_node;
304 	return NULL;
305 }
306 
307 /*
308  * mount options
309  */
310 enum {
311 	Opt_notify_timeout,
312 	Opt_last_int,
313 	/* int args above */
314 	Opt_last_string,
315 	/* string args above */
316 };
317 
318 static match_table_t rbdopt_tokens = {
319 	{Opt_notify_timeout, "notify_timeout=%d"},
320 	/* int args above */
321 	/* string args above */
322 	{-1, NULL}
323 };
324 
325 static int parse_rbd_opts_token(char *c, void *private)
326 {
327 	struct rbd_options *rbdopt = private;
328 	substring_t argstr[MAX_OPT_ARGS];
329 	int token, intval, ret;
330 
331 	token = match_token((char *)c, rbdopt_tokens, argstr);
332 	if (token < 0)
333 		return -EINVAL;
334 
335 	if (token < Opt_last_int) {
336 		ret = match_int(&argstr[0], &intval);
337 		if (ret < 0) {
338 			pr_err("bad mount option arg (not int) "
339 			       "at '%s'\n", c);
340 			return ret;
341 		}
342 		dout("got int token %d val %d\n", token, intval);
343 	} else if (token > Opt_last_int && token < Opt_last_string) {
344 		dout("got string token %d val %s\n", token,
345 		     argstr[0].from);
346 	} else {
347 		dout("got token %d\n", token);
348 	}
349 
350 	switch (token) {
351 	case Opt_notify_timeout:
352 		rbdopt->notify_timeout = intval;
353 		break;
354 	default:
355 		BUG_ON(token);
356 	}
357 	return 0;
358 }
359 
360 /*
361  * Get a ceph client with specific addr and configuration, if one does
362  * not exist create it.
363  */
364 static int rbd_get_client(struct rbd_device *rbd_dev, const char *mon_addr,
365 			  char *options)
366 {
367 	struct rbd_client *rbdc;
368 	struct ceph_options *opt;
369 	int ret;
370 	struct rbd_options *rbd_opts;
371 
372 	rbd_opts = kzalloc(sizeof(*rbd_opts), GFP_KERNEL);
373 	if (!rbd_opts)
374 		return -ENOMEM;
375 
376 	rbd_opts->notify_timeout = RBD_NOTIFY_TIMEOUT_DEFAULT;
377 
378 	ret = ceph_parse_options(&opt, options, mon_addr,
379 				 mon_addr + strlen(mon_addr), parse_rbd_opts_token, rbd_opts);
380 	if (ret < 0)
381 		goto done_err;
382 
383 	spin_lock(&node_lock);
384 	rbdc = __rbd_client_find(opt);
385 	if (rbdc) {
386 		ceph_destroy_options(opt);
387 
388 		/* using an existing client */
389 		kref_get(&rbdc->kref);
390 		rbd_dev->rbd_client = rbdc;
391 		rbd_dev->client = rbdc->client;
392 		spin_unlock(&node_lock);
393 		return 0;
394 	}
395 	spin_unlock(&node_lock);
396 
397 	rbdc = rbd_client_create(opt, rbd_opts);
398 	if (IS_ERR(rbdc)) {
399 		ret = PTR_ERR(rbdc);
400 		goto done_err;
401 	}
402 
403 	rbd_dev->rbd_client = rbdc;
404 	rbd_dev->client = rbdc->client;
405 	return 0;
406 done_err:
407 	kfree(rbd_opts);
408 	return ret;
409 }
410 
411 /*
412  * Destroy ceph client
413  */
414 static void rbd_client_release(struct kref *kref)
415 {
416 	struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
417 
418 	dout("rbd_release_client %p\n", rbdc);
419 	spin_lock(&node_lock);
420 	list_del(&rbdc->node);
421 	spin_unlock(&node_lock);
422 
423 	ceph_destroy_client(rbdc->client);
424 	kfree(rbdc->rbd_opts);
425 	kfree(rbdc);
426 }
427 
428 /*
429  * Drop reference to ceph client node. If it's not referenced anymore, release
430  * it.
431  */
432 static void rbd_put_client(struct rbd_device *rbd_dev)
433 {
434 	kref_put(&rbd_dev->rbd_client->kref, rbd_client_release);
435 	rbd_dev->rbd_client = NULL;
436 	rbd_dev->client = NULL;
437 }
438 
439 /*
440  * Destroy requests collection
441  */
442 static void rbd_coll_release(struct kref *kref)
443 {
444 	struct rbd_req_coll *coll =
445 		container_of(kref, struct rbd_req_coll, kref);
446 
447 	dout("rbd_coll_release %p\n", coll);
448 	kfree(coll);
449 }
450 
451 /*
452  * Create a new header structure, translate header format from the on-disk
453  * header.
454  */
455 static int rbd_header_from_disk(struct rbd_image_header *header,
456 				 struct rbd_image_header_ondisk *ondisk,
457 				 int allocated_snaps,
458 				 gfp_t gfp_flags)
459 {
460 	int i;
461 	u32 snap_count = le32_to_cpu(ondisk->snap_count);
462 	int ret = -ENOMEM;
463 
464 	init_rwsem(&header->snap_rwsem);
465 	header->snap_names_len = le64_to_cpu(ondisk->snap_names_len);
466 	header->snapc = kmalloc(sizeof(struct ceph_snap_context) +
467 				snap_count *
468 				 sizeof(struct rbd_image_snap_ondisk),
469 				gfp_flags);
470 	if (!header->snapc)
471 		return -ENOMEM;
472 	if (snap_count) {
473 		header->snap_names = kmalloc(header->snap_names_len,
474 					     GFP_KERNEL);
475 		if (!header->snap_names)
476 			goto err_snapc;
477 		header->snap_sizes = kmalloc(snap_count * sizeof(u64),
478 					     GFP_KERNEL);
479 		if (!header->snap_sizes)
480 			goto err_names;
481 	} else {
482 		header->snap_names = NULL;
483 		header->snap_sizes = NULL;
484 	}
485 	memcpy(header->block_name, ondisk->block_name,
486 	       sizeof(ondisk->block_name));
487 
488 	header->image_size = le64_to_cpu(ondisk->image_size);
489 	header->obj_order = ondisk->options.order;
490 	header->crypt_type = ondisk->options.crypt_type;
491 	header->comp_type = ondisk->options.comp_type;
492 
493 	atomic_set(&header->snapc->nref, 1);
494 	header->snap_seq = le64_to_cpu(ondisk->snap_seq);
495 	header->snapc->num_snaps = snap_count;
496 	header->total_snaps = snap_count;
497 
498 	if (snap_count &&
499 	    allocated_snaps == snap_count) {
500 		for (i = 0; i < snap_count; i++) {
501 			header->snapc->snaps[i] =
502 				le64_to_cpu(ondisk->snaps[i].id);
503 			header->snap_sizes[i] =
504 				le64_to_cpu(ondisk->snaps[i].image_size);
505 		}
506 
507 		/* copy snapshot names */
508 		memcpy(header->snap_names, &ondisk->snaps[i],
509 			header->snap_names_len);
510 	}
511 
512 	return 0;
513 
514 err_names:
515 	kfree(header->snap_names);
516 err_snapc:
517 	kfree(header->snapc);
518 	return ret;
519 }
520 
521 static int snap_index(struct rbd_image_header *header, int snap_num)
522 {
523 	return header->total_snaps - snap_num;
524 }
525 
526 static u64 cur_snap_id(struct rbd_device *rbd_dev)
527 {
528 	struct rbd_image_header *header = &rbd_dev->header;
529 
530 	if (!rbd_dev->cur_snap)
531 		return 0;
532 
533 	return header->snapc->snaps[snap_index(header, rbd_dev->cur_snap)];
534 }
535 
536 static int snap_by_name(struct rbd_image_header *header, const char *snap_name,
537 			u64 *seq, u64 *size)
538 {
539 	int i;
540 	char *p = header->snap_names;
541 
542 	for (i = 0; i < header->total_snaps; i++, p += strlen(p) + 1) {
543 		if (strcmp(snap_name, p) == 0)
544 			break;
545 	}
546 	if (i == header->total_snaps)
547 		return -ENOENT;
548 	if (seq)
549 		*seq = header->snapc->snaps[i];
550 
551 	if (size)
552 		*size = header->snap_sizes[i];
553 
554 	return i;
555 }
556 
557 static int rbd_header_set_snap(struct rbd_device *dev,
558 			       const char *snap_name,
559 			       u64 *size)
560 {
561 	struct rbd_image_header *header = &dev->header;
562 	struct ceph_snap_context *snapc = header->snapc;
563 	int ret = -ENOENT;
564 
565 	down_write(&header->snap_rwsem);
566 
567 	if (!snap_name ||
568 	    !*snap_name ||
569 	    strcmp(snap_name, "-") == 0 ||
570 	    strcmp(snap_name, RBD_SNAP_HEAD_NAME) == 0) {
571 		if (header->total_snaps)
572 			snapc->seq = header->snap_seq;
573 		else
574 			snapc->seq = 0;
575 		dev->cur_snap = 0;
576 		dev->read_only = 0;
577 		if (size)
578 			*size = header->image_size;
579 	} else {
580 		ret = snap_by_name(header, snap_name, &snapc->seq, size);
581 		if (ret < 0)
582 			goto done;
583 
584 		dev->cur_snap = header->total_snaps - ret;
585 		dev->read_only = 1;
586 	}
587 
588 	ret = 0;
589 done:
590 	up_write(&header->snap_rwsem);
591 	return ret;
592 }
593 
594 static void rbd_header_free(struct rbd_image_header *header)
595 {
596 	kfree(header->snapc);
597 	kfree(header->snap_names);
598 	kfree(header->snap_sizes);
599 }
600 
601 /*
602  * get the actual striped segment name, offset and length
603  */
604 static u64 rbd_get_segment(struct rbd_image_header *header,
605 			   const char *block_name,
606 			   u64 ofs, u64 len,
607 			   char *seg_name, u64 *segofs)
608 {
609 	u64 seg = ofs >> header->obj_order;
610 
611 	if (seg_name)
612 		snprintf(seg_name, RBD_MAX_SEG_NAME_LEN,
613 			 "%s.%012llx", block_name, seg);
614 
615 	ofs = ofs & ((1 << header->obj_order) - 1);
616 	len = min_t(u64, len, (1 << header->obj_order) - ofs);
617 
618 	if (segofs)
619 		*segofs = ofs;
620 
621 	return len;
622 }
623 
624 static int rbd_get_num_segments(struct rbd_image_header *header,
625 				u64 ofs, u64 len)
626 {
627 	u64 start_seg = ofs >> header->obj_order;
628 	u64 end_seg = (ofs + len - 1) >> header->obj_order;
629 	return end_seg - start_seg + 1;
630 }
631 
632 /*
633  * bio helpers
634  */
635 
636 static void bio_chain_put(struct bio *chain)
637 {
638 	struct bio *tmp;
639 
640 	while (chain) {
641 		tmp = chain;
642 		chain = chain->bi_next;
643 		bio_put(tmp);
644 	}
645 }
646 
647 /*
648  * zeros a bio chain, starting at specific offset
649  */
650 static void zero_bio_chain(struct bio *chain, int start_ofs)
651 {
652 	struct bio_vec *bv;
653 	unsigned long flags;
654 	void *buf;
655 	int i;
656 	int pos = 0;
657 
658 	while (chain) {
659 		bio_for_each_segment(bv, chain, i) {
660 			if (pos + bv->bv_len > start_ofs) {
661 				int remainder = max(start_ofs - pos, 0);
662 				buf = bvec_kmap_irq(bv, &flags);
663 				memset(buf + remainder, 0,
664 				       bv->bv_len - remainder);
665 				bvec_kunmap_irq(buf, &flags);
666 			}
667 			pos += bv->bv_len;
668 		}
669 
670 		chain = chain->bi_next;
671 	}
672 }
673 
674 /*
675  * bio_chain_clone - clone a chain of bios up to a certain length.
676  * might return a bio_pair that will need to be released.
677  */
678 static struct bio *bio_chain_clone(struct bio **old, struct bio **next,
679 				   struct bio_pair **bp,
680 				   int len, gfp_t gfpmask)
681 {
682 	struct bio *tmp, *old_chain = *old, *new_chain = NULL, *tail = NULL;
683 	int total = 0;
684 
685 	if (*bp) {
686 		bio_pair_release(*bp);
687 		*bp = NULL;
688 	}
689 
690 	while (old_chain && (total < len)) {
691 		tmp = bio_kmalloc(gfpmask, old_chain->bi_max_vecs);
692 		if (!tmp)
693 			goto err_out;
694 
695 		if (total + old_chain->bi_size > len) {
696 			struct bio_pair *bp;
697 
698 			/*
699 			 * this split can only happen with a single paged bio,
700 			 * split_bio will BUG_ON if this is not the case
701 			 */
702 			dout("bio_chain_clone split! total=%d remaining=%d"
703 			     "bi_size=%d\n",
704 			     (int)total, (int)len-total,
705 			     (int)old_chain->bi_size);
706 
707 			/* split the bio. We'll release it either in the next
708 			   call, or it will have to be released outside */
709 			bp = bio_split(old_chain, (len - total) / 512ULL);
710 			if (!bp)
711 				goto err_out;
712 
713 			__bio_clone(tmp, &bp->bio1);
714 
715 			*next = &bp->bio2;
716 		} else {
717 			__bio_clone(tmp, old_chain);
718 			*next = old_chain->bi_next;
719 		}
720 
721 		tmp->bi_bdev = NULL;
722 		gfpmask &= ~__GFP_WAIT;
723 		tmp->bi_next = NULL;
724 
725 		if (!new_chain) {
726 			new_chain = tail = tmp;
727 		} else {
728 			tail->bi_next = tmp;
729 			tail = tmp;
730 		}
731 		old_chain = old_chain->bi_next;
732 
733 		total += tmp->bi_size;
734 	}
735 
736 	BUG_ON(total < len);
737 
738 	if (tail)
739 		tail->bi_next = NULL;
740 
741 	*old = old_chain;
742 
743 	return new_chain;
744 
745 err_out:
746 	dout("bio_chain_clone with err\n");
747 	bio_chain_put(new_chain);
748 	return NULL;
749 }
750 
751 /*
752  * helpers for osd request op vectors.
753  */
754 static int rbd_create_rw_ops(struct ceph_osd_req_op **ops,
755 			    int num_ops,
756 			    int opcode,
757 			    u32 payload_len)
758 {
759 	*ops = kzalloc(sizeof(struct ceph_osd_req_op) * (num_ops + 1),
760 		       GFP_NOIO);
761 	if (!*ops)
762 		return -ENOMEM;
763 	(*ops)[0].op = opcode;
764 	/*
765 	 * op extent offset and length will be set later on
766 	 * in calc_raw_layout()
767 	 */
768 	(*ops)[0].payload_len = payload_len;
769 	return 0;
770 }
771 
772 static void rbd_destroy_ops(struct ceph_osd_req_op *ops)
773 {
774 	kfree(ops);
775 }
776 
777 static void rbd_coll_end_req_index(struct request *rq,
778 				   struct rbd_req_coll *coll,
779 				   int index,
780 				   int ret, u64 len)
781 {
782 	struct request_queue *q;
783 	int min, max, i;
784 
785 	dout("rbd_coll_end_req_index %p index %d ret %d len %lld\n",
786 	     coll, index, ret, len);
787 
788 	if (!rq)
789 		return;
790 
791 	if (!coll) {
792 		blk_end_request(rq, ret, len);
793 		return;
794 	}
795 
796 	q = rq->q;
797 
798 	spin_lock_irq(q->queue_lock);
799 	coll->status[index].done = 1;
800 	coll->status[index].rc = ret;
801 	coll->status[index].bytes = len;
802 	max = min = coll->num_done;
803 	while (max < coll->total && coll->status[max].done)
804 		max++;
805 
806 	for (i = min; i<max; i++) {
807 		__blk_end_request(rq, coll->status[i].rc,
808 				  coll->status[i].bytes);
809 		coll->num_done++;
810 		kref_put(&coll->kref, rbd_coll_release);
811 	}
812 	spin_unlock_irq(q->queue_lock);
813 }
814 
815 static void rbd_coll_end_req(struct rbd_request *req,
816 			     int ret, u64 len)
817 {
818 	rbd_coll_end_req_index(req->rq, req->coll, req->coll_index, ret, len);
819 }
820 
821 /*
822  * Send ceph osd request
823  */
824 static int rbd_do_request(struct request *rq,
825 			  struct rbd_device *dev,
826 			  struct ceph_snap_context *snapc,
827 			  u64 snapid,
828 			  const char *obj, u64 ofs, u64 len,
829 			  struct bio *bio,
830 			  struct page **pages,
831 			  int num_pages,
832 			  int flags,
833 			  struct ceph_osd_req_op *ops,
834 			  int num_reply,
835 			  struct rbd_req_coll *coll,
836 			  int coll_index,
837 			  void (*rbd_cb)(struct ceph_osd_request *req,
838 					 struct ceph_msg *msg),
839 			  struct ceph_osd_request **linger_req,
840 			  u64 *ver)
841 {
842 	struct ceph_osd_request *req;
843 	struct ceph_file_layout *layout;
844 	int ret;
845 	u64 bno;
846 	struct timespec mtime = CURRENT_TIME;
847 	struct rbd_request *req_data;
848 	struct ceph_osd_request_head *reqhead;
849 	struct rbd_image_header *header = &dev->header;
850 
851 	req_data = kzalloc(sizeof(*req_data), GFP_NOIO);
852 	if (!req_data) {
853 		if (coll)
854 			rbd_coll_end_req_index(rq, coll, coll_index,
855 					       -ENOMEM, len);
856 		return -ENOMEM;
857 	}
858 
859 	if (coll) {
860 		req_data->coll = coll;
861 		req_data->coll_index = coll_index;
862 	}
863 
864 	dout("rbd_do_request obj=%s ofs=%lld len=%lld\n", obj, len, ofs);
865 
866 	down_read(&header->snap_rwsem);
867 
868 	req = ceph_osdc_alloc_request(&dev->client->osdc, flags,
869 				      snapc,
870 				      ops,
871 				      false,
872 				      GFP_NOIO, pages, bio);
873 	if (!req) {
874 		up_read(&header->snap_rwsem);
875 		ret = -ENOMEM;
876 		goto done_pages;
877 	}
878 
879 	req->r_callback = rbd_cb;
880 
881 	req_data->rq = rq;
882 	req_data->bio = bio;
883 	req_data->pages = pages;
884 	req_data->len = len;
885 
886 	req->r_priv = req_data;
887 
888 	reqhead = req->r_request->front.iov_base;
889 	reqhead->snapid = cpu_to_le64(CEPH_NOSNAP);
890 
891 	strncpy(req->r_oid, obj, sizeof(req->r_oid));
892 	req->r_oid_len = strlen(req->r_oid);
893 
894 	layout = &req->r_file_layout;
895 	memset(layout, 0, sizeof(*layout));
896 	layout->fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
897 	layout->fl_stripe_count = cpu_to_le32(1);
898 	layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
899 	layout->fl_pg_preferred = cpu_to_le32(-1);
900 	layout->fl_pg_pool = cpu_to_le32(dev->poolid);
901 	ceph_calc_raw_layout(&dev->client->osdc, layout, snapid,
902 			     ofs, &len, &bno, req, ops);
903 
904 	ceph_osdc_build_request(req, ofs, &len,
905 				ops,
906 				snapc,
907 				&mtime,
908 				req->r_oid, req->r_oid_len);
909 	up_read(&header->snap_rwsem);
910 
911 	if (linger_req) {
912 		ceph_osdc_set_request_linger(&dev->client->osdc, req);
913 		*linger_req = req;
914 	}
915 
916 	ret = ceph_osdc_start_request(&dev->client->osdc, req, false);
917 	if (ret < 0)
918 		goto done_err;
919 
920 	if (!rbd_cb) {
921 		ret = ceph_osdc_wait_request(&dev->client->osdc, req);
922 		if (ver)
923 			*ver = le64_to_cpu(req->r_reassert_version.version);
924 		dout("reassert_ver=%lld\n",
925 		     le64_to_cpu(req->r_reassert_version.version));
926 		ceph_osdc_put_request(req);
927 	}
928 	return ret;
929 
930 done_err:
931 	bio_chain_put(req_data->bio);
932 	ceph_osdc_put_request(req);
933 done_pages:
934 	rbd_coll_end_req(req_data, ret, len);
935 	kfree(req_data);
936 	return ret;
937 }
938 
939 /*
940  * Ceph osd op callback
941  */
942 static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
943 {
944 	struct rbd_request *req_data = req->r_priv;
945 	struct ceph_osd_reply_head *replyhead;
946 	struct ceph_osd_op *op;
947 	__s32 rc;
948 	u64 bytes;
949 	int read_op;
950 
951 	/* parse reply */
952 	replyhead = msg->front.iov_base;
953 	WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
954 	op = (void *)(replyhead + 1);
955 	rc = le32_to_cpu(replyhead->result);
956 	bytes = le64_to_cpu(op->extent.length);
957 	read_op = (le32_to_cpu(op->op) == CEPH_OSD_OP_READ);
958 
959 	dout("rbd_req_cb bytes=%lld readop=%d rc=%d\n", bytes, read_op, rc);
960 
961 	if (rc == -ENOENT && read_op) {
962 		zero_bio_chain(req_data->bio, 0);
963 		rc = 0;
964 	} else if (rc == 0 && read_op && bytes < req_data->len) {
965 		zero_bio_chain(req_data->bio, bytes);
966 		bytes = req_data->len;
967 	}
968 
969 	rbd_coll_end_req(req_data, rc, bytes);
970 
971 	if (req_data->bio)
972 		bio_chain_put(req_data->bio);
973 
974 	ceph_osdc_put_request(req);
975 	kfree(req_data);
976 }
977 
978 static void rbd_simple_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
979 {
980 	ceph_osdc_put_request(req);
981 }
982 
983 /*
984  * Do a synchronous ceph osd operation
985  */
986 static int rbd_req_sync_op(struct rbd_device *dev,
987 			   struct ceph_snap_context *snapc,
988 			   u64 snapid,
989 			   int opcode,
990 			   int flags,
991 			   struct ceph_osd_req_op *orig_ops,
992 			   int num_reply,
993 			   const char *obj,
994 			   u64 ofs, u64 len,
995 			   char *buf,
996 			   struct ceph_osd_request **linger_req,
997 			   u64 *ver)
998 {
999 	int ret;
1000 	struct page **pages;
1001 	int num_pages;
1002 	struct ceph_osd_req_op *ops = orig_ops;
1003 	u32 payload_len;
1004 
1005 	num_pages = calc_pages_for(ofs , len);
1006 	pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
1007 	if (IS_ERR(pages))
1008 		return PTR_ERR(pages);
1009 
1010 	if (!orig_ops) {
1011 		payload_len = (flags & CEPH_OSD_FLAG_WRITE ? len : 0);
1012 		ret = rbd_create_rw_ops(&ops, 1, opcode, payload_len);
1013 		if (ret < 0)
1014 			goto done;
1015 
1016 		if ((flags & CEPH_OSD_FLAG_WRITE) && buf) {
1017 			ret = ceph_copy_to_page_vector(pages, buf, ofs, len);
1018 			if (ret < 0)
1019 				goto done_ops;
1020 		}
1021 	}
1022 
1023 	ret = rbd_do_request(NULL, dev, snapc, snapid,
1024 			  obj, ofs, len, NULL,
1025 			  pages, num_pages,
1026 			  flags,
1027 			  ops,
1028 			  2,
1029 			  NULL, 0,
1030 			  NULL,
1031 			  linger_req, ver);
1032 	if (ret < 0)
1033 		goto done_ops;
1034 
1035 	if ((flags & CEPH_OSD_FLAG_READ) && buf)
1036 		ret = ceph_copy_from_page_vector(pages, buf, ofs, ret);
1037 
1038 done_ops:
1039 	if (!orig_ops)
1040 		rbd_destroy_ops(ops);
1041 done:
1042 	ceph_release_page_vector(pages, num_pages);
1043 	return ret;
1044 }
1045 
1046 /*
1047  * Do an asynchronous ceph osd operation
1048  */
1049 static int rbd_do_op(struct request *rq,
1050 		     struct rbd_device *rbd_dev ,
1051 		     struct ceph_snap_context *snapc,
1052 		     u64 snapid,
1053 		     int opcode, int flags, int num_reply,
1054 		     u64 ofs, u64 len,
1055 		     struct bio *bio,
1056 		     struct rbd_req_coll *coll,
1057 		     int coll_index)
1058 {
1059 	char *seg_name;
1060 	u64 seg_ofs;
1061 	u64 seg_len;
1062 	int ret;
1063 	struct ceph_osd_req_op *ops;
1064 	u32 payload_len;
1065 
1066 	seg_name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO);
1067 	if (!seg_name)
1068 		return -ENOMEM;
1069 
1070 	seg_len = rbd_get_segment(&rbd_dev->header,
1071 				  rbd_dev->header.block_name,
1072 				  ofs, len,
1073 				  seg_name, &seg_ofs);
1074 
1075 	payload_len = (flags & CEPH_OSD_FLAG_WRITE ? seg_len : 0);
1076 
1077 	ret = rbd_create_rw_ops(&ops, 1, opcode, payload_len);
1078 	if (ret < 0)
1079 		goto done;
1080 
1081 	/* we've taken care of segment sizes earlier when we
1082 	   cloned the bios. We should never have a segment
1083 	   truncated at this point */
1084 	BUG_ON(seg_len < len);
1085 
1086 	ret = rbd_do_request(rq, rbd_dev, snapc, snapid,
1087 			     seg_name, seg_ofs, seg_len,
1088 			     bio,
1089 			     NULL, 0,
1090 			     flags,
1091 			     ops,
1092 			     num_reply,
1093 			     coll, coll_index,
1094 			     rbd_req_cb, 0, NULL);
1095 
1096 	rbd_destroy_ops(ops);
1097 done:
1098 	kfree(seg_name);
1099 	return ret;
1100 }
1101 
1102 /*
1103  * Request async osd write
1104  */
1105 static int rbd_req_write(struct request *rq,
1106 			 struct rbd_device *rbd_dev,
1107 			 struct ceph_snap_context *snapc,
1108 			 u64 ofs, u64 len,
1109 			 struct bio *bio,
1110 			 struct rbd_req_coll *coll,
1111 			 int coll_index)
1112 {
1113 	return rbd_do_op(rq, rbd_dev, snapc, CEPH_NOSNAP,
1114 			 CEPH_OSD_OP_WRITE,
1115 			 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1116 			 2,
1117 			 ofs, len, bio, coll, coll_index);
1118 }
1119 
1120 /*
1121  * Request async osd read
1122  */
1123 static int rbd_req_read(struct request *rq,
1124 			 struct rbd_device *rbd_dev,
1125 			 u64 snapid,
1126 			 u64 ofs, u64 len,
1127 			 struct bio *bio,
1128 			 struct rbd_req_coll *coll,
1129 			 int coll_index)
1130 {
1131 	return rbd_do_op(rq, rbd_dev, NULL,
1132 			 (snapid ? snapid : CEPH_NOSNAP),
1133 			 CEPH_OSD_OP_READ,
1134 			 CEPH_OSD_FLAG_READ,
1135 			 2,
1136 			 ofs, len, bio, coll, coll_index);
1137 }
1138 
1139 /*
1140  * Request sync osd read
1141  */
1142 static int rbd_req_sync_read(struct rbd_device *dev,
1143 			  struct ceph_snap_context *snapc,
1144 			  u64 snapid,
1145 			  const char *obj,
1146 			  u64 ofs, u64 len,
1147 			  char *buf,
1148 			  u64 *ver)
1149 {
1150 	return rbd_req_sync_op(dev, NULL,
1151 			       (snapid ? snapid : CEPH_NOSNAP),
1152 			       CEPH_OSD_OP_READ,
1153 			       CEPH_OSD_FLAG_READ,
1154 			       NULL,
1155 			       1, obj, ofs, len, buf, NULL, ver);
1156 }
1157 
1158 /*
1159  * Request sync osd watch
1160  */
1161 static int rbd_req_sync_notify_ack(struct rbd_device *dev,
1162 				   u64 ver,
1163 				   u64 notify_id,
1164 				   const char *obj)
1165 {
1166 	struct ceph_osd_req_op *ops;
1167 	struct page **pages = NULL;
1168 	int ret;
1169 
1170 	ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_NOTIFY_ACK, 0);
1171 	if (ret < 0)
1172 		return ret;
1173 
1174 	ops[0].watch.ver = cpu_to_le64(dev->header.obj_version);
1175 	ops[0].watch.cookie = notify_id;
1176 	ops[0].watch.flag = 0;
1177 
1178 	ret = rbd_do_request(NULL, dev, NULL, CEPH_NOSNAP,
1179 			  obj, 0, 0, NULL,
1180 			  pages, 0,
1181 			  CEPH_OSD_FLAG_READ,
1182 			  ops,
1183 			  1,
1184 			  NULL, 0,
1185 			  rbd_simple_req_cb, 0, NULL);
1186 
1187 	rbd_destroy_ops(ops);
1188 	return ret;
1189 }
1190 
1191 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1192 {
1193 	struct rbd_device *dev = (struct rbd_device *)data;
1194 	int rc;
1195 
1196 	if (!dev)
1197 		return;
1198 
1199 	dout("rbd_watch_cb %s notify_id=%lld opcode=%d\n", dev->obj_md_name,
1200 		notify_id, (int)opcode);
1201 	mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1202 	rc = __rbd_update_snaps(dev);
1203 	mutex_unlock(&ctl_mutex);
1204 	if (rc)
1205 		pr_warning(DRV_NAME "%d got notification but failed to update"
1206 			   " snaps: %d\n", dev->major, rc);
1207 
1208 	rbd_req_sync_notify_ack(dev, ver, notify_id, dev->obj_md_name);
1209 }
1210 
1211 /*
1212  * Request sync osd watch
1213  */
1214 static int rbd_req_sync_watch(struct rbd_device *dev,
1215 			      const char *obj,
1216 			      u64 ver)
1217 {
1218 	struct ceph_osd_req_op *ops;
1219 	struct ceph_osd_client *osdc = &dev->client->osdc;
1220 
1221 	int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_WATCH, 0);
1222 	if (ret < 0)
1223 		return ret;
1224 
1225 	ret = ceph_osdc_create_event(osdc, rbd_watch_cb, 0,
1226 				     (void *)dev, &dev->watch_event);
1227 	if (ret < 0)
1228 		goto fail;
1229 
1230 	ops[0].watch.ver = cpu_to_le64(ver);
1231 	ops[0].watch.cookie = cpu_to_le64(dev->watch_event->cookie);
1232 	ops[0].watch.flag = 1;
1233 
1234 	ret = rbd_req_sync_op(dev, NULL,
1235 			      CEPH_NOSNAP,
1236 			      0,
1237 			      CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1238 			      ops,
1239 			      1, obj, 0, 0, NULL,
1240 			      &dev->watch_request, NULL);
1241 
1242 	if (ret < 0)
1243 		goto fail_event;
1244 
1245 	rbd_destroy_ops(ops);
1246 	return 0;
1247 
1248 fail_event:
1249 	ceph_osdc_cancel_event(dev->watch_event);
1250 	dev->watch_event = NULL;
1251 fail:
1252 	rbd_destroy_ops(ops);
1253 	return ret;
1254 }
1255 
1256 struct rbd_notify_info {
1257 	struct rbd_device *dev;
1258 };
1259 
1260 static void rbd_notify_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1261 {
1262 	struct rbd_device *dev = (struct rbd_device *)data;
1263 	if (!dev)
1264 		return;
1265 
1266 	dout("rbd_notify_cb %s notify_id=%lld opcode=%d\n", dev->obj_md_name,
1267 		notify_id, (int)opcode);
1268 }
1269 
1270 /*
1271  * Request sync osd notify
1272  */
1273 static int rbd_req_sync_notify(struct rbd_device *dev,
1274 		          const char *obj)
1275 {
1276 	struct ceph_osd_req_op *ops;
1277 	struct ceph_osd_client *osdc = &dev->client->osdc;
1278 	struct ceph_osd_event *event;
1279 	struct rbd_notify_info info;
1280 	int payload_len = sizeof(u32) + sizeof(u32);
1281 	int ret;
1282 
1283 	ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_NOTIFY, payload_len);
1284 	if (ret < 0)
1285 		return ret;
1286 
1287 	info.dev = dev;
1288 
1289 	ret = ceph_osdc_create_event(osdc, rbd_notify_cb, 1,
1290 				     (void *)&info, &event);
1291 	if (ret < 0)
1292 		goto fail;
1293 
1294 	ops[0].watch.ver = 1;
1295 	ops[0].watch.flag = 1;
1296 	ops[0].watch.cookie = event->cookie;
1297 	ops[0].watch.prot_ver = RADOS_NOTIFY_VER;
1298 	ops[0].watch.timeout = 12;
1299 
1300 	ret = rbd_req_sync_op(dev, NULL,
1301 			       CEPH_NOSNAP,
1302 			       0,
1303 			       CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1304 			       ops,
1305 			       1, obj, 0, 0, NULL, NULL, NULL);
1306 	if (ret < 0)
1307 		goto fail_event;
1308 
1309 	ret = ceph_osdc_wait_event(event, CEPH_OSD_TIMEOUT_DEFAULT);
1310 	dout("ceph_osdc_wait_event returned %d\n", ret);
1311 	rbd_destroy_ops(ops);
1312 	return 0;
1313 
1314 fail_event:
1315 	ceph_osdc_cancel_event(event);
1316 fail:
1317 	rbd_destroy_ops(ops);
1318 	return ret;
1319 }
1320 
1321 /*
1322  * Request sync osd rollback
1323  */
1324 static int rbd_req_sync_rollback_obj(struct rbd_device *dev,
1325 				     u64 snapid,
1326 				     const char *obj)
1327 {
1328 	struct ceph_osd_req_op *ops;
1329 	int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_ROLLBACK, 0);
1330 	if (ret < 0)
1331 		return ret;
1332 
1333 	ops[0].snap.snapid = snapid;
1334 
1335 	ret = rbd_req_sync_op(dev, NULL,
1336 			       CEPH_NOSNAP,
1337 			       0,
1338 			       CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1339 			       ops,
1340 			       1, obj, 0, 0, NULL, NULL, NULL);
1341 
1342 	rbd_destroy_ops(ops);
1343 
1344 	return ret;
1345 }
1346 
1347 /*
1348  * Request sync osd read
1349  */
1350 static int rbd_req_sync_exec(struct rbd_device *dev,
1351 			     const char *obj,
1352 			     const char *cls,
1353 			     const char *method,
1354 			     const char *data,
1355 			     int len,
1356 			     u64 *ver)
1357 {
1358 	struct ceph_osd_req_op *ops;
1359 	int cls_len = strlen(cls);
1360 	int method_len = strlen(method);
1361 	int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_CALL,
1362 				    cls_len + method_len + len);
1363 	if (ret < 0)
1364 		return ret;
1365 
1366 	ops[0].cls.class_name = cls;
1367 	ops[0].cls.class_len = (__u8)cls_len;
1368 	ops[0].cls.method_name = method;
1369 	ops[0].cls.method_len = (__u8)method_len;
1370 	ops[0].cls.argc = 0;
1371 	ops[0].cls.indata = data;
1372 	ops[0].cls.indata_len = len;
1373 
1374 	ret = rbd_req_sync_op(dev, NULL,
1375 			       CEPH_NOSNAP,
1376 			       0,
1377 			       CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1378 			       ops,
1379 			       1, obj, 0, 0, NULL, NULL, ver);
1380 
1381 	rbd_destroy_ops(ops);
1382 
1383 	dout("cls_exec returned %d\n", ret);
1384 	return ret;
1385 }
1386 
1387 static struct rbd_req_coll *rbd_alloc_coll(int num_reqs)
1388 {
1389 	struct rbd_req_coll *coll =
1390 			kzalloc(sizeof(struct rbd_req_coll) +
1391 			        sizeof(struct rbd_req_status) * num_reqs,
1392 				GFP_ATOMIC);
1393 
1394 	if (!coll)
1395 		return NULL;
1396 	coll->total = num_reqs;
1397 	kref_init(&coll->kref);
1398 	return coll;
1399 }
1400 
1401 /*
1402  * block device queue callback
1403  */
1404 static void rbd_rq_fn(struct request_queue *q)
1405 {
1406 	struct rbd_device *rbd_dev = q->queuedata;
1407 	struct request *rq;
1408 	struct bio_pair *bp = NULL;
1409 
1410 	rq = blk_fetch_request(q);
1411 
1412 	while (1) {
1413 		struct bio *bio;
1414 		struct bio *rq_bio, *next_bio = NULL;
1415 		bool do_write;
1416 		int size, op_size = 0;
1417 		u64 ofs;
1418 		int num_segs, cur_seg = 0;
1419 		struct rbd_req_coll *coll;
1420 
1421 		/* peek at request from block layer */
1422 		if (!rq)
1423 			break;
1424 
1425 		dout("fetched request\n");
1426 
1427 		/* filter out block requests we don't understand */
1428 		if ((rq->cmd_type != REQ_TYPE_FS)) {
1429 			__blk_end_request_all(rq, 0);
1430 			goto next;
1431 		}
1432 
1433 		/* deduce our operation (read, write) */
1434 		do_write = (rq_data_dir(rq) == WRITE);
1435 
1436 		size = blk_rq_bytes(rq);
1437 		ofs = blk_rq_pos(rq) * 512ULL;
1438 		rq_bio = rq->bio;
1439 		if (do_write && rbd_dev->read_only) {
1440 			__blk_end_request_all(rq, -EROFS);
1441 			goto next;
1442 		}
1443 
1444 		spin_unlock_irq(q->queue_lock);
1445 
1446 		dout("%s 0x%x bytes at 0x%llx\n",
1447 		     do_write ? "write" : "read",
1448 		     size, blk_rq_pos(rq) * 512ULL);
1449 
1450 		num_segs = rbd_get_num_segments(&rbd_dev->header, ofs, size);
1451 		coll = rbd_alloc_coll(num_segs);
1452 		if (!coll) {
1453 			spin_lock_irq(q->queue_lock);
1454 			__blk_end_request_all(rq, -ENOMEM);
1455 			goto next;
1456 		}
1457 
1458 		do {
1459 			/* a bio clone to be passed down to OSD req */
1460 			dout("rq->bio->bi_vcnt=%d\n", rq->bio->bi_vcnt);
1461 			op_size = rbd_get_segment(&rbd_dev->header,
1462 						  rbd_dev->header.block_name,
1463 						  ofs, size,
1464 						  NULL, NULL);
1465 			kref_get(&coll->kref);
1466 			bio = bio_chain_clone(&rq_bio, &next_bio, &bp,
1467 					      op_size, GFP_ATOMIC);
1468 			if (!bio) {
1469 				rbd_coll_end_req_index(rq, coll, cur_seg,
1470 						       -ENOMEM, op_size);
1471 				goto next_seg;
1472 			}
1473 
1474 
1475 			/* init OSD command: write or read */
1476 			if (do_write)
1477 				rbd_req_write(rq, rbd_dev,
1478 					      rbd_dev->header.snapc,
1479 					      ofs,
1480 					      op_size, bio,
1481 					      coll, cur_seg);
1482 			else
1483 				rbd_req_read(rq, rbd_dev,
1484 					     cur_snap_id(rbd_dev),
1485 					     ofs,
1486 					     op_size, bio,
1487 					     coll, cur_seg);
1488 
1489 next_seg:
1490 			size -= op_size;
1491 			ofs += op_size;
1492 
1493 			cur_seg++;
1494 			rq_bio = next_bio;
1495 		} while (size > 0);
1496 		kref_put(&coll->kref, rbd_coll_release);
1497 
1498 		if (bp)
1499 			bio_pair_release(bp);
1500 		spin_lock_irq(q->queue_lock);
1501 next:
1502 		rq = blk_fetch_request(q);
1503 	}
1504 }
1505 
1506 /*
1507  * a queue callback. Makes sure that we don't create a bio that spans across
1508  * multiple osd objects. One exception would be with a single page bios,
1509  * which we handle later at bio_chain_clone
1510  */
1511 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
1512 			  struct bio_vec *bvec)
1513 {
1514 	struct rbd_device *rbd_dev = q->queuedata;
1515 	unsigned int chunk_sectors = 1 << (rbd_dev->header.obj_order - 9);
1516 	sector_t sector = bmd->bi_sector + get_start_sect(bmd->bi_bdev);
1517 	unsigned int bio_sectors = bmd->bi_size >> 9;
1518 	int max;
1519 
1520 	max =  (chunk_sectors - ((sector & (chunk_sectors - 1))
1521 				 + bio_sectors)) << 9;
1522 	if (max < 0)
1523 		max = 0; /* bio_add cannot handle a negative return */
1524 	if (max <= bvec->bv_len && bio_sectors == 0)
1525 		return bvec->bv_len;
1526 	return max;
1527 }
1528 
1529 static void rbd_free_disk(struct rbd_device *rbd_dev)
1530 {
1531 	struct gendisk *disk = rbd_dev->disk;
1532 
1533 	if (!disk)
1534 		return;
1535 
1536 	rbd_header_free(&rbd_dev->header);
1537 
1538 	if (disk->flags & GENHD_FL_UP)
1539 		del_gendisk(disk);
1540 	if (disk->queue)
1541 		blk_cleanup_queue(disk->queue);
1542 	put_disk(disk);
1543 }
1544 
1545 /*
1546  * reload the ondisk the header
1547  */
1548 static int rbd_read_header(struct rbd_device *rbd_dev,
1549 			   struct rbd_image_header *header)
1550 {
1551 	ssize_t rc;
1552 	struct rbd_image_header_ondisk *dh;
1553 	int snap_count = 0;
1554 	u64 snap_names_len = 0;
1555 	u64 ver;
1556 
1557 	while (1) {
1558 		int len = sizeof(*dh) +
1559 			  snap_count * sizeof(struct rbd_image_snap_ondisk) +
1560 			  snap_names_len;
1561 
1562 		rc = -ENOMEM;
1563 		dh = kmalloc(len, GFP_KERNEL);
1564 		if (!dh)
1565 			return -ENOMEM;
1566 
1567 		rc = rbd_req_sync_read(rbd_dev,
1568 				       NULL, CEPH_NOSNAP,
1569 				       rbd_dev->obj_md_name,
1570 				       0, len,
1571 				       (char *)dh, &ver);
1572 		if (rc < 0)
1573 			goto out_dh;
1574 
1575 		rc = rbd_header_from_disk(header, dh, snap_count, GFP_KERNEL);
1576 		if (rc < 0)
1577 			goto out_dh;
1578 
1579 		if (snap_count != header->total_snaps) {
1580 			snap_count = header->total_snaps;
1581 			snap_names_len = header->snap_names_len;
1582 			rbd_header_free(header);
1583 			kfree(dh);
1584 			continue;
1585 		}
1586 		break;
1587 	}
1588 	header->obj_version = ver;
1589 
1590 out_dh:
1591 	kfree(dh);
1592 	return rc;
1593 }
1594 
1595 /*
1596  * create a snapshot
1597  */
1598 static int rbd_header_add_snap(struct rbd_device *dev,
1599 			       const char *snap_name,
1600 			       gfp_t gfp_flags)
1601 {
1602 	int name_len = strlen(snap_name);
1603 	u64 new_snapid;
1604 	int ret;
1605 	void *data, *p, *e;
1606 	u64 ver;
1607 
1608 	/* we should create a snapshot only if we're pointing at the head */
1609 	if (dev->cur_snap)
1610 		return -EINVAL;
1611 
1612 	ret = ceph_monc_create_snapid(&dev->client->monc, dev->poolid,
1613 				      &new_snapid);
1614 	dout("created snapid=%lld\n", new_snapid);
1615 	if (ret < 0)
1616 		return ret;
1617 
1618 	data = kmalloc(name_len + 16, gfp_flags);
1619 	if (!data)
1620 		return -ENOMEM;
1621 
1622 	p = data;
1623 	e = data + name_len + 16;
1624 
1625 	ceph_encode_string_safe(&p, e, snap_name, name_len, bad);
1626 	ceph_encode_64_safe(&p, e, new_snapid, bad);
1627 
1628 	ret = rbd_req_sync_exec(dev, dev->obj_md_name, "rbd", "snap_add",
1629 				data, p - data, &ver);
1630 
1631 	kfree(data);
1632 
1633 	if (ret < 0)
1634 		return ret;
1635 
1636 	dev->header.snapc->seq =  new_snapid;
1637 
1638 	return 0;
1639 bad:
1640 	return -ERANGE;
1641 }
1642 
1643 static void __rbd_remove_all_snaps(struct rbd_device *rbd_dev)
1644 {
1645 	struct rbd_snap *snap;
1646 
1647 	while (!list_empty(&rbd_dev->snaps)) {
1648 		snap = list_first_entry(&rbd_dev->snaps, struct rbd_snap, node);
1649 		__rbd_remove_snap_dev(rbd_dev, snap);
1650 	}
1651 }
1652 
1653 /*
1654  * only read the first part of the ondisk header, without the snaps info
1655  */
1656 static int __rbd_update_snaps(struct rbd_device *rbd_dev)
1657 {
1658 	int ret;
1659 	struct rbd_image_header h;
1660 	u64 snap_seq;
1661 	int follow_seq = 0;
1662 
1663 	ret = rbd_read_header(rbd_dev, &h);
1664 	if (ret < 0)
1665 		return ret;
1666 
1667 	/* resized? */
1668 	set_capacity(rbd_dev->disk, h.image_size / 512ULL);
1669 
1670 	down_write(&rbd_dev->header.snap_rwsem);
1671 
1672 	snap_seq = rbd_dev->header.snapc->seq;
1673 	if (rbd_dev->header.total_snaps &&
1674 	    rbd_dev->header.snapc->snaps[0] == snap_seq)
1675 		/* pointing at the head, will need to follow that
1676 		   if head moves */
1677 		follow_seq = 1;
1678 
1679 	kfree(rbd_dev->header.snapc);
1680 	kfree(rbd_dev->header.snap_names);
1681 	kfree(rbd_dev->header.snap_sizes);
1682 
1683 	rbd_dev->header.total_snaps = h.total_snaps;
1684 	rbd_dev->header.snapc = h.snapc;
1685 	rbd_dev->header.snap_names = h.snap_names;
1686 	rbd_dev->header.snap_names_len = h.snap_names_len;
1687 	rbd_dev->header.snap_sizes = h.snap_sizes;
1688 	if (follow_seq)
1689 		rbd_dev->header.snapc->seq = rbd_dev->header.snapc->snaps[0];
1690 	else
1691 		rbd_dev->header.snapc->seq = snap_seq;
1692 
1693 	ret = __rbd_init_snaps_header(rbd_dev);
1694 
1695 	up_write(&rbd_dev->header.snap_rwsem);
1696 
1697 	return ret;
1698 }
1699 
1700 static int rbd_init_disk(struct rbd_device *rbd_dev)
1701 {
1702 	struct gendisk *disk;
1703 	struct request_queue *q;
1704 	int rc;
1705 	u64 total_size = 0;
1706 
1707 	/* contact OSD, request size info about the object being mapped */
1708 	rc = rbd_read_header(rbd_dev, &rbd_dev->header);
1709 	if (rc)
1710 		return rc;
1711 
1712 	/* no need to lock here, as rbd_dev is not registered yet */
1713 	rc = __rbd_init_snaps_header(rbd_dev);
1714 	if (rc)
1715 		return rc;
1716 
1717 	rc = rbd_header_set_snap(rbd_dev, rbd_dev->snap_name, &total_size);
1718 	if (rc)
1719 		return rc;
1720 
1721 	/* create gendisk info */
1722 	rc = -ENOMEM;
1723 	disk = alloc_disk(RBD_MINORS_PER_MAJOR);
1724 	if (!disk)
1725 		goto out;
1726 
1727 	snprintf(disk->disk_name, sizeof(disk->disk_name), DRV_NAME "%d",
1728 		 rbd_dev->id);
1729 	disk->major = rbd_dev->major;
1730 	disk->first_minor = 0;
1731 	disk->fops = &rbd_bd_ops;
1732 	disk->private_data = rbd_dev;
1733 
1734 	/* init rq */
1735 	rc = -ENOMEM;
1736 	q = blk_init_queue(rbd_rq_fn, &rbd_dev->lock);
1737 	if (!q)
1738 		goto out_disk;
1739 	blk_queue_merge_bvec(q, rbd_merge_bvec);
1740 	disk->queue = q;
1741 
1742 	q->queuedata = rbd_dev;
1743 
1744 	rbd_dev->disk = disk;
1745 	rbd_dev->q = q;
1746 
1747 	/* finally, announce the disk to the world */
1748 	set_capacity(disk, total_size / 512ULL);
1749 	add_disk(disk);
1750 
1751 	pr_info("%s: added with size 0x%llx\n",
1752 		disk->disk_name, (unsigned long long)total_size);
1753 	return 0;
1754 
1755 out_disk:
1756 	put_disk(disk);
1757 out:
1758 	return rc;
1759 }
1760 
1761 /*
1762   sysfs
1763 */
1764 
1765 static ssize_t rbd_size_show(struct device *dev,
1766 			     struct device_attribute *attr, char *buf)
1767 {
1768 	struct rbd_device *rbd_dev = dev_to_rbd(dev);
1769 
1770 	return sprintf(buf, "%llu\n", (unsigned long long)rbd_dev->header.image_size);
1771 }
1772 
1773 static ssize_t rbd_major_show(struct device *dev,
1774 			      struct device_attribute *attr, char *buf)
1775 {
1776 	struct rbd_device *rbd_dev = dev_to_rbd(dev);
1777 
1778 	return sprintf(buf, "%d\n", rbd_dev->major);
1779 }
1780 
1781 static ssize_t rbd_client_id_show(struct device *dev,
1782 				  struct device_attribute *attr, char *buf)
1783 {
1784 	struct rbd_device *rbd_dev = dev_to_rbd(dev);
1785 
1786 	return sprintf(buf, "client%lld\n", ceph_client_id(rbd_dev->client));
1787 }
1788 
1789 static ssize_t rbd_pool_show(struct device *dev,
1790 			     struct device_attribute *attr, char *buf)
1791 {
1792 	struct rbd_device *rbd_dev = dev_to_rbd(dev);
1793 
1794 	return sprintf(buf, "%s\n", rbd_dev->pool_name);
1795 }
1796 
1797 static ssize_t rbd_name_show(struct device *dev,
1798 			     struct device_attribute *attr, char *buf)
1799 {
1800 	struct rbd_device *rbd_dev = dev_to_rbd(dev);
1801 
1802 	return sprintf(buf, "%s\n", rbd_dev->obj);
1803 }
1804 
1805 static ssize_t rbd_snap_show(struct device *dev,
1806 			     struct device_attribute *attr,
1807 			     char *buf)
1808 {
1809 	struct rbd_device *rbd_dev = dev_to_rbd(dev);
1810 
1811 	return sprintf(buf, "%s\n", rbd_dev->snap_name);
1812 }
1813 
1814 static ssize_t rbd_image_refresh(struct device *dev,
1815 				 struct device_attribute *attr,
1816 				 const char *buf,
1817 				 size_t size)
1818 {
1819 	struct rbd_device *rbd_dev = dev_to_rbd(dev);
1820 	int rc;
1821 	int ret = size;
1822 
1823 	mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1824 
1825 	rc = __rbd_update_snaps(rbd_dev);
1826 	if (rc < 0)
1827 		ret = rc;
1828 
1829 	mutex_unlock(&ctl_mutex);
1830 	return ret;
1831 }
1832 
1833 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
1834 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
1835 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
1836 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
1837 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
1838 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
1839 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
1840 static DEVICE_ATTR(create_snap, S_IWUSR, NULL, rbd_snap_add);
1841 static DEVICE_ATTR(rollback_snap, S_IWUSR, NULL, rbd_snap_rollback);
1842 
1843 static struct attribute *rbd_attrs[] = {
1844 	&dev_attr_size.attr,
1845 	&dev_attr_major.attr,
1846 	&dev_attr_client_id.attr,
1847 	&dev_attr_pool.attr,
1848 	&dev_attr_name.attr,
1849 	&dev_attr_current_snap.attr,
1850 	&dev_attr_refresh.attr,
1851 	&dev_attr_create_snap.attr,
1852 	&dev_attr_rollback_snap.attr,
1853 	NULL
1854 };
1855 
1856 static struct attribute_group rbd_attr_group = {
1857 	.attrs = rbd_attrs,
1858 };
1859 
1860 static const struct attribute_group *rbd_attr_groups[] = {
1861 	&rbd_attr_group,
1862 	NULL
1863 };
1864 
1865 static void rbd_sysfs_dev_release(struct device *dev)
1866 {
1867 }
1868 
1869 static struct device_type rbd_device_type = {
1870 	.name		= "rbd",
1871 	.groups		= rbd_attr_groups,
1872 	.release	= rbd_sysfs_dev_release,
1873 };
1874 
1875 
1876 /*
1877   sysfs - snapshots
1878 */
1879 
1880 static ssize_t rbd_snap_size_show(struct device *dev,
1881 				  struct device_attribute *attr,
1882 				  char *buf)
1883 {
1884 	struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1885 
1886 	return sprintf(buf, "%lld\n", (long long)snap->size);
1887 }
1888 
1889 static ssize_t rbd_snap_id_show(struct device *dev,
1890 				struct device_attribute *attr,
1891 				char *buf)
1892 {
1893 	struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1894 
1895 	return sprintf(buf, "%lld\n", (long long)snap->id);
1896 }
1897 
1898 static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
1899 static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
1900 
1901 static struct attribute *rbd_snap_attrs[] = {
1902 	&dev_attr_snap_size.attr,
1903 	&dev_attr_snap_id.attr,
1904 	NULL,
1905 };
1906 
1907 static struct attribute_group rbd_snap_attr_group = {
1908 	.attrs = rbd_snap_attrs,
1909 };
1910 
1911 static void rbd_snap_dev_release(struct device *dev)
1912 {
1913 	struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1914 	kfree(snap->name);
1915 	kfree(snap);
1916 }
1917 
1918 static const struct attribute_group *rbd_snap_attr_groups[] = {
1919 	&rbd_snap_attr_group,
1920 	NULL
1921 };
1922 
1923 static struct device_type rbd_snap_device_type = {
1924 	.groups		= rbd_snap_attr_groups,
1925 	.release	= rbd_snap_dev_release,
1926 };
1927 
1928 static void __rbd_remove_snap_dev(struct rbd_device *rbd_dev,
1929 				  struct rbd_snap *snap)
1930 {
1931 	list_del(&snap->node);
1932 	device_unregister(&snap->dev);
1933 }
1934 
1935 static int rbd_register_snap_dev(struct rbd_device *rbd_dev,
1936 				  struct rbd_snap *snap,
1937 				  struct device *parent)
1938 {
1939 	struct device *dev = &snap->dev;
1940 	int ret;
1941 
1942 	dev->type = &rbd_snap_device_type;
1943 	dev->parent = parent;
1944 	dev->release = rbd_snap_dev_release;
1945 	dev_set_name(dev, "snap_%s", snap->name);
1946 	ret = device_register(dev);
1947 
1948 	return ret;
1949 }
1950 
1951 static int __rbd_add_snap_dev(struct rbd_device *rbd_dev,
1952 			      int i, const char *name,
1953 			      struct rbd_snap **snapp)
1954 {
1955 	int ret;
1956 	struct rbd_snap *snap = kzalloc(sizeof(*snap), GFP_KERNEL);
1957 	if (!snap)
1958 		return -ENOMEM;
1959 	snap->name = kstrdup(name, GFP_KERNEL);
1960 	snap->size = rbd_dev->header.snap_sizes[i];
1961 	snap->id = rbd_dev->header.snapc->snaps[i];
1962 	if (device_is_registered(&rbd_dev->dev)) {
1963 		ret = rbd_register_snap_dev(rbd_dev, snap,
1964 					     &rbd_dev->dev);
1965 		if (ret < 0)
1966 			goto err;
1967 	}
1968 	*snapp = snap;
1969 	return 0;
1970 err:
1971 	kfree(snap->name);
1972 	kfree(snap);
1973 	return ret;
1974 }
1975 
1976 /*
1977  * search for the previous snap in a null delimited string list
1978  */
1979 const char *rbd_prev_snap_name(const char *name, const char *start)
1980 {
1981 	if (name < start + 2)
1982 		return NULL;
1983 
1984 	name -= 2;
1985 	while (*name) {
1986 		if (name == start)
1987 			return start;
1988 		name--;
1989 	}
1990 	return name + 1;
1991 }
1992 
1993 /*
1994  * compare the old list of snapshots that we have to what's in the header
1995  * and update it accordingly. Note that the header holds the snapshots
1996  * in a reverse order (from newest to oldest) and we need to go from
1997  * older to new so that we don't get a duplicate snap name when
1998  * doing the process (e.g., removed snapshot and recreated a new
1999  * one with the same name.
2000  */
2001 static int __rbd_init_snaps_header(struct rbd_device *rbd_dev)
2002 {
2003 	const char *name, *first_name;
2004 	int i = rbd_dev->header.total_snaps;
2005 	struct rbd_snap *snap, *old_snap = NULL;
2006 	int ret;
2007 	struct list_head *p, *n;
2008 
2009 	first_name = rbd_dev->header.snap_names;
2010 	name = first_name + rbd_dev->header.snap_names_len;
2011 
2012 	list_for_each_prev_safe(p, n, &rbd_dev->snaps) {
2013 		u64 cur_id;
2014 
2015 		old_snap = list_entry(p, struct rbd_snap, node);
2016 
2017 		if (i)
2018 			cur_id = rbd_dev->header.snapc->snaps[i - 1];
2019 
2020 		if (!i || old_snap->id < cur_id) {
2021 			/* old_snap->id was skipped, thus was removed */
2022 			__rbd_remove_snap_dev(rbd_dev, old_snap);
2023 			continue;
2024 		}
2025 		if (old_snap->id == cur_id) {
2026 			/* we have this snapshot already */
2027 			i--;
2028 			name = rbd_prev_snap_name(name, first_name);
2029 			continue;
2030 		}
2031 		for (; i > 0;
2032 		     i--, name = rbd_prev_snap_name(name, first_name)) {
2033 			if (!name) {
2034 				WARN_ON(1);
2035 				return -EINVAL;
2036 			}
2037 			cur_id = rbd_dev->header.snapc->snaps[i];
2038 			/* snapshot removal? handle it above */
2039 			if (cur_id >= old_snap->id)
2040 				break;
2041 			/* a new snapshot */
2042 			ret = __rbd_add_snap_dev(rbd_dev, i - 1, name, &snap);
2043 			if (ret < 0)
2044 				return ret;
2045 
2046 			/* note that we add it backward so using n and not p */
2047 			list_add(&snap->node, n);
2048 			p = &snap->node;
2049 		}
2050 	}
2051 	/* we're done going over the old snap list, just add what's left */
2052 	for (; i > 0; i--) {
2053 		name = rbd_prev_snap_name(name, first_name);
2054 		if (!name) {
2055 			WARN_ON(1);
2056 			return -EINVAL;
2057 		}
2058 		ret = __rbd_add_snap_dev(rbd_dev, i - 1, name, &snap);
2059 		if (ret < 0)
2060 			return ret;
2061 		list_add(&snap->node, &rbd_dev->snaps);
2062 	}
2063 
2064 	return 0;
2065 }
2066 
2067 
2068 static void rbd_root_dev_release(struct device *dev)
2069 {
2070 }
2071 
2072 static struct device rbd_root_dev = {
2073 	.init_name =    "rbd",
2074 	.release =      rbd_root_dev_release,
2075 };
2076 
2077 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
2078 {
2079 	int ret = -ENOMEM;
2080 	struct device *dev;
2081 	struct rbd_snap *snap;
2082 
2083 	mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2084 	dev = &rbd_dev->dev;
2085 
2086 	dev->bus = &rbd_bus_type;
2087 	dev->type = &rbd_device_type;
2088 	dev->parent = &rbd_root_dev;
2089 	dev->release = rbd_dev_release;
2090 	dev_set_name(dev, "%d", rbd_dev->id);
2091 	ret = device_register(dev);
2092 	if (ret < 0)
2093 		goto done_free;
2094 
2095 	list_for_each_entry(snap, &rbd_dev->snaps, node) {
2096 		ret = rbd_register_snap_dev(rbd_dev, snap,
2097 					     &rbd_dev->dev);
2098 		if (ret < 0)
2099 			break;
2100 	}
2101 
2102 	mutex_unlock(&ctl_mutex);
2103 	return 0;
2104 done_free:
2105 	mutex_unlock(&ctl_mutex);
2106 	return ret;
2107 }
2108 
2109 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
2110 {
2111 	device_unregister(&rbd_dev->dev);
2112 }
2113 
2114 static int rbd_init_watch_dev(struct rbd_device *rbd_dev)
2115 {
2116 	int ret, rc;
2117 
2118 	do {
2119 		ret = rbd_req_sync_watch(rbd_dev, rbd_dev->obj_md_name,
2120 					 rbd_dev->header.obj_version);
2121 		if (ret == -ERANGE) {
2122 			mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2123 			rc = __rbd_update_snaps(rbd_dev);
2124 			mutex_unlock(&ctl_mutex);
2125 			if (rc < 0)
2126 				return rc;
2127 		}
2128 	} while (ret == -ERANGE);
2129 
2130 	return ret;
2131 }
2132 
2133 static ssize_t rbd_add(struct bus_type *bus,
2134 		       const char *buf,
2135 		       size_t count)
2136 {
2137 	struct ceph_osd_client *osdc;
2138 	struct rbd_device *rbd_dev;
2139 	ssize_t rc = -ENOMEM;
2140 	int irc, new_id = 0;
2141 	struct list_head *tmp;
2142 	char *mon_dev_name;
2143 	char *options;
2144 
2145 	if (!try_module_get(THIS_MODULE))
2146 		return -ENODEV;
2147 
2148 	mon_dev_name = kmalloc(RBD_MAX_OPT_LEN, GFP_KERNEL);
2149 	if (!mon_dev_name)
2150 		goto err_out_mod;
2151 
2152 	options = kmalloc(RBD_MAX_OPT_LEN, GFP_KERNEL);
2153 	if (!options)
2154 		goto err_mon_dev;
2155 
2156 	/* new rbd_device object */
2157 	rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
2158 	if (!rbd_dev)
2159 		goto err_out_opt;
2160 
2161 	/* static rbd_device initialization */
2162 	spin_lock_init(&rbd_dev->lock);
2163 	INIT_LIST_HEAD(&rbd_dev->node);
2164 	INIT_LIST_HEAD(&rbd_dev->snaps);
2165 
2166 	/* generate unique id: find highest unique id, add one */
2167 	mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2168 
2169 	list_for_each(tmp, &rbd_dev_list) {
2170 		struct rbd_device *rbd_dev;
2171 
2172 		rbd_dev = list_entry(tmp, struct rbd_device, node);
2173 		if (rbd_dev->id >= new_id)
2174 			new_id = rbd_dev->id + 1;
2175 	}
2176 
2177 	rbd_dev->id = new_id;
2178 
2179 	/* add to global list */
2180 	list_add_tail(&rbd_dev->node, &rbd_dev_list);
2181 
2182 	/* parse add command */
2183 	if (sscanf(buf, "%" __stringify(RBD_MAX_OPT_LEN) "s "
2184 		   "%" __stringify(RBD_MAX_OPT_LEN) "s "
2185 		   "%" __stringify(RBD_MAX_POOL_NAME_LEN) "s "
2186 		   "%" __stringify(RBD_MAX_OBJ_NAME_LEN) "s"
2187 		   "%" __stringify(RBD_MAX_SNAP_NAME_LEN) "s",
2188 		   mon_dev_name, options, rbd_dev->pool_name,
2189 		   rbd_dev->obj, rbd_dev->snap_name) < 4) {
2190 		rc = -EINVAL;
2191 		goto err_out_slot;
2192 	}
2193 
2194 	if (rbd_dev->snap_name[0] == 0)
2195 		rbd_dev->snap_name[0] = '-';
2196 
2197 	rbd_dev->obj_len = strlen(rbd_dev->obj);
2198 	snprintf(rbd_dev->obj_md_name, sizeof(rbd_dev->obj_md_name), "%s%s",
2199 		 rbd_dev->obj, RBD_SUFFIX);
2200 
2201 	/* initialize rest of new object */
2202 	snprintf(rbd_dev->name, DEV_NAME_LEN, DRV_NAME "%d", rbd_dev->id);
2203 	rc = rbd_get_client(rbd_dev, mon_dev_name, options);
2204 	if (rc < 0)
2205 		goto err_out_slot;
2206 
2207 	mutex_unlock(&ctl_mutex);
2208 
2209 	/* pick the pool */
2210 	osdc = &rbd_dev->client->osdc;
2211 	rc = ceph_pg_poolid_by_name(osdc->osdmap, rbd_dev->pool_name);
2212 	if (rc < 0)
2213 		goto err_out_client;
2214 	rbd_dev->poolid = rc;
2215 
2216 	/* register our block device */
2217 	irc = register_blkdev(0, rbd_dev->name);
2218 	if (irc < 0) {
2219 		rc = irc;
2220 		goto err_out_client;
2221 	}
2222 	rbd_dev->major = irc;
2223 
2224 	rc = rbd_bus_add_dev(rbd_dev);
2225 	if (rc)
2226 		goto err_out_blkdev;
2227 
2228 	/* set up and announce blkdev mapping */
2229 	rc = rbd_init_disk(rbd_dev);
2230 	if (rc)
2231 		goto err_out_bus;
2232 
2233 	rc = rbd_init_watch_dev(rbd_dev);
2234 	if (rc)
2235 		goto err_out_bus;
2236 
2237 	return count;
2238 
2239 err_out_bus:
2240 	mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2241 	list_del_init(&rbd_dev->node);
2242 	mutex_unlock(&ctl_mutex);
2243 
2244 	/* this will also clean up rest of rbd_dev stuff */
2245 
2246 	rbd_bus_del_dev(rbd_dev);
2247 	kfree(options);
2248 	kfree(mon_dev_name);
2249 	return rc;
2250 
2251 err_out_blkdev:
2252 	unregister_blkdev(rbd_dev->major, rbd_dev->name);
2253 err_out_client:
2254 	rbd_put_client(rbd_dev);
2255 	mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2256 err_out_slot:
2257 	list_del_init(&rbd_dev->node);
2258 	mutex_unlock(&ctl_mutex);
2259 
2260 	kfree(rbd_dev);
2261 err_out_opt:
2262 	kfree(options);
2263 err_mon_dev:
2264 	kfree(mon_dev_name);
2265 err_out_mod:
2266 	dout("Error adding device %s\n", buf);
2267 	module_put(THIS_MODULE);
2268 	return rc;
2269 }
2270 
2271 static struct rbd_device *__rbd_get_dev(unsigned long id)
2272 {
2273 	struct list_head *tmp;
2274 	struct rbd_device *rbd_dev;
2275 
2276 	list_for_each(tmp, &rbd_dev_list) {
2277 		rbd_dev = list_entry(tmp, struct rbd_device, node);
2278 		if (rbd_dev->id == id)
2279 			return rbd_dev;
2280 	}
2281 	return NULL;
2282 }
2283 
2284 static void rbd_dev_release(struct device *dev)
2285 {
2286 	struct rbd_device *rbd_dev =
2287 			container_of(dev, struct rbd_device, dev);
2288 
2289 	if (rbd_dev->watch_request)
2290 		ceph_osdc_unregister_linger_request(&rbd_dev->client->osdc,
2291 						    rbd_dev->watch_request);
2292 	if (rbd_dev->watch_event)
2293 		ceph_osdc_cancel_event(rbd_dev->watch_event);
2294 
2295 	rbd_put_client(rbd_dev);
2296 
2297 	/* clean up and free blkdev */
2298 	rbd_free_disk(rbd_dev);
2299 	unregister_blkdev(rbd_dev->major, rbd_dev->name);
2300 	kfree(rbd_dev);
2301 
2302 	/* release module ref */
2303 	module_put(THIS_MODULE);
2304 }
2305 
2306 static ssize_t rbd_remove(struct bus_type *bus,
2307 			  const char *buf,
2308 			  size_t count)
2309 {
2310 	struct rbd_device *rbd_dev = NULL;
2311 	int target_id, rc;
2312 	unsigned long ul;
2313 	int ret = count;
2314 
2315 	rc = strict_strtoul(buf, 10, &ul);
2316 	if (rc)
2317 		return rc;
2318 
2319 	/* convert to int; abort if we lost anything in the conversion */
2320 	target_id = (int) ul;
2321 	if (target_id != ul)
2322 		return -EINVAL;
2323 
2324 	mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2325 
2326 	rbd_dev = __rbd_get_dev(target_id);
2327 	if (!rbd_dev) {
2328 		ret = -ENOENT;
2329 		goto done;
2330 	}
2331 
2332 	list_del_init(&rbd_dev->node);
2333 
2334 	__rbd_remove_all_snaps(rbd_dev);
2335 	rbd_bus_del_dev(rbd_dev);
2336 
2337 done:
2338 	mutex_unlock(&ctl_mutex);
2339 	return ret;
2340 }
2341 
2342 static ssize_t rbd_snap_add(struct device *dev,
2343 			    struct device_attribute *attr,
2344 			    const char *buf,
2345 			    size_t count)
2346 {
2347 	struct rbd_device *rbd_dev = dev_to_rbd(dev);
2348 	int ret;
2349 	char *name = kmalloc(count + 1, GFP_KERNEL);
2350 	if (!name)
2351 		return -ENOMEM;
2352 
2353 	snprintf(name, count, "%s", buf);
2354 
2355 	mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2356 
2357 	ret = rbd_header_add_snap(rbd_dev,
2358 				  name, GFP_KERNEL);
2359 	if (ret < 0)
2360 		goto err_unlock;
2361 
2362 	ret = __rbd_update_snaps(rbd_dev);
2363 	if (ret < 0)
2364 		goto err_unlock;
2365 
2366 	/* shouldn't hold ctl_mutex when notifying.. notify might
2367 	   trigger a watch callback that would need to get that mutex */
2368 	mutex_unlock(&ctl_mutex);
2369 
2370 	/* make a best effort, don't error if failed */
2371 	rbd_req_sync_notify(rbd_dev, rbd_dev->obj_md_name);
2372 
2373 	ret = count;
2374 	kfree(name);
2375 	return ret;
2376 
2377 err_unlock:
2378 	mutex_unlock(&ctl_mutex);
2379 	kfree(name);
2380 	return ret;
2381 }
2382 
2383 static ssize_t rbd_snap_rollback(struct device *dev,
2384 				 struct device_attribute *attr,
2385 				 const char *buf,
2386 				 size_t count)
2387 {
2388 	struct rbd_device *rbd_dev = dev_to_rbd(dev);
2389 	int ret;
2390 	u64 snapid;
2391 	u64 cur_ofs;
2392 	char *seg_name = NULL;
2393 	char *snap_name = kmalloc(count + 1, GFP_KERNEL);
2394 	ret = -ENOMEM;
2395 	if (!snap_name)
2396 		return ret;
2397 
2398 	/* parse snaps add command */
2399 	snprintf(snap_name, count, "%s", buf);
2400 	seg_name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO);
2401 	if (!seg_name)
2402 		goto done;
2403 
2404 	mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2405 
2406 	ret = snap_by_name(&rbd_dev->header, snap_name, &snapid, NULL);
2407 	if (ret < 0)
2408 		goto done_unlock;
2409 
2410 	dout("snapid=%lld\n", snapid);
2411 
2412 	cur_ofs = 0;
2413 	while (cur_ofs < rbd_dev->header.image_size) {
2414 		cur_ofs += rbd_get_segment(&rbd_dev->header,
2415 					   rbd_dev->obj,
2416 					   cur_ofs, (u64)-1,
2417 					   seg_name, NULL);
2418 		dout("seg_name=%s\n", seg_name);
2419 
2420 		ret = rbd_req_sync_rollback_obj(rbd_dev, snapid, seg_name);
2421 		if (ret < 0)
2422 			pr_warning("could not roll back obj %s err=%d\n",
2423 				   seg_name, ret);
2424 	}
2425 
2426 	ret = __rbd_update_snaps(rbd_dev);
2427 	if (ret < 0)
2428 		goto done_unlock;
2429 
2430 	ret = count;
2431 
2432 done_unlock:
2433 	mutex_unlock(&ctl_mutex);
2434 done:
2435 	kfree(seg_name);
2436 	kfree(snap_name);
2437 
2438 	return ret;
2439 }
2440 
2441 static struct bus_attribute rbd_bus_attrs[] = {
2442 	__ATTR(add, S_IWUSR, NULL, rbd_add),
2443 	__ATTR(remove, S_IWUSR, NULL, rbd_remove),
2444 	__ATTR_NULL
2445 };
2446 
2447 /*
2448  * create control files in sysfs
2449  * /sys/bus/rbd/...
2450  */
2451 static int rbd_sysfs_init(void)
2452 {
2453 	int ret;
2454 
2455 	rbd_bus_type.bus_attrs = rbd_bus_attrs;
2456 
2457 	ret = bus_register(&rbd_bus_type);
2458 	 if (ret < 0)
2459 		return ret;
2460 
2461 	ret = device_register(&rbd_root_dev);
2462 
2463 	return ret;
2464 }
2465 
2466 static void rbd_sysfs_cleanup(void)
2467 {
2468 	device_unregister(&rbd_root_dev);
2469 	bus_unregister(&rbd_bus_type);
2470 }
2471 
2472 int __init rbd_init(void)
2473 {
2474 	int rc;
2475 
2476 	rc = rbd_sysfs_init();
2477 	if (rc)
2478 		return rc;
2479 	spin_lock_init(&node_lock);
2480 	pr_info("loaded " DRV_NAME_LONG "\n");
2481 	return 0;
2482 }
2483 
2484 void __exit rbd_exit(void)
2485 {
2486 	rbd_sysfs_cleanup();
2487 }
2488 
2489 module_init(rbd_init);
2490 module_exit(rbd_exit);
2491 
2492 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
2493 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
2494 MODULE_DESCRIPTION("rados block device");
2495 
2496 /* following authorship retained from original osdblk.c */
2497 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
2498 
2499 MODULE_LICENSE("GPL");
2500