xref: /openbmc/linux/drivers/block/rbd.c (revision 1ab142d4)
1 /*
2    rbd.c -- Export ceph rados objects as a Linux block device
3 
4 
5    based on drivers/block/osdblk.c:
6 
7    Copyright 2009 Red Hat, Inc.
8 
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation.
12 
13    This program is distributed in the hope that it will be useful,
14    but WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16    GNU General Public License for more details.
17 
18    You should have received a copy of the GNU General Public License
19    along with this program; see the file COPYING.  If not, write to
20    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
21 
22 
23 
24    For usage instructions, please refer to:
25 
26                  Documentation/ABI/testing/sysfs-bus-rbd
27 
28  */
29 
30 #include <linux/ceph/libceph.h>
31 #include <linux/ceph/osd_client.h>
32 #include <linux/ceph/mon_client.h>
33 #include <linux/ceph/decode.h>
34 #include <linux/parser.h>
35 
36 #include <linux/kernel.h>
37 #include <linux/device.h>
38 #include <linux/module.h>
39 #include <linux/fs.h>
40 #include <linux/blkdev.h>
41 
42 #include "rbd_types.h"
43 
44 #define DRV_NAME "rbd"
45 #define DRV_NAME_LONG "rbd (rados block device)"
46 
47 #define RBD_MINORS_PER_MAJOR	256		/* max minors per blkdev */
48 
49 #define RBD_MAX_MD_NAME_LEN	(96 + sizeof(RBD_SUFFIX))
50 #define RBD_MAX_POOL_NAME_LEN	64
51 #define RBD_MAX_SNAP_NAME_LEN	32
52 #define RBD_MAX_OPT_LEN		1024
53 
54 #define RBD_SNAP_HEAD_NAME	"-"
55 
56 #define DEV_NAME_LEN		32
57 
58 #define RBD_NOTIFY_TIMEOUT_DEFAULT 10
59 
60 /*
61  * block device image metadata (in-memory version)
62  */
63 struct rbd_image_header {
64 	u64 image_size;
65 	char block_name[32];
66 	__u8 obj_order;
67 	__u8 crypt_type;
68 	__u8 comp_type;
69 	struct rw_semaphore snap_rwsem;
70 	struct ceph_snap_context *snapc;
71 	size_t snap_names_len;
72 	u64 snap_seq;
73 	u32 total_snaps;
74 
75 	char *snap_names;
76 	u64 *snap_sizes;
77 
78 	u64 obj_version;
79 };
80 
81 struct rbd_options {
82 	int	notify_timeout;
83 };
84 
85 /*
86  * an instance of the client.  multiple devices may share a client.
87  */
88 struct rbd_client {
89 	struct ceph_client	*client;
90 	struct rbd_options	*rbd_opts;
91 	struct kref		kref;
92 	struct list_head	node;
93 };
94 
95 struct rbd_req_coll;
96 
97 /*
98  * a single io request
99  */
100 struct rbd_request {
101 	struct request		*rq;		/* blk layer request */
102 	struct bio		*bio;		/* cloned bio */
103 	struct page		**pages;	/* list of used pages */
104 	u64			len;
105 	int			coll_index;
106 	struct rbd_req_coll	*coll;
107 };
108 
109 struct rbd_req_status {
110 	int done;
111 	int rc;
112 	u64 bytes;
113 };
114 
115 /*
116  * a collection of requests
117  */
118 struct rbd_req_coll {
119 	int			total;
120 	int			num_done;
121 	struct kref		kref;
122 	struct rbd_req_status	status[0];
123 };
124 
125 struct rbd_snap {
126 	struct	device		dev;
127 	const char		*name;
128 	size_t			size;
129 	struct list_head	node;
130 	u64			id;
131 };
132 
133 /*
134  * a single device
135  */
136 struct rbd_device {
137 	int			id;		/* blkdev unique id */
138 
139 	int			major;		/* blkdev assigned major */
140 	struct gendisk		*disk;		/* blkdev's gendisk and rq */
141 	struct request_queue	*q;
142 
143 	struct ceph_client	*client;
144 	struct rbd_client	*rbd_client;
145 
146 	char			name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
147 
148 	spinlock_t		lock;		/* queue lock */
149 
150 	struct rbd_image_header	header;
151 	char			obj[RBD_MAX_OBJ_NAME_LEN]; /* rbd image name */
152 	int			obj_len;
153 	char			obj_md_name[RBD_MAX_MD_NAME_LEN]; /* hdr nm. */
154 	char			pool_name[RBD_MAX_POOL_NAME_LEN];
155 	int			poolid;
156 
157 	struct ceph_osd_event   *watch_event;
158 	struct ceph_osd_request *watch_request;
159 
160 	char                    snap_name[RBD_MAX_SNAP_NAME_LEN];
161 	u32 cur_snap;	/* index+1 of current snapshot within snap context
162 			   0 - for the head */
163 	int read_only;
164 
165 	struct list_head	node;
166 
167 	/* list of snapshots */
168 	struct list_head	snaps;
169 
170 	/* sysfs related */
171 	struct device		dev;
172 };
173 
174 static struct bus_type rbd_bus_type = {
175 	.name		= "rbd",
176 };
177 
178 static spinlock_t node_lock;      /* protects client get/put */
179 
180 static DEFINE_MUTEX(ctl_mutex);	  /* Serialize open/close/setup/teardown */
181 static LIST_HEAD(rbd_dev_list);    /* devices */
182 static LIST_HEAD(rbd_client_list);      /* clients */
183 
184 static int __rbd_init_snaps_header(struct rbd_device *rbd_dev);
185 static void rbd_dev_release(struct device *dev);
186 static ssize_t rbd_snap_add(struct device *dev,
187 			    struct device_attribute *attr,
188 			    const char *buf,
189 			    size_t count);
190 static void __rbd_remove_snap_dev(struct rbd_device *rbd_dev,
191 				  struct rbd_snap *snap);
192 
193 
194 static struct rbd_device *dev_to_rbd(struct device *dev)
195 {
196 	return container_of(dev, struct rbd_device, dev);
197 }
198 
199 static struct device *rbd_get_dev(struct rbd_device *rbd_dev)
200 {
201 	return get_device(&rbd_dev->dev);
202 }
203 
204 static void rbd_put_dev(struct rbd_device *rbd_dev)
205 {
206 	put_device(&rbd_dev->dev);
207 }
208 
209 static int __rbd_update_snaps(struct rbd_device *rbd_dev);
210 
211 static int rbd_open(struct block_device *bdev, fmode_t mode)
212 {
213 	struct gendisk *disk = bdev->bd_disk;
214 	struct rbd_device *rbd_dev = disk->private_data;
215 
216 	rbd_get_dev(rbd_dev);
217 
218 	set_device_ro(bdev, rbd_dev->read_only);
219 
220 	if ((mode & FMODE_WRITE) && rbd_dev->read_only)
221 		return -EROFS;
222 
223 	return 0;
224 }
225 
226 static int rbd_release(struct gendisk *disk, fmode_t mode)
227 {
228 	struct rbd_device *rbd_dev = disk->private_data;
229 
230 	rbd_put_dev(rbd_dev);
231 
232 	return 0;
233 }
234 
235 static const struct block_device_operations rbd_bd_ops = {
236 	.owner			= THIS_MODULE,
237 	.open			= rbd_open,
238 	.release		= rbd_release,
239 };
240 
241 /*
242  * Initialize an rbd client instance.
243  * We own *opt.
244  */
245 static struct rbd_client *rbd_client_create(struct ceph_options *opt,
246 					    struct rbd_options *rbd_opts)
247 {
248 	struct rbd_client *rbdc;
249 	int ret = -ENOMEM;
250 
251 	dout("rbd_client_create\n");
252 	rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
253 	if (!rbdc)
254 		goto out_opt;
255 
256 	kref_init(&rbdc->kref);
257 	INIT_LIST_HEAD(&rbdc->node);
258 
259 	rbdc->client = ceph_create_client(opt, rbdc, 0, 0);
260 	if (IS_ERR(rbdc->client))
261 		goto out_rbdc;
262 	opt = NULL; /* Now rbdc->client is responsible for opt */
263 
264 	ret = ceph_open_session(rbdc->client);
265 	if (ret < 0)
266 		goto out_err;
267 
268 	rbdc->rbd_opts = rbd_opts;
269 
270 	spin_lock(&node_lock);
271 	list_add_tail(&rbdc->node, &rbd_client_list);
272 	spin_unlock(&node_lock);
273 
274 	dout("rbd_client_create created %p\n", rbdc);
275 	return rbdc;
276 
277 out_err:
278 	ceph_destroy_client(rbdc->client);
279 out_rbdc:
280 	kfree(rbdc);
281 out_opt:
282 	if (opt)
283 		ceph_destroy_options(opt);
284 	return ERR_PTR(ret);
285 }
286 
287 /*
288  * Find a ceph client with specific addr and configuration.
289  */
290 static struct rbd_client *__rbd_client_find(struct ceph_options *opt)
291 {
292 	struct rbd_client *client_node;
293 
294 	if (opt->flags & CEPH_OPT_NOSHARE)
295 		return NULL;
296 
297 	list_for_each_entry(client_node, &rbd_client_list, node)
298 		if (ceph_compare_options(opt, client_node->client) == 0)
299 			return client_node;
300 	return NULL;
301 }
302 
303 /*
304  * mount options
305  */
306 enum {
307 	Opt_notify_timeout,
308 	Opt_last_int,
309 	/* int args above */
310 	Opt_last_string,
311 	/* string args above */
312 };
313 
314 static match_table_t rbdopt_tokens = {
315 	{Opt_notify_timeout, "notify_timeout=%d"},
316 	/* int args above */
317 	/* string args above */
318 	{-1, NULL}
319 };
320 
321 static int parse_rbd_opts_token(char *c, void *private)
322 {
323 	struct rbd_options *rbdopt = private;
324 	substring_t argstr[MAX_OPT_ARGS];
325 	int token, intval, ret;
326 
327 	token = match_token((char *)c, rbdopt_tokens, argstr);
328 	if (token < 0)
329 		return -EINVAL;
330 
331 	if (token < Opt_last_int) {
332 		ret = match_int(&argstr[0], &intval);
333 		if (ret < 0) {
334 			pr_err("bad mount option arg (not int) "
335 			       "at '%s'\n", c);
336 			return ret;
337 		}
338 		dout("got int token %d val %d\n", token, intval);
339 	} else if (token > Opt_last_int && token < Opt_last_string) {
340 		dout("got string token %d val %s\n", token,
341 		     argstr[0].from);
342 	} else {
343 		dout("got token %d\n", token);
344 	}
345 
346 	switch (token) {
347 	case Opt_notify_timeout:
348 		rbdopt->notify_timeout = intval;
349 		break;
350 	default:
351 		BUG_ON(token);
352 	}
353 	return 0;
354 }
355 
356 /*
357  * Get a ceph client with specific addr and configuration, if one does
358  * not exist create it.
359  */
360 static int rbd_get_client(struct rbd_device *rbd_dev, const char *mon_addr,
361 			  char *options)
362 {
363 	struct rbd_client *rbdc;
364 	struct ceph_options *opt;
365 	int ret;
366 	struct rbd_options *rbd_opts;
367 
368 	rbd_opts = kzalloc(sizeof(*rbd_opts), GFP_KERNEL);
369 	if (!rbd_opts)
370 		return -ENOMEM;
371 
372 	rbd_opts->notify_timeout = RBD_NOTIFY_TIMEOUT_DEFAULT;
373 
374 	ret = ceph_parse_options(&opt, options, mon_addr,
375 				 mon_addr + strlen(mon_addr), parse_rbd_opts_token, rbd_opts);
376 	if (ret < 0)
377 		goto done_err;
378 
379 	spin_lock(&node_lock);
380 	rbdc = __rbd_client_find(opt);
381 	if (rbdc) {
382 		ceph_destroy_options(opt);
383 		kfree(rbd_opts);
384 
385 		/* using an existing client */
386 		kref_get(&rbdc->kref);
387 		rbd_dev->rbd_client = rbdc;
388 		rbd_dev->client = rbdc->client;
389 		spin_unlock(&node_lock);
390 		return 0;
391 	}
392 	spin_unlock(&node_lock);
393 
394 	rbdc = rbd_client_create(opt, rbd_opts);
395 	if (IS_ERR(rbdc)) {
396 		ret = PTR_ERR(rbdc);
397 		goto done_err;
398 	}
399 
400 	rbd_dev->rbd_client = rbdc;
401 	rbd_dev->client = rbdc->client;
402 	return 0;
403 done_err:
404 	kfree(rbd_opts);
405 	return ret;
406 }
407 
408 /*
409  * Destroy ceph client
410  *
411  * Caller must hold node_lock.
412  */
413 static void rbd_client_release(struct kref *kref)
414 {
415 	struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
416 
417 	dout("rbd_release_client %p\n", rbdc);
418 	list_del(&rbdc->node);
419 
420 	ceph_destroy_client(rbdc->client);
421 	kfree(rbdc->rbd_opts);
422 	kfree(rbdc);
423 }
424 
425 /*
426  * Drop reference to ceph client node. If it's not referenced anymore, release
427  * it.
428  */
429 static void rbd_put_client(struct rbd_device *rbd_dev)
430 {
431 	spin_lock(&node_lock);
432 	kref_put(&rbd_dev->rbd_client->kref, rbd_client_release);
433 	spin_unlock(&node_lock);
434 	rbd_dev->rbd_client = NULL;
435 	rbd_dev->client = NULL;
436 }
437 
438 /*
439  * Destroy requests collection
440  */
441 static void rbd_coll_release(struct kref *kref)
442 {
443 	struct rbd_req_coll *coll =
444 		container_of(kref, struct rbd_req_coll, kref);
445 
446 	dout("rbd_coll_release %p\n", coll);
447 	kfree(coll);
448 }
449 
450 /*
451  * Create a new header structure, translate header format from the on-disk
452  * header.
453  */
454 static int rbd_header_from_disk(struct rbd_image_header *header,
455 				 struct rbd_image_header_ondisk *ondisk,
456 				 int allocated_snaps,
457 				 gfp_t gfp_flags)
458 {
459 	int i;
460 	u32 snap_count = le32_to_cpu(ondisk->snap_count);
461 	int ret = -ENOMEM;
462 
463 	if (memcmp(ondisk, RBD_HEADER_TEXT, sizeof(RBD_HEADER_TEXT))) {
464 		return -ENXIO;
465 	}
466 
467 	init_rwsem(&header->snap_rwsem);
468 	header->snap_names_len = le64_to_cpu(ondisk->snap_names_len);
469 	header->snapc = kmalloc(sizeof(struct ceph_snap_context) +
470 				snap_count *
471 				 sizeof(struct rbd_image_snap_ondisk),
472 				gfp_flags);
473 	if (!header->snapc)
474 		return -ENOMEM;
475 	if (snap_count) {
476 		header->snap_names = kmalloc(header->snap_names_len,
477 					     GFP_KERNEL);
478 		if (!header->snap_names)
479 			goto err_snapc;
480 		header->snap_sizes = kmalloc(snap_count * sizeof(u64),
481 					     GFP_KERNEL);
482 		if (!header->snap_sizes)
483 			goto err_names;
484 	} else {
485 		header->snap_names = NULL;
486 		header->snap_sizes = NULL;
487 	}
488 	memcpy(header->block_name, ondisk->block_name,
489 	       sizeof(ondisk->block_name));
490 
491 	header->image_size = le64_to_cpu(ondisk->image_size);
492 	header->obj_order = ondisk->options.order;
493 	header->crypt_type = ondisk->options.crypt_type;
494 	header->comp_type = ondisk->options.comp_type;
495 
496 	atomic_set(&header->snapc->nref, 1);
497 	header->snap_seq = le64_to_cpu(ondisk->snap_seq);
498 	header->snapc->num_snaps = snap_count;
499 	header->total_snaps = snap_count;
500 
501 	if (snap_count &&
502 	    allocated_snaps == snap_count) {
503 		for (i = 0; i < snap_count; i++) {
504 			header->snapc->snaps[i] =
505 				le64_to_cpu(ondisk->snaps[i].id);
506 			header->snap_sizes[i] =
507 				le64_to_cpu(ondisk->snaps[i].image_size);
508 		}
509 
510 		/* copy snapshot names */
511 		memcpy(header->snap_names, &ondisk->snaps[i],
512 			header->snap_names_len);
513 	}
514 
515 	return 0;
516 
517 err_names:
518 	kfree(header->snap_names);
519 err_snapc:
520 	kfree(header->snapc);
521 	return ret;
522 }
523 
524 static int snap_index(struct rbd_image_header *header, int snap_num)
525 {
526 	return header->total_snaps - snap_num;
527 }
528 
529 static u64 cur_snap_id(struct rbd_device *rbd_dev)
530 {
531 	struct rbd_image_header *header = &rbd_dev->header;
532 
533 	if (!rbd_dev->cur_snap)
534 		return 0;
535 
536 	return header->snapc->snaps[snap_index(header, rbd_dev->cur_snap)];
537 }
538 
539 static int snap_by_name(struct rbd_image_header *header, const char *snap_name,
540 			u64 *seq, u64 *size)
541 {
542 	int i;
543 	char *p = header->snap_names;
544 
545 	for (i = 0; i < header->total_snaps; i++, p += strlen(p) + 1) {
546 		if (strcmp(snap_name, p) == 0)
547 			break;
548 	}
549 	if (i == header->total_snaps)
550 		return -ENOENT;
551 	if (seq)
552 		*seq = header->snapc->snaps[i];
553 
554 	if (size)
555 		*size = header->snap_sizes[i];
556 
557 	return i;
558 }
559 
560 static int rbd_header_set_snap(struct rbd_device *dev,
561 			       const char *snap_name,
562 			       u64 *size)
563 {
564 	struct rbd_image_header *header = &dev->header;
565 	struct ceph_snap_context *snapc = header->snapc;
566 	int ret = -ENOENT;
567 
568 	down_write(&header->snap_rwsem);
569 
570 	if (!snap_name ||
571 	    !*snap_name ||
572 	    strcmp(snap_name, "-") == 0 ||
573 	    strcmp(snap_name, RBD_SNAP_HEAD_NAME) == 0) {
574 		if (header->total_snaps)
575 			snapc->seq = header->snap_seq;
576 		else
577 			snapc->seq = 0;
578 		dev->cur_snap = 0;
579 		dev->read_only = 0;
580 		if (size)
581 			*size = header->image_size;
582 	} else {
583 		ret = snap_by_name(header, snap_name, &snapc->seq, size);
584 		if (ret < 0)
585 			goto done;
586 
587 		dev->cur_snap = header->total_snaps - ret;
588 		dev->read_only = 1;
589 	}
590 
591 	ret = 0;
592 done:
593 	up_write(&header->snap_rwsem);
594 	return ret;
595 }
596 
597 static void rbd_header_free(struct rbd_image_header *header)
598 {
599 	kfree(header->snapc);
600 	kfree(header->snap_names);
601 	kfree(header->snap_sizes);
602 }
603 
604 /*
605  * get the actual striped segment name, offset and length
606  */
607 static u64 rbd_get_segment(struct rbd_image_header *header,
608 			   const char *block_name,
609 			   u64 ofs, u64 len,
610 			   char *seg_name, u64 *segofs)
611 {
612 	u64 seg = ofs >> header->obj_order;
613 
614 	if (seg_name)
615 		snprintf(seg_name, RBD_MAX_SEG_NAME_LEN,
616 			 "%s.%012llx", block_name, seg);
617 
618 	ofs = ofs & ((1 << header->obj_order) - 1);
619 	len = min_t(u64, len, (1 << header->obj_order) - ofs);
620 
621 	if (segofs)
622 		*segofs = ofs;
623 
624 	return len;
625 }
626 
627 static int rbd_get_num_segments(struct rbd_image_header *header,
628 				u64 ofs, u64 len)
629 {
630 	u64 start_seg = ofs >> header->obj_order;
631 	u64 end_seg = (ofs + len - 1) >> header->obj_order;
632 	return end_seg - start_seg + 1;
633 }
634 
635 /*
636  * returns the size of an object in the image
637  */
638 static u64 rbd_obj_bytes(struct rbd_image_header *header)
639 {
640 	return 1 << header->obj_order;
641 }
642 
643 /*
644  * bio helpers
645  */
646 
647 static void bio_chain_put(struct bio *chain)
648 {
649 	struct bio *tmp;
650 
651 	while (chain) {
652 		tmp = chain;
653 		chain = chain->bi_next;
654 		bio_put(tmp);
655 	}
656 }
657 
658 /*
659  * zeros a bio chain, starting at specific offset
660  */
661 static void zero_bio_chain(struct bio *chain, int start_ofs)
662 {
663 	struct bio_vec *bv;
664 	unsigned long flags;
665 	void *buf;
666 	int i;
667 	int pos = 0;
668 
669 	while (chain) {
670 		bio_for_each_segment(bv, chain, i) {
671 			if (pos + bv->bv_len > start_ofs) {
672 				int remainder = max(start_ofs - pos, 0);
673 				buf = bvec_kmap_irq(bv, &flags);
674 				memset(buf + remainder, 0,
675 				       bv->bv_len - remainder);
676 				bvec_kunmap_irq(buf, &flags);
677 			}
678 			pos += bv->bv_len;
679 		}
680 
681 		chain = chain->bi_next;
682 	}
683 }
684 
685 /*
686  * bio_chain_clone - clone a chain of bios up to a certain length.
687  * might return a bio_pair that will need to be released.
688  */
689 static struct bio *bio_chain_clone(struct bio **old, struct bio **next,
690 				   struct bio_pair **bp,
691 				   int len, gfp_t gfpmask)
692 {
693 	struct bio *tmp, *old_chain = *old, *new_chain = NULL, *tail = NULL;
694 	int total = 0;
695 
696 	if (*bp) {
697 		bio_pair_release(*bp);
698 		*bp = NULL;
699 	}
700 
701 	while (old_chain && (total < len)) {
702 		tmp = bio_kmalloc(gfpmask, old_chain->bi_max_vecs);
703 		if (!tmp)
704 			goto err_out;
705 
706 		if (total + old_chain->bi_size > len) {
707 			struct bio_pair *bp;
708 
709 			/*
710 			 * this split can only happen with a single paged bio,
711 			 * split_bio will BUG_ON if this is not the case
712 			 */
713 			dout("bio_chain_clone split! total=%d remaining=%d"
714 			     "bi_size=%d\n",
715 			     (int)total, (int)len-total,
716 			     (int)old_chain->bi_size);
717 
718 			/* split the bio. We'll release it either in the next
719 			   call, or it will have to be released outside */
720 			bp = bio_split(old_chain, (len - total) / 512ULL);
721 			if (!bp)
722 				goto err_out;
723 
724 			__bio_clone(tmp, &bp->bio1);
725 
726 			*next = &bp->bio2;
727 		} else {
728 			__bio_clone(tmp, old_chain);
729 			*next = old_chain->bi_next;
730 		}
731 
732 		tmp->bi_bdev = NULL;
733 		gfpmask &= ~__GFP_WAIT;
734 		tmp->bi_next = NULL;
735 
736 		if (!new_chain) {
737 			new_chain = tail = tmp;
738 		} else {
739 			tail->bi_next = tmp;
740 			tail = tmp;
741 		}
742 		old_chain = old_chain->bi_next;
743 
744 		total += tmp->bi_size;
745 	}
746 
747 	BUG_ON(total < len);
748 
749 	if (tail)
750 		tail->bi_next = NULL;
751 
752 	*old = old_chain;
753 
754 	return new_chain;
755 
756 err_out:
757 	dout("bio_chain_clone with err\n");
758 	bio_chain_put(new_chain);
759 	return NULL;
760 }
761 
762 /*
763  * helpers for osd request op vectors.
764  */
765 static int rbd_create_rw_ops(struct ceph_osd_req_op **ops,
766 			    int num_ops,
767 			    int opcode,
768 			    u32 payload_len)
769 {
770 	*ops = kzalloc(sizeof(struct ceph_osd_req_op) * (num_ops + 1),
771 		       GFP_NOIO);
772 	if (!*ops)
773 		return -ENOMEM;
774 	(*ops)[0].op = opcode;
775 	/*
776 	 * op extent offset and length will be set later on
777 	 * in calc_raw_layout()
778 	 */
779 	(*ops)[0].payload_len = payload_len;
780 	return 0;
781 }
782 
783 static void rbd_destroy_ops(struct ceph_osd_req_op *ops)
784 {
785 	kfree(ops);
786 }
787 
788 static void rbd_coll_end_req_index(struct request *rq,
789 				   struct rbd_req_coll *coll,
790 				   int index,
791 				   int ret, u64 len)
792 {
793 	struct request_queue *q;
794 	int min, max, i;
795 
796 	dout("rbd_coll_end_req_index %p index %d ret %d len %lld\n",
797 	     coll, index, ret, len);
798 
799 	if (!rq)
800 		return;
801 
802 	if (!coll) {
803 		blk_end_request(rq, ret, len);
804 		return;
805 	}
806 
807 	q = rq->q;
808 
809 	spin_lock_irq(q->queue_lock);
810 	coll->status[index].done = 1;
811 	coll->status[index].rc = ret;
812 	coll->status[index].bytes = len;
813 	max = min = coll->num_done;
814 	while (max < coll->total && coll->status[max].done)
815 		max++;
816 
817 	for (i = min; i<max; i++) {
818 		__blk_end_request(rq, coll->status[i].rc,
819 				  coll->status[i].bytes);
820 		coll->num_done++;
821 		kref_put(&coll->kref, rbd_coll_release);
822 	}
823 	spin_unlock_irq(q->queue_lock);
824 }
825 
826 static void rbd_coll_end_req(struct rbd_request *req,
827 			     int ret, u64 len)
828 {
829 	rbd_coll_end_req_index(req->rq, req->coll, req->coll_index, ret, len);
830 }
831 
832 /*
833  * Send ceph osd request
834  */
835 static int rbd_do_request(struct request *rq,
836 			  struct rbd_device *dev,
837 			  struct ceph_snap_context *snapc,
838 			  u64 snapid,
839 			  const char *obj, u64 ofs, u64 len,
840 			  struct bio *bio,
841 			  struct page **pages,
842 			  int num_pages,
843 			  int flags,
844 			  struct ceph_osd_req_op *ops,
845 			  int num_reply,
846 			  struct rbd_req_coll *coll,
847 			  int coll_index,
848 			  void (*rbd_cb)(struct ceph_osd_request *req,
849 					 struct ceph_msg *msg),
850 			  struct ceph_osd_request **linger_req,
851 			  u64 *ver)
852 {
853 	struct ceph_osd_request *req;
854 	struct ceph_file_layout *layout;
855 	int ret;
856 	u64 bno;
857 	struct timespec mtime = CURRENT_TIME;
858 	struct rbd_request *req_data;
859 	struct ceph_osd_request_head *reqhead;
860 	struct rbd_image_header *header = &dev->header;
861 
862 	req_data = kzalloc(sizeof(*req_data), GFP_NOIO);
863 	if (!req_data) {
864 		if (coll)
865 			rbd_coll_end_req_index(rq, coll, coll_index,
866 					       -ENOMEM, len);
867 		return -ENOMEM;
868 	}
869 
870 	if (coll) {
871 		req_data->coll = coll;
872 		req_data->coll_index = coll_index;
873 	}
874 
875 	dout("rbd_do_request obj=%s ofs=%lld len=%lld\n", obj, len, ofs);
876 
877 	down_read(&header->snap_rwsem);
878 
879 	req = ceph_osdc_alloc_request(&dev->client->osdc, flags,
880 				      snapc,
881 				      ops,
882 				      false,
883 				      GFP_NOIO, pages, bio);
884 	if (!req) {
885 		up_read(&header->snap_rwsem);
886 		ret = -ENOMEM;
887 		goto done_pages;
888 	}
889 
890 	req->r_callback = rbd_cb;
891 
892 	req_data->rq = rq;
893 	req_data->bio = bio;
894 	req_data->pages = pages;
895 	req_data->len = len;
896 
897 	req->r_priv = req_data;
898 
899 	reqhead = req->r_request->front.iov_base;
900 	reqhead->snapid = cpu_to_le64(CEPH_NOSNAP);
901 
902 	strncpy(req->r_oid, obj, sizeof(req->r_oid));
903 	req->r_oid_len = strlen(req->r_oid);
904 
905 	layout = &req->r_file_layout;
906 	memset(layout, 0, sizeof(*layout));
907 	layout->fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
908 	layout->fl_stripe_count = cpu_to_le32(1);
909 	layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
910 	layout->fl_pg_preferred = cpu_to_le32(-1);
911 	layout->fl_pg_pool = cpu_to_le32(dev->poolid);
912 	ceph_calc_raw_layout(&dev->client->osdc, layout, snapid,
913 			     ofs, &len, &bno, req, ops);
914 
915 	ceph_osdc_build_request(req, ofs, &len,
916 				ops,
917 				snapc,
918 				&mtime,
919 				req->r_oid, req->r_oid_len);
920 	up_read(&header->snap_rwsem);
921 
922 	if (linger_req) {
923 		ceph_osdc_set_request_linger(&dev->client->osdc, req);
924 		*linger_req = req;
925 	}
926 
927 	ret = ceph_osdc_start_request(&dev->client->osdc, req, false);
928 	if (ret < 0)
929 		goto done_err;
930 
931 	if (!rbd_cb) {
932 		ret = ceph_osdc_wait_request(&dev->client->osdc, req);
933 		if (ver)
934 			*ver = le64_to_cpu(req->r_reassert_version.version);
935 		dout("reassert_ver=%lld\n",
936 		     le64_to_cpu(req->r_reassert_version.version));
937 		ceph_osdc_put_request(req);
938 	}
939 	return ret;
940 
941 done_err:
942 	bio_chain_put(req_data->bio);
943 	ceph_osdc_put_request(req);
944 done_pages:
945 	rbd_coll_end_req(req_data, ret, len);
946 	kfree(req_data);
947 	return ret;
948 }
949 
950 /*
951  * Ceph osd op callback
952  */
953 static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
954 {
955 	struct rbd_request *req_data = req->r_priv;
956 	struct ceph_osd_reply_head *replyhead;
957 	struct ceph_osd_op *op;
958 	__s32 rc;
959 	u64 bytes;
960 	int read_op;
961 
962 	/* parse reply */
963 	replyhead = msg->front.iov_base;
964 	WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
965 	op = (void *)(replyhead + 1);
966 	rc = le32_to_cpu(replyhead->result);
967 	bytes = le64_to_cpu(op->extent.length);
968 	read_op = (le32_to_cpu(op->op) == CEPH_OSD_OP_READ);
969 
970 	dout("rbd_req_cb bytes=%lld readop=%d rc=%d\n", bytes, read_op, rc);
971 
972 	if (rc == -ENOENT && read_op) {
973 		zero_bio_chain(req_data->bio, 0);
974 		rc = 0;
975 	} else if (rc == 0 && read_op && bytes < req_data->len) {
976 		zero_bio_chain(req_data->bio, bytes);
977 		bytes = req_data->len;
978 	}
979 
980 	rbd_coll_end_req(req_data, rc, bytes);
981 
982 	if (req_data->bio)
983 		bio_chain_put(req_data->bio);
984 
985 	ceph_osdc_put_request(req);
986 	kfree(req_data);
987 }
988 
989 static void rbd_simple_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
990 {
991 	ceph_osdc_put_request(req);
992 }
993 
994 /*
995  * Do a synchronous ceph osd operation
996  */
997 static int rbd_req_sync_op(struct rbd_device *dev,
998 			   struct ceph_snap_context *snapc,
999 			   u64 snapid,
1000 			   int opcode,
1001 			   int flags,
1002 			   struct ceph_osd_req_op *orig_ops,
1003 			   int num_reply,
1004 			   const char *obj,
1005 			   u64 ofs, u64 len,
1006 			   char *buf,
1007 			   struct ceph_osd_request **linger_req,
1008 			   u64 *ver)
1009 {
1010 	int ret;
1011 	struct page **pages;
1012 	int num_pages;
1013 	struct ceph_osd_req_op *ops = orig_ops;
1014 	u32 payload_len;
1015 
1016 	num_pages = calc_pages_for(ofs , len);
1017 	pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
1018 	if (IS_ERR(pages))
1019 		return PTR_ERR(pages);
1020 
1021 	if (!orig_ops) {
1022 		payload_len = (flags & CEPH_OSD_FLAG_WRITE ? len : 0);
1023 		ret = rbd_create_rw_ops(&ops, 1, opcode, payload_len);
1024 		if (ret < 0)
1025 			goto done;
1026 
1027 		if ((flags & CEPH_OSD_FLAG_WRITE) && buf) {
1028 			ret = ceph_copy_to_page_vector(pages, buf, ofs, len);
1029 			if (ret < 0)
1030 				goto done_ops;
1031 		}
1032 	}
1033 
1034 	ret = rbd_do_request(NULL, dev, snapc, snapid,
1035 			  obj, ofs, len, NULL,
1036 			  pages, num_pages,
1037 			  flags,
1038 			  ops,
1039 			  2,
1040 			  NULL, 0,
1041 			  NULL,
1042 			  linger_req, ver);
1043 	if (ret < 0)
1044 		goto done_ops;
1045 
1046 	if ((flags & CEPH_OSD_FLAG_READ) && buf)
1047 		ret = ceph_copy_from_page_vector(pages, buf, ofs, ret);
1048 
1049 done_ops:
1050 	if (!orig_ops)
1051 		rbd_destroy_ops(ops);
1052 done:
1053 	ceph_release_page_vector(pages, num_pages);
1054 	return ret;
1055 }
1056 
1057 /*
1058  * Do an asynchronous ceph osd operation
1059  */
1060 static int rbd_do_op(struct request *rq,
1061 		     struct rbd_device *rbd_dev ,
1062 		     struct ceph_snap_context *snapc,
1063 		     u64 snapid,
1064 		     int opcode, int flags, int num_reply,
1065 		     u64 ofs, u64 len,
1066 		     struct bio *bio,
1067 		     struct rbd_req_coll *coll,
1068 		     int coll_index)
1069 {
1070 	char *seg_name;
1071 	u64 seg_ofs;
1072 	u64 seg_len;
1073 	int ret;
1074 	struct ceph_osd_req_op *ops;
1075 	u32 payload_len;
1076 
1077 	seg_name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO);
1078 	if (!seg_name)
1079 		return -ENOMEM;
1080 
1081 	seg_len = rbd_get_segment(&rbd_dev->header,
1082 				  rbd_dev->header.block_name,
1083 				  ofs, len,
1084 				  seg_name, &seg_ofs);
1085 
1086 	payload_len = (flags & CEPH_OSD_FLAG_WRITE ? seg_len : 0);
1087 
1088 	ret = rbd_create_rw_ops(&ops, 1, opcode, payload_len);
1089 	if (ret < 0)
1090 		goto done;
1091 
1092 	/* we've taken care of segment sizes earlier when we
1093 	   cloned the bios. We should never have a segment
1094 	   truncated at this point */
1095 	BUG_ON(seg_len < len);
1096 
1097 	ret = rbd_do_request(rq, rbd_dev, snapc, snapid,
1098 			     seg_name, seg_ofs, seg_len,
1099 			     bio,
1100 			     NULL, 0,
1101 			     flags,
1102 			     ops,
1103 			     num_reply,
1104 			     coll, coll_index,
1105 			     rbd_req_cb, 0, NULL);
1106 
1107 	rbd_destroy_ops(ops);
1108 done:
1109 	kfree(seg_name);
1110 	return ret;
1111 }
1112 
1113 /*
1114  * Request async osd write
1115  */
1116 static int rbd_req_write(struct request *rq,
1117 			 struct rbd_device *rbd_dev,
1118 			 struct ceph_snap_context *snapc,
1119 			 u64 ofs, u64 len,
1120 			 struct bio *bio,
1121 			 struct rbd_req_coll *coll,
1122 			 int coll_index)
1123 {
1124 	return rbd_do_op(rq, rbd_dev, snapc, CEPH_NOSNAP,
1125 			 CEPH_OSD_OP_WRITE,
1126 			 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1127 			 2,
1128 			 ofs, len, bio, coll, coll_index);
1129 }
1130 
1131 /*
1132  * Request async osd read
1133  */
1134 static int rbd_req_read(struct request *rq,
1135 			 struct rbd_device *rbd_dev,
1136 			 u64 snapid,
1137 			 u64 ofs, u64 len,
1138 			 struct bio *bio,
1139 			 struct rbd_req_coll *coll,
1140 			 int coll_index)
1141 {
1142 	return rbd_do_op(rq, rbd_dev, NULL,
1143 			 (snapid ? snapid : CEPH_NOSNAP),
1144 			 CEPH_OSD_OP_READ,
1145 			 CEPH_OSD_FLAG_READ,
1146 			 2,
1147 			 ofs, len, bio, coll, coll_index);
1148 }
1149 
1150 /*
1151  * Request sync osd read
1152  */
1153 static int rbd_req_sync_read(struct rbd_device *dev,
1154 			  struct ceph_snap_context *snapc,
1155 			  u64 snapid,
1156 			  const char *obj,
1157 			  u64 ofs, u64 len,
1158 			  char *buf,
1159 			  u64 *ver)
1160 {
1161 	return rbd_req_sync_op(dev, NULL,
1162 			       (snapid ? snapid : CEPH_NOSNAP),
1163 			       CEPH_OSD_OP_READ,
1164 			       CEPH_OSD_FLAG_READ,
1165 			       NULL,
1166 			       1, obj, ofs, len, buf, NULL, ver);
1167 }
1168 
1169 /*
1170  * Request sync osd watch
1171  */
1172 static int rbd_req_sync_notify_ack(struct rbd_device *dev,
1173 				   u64 ver,
1174 				   u64 notify_id,
1175 				   const char *obj)
1176 {
1177 	struct ceph_osd_req_op *ops;
1178 	struct page **pages = NULL;
1179 	int ret;
1180 
1181 	ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_NOTIFY_ACK, 0);
1182 	if (ret < 0)
1183 		return ret;
1184 
1185 	ops[0].watch.ver = cpu_to_le64(dev->header.obj_version);
1186 	ops[0].watch.cookie = notify_id;
1187 	ops[0].watch.flag = 0;
1188 
1189 	ret = rbd_do_request(NULL, dev, NULL, CEPH_NOSNAP,
1190 			  obj, 0, 0, NULL,
1191 			  pages, 0,
1192 			  CEPH_OSD_FLAG_READ,
1193 			  ops,
1194 			  1,
1195 			  NULL, 0,
1196 			  rbd_simple_req_cb, 0, NULL);
1197 
1198 	rbd_destroy_ops(ops);
1199 	return ret;
1200 }
1201 
1202 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1203 {
1204 	struct rbd_device *dev = (struct rbd_device *)data;
1205 	int rc;
1206 
1207 	if (!dev)
1208 		return;
1209 
1210 	dout("rbd_watch_cb %s notify_id=%lld opcode=%d\n", dev->obj_md_name,
1211 		notify_id, (int)opcode);
1212 	mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1213 	rc = __rbd_update_snaps(dev);
1214 	mutex_unlock(&ctl_mutex);
1215 	if (rc)
1216 		pr_warning(DRV_NAME "%d got notification but failed to update"
1217 			   " snaps: %d\n", dev->major, rc);
1218 
1219 	rbd_req_sync_notify_ack(dev, ver, notify_id, dev->obj_md_name);
1220 }
1221 
1222 /*
1223  * Request sync osd watch
1224  */
1225 static int rbd_req_sync_watch(struct rbd_device *dev,
1226 			      const char *obj,
1227 			      u64 ver)
1228 {
1229 	struct ceph_osd_req_op *ops;
1230 	struct ceph_osd_client *osdc = &dev->client->osdc;
1231 
1232 	int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_WATCH, 0);
1233 	if (ret < 0)
1234 		return ret;
1235 
1236 	ret = ceph_osdc_create_event(osdc, rbd_watch_cb, 0,
1237 				     (void *)dev, &dev->watch_event);
1238 	if (ret < 0)
1239 		goto fail;
1240 
1241 	ops[0].watch.ver = cpu_to_le64(ver);
1242 	ops[0].watch.cookie = cpu_to_le64(dev->watch_event->cookie);
1243 	ops[0].watch.flag = 1;
1244 
1245 	ret = rbd_req_sync_op(dev, NULL,
1246 			      CEPH_NOSNAP,
1247 			      0,
1248 			      CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1249 			      ops,
1250 			      1, obj, 0, 0, NULL,
1251 			      &dev->watch_request, NULL);
1252 
1253 	if (ret < 0)
1254 		goto fail_event;
1255 
1256 	rbd_destroy_ops(ops);
1257 	return 0;
1258 
1259 fail_event:
1260 	ceph_osdc_cancel_event(dev->watch_event);
1261 	dev->watch_event = NULL;
1262 fail:
1263 	rbd_destroy_ops(ops);
1264 	return ret;
1265 }
1266 
1267 /*
1268  * Request sync osd unwatch
1269  */
1270 static int rbd_req_sync_unwatch(struct rbd_device *dev,
1271 				const char *obj)
1272 {
1273 	struct ceph_osd_req_op *ops;
1274 
1275 	int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_WATCH, 0);
1276 	if (ret < 0)
1277 		return ret;
1278 
1279 	ops[0].watch.ver = 0;
1280 	ops[0].watch.cookie = cpu_to_le64(dev->watch_event->cookie);
1281 	ops[0].watch.flag = 0;
1282 
1283 	ret = rbd_req_sync_op(dev, NULL,
1284 			      CEPH_NOSNAP,
1285 			      0,
1286 			      CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1287 			      ops,
1288 			      1, obj, 0, 0, NULL, NULL, NULL);
1289 
1290 	rbd_destroy_ops(ops);
1291 	ceph_osdc_cancel_event(dev->watch_event);
1292 	dev->watch_event = NULL;
1293 	return ret;
1294 }
1295 
1296 struct rbd_notify_info {
1297 	struct rbd_device *dev;
1298 };
1299 
1300 static void rbd_notify_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1301 {
1302 	struct rbd_device *dev = (struct rbd_device *)data;
1303 	if (!dev)
1304 		return;
1305 
1306 	dout("rbd_notify_cb %s notify_id=%lld opcode=%d\n", dev->obj_md_name,
1307 		notify_id, (int)opcode);
1308 }
1309 
1310 /*
1311  * Request sync osd notify
1312  */
1313 static int rbd_req_sync_notify(struct rbd_device *dev,
1314 		          const char *obj)
1315 {
1316 	struct ceph_osd_req_op *ops;
1317 	struct ceph_osd_client *osdc = &dev->client->osdc;
1318 	struct ceph_osd_event *event;
1319 	struct rbd_notify_info info;
1320 	int payload_len = sizeof(u32) + sizeof(u32);
1321 	int ret;
1322 
1323 	ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_NOTIFY, payload_len);
1324 	if (ret < 0)
1325 		return ret;
1326 
1327 	info.dev = dev;
1328 
1329 	ret = ceph_osdc_create_event(osdc, rbd_notify_cb, 1,
1330 				     (void *)&info, &event);
1331 	if (ret < 0)
1332 		goto fail;
1333 
1334 	ops[0].watch.ver = 1;
1335 	ops[0].watch.flag = 1;
1336 	ops[0].watch.cookie = event->cookie;
1337 	ops[0].watch.prot_ver = RADOS_NOTIFY_VER;
1338 	ops[0].watch.timeout = 12;
1339 
1340 	ret = rbd_req_sync_op(dev, NULL,
1341 			       CEPH_NOSNAP,
1342 			       0,
1343 			       CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1344 			       ops,
1345 			       1, obj, 0, 0, NULL, NULL, NULL);
1346 	if (ret < 0)
1347 		goto fail_event;
1348 
1349 	ret = ceph_osdc_wait_event(event, CEPH_OSD_TIMEOUT_DEFAULT);
1350 	dout("ceph_osdc_wait_event returned %d\n", ret);
1351 	rbd_destroy_ops(ops);
1352 	return 0;
1353 
1354 fail_event:
1355 	ceph_osdc_cancel_event(event);
1356 fail:
1357 	rbd_destroy_ops(ops);
1358 	return ret;
1359 }
1360 
1361 /*
1362  * Request sync osd read
1363  */
1364 static int rbd_req_sync_exec(struct rbd_device *dev,
1365 			     const char *obj,
1366 			     const char *cls,
1367 			     const char *method,
1368 			     const char *data,
1369 			     int len,
1370 			     u64 *ver)
1371 {
1372 	struct ceph_osd_req_op *ops;
1373 	int cls_len = strlen(cls);
1374 	int method_len = strlen(method);
1375 	int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_CALL,
1376 				    cls_len + method_len + len);
1377 	if (ret < 0)
1378 		return ret;
1379 
1380 	ops[0].cls.class_name = cls;
1381 	ops[0].cls.class_len = (__u8)cls_len;
1382 	ops[0].cls.method_name = method;
1383 	ops[0].cls.method_len = (__u8)method_len;
1384 	ops[0].cls.argc = 0;
1385 	ops[0].cls.indata = data;
1386 	ops[0].cls.indata_len = len;
1387 
1388 	ret = rbd_req_sync_op(dev, NULL,
1389 			       CEPH_NOSNAP,
1390 			       0,
1391 			       CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1392 			       ops,
1393 			       1, obj, 0, 0, NULL, NULL, ver);
1394 
1395 	rbd_destroy_ops(ops);
1396 
1397 	dout("cls_exec returned %d\n", ret);
1398 	return ret;
1399 }
1400 
1401 static struct rbd_req_coll *rbd_alloc_coll(int num_reqs)
1402 {
1403 	struct rbd_req_coll *coll =
1404 			kzalloc(sizeof(struct rbd_req_coll) +
1405 			        sizeof(struct rbd_req_status) * num_reqs,
1406 				GFP_ATOMIC);
1407 
1408 	if (!coll)
1409 		return NULL;
1410 	coll->total = num_reqs;
1411 	kref_init(&coll->kref);
1412 	return coll;
1413 }
1414 
1415 /*
1416  * block device queue callback
1417  */
1418 static void rbd_rq_fn(struct request_queue *q)
1419 {
1420 	struct rbd_device *rbd_dev = q->queuedata;
1421 	struct request *rq;
1422 	struct bio_pair *bp = NULL;
1423 
1424 	rq = blk_fetch_request(q);
1425 
1426 	while (1) {
1427 		struct bio *bio;
1428 		struct bio *rq_bio, *next_bio = NULL;
1429 		bool do_write;
1430 		int size, op_size = 0;
1431 		u64 ofs;
1432 		int num_segs, cur_seg = 0;
1433 		struct rbd_req_coll *coll;
1434 
1435 		/* peek at request from block layer */
1436 		if (!rq)
1437 			break;
1438 
1439 		dout("fetched request\n");
1440 
1441 		/* filter out block requests we don't understand */
1442 		if ((rq->cmd_type != REQ_TYPE_FS)) {
1443 			__blk_end_request_all(rq, 0);
1444 			goto next;
1445 		}
1446 
1447 		/* deduce our operation (read, write) */
1448 		do_write = (rq_data_dir(rq) == WRITE);
1449 
1450 		size = blk_rq_bytes(rq);
1451 		ofs = blk_rq_pos(rq) * 512ULL;
1452 		rq_bio = rq->bio;
1453 		if (do_write && rbd_dev->read_only) {
1454 			__blk_end_request_all(rq, -EROFS);
1455 			goto next;
1456 		}
1457 
1458 		spin_unlock_irq(q->queue_lock);
1459 
1460 		dout("%s 0x%x bytes at 0x%llx\n",
1461 		     do_write ? "write" : "read",
1462 		     size, blk_rq_pos(rq) * 512ULL);
1463 
1464 		num_segs = rbd_get_num_segments(&rbd_dev->header, ofs, size);
1465 		coll = rbd_alloc_coll(num_segs);
1466 		if (!coll) {
1467 			spin_lock_irq(q->queue_lock);
1468 			__blk_end_request_all(rq, -ENOMEM);
1469 			goto next;
1470 		}
1471 
1472 		do {
1473 			/* a bio clone to be passed down to OSD req */
1474 			dout("rq->bio->bi_vcnt=%d\n", rq->bio->bi_vcnt);
1475 			op_size = rbd_get_segment(&rbd_dev->header,
1476 						  rbd_dev->header.block_name,
1477 						  ofs, size,
1478 						  NULL, NULL);
1479 			kref_get(&coll->kref);
1480 			bio = bio_chain_clone(&rq_bio, &next_bio, &bp,
1481 					      op_size, GFP_ATOMIC);
1482 			if (!bio) {
1483 				rbd_coll_end_req_index(rq, coll, cur_seg,
1484 						       -ENOMEM, op_size);
1485 				goto next_seg;
1486 			}
1487 
1488 
1489 			/* init OSD command: write or read */
1490 			if (do_write)
1491 				rbd_req_write(rq, rbd_dev,
1492 					      rbd_dev->header.snapc,
1493 					      ofs,
1494 					      op_size, bio,
1495 					      coll, cur_seg);
1496 			else
1497 				rbd_req_read(rq, rbd_dev,
1498 					     cur_snap_id(rbd_dev),
1499 					     ofs,
1500 					     op_size, bio,
1501 					     coll, cur_seg);
1502 
1503 next_seg:
1504 			size -= op_size;
1505 			ofs += op_size;
1506 
1507 			cur_seg++;
1508 			rq_bio = next_bio;
1509 		} while (size > 0);
1510 		kref_put(&coll->kref, rbd_coll_release);
1511 
1512 		if (bp)
1513 			bio_pair_release(bp);
1514 		spin_lock_irq(q->queue_lock);
1515 next:
1516 		rq = blk_fetch_request(q);
1517 	}
1518 }
1519 
1520 /*
1521  * a queue callback. Makes sure that we don't create a bio that spans across
1522  * multiple osd objects. One exception would be with a single page bios,
1523  * which we handle later at bio_chain_clone
1524  */
1525 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
1526 			  struct bio_vec *bvec)
1527 {
1528 	struct rbd_device *rbd_dev = q->queuedata;
1529 	unsigned int chunk_sectors = 1 << (rbd_dev->header.obj_order - 9);
1530 	sector_t sector = bmd->bi_sector + get_start_sect(bmd->bi_bdev);
1531 	unsigned int bio_sectors = bmd->bi_size >> 9;
1532 	int max;
1533 
1534 	max =  (chunk_sectors - ((sector & (chunk_sectors - 1))
1535 				 + bio_sectors)) << 9;
1536 	if (max < 0)
1537 		max = 0; /* bio_add cannot handle a negative return */
1538 	if (max <= bvec->bv_len && bio_sectors == 0)
1539 		return bvec->bv_len;
1540 	return max;
1541 }
1542 
1543 static void rbd_free_disk(struct rbd_device *rbd_dev)
1544 {
1545 	struct gendisk *disk = rbd_dev->disk;
1546 
1547 	if (!disk)
1548 		return;
1549 
1550 	rbd_header_free(&rbd_dev->header);
1551 
1552 	if (disk->flags & GENHD_FL_UP)
1553 		del_gendisk(disk);
1554 	if (disk->queue)
1555 		blk_cleanup_queue(disk->queue);
1556 	put_disk(disk);
1557 }
1558 
1559 /*
1560  * reload the ondisk the header
1561  */
1562 static int rbd_read_header(struct rbd_device *rbd_dev,
1563 			   struct rbd_image_header *header)
1564 {
1565 	ssize_t rc;
1566 	struct rbd_image_header_ondisk *dh;
1567 	int snap_count = 0;
1568 	u64 snap_names_len = 0;
1569 	u64 ver;
1570 
1571 	while (1) {
1572 		int len = sizeof(*dh) +
1573 			  snap_count * sizeof(struct rbd_image_snap_ondisk) +
1574 			  snap_names_len;
1575 
1576 		rc = -ENOMEM;
1577 		dh = kmalloc(len, GFP_KERNEL);
1578 		if (!dh)
1579 			return -ENOMEM;
1580 
1581 		rc = rbd_req_sync_read(rbd_dev,
1582 				       NULL, CEPH_NOSNAP,
1583 				       rbd_dev->obj_md_name,
1584 				       0, len,
1585 				       (char *)dh, &ver);
1586 		if (rc < 0)
1587 			goto out_dh;
1588 
1589 		rc = rbd_header_from_disk(header, dh, snap_count, GFP_KERNEL);
1590 		if (rc < 0) {
1591 			if (rc == -ENXIO) {
1592 				pr_warning("unrecognized header format"
1593 					   " for image %s", rbd_dev->obj);
1594 			}
1595 			goto out_dh;
1596 		}
1597 
1598 		if (snap_count != header->total_snaps) {
1599 			snap_count = header->total_snaps;
1600 			snap_names_len = header->snap_names_len;
1601 			rbd_header_free(header);
1602 			kfree(dh);
1603 			continue;
1604 		}
1605 		break;
1606 	}
1607 	header->obj_version = ver;
1608 
1609 out_dh:
1610 	kfree(dh);
1611 	return rc;
1612 }
1613 
1614 /*
1615  * create a snapshot
1616  */
1617 static int rbd_header_add_snap(struct rbd_device *dev,
1618 			       const char *snap_name,
1619 			       gfp_t gfp_flags)
1620 {
1621 	int name_len = strlen(snap_name);
1622 	u64 new_snapid;
1623 	int ret;
1624 	void *data, *p, *e;
1625 	u64 ver;
1626 
1627 	/* we should create a snapshot only if we're pointing at the head */
1628 	if (dev->cur_snap)
1629 		return -EINVAL;
1630 
1631 	ret = ceph_monc_create_snapid(&dev->client->monc, dev->poolid,
1632 				      &new_snapid);
1633 	dout("created snapid=%lld\n", new_snapid);
1634 	if (ret < 0)
1635 		return ret;
1636 
1637 	data = kmalloc(name_len + 16, gfp_flags);
1638 	if (!data)
1639 		return -ENOMEM;
1640 
1641 	p = data;
1642 	e = data + name_len + 16;
1643 
1644 	ceph_encode_string_safe(&p, e, snap_name, name_len, bad);
1645 	ceph_encode_64_safe(&p, e, new_snapid, bad);
1646 
1647 	ret = rbd_req_sync_exec(dev, dev->obj_md_name, "rbd", "snap_add",
1648 				data, p - data, &ver);
1649 
1650 	kfree(data);
1651 
1652 	if (ret < 0)
1653 		return ret;
1654 
1655 	dev->header.snapc->seq =  new_snapid;
1656 
1657 	return 0;
1658 bad:
1659 	return -ERANGE;
1660 }
1661 
1662 static void __rbd_remove_all_snaps(struct rbd_device *rbd_dev)
1663 {
1664 	struct rbd_snap *snap;
1665 
1666 	while (!list_empty(&rbd_dev->snaps)) {
1667 		snap = list_first_entry(&rbd_dev->snaps, struct rbd_snap, node);
1668 		__rbd_remove_snap_dev(rbd_dev, snap);
1669 	}
1670 }
1671 
1672 /*
1673  * only read the first part of the ondisk header, without the snaps info
1674  */
1675 static int __rbd_update_snaps(struct rbd_device *rbd_dev)
1676 {
1677 	int ret;
1678 	struct rbd_image_header h;
1679 	u64 snap_seq;
1680 	int follow_seq = 0;
1681 
1682 	ret = rbd_read_header(rbd_dev, &h);
1683 	if (ret < 0)
1684 		return ret;
1685 
1686 	/* resized? */
1687 	set_capacity(rbd_dev->disk, h.image_size / 512ULL);
1688 
1689 	down_write(&rbd_dev->header.snap_rwsem);
1690 
1691 	snap_seq = rbd_dev->header.snapc->seq;
1692 	if (rbd_dev->header.total_snaps &&
1693 	    rbd_dev->header.snapc->snaps[0] == snap_seq)
1694 		/* pointing at the head, will need to follow that
1695 		   if head moves */
1696 		follow_seq = 1;
1697 
1698 	kfree(rbd_dev->header.snapc);
1699 	kfree(rbd_dev->header.snap_names);
1700 	kfree(rbd_dev->header.snap_sizes);
1701 
1702 	rbd_dev->header.total_snaps = h.total_snaps;
1703 	rbd_dev->header.snapc = h.snapc;
1704 	rbd_dev->header.snap_names = h.snap_names;
1705 	rbd_dev->header.snap_names_len = h.snap_names_len;
1706 	rbd_dev->header.snap_sizes = h.snap_sizes;
1707 	if (follow_seq)
1708 		rbd_dev->header.snapc->seq = rbd_dev->header.snapc->snaps[0];
1709 	else
1710 		rbd_dev->header.snapc->seq = snap_seq;
1711 
1712 	ret = __rbd_init_snaps_header(rbd_dev);
1713 
1714 	up_write(&rbd_dev->header.snap_rwsem);
1715 
1716 	return ret;
1717 }
1718 
1719 static int rbd_init_disk(struct rbd_device *rbd_dev)
1720 {
1721 	struct gendisk *disk;
1722 	struct request_queue *q;
1723 	int rc;
1724 	u64 total_size = 0;
1725 
1726 	/* contact OSD, request size info about the object being mapped */
1727 	rc = rbd_read_header(rbd_dev, &rbd_dev->header);
1728 	if (rc)
1729 		return rc;
1730 
1731 	/* no need to lock here, as rbd_dev is not registered yet */
1732 	rc = __rbd_init_snaps_header(rbd_dev);
1733 	if (rc)
1734 		return rc;
1735 
1736 	rc = rbd_header_set_snap(rbd_dev, rbd_dev->snap_name, &total_size);
1737 	if (rc)
1738 		return rc;
1739 
1740 	/* create gendisk info */
1741 	rc = -ENOMEM;
1742 	disk = alloc_disk(RBD_MINORS_PER_MAJOR);
1743 	if (!disk)
1744 		goto out;
1745 
1746 	snprintf(disk->disk_name, sizeof(disk->disk_name), DRV_NAME "%d",
1747 		 rbd_dev->id);
1748 	disk->major = rbd_dev->major;
1749 	disk->first_minor = 0;
1750 	disk->fops = &rbd_bd_ops;
1751 	disk->private_data = rbd_dev;
1752 
1753 	/* init rq */
1754 	rc = -ENOMEM;
1755 	q = blk_init_queue(rbd_rq_fn, &rbd_dev->lock);
1756 	if (!q)
1757 		goto out_disk;
1758 
1759 	/* set io sizes to object size */
1760 	blk_queue_max_hw_sectors(q, rbd_obj_bytes(&rbd_dev->header) / 512ULL);
1761 	blk_queue_max_segment_size(q, rbd_obj_bytes(&rbd_dev->header));
1762 	blk_queue_io_min(q, rbd_obj_bytes(&rbd_dev->header));
1763 	blk_queue_io_opt(q, rbd_obj_bytes(&rbd_dev->header));
1764 
1765 	blk_queue_merge_bvec(q, rbd_merge_bvec);
1766 	disk->queue = q;
1767 
1768 	q->queuedata = rbd_dev;
1769 
1770 	rbd_dev->disk = disk;
1771 	rbd_dev->q = q;
1772 
1773 	/* finally, announce the disk to the world */
1774 	set_capacity(disk, total_size / 512ULL);
1775 	add_disk(disk);
1776 
1777 	pr_info("%s: added with size 0x%llx\n",
1778 		disk->disk_name, (unsigned long long)total_size);
1779 	return 0;
1780 
1781 out_disk:
1782 	put_disk(disk);
1783 out:
1784 	return rc;
1785 }
1786 
1787 /*
1788   sysfs
1789 */
1790 
1791 static ssize_t rbd_size_show(struct device *dev,
1792 			     struct device_attribute *attr, char *buf)
1793 {
1794 	struct rbd_device *rbd_dev = dev_to_rbd(dev);
1795 
1796 	return sprintf(buf, "%llu\n", (unsigned long long)rbd_dev->header.image_size);
1797 }
1798 
1799 static ssize_t rbd_major_show(struct device *dev,
1800 			      struct device_attribute *attr, char *buf)
1801 {
1802 	struct rbd_device *rbd_dev = dev_to_rbd(dev);
1803 
1804 	return sprintf(buf, "%d\n", rbd_dev->major);
1805 }
1806 
1807 static ssize_t rbd_client_id_show(struct device *dev,
1808 				  struct device_attribute *attr, char *buf)
1809 {
1810 	struct rbd_device *rbd_dev = dev_to_rbd(dev);
1811 
1812 	return sprintf(buf, "client%lld\n", ceph_client_id(rbd_dev->client));
1813 }
1814 
1815 static ssize_t rbd_pool_show(struct device *dev,
1816 			     struct device_attribute *attr, char *buf)
1817 {
1818 	struct rbd_device *rbd_dev = dev_to_rbd(dev);
1819 
1820 	return sprintf(buf, "%s\n", rbd_dev->pool_name);
1821 }
1822 
1823 static ssize_t rbd_name_show(struct device *dev,
1824 			     struct device_attribute *attr, char *buf)
1825 {
1826 	struct rbd_device *rbd_dev = dev_to_rbd(dev);
1827 
1828 	return sprintf(buf, "%s\n", rbd_dev->obj);
1829 }
1830 
1831 static ssize_t rbd_snap_show(struct device *dev,
1832 			     struct device_attribute *attr,
1833 			     char *buf)
1834 {
1835 	struct rbd_device *rbd_dev = dev_to_rbd(dev);
1836 
1837 	return sprintf(buf, "%s\n", rbd_dev->snap_name);
1838 }
1839 
1840 static ssize_t rbd_image_refresh(struct device *dev,
1841 				 struct device_attribute *attr,
1842 				 const char *buf,
1843 				 size_t size)
1844 {
1845 	struct rbd_device *rbd_dev = dev_to_rbd(dev);
1846 	int rc;
1847 	int ret = size;
1848 
1849 	mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1850 
1851 	rc = __rbd_update_snaps(rbd_dev);
1852 	if (rc < 0)
1853 		ret = rc;
1854 
1855 	mutex_unlock(&ctl_mutex);
1856 	return ret;
1857 }
1858 
1859 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
1860 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
1861 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
1862 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
1863 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
1864 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
1865 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
1866 static DEVICE_ATTR(create_snap, S_IWUSR, NULL, rbd_snap_add);
1867 
1868 static struct attribute *rbd_attrs[] = {
1869 	&dev_attr_size.attr,
1870 	&dev_attr_major.attr,
1871 	&dev_attr_client_id.attr,
1872 	&dev_attr_pool.attr,
1873 	&dev_attr_name.attr,
1874 	&dev_attr_current_snap.attr,
1875 	&dev_attr_refresh.attr,
1876 	&dev_attr_create_snap.attr,
1877 	NULL
1878 };
1879 
1880 static struct attribute_group rbd_attr_group = {
1881 	.attrs = rbd_attrs,
1882 };
1883 
1884 static const struct attribute_group *rbd_attr_groups[] = {
1885 	&rbd_attr_group,
1886 	NULL
1887 };
1888 
1889 static void rbd_sysfs_dev_release(struct device *dev)
1890 {
1891 }
1892 
1893 static struct device_type rbd_device_type = {
1894 	.name		= "rbd",
1895 	.groups		= rbd_attr_groups,
1896 	.release	= rbd_sysfs_dev_release,
1897 };
1898 
1899 
1900 /*
1901   sysfs - snapshots
1902 */
1903 
1904 static ssize_t rbd_snap_size_show(struct device *dev,
1905 				  struct device_attribute *attr,
1906 				  char *buf)
1907 {
1908 	struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1909 
1910 	return sprintf(buf, "%lld\n", (long long)snap->size);
1911 }
1912 
1913 static ssize_t rbd_snap_id_show(struct device *dev,
1914 				struct device_attribute *attr,
1915 				char *buf)
1916 {
1917 	struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1918 
1919 	return sprintf(buf, "%lld\n", (long long)snap->id);
1920 }
1921 
1922 static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
1923 static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
1924 
1925 static struct attribute *rbd_snap_attrs[] = {
1926 	&dev_attr_snap_size.attr,
1927 	&dev_attr_snap_id.attr,
1928 	NULL,
1929 };
1930 
1931 static struct attribute_group rbd_snap_attr_group = {
1932 	.attrs = rbd_snap_attrs,
1933 };
1934 
1935 static void rbd_snap_dev_release(struct device *dev)
1936 {
1937 	struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1938 	kfree(snap->name);
1939 	kfree(snap);
1940 }
1941 
1942 static const struct attribute_group *rbd_snap_attr_groups[] = {
1943 	&rbd_snap_attr_group,
1944 	NULL
1945 };
1946 
1947 static struct device_type rbd_snap_device_type = {
1948 	.groups		= rbd_snap_attr_groups,
1949 	.release	= rbd_snap_dev_release,
1950 };
1951 
1952 static void __rbd_remove_snap_dev(struct rbd_device *rbd_dev,
1953 				  struct rbd_snap *snap)
1954 {
1955 	list_del(&snap->node);
1956 	device_unregister(&snap->dev);
1957 }
1958 
1959 static int rbd_register_snap_dev(struct rbd_device *rbd_dev,
1960 				  struct rbd_snap *snap,
1961 				  struct device *parent)
1962 {
1963 	struct device *dev = &snap->dev;
1964 	int ret;
1965 
1966 	dev->type = &rbd_snap_device_type;
1967 	dev->parent = parent;
1968 	dev->release = rbd_snap_dev_release;
1969 	dev_set_name(dev, "snap_%s", snap->name);
1970 	ret = device_register(dev);
1971 
1972 	return ret;
1973 }
1974 
1975 static int __rbd_add_snap_dev(struct rbd_device *rbd_dev,
1976 			      int i, const char *name,
1977 			      struct rbd_snap **snapp)
1978 {
1979 	int ret;
1980 	struct rbd_snap *snap = kzalloc(sizeof(*snap), GFP_KERNEL);
1981 	if (!snap)
1982 		return -ENOMEM;
1983 	snap->name = kstrdup(name, GFP_KERNEL);
1984 	snap->size = rbd_dev->header.snap_sizes[i];
1985 	snap->id = rbd_dev->header.snapc->snaps[i];
1986 	if (device_is_registered(&rbd_dev->dev)) {
1987 		ret = rbd_register_snap_dev(rbd_dev, snap,
1988 					     &rbd_dev->dev);
1989 		if (ret < 0)
1990 			goto err;
1991 	}
1992 	*snapp = snap;
1993 	return 0;
1994 err:
1995 	kfree(snap->name);
1996 	kfree(snap);
1997 	return ret;
1998 }
1999 
2000 /*
2001  * search for the previous snap in a null delimited string list
2002  */
2003 const char *rbd_prev_snap_name(const char *name, const char *start)
2004 {
2005 	if (name < start + 2)
2006 		return NULL;
2007 
2008 	name -= 2;
2009 	while (*name) {
2010 		if (name == start)
2011 			return start;
2012 		name--;
2013 	}
2014 	return name + 1;
2015 }
2016 
2017 /*
2018  * compare the old list of snapshots that we have to what's in the header
2019  * and update it accordingly. Note that the header holds the snapshots
2020  * in a reverse order (from newest to oldest) and we need to go from
2021  * older to new so that we don't get a duplicate snap name when
2022  * doing the process (e.g., removed snapshot and recreated a new
2023  * one with the same name.
2024  */
2025 static int __rbd_init_snaps_header(struct rbd_device *rbd_dev)
2026 {
2027 	const char *name, *first_name;
2028 	int i = rbd_dev->header.total_snaps;
2029 	struct rbd_snap *snap, *old_snap = NULL;
2030 	int ret;
2031 	struct list_head *p, *n;
2032 
2033 	first_name = rbd_dev->header.snap_names;
2034 	name = first_name + rbd_dev->header.snap_names_len;
2035 
2036 	list_for_each_prev_safe(p, n, &rbd_dev->snaps) {
2037 		u64 cur_id;
2038 
2039 		old_snap = list_entry(p, struct rbd_snap, node);
2040 
2041 		if (i)
2042 			cur_id = rbd_dev->header.snapc->snaps[i - 1];
2043 
2044 		if (!i || old_snap->id < cur_id) {
2045 			/* old_snap->id was skipped, thus was removed */
2046 			__rbd_remove_snap_dev(rbd_dev, old_snap);
2047 			continue;
2048 		}
2049 		if (old_snap->id == cur_id) {
2050 			/* we have this snapshot already */
2051 			i--;
2052 			name = rbd_prev_snap_name(name, first_name);
2053 			continue;
2054 		}
2055 		for (; i > 0;
2056 		     i--, name = rbd_prev_snap_name(name, first_name)) {
2057 			if (!name) {
2058 				WARN_ON(1);
2059 				return -EINVAL;
2060 			}
2061 			cur_id = rbd_dev->header.snapc->snaps[i];
2062 			/* snapshot removal? handle it above */
2063 			if (cur_id >= old_snap->id)
2064 				break;
2065 			/* a new snapshot */
2066 			ret = __rbd_add_snap_dev(rbd_dev, i - 1, name, &snap);
2067 			if (ret < 0)
2068 				return ret;
2069 
2070 			/* note that we add it backward so using n and not p */
2071 			list_add(&snap->node, n);
2072 			p = &snap->node;
2073 		}
2074 	}
2075 	/* we're done going over the old snap list, just add what's left */
2076 	for (; i > 0; i--) {
2077 		name = rbd_prev_snap_name(name, first_name);
2078 		if (!name) {
2079 			WARN_ON(1);
2080 			return -EINVAL;
2081 		}
2082 		ret = __rbd_add_snap_dev(rbd_dev, i - 1, name, &snap);
2083 		if (ret < 0)
2084 			return ret;
2085 		list_add(&snap->node, &rbd_dev->snaps);
2086 	}
2087 
2088 	return 0;
2089 }
2090 
2091 
2092 static void rbd_root_dev_release(struct device *dev)
2093 {
2094 }
2095 
2096 static struct device rbd_root_dev = {
2097 	.init_name =    "rbd",
2098 	.release =      rbd_root_dev_release,
2099 };
2100 
2101 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
2102 {
2103 	int ret = -ENOMEM;
2104 	struct device *dev;
2105 	struct rbd_snap *snap;
2106 
2107 	mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2108 	dev = &rbd_dev->dev;
2109 
2110 	dev->bus = &rbd_bus_type;
2111 	dev->type = &rbd_device_type;
2112 	dev->parent = &rbd_root_dev;
2113 	dev->release = rbd_dev_release;
2114 	dev_set_name(dev, "%d", rbd_dev->id);
2115 	ret = device_register(dev);
2116 	if (ret < 0)
2117 		goto done_free;
2118 
2119 	list_for_each_entry(snap, &rbd_dev->snaps, node) {
2120 		ret = rbd_register_snap_dev(rbd_dev, snap,
2121 					     &rbd_dev->dev);
2122 		if (ret < 0)
2123 			break;
2124 	}
2125 
2126 	mutex_unlock(&ctl_mutex);
2127 	return 0;
2128 done_free:
2129 	mutex_unlock(&ctl_mutex);
2130 	return ret;
2131 }
2132 
2133 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
2134 {
2135 	device_unregister(&rbd_dev->dev);
2136 }
2137 
2138 static int rbd_init_watch_dev(struct rbd_device *rbd_dev)
2139 {
2140 	int ret, rc;
2141 
2142 	do {
2143 		ret = rbd_req_sync_watch(rbd_dev, rbd_dev->obj_md_name,
2144 					 rbd_dev->header.obj_version);
2145 		if (ret == -ERANGE) {
2146 			mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2147 			rc = __rbd_update_snaps(rbd_dev);
2148 			mutex_unlock(&ctl_mutex);
2149 			if (rc < 0)
2150 				return rc;
2151 		}
2152 	} while (ret == -ERANGE);
2153 
2154 	return ret;
2155 }
2156 
2157 static ssize_t rbd_add(struct bus_type *bus,
2158 		       const char *buf,
2159 		       size_t count)
2160 {
2161 	struct ceph_osd_client *osdc;
2162 	struct rbd_device *rbd_dev;
2163 	ssize_t rc = -ENOMEM;
2164 	int irc, new_id = 0;
2165 	struct list_head *tmp;
2166 	char *mon_dev_name;
2167 	char *options;
2168 
2169 	if (!try_module_get(THIS_MODULE))
2170 		return -ENODEV;
2171 
2172 	mon_dev_name = kmalloc(RBD_MAX_OPT_LEN, GFP_KERNEL);
2173 	if (!mon_dev_name)
2174 		goto err_out_mod;
2175 
2176 	options = kmalloc(RBD_MAX_OPT_LEN, GFP_KERNEL);
2177 	if (!options)
2178 		goto err_mon_dev;
2179 
2180 	/* new rbd_device object */
2181 	rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
2182 	if (!rbd_dev)
2183 		goto err_out_opt;
2184 
2185 	/* static rbd_device initialization */
2186 	spin_lock_init(&rbd_dev->lock);
2187 	INIT_LIST_HEAD(&rbd_dev->node);
2188 	INIT_LIST_HEAD(&rbd_dev->snaps);
2189 
2190 	init_rwsem(&rbd_dev->header.snap_rwsem);
2191 
2192 	/* generate unique id: find highest unique id, add one */
2193 	mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2194 
2195 	list_for_each(tmp, &rbd_dev_list) {
2196 		struct rbd_device *rbd_dev;
2197 
2198 		rbd_dev = list_entry(tmp, struct rbd_device, node);
2199 		if (rbd_dev->id >= new_id)
2200 			new_id = rbd_dev->id + 1;
2201 	}
2202 
2203 	rbd_dev->id = new_id;
2204 
2205 	/* add to global list */
2206 	list_add_tail(&rbd_dev->node, &rbd_dev_list);
2207 
2208 	/* parse add command */
2209 	if (sscanf(buf, "%" __stringify(RBD_MAX_OPT_LEN) "s "
2210 		   "%" __stringify(RBD_MAX_OPT_LEN) "s "
2211 		   "%" __stringify(RBD_MAX_POOL_NAME_LEN) "s "
2212 		   "%" __stringify(RBD_MAX_OBJ_NAME_LEN) "s"
2213 		   "%" __stringify(RBD_MAX_SNAP_NAME_LEN) "s",
2214 		   mon_dev_name, options, rbd_dev->pool_name,
2215 		   rbd_dev->obj, rbd_dev->snap_name) < 4) {
2216 		rc = -EINVAL;
2217 		goto err_out_slot;
2218 	}
2219 
2220 	if (rbd_dev->snap_name[0] == 0)
2221 		rbd_dev->snap_name[0] = '-';
2222 
2223 	rbd_dev->obj_len = strlen(rbd_dev->obj);
2224 	snprintf(rbd_dev->obj_md_name, sizeof(rbd_dev->obj_md_name), "%s%s",
2225 		 rbd_dev->obj, RBD_SUFFIX);
2226 
2227 	/* initialize rest of new object */
2228 	snprintf(rbd_dev->name, DEV_NAME_LEN, DRV_NAME "%d", rbd_dev->id);
2229 	rc = rbd_get_client(rbd_dev, mon_dev_name, options);
2230 	if (rc < 0)
2231 		goto err_out_slot;
2232 
2233 	mutex_unlock(&ctl_mutex);
2234 
2235 	/* pick the pool */
2236 	osdc = &rbd_dev->client->osdc;
2237 	rc = ceph_pg_poolid_by_name(osdc->osdmap, rbd_dev->pool_name);
2238 	if (rc < 0)
2239 		goto err_out_client;
2240 	rbd_dev->poolid = rc;
2241 
2242 	/* register our block device */
2243 	irc = register_blkdev(0, rbd_dev->name);
2244 	if (irc < 0) {
2245 		rc = irc;
2246 		goto err_out_client;
2247 	}
2248 	rbd_dev->major = irc;
2249 
2250 	rc = rbd_bus_add_dev(rbd_dev);
2251 	if (rc)
2252 		goto err_out_blkdev;
2253 
2254 	/* set up and announce blkdev mapping */
2255 	rc = rbd_init_disk(rbd_dev);
2256 	if (rc)
2257 		goto err_out_bus;
2258 
2259 	rc = rbd_init_watch_dev(rbd_dev);
2260 	if (rc)
2261 		goto err_out_bus;
2262 
2263 	return count;
2264 
2265 err_out_bus:
2266 	mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2267 	list_del_init(&rbd_dev->node);
2268 	mutex_unlock(&ctl_mutex);
2269 
2270 	/* this will also clean up rest of rbd_dev stuff */
2271 
2272 	rbd_bus_del_dev(rbd_dev);
2273 	kfree(options);
2274 	kfree(mon_dev_name);
2275 	return rc;
2276 
2277 err_out_blkdev:
2278 	unregister_blkdev(rbd_dev->major, rbd_dev->name);
2279 err_out_client:
2280 	rbd_put_client(rbd_dev);
2281 	mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2282 err_out_slot:
2283 	list_del_init(&rbd_dev->node);
2284 	mutex_unlock(&ctl_mutex);
2285 
2286 	kfree(rbd_dev);
2287 err_out_opt:
2288 	kfree(options);
2289 err_mon_dev:
2290 	kfree(mon_dev_name);
2291 err_out_mod:
2292 	dout("Error adding device %s\n", buf);
2293 	module_put(THIS_MODULE);
2294 	return rc;
2295 }
2296 
2297 static struct rbd_device *__rbd_get_dev(unsigned long id)
2298 {
2299 	struct list_head *tmp;
2300 	struct rbd_device *rbd_dev;
2301 
2302 	list_for_each(tmp, &rbd_dev_list) {
2303 		rbd_dev = list_entry(tmp, struct rbd_device, node);
2304 		if (rbd_dev->id == id)
2305 			return rbd_dev;
2306 	}
2307 	return NULL;
2308 }
2309 
2310 static void rbd_dev_release(struct device *dev)
2311 {
2312 	struct rbd_device *rbd_dev =
2313 			container_of(dev, struct rbd_device, dev);
2314 
2315 	if (rbd_dev->watch_request)
2316 		ceph_osdc_unregister_linger_request(&rbd_dev->client->osdc,
2317 						    rbd_dev->watch_request);
2318 	if (rbd_dev->watch_event)
2319 		rbd_req_sync_unwatch(rbd_dev, rbd_dev->obj_md_name);
2320 
2321 	rbd_put_client(rbd_dev);
2322 
2323 	/* clean up and free blkdev */
2324 	rbd_free_disk(rbd_dev);
2325 	unregister_blkdev(rbd_dev->major, rbd_dev->name);
2326 	kfree(rbd_dev);
2327 
2328 	/* release module ref */
2329 	module_put(THIS_MODULE);
2330 }
2331 
2332 static ssize_t rbd_remove(struct bus_type *bus,
2333 			  const char *buf,
2334 			  size_t count)
2335 {
2336 	struct rbd_device *rbd_dev = NULL;
2337 	int target_id, rc;
2338 	unsigned long ul;
2339 	int ret = count;
2340 
2341 	rc = strict_strtoul(buf, 10, &ul);
2342 	if (rc)
2343 		return rc;
2344 
2345 	/* convert to int; abort if we lost anything in the conversion */
2346 	target_id = (int) ul;
2347 	if (target_id != ul)
2348 		return -EINVAL;
2349 
2350 	mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2351 
2352 	rbd_dev = __rbd_get_dev(target_id);
2353 	if (!rbd_dev) {
2354 		ret = -ENOENT;
2355 		goto done;
2356 	}
2357 
2358 	list_del_init(&rbd_dev->node);
2359 
2360 	__rbd_remove_all_snaps(rbd_dev);
2361 	rbd_bus_del_dev(rbd_dev);
2362 
2363 done:
2364 	mutex_unlock(&ctl_mutex);
2365 	return ret;
2366 }
2367 
2368 static ssize_t rbd_snap_add(struct device *dev,
2369 			    struct device_attribute *attr,
2370 			    const char *buf,
2371 			    size_t count)
2372 {
2373 	struct rbd_device *rbd_dev = dev_to_rbd(dev);
2374 	int ret;
2375 	char *name = kmalloc(count + 1, GFP_KERNEL);
2376 	if (!name)
2377 		return -ENOMEM;
2378 
2379 	snprintf(name, count, "%s", buf);
2380 
2381 	mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2382 
2383 	ret = rbd_header_add_snap(rbd_dev,
2384 				  name, GFP_KERNEL);
2385 	if (ret < 0)
2386 		goto err_unlock;
2387 
2388 	ret = __rbd_update_snaps(rbd_dev);
2389 	if (ret < 0)
2390 		goto err_unlock;
2391 
2392 	/* shouldn't hold ctl_mutex when notifying.. notify might
2393 	   trigger a watch callback that would need to get that mutex */
2394 	mutex_unlock(&ctl_mutex);
2395 
2396 	/* make a best effort, don't error if failed */
2397 	rbd_req_sync_notify(rbd_dev, rbd_dev->obj_md_name);
2398 
2399 	ret = count;
2400 	kfree(name);
2401 	return ret;
2402 
2403 err_unlock:
2404 	mutex_unlock(&ctl_mutex);
2405 	kfree(name);
2406 	return ret;
2407 }
2408 
2409 static struct bus_attribute rbd_bus_attrs[] = {
2410 	__ATTR(add, S_IWUSR, NULL, rbd_add),
2411 	__ATTR(remove, S_IWUSR, NULL, rbd_remove),
2412 	__ATTR_NULL
2413 };
2414 
2415 /*
2416  * create control files in sysfs
2417  * /sys/bus/rbd/...
2418  */
2419 static int rbd_sysfs_init(void)
2420 {
2421 	int ret;
2422 
2423 	rbd_bus_type.bus_attrs = rbd_bus_attrs;
2424 
2425 	ret = bus_register(&rbd_bus_type);
2426 	 if (ret < 0)
2427 		return ret;
2428 
2429 	ret = device_register(&rbd_root_dev);
2430 
2431 	return ret;
2432 }
2433 
2434 static void rbd_sysfs_cleanup(void)
2435 {
2436 	device_unregister(&rbd_root_dev);
2437 	bus_unregister(&rbd_bus_type);
2438 }
2439 
2440 int __init rbd_init(void)
2441 {
2442 	int rc;
2443 
2444 	rc = rbd_sysfs_init();
2445 	if (rc)
2446 		return rc;
2447 	spin_lock_init(&node_lock);
2448 	pr_info("loaded " DRV_NAME_LONG "\n");
2449 	return 0;
2450 }
2451 
2452 void __exit rbd_exit(void)
2453 {
2454 	rbd_sysfs_cleanup();
2455 }
2456 
2457 module_init(rbd_init);
2458 module_exit(rbd_exit);
2459 
2460 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
2461 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
2462 MODULE_DESCRIPTION("rados block device");
2463 
2464 /* following authorship retained from original osdblk.c */
2465 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
2466 
2467 MODULE_LICENSE("GPL");
2468