xref: /openbmc/linux/drivers/block/rbd.c (revision df2634f43f5106947f3735a0b61a6527a4b278cd)
1 /*
2    rbd.c -- Export ceph rados objects as a Linux block device
3 
4 
5    based on drivers/block/osdblk.c:
6 
7    Copyright 2009 Red Hat, Inc.
8 
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation.
12 
13    This program is distributed in the hope that it will be useful,
14    but WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16    GNU General Public License for more details.
17 
18    You should have received a copy of the GNU General Public License
19    along with this program; see the file COPYING.  If not, write to
20    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
21 
22 
23 
24    For usage instructions, please refer to:
25 
26                  Documentation/ABI/testing/sysfs-bus-rbd
27 
28  */
29 
30 #include <linux/ceph/libceph.h>
31 #include <linux/ceph/osd_client.h>
32 #include <linux/ceph/mon_client.h>
33 #include <linux/ceph/decode.h>
34 
35 #include <linux/kernel.h>
36 #include <linux/device.h>
37 #include <linux/module.h>
38 #include <linux/fs.h>
39 #include <linux/blkdev.h>
40 
41 #include "rbd_types.h"
42 
43 #define DRV_NAME "rbd"
44 #define DRV_NAME_LONG "rbd (rados block device)"
45 
46 #define RBD_MINORS_PER_MAJOR	256		/* max minors per blkdev */
47 
48 #define RBD_MAX_MD_NAME_LEN	(96 + sizeof(RBD_SUFFIX))
49 #define RBD_MAX_POOL_NAME_LEN	64
50 #define RBD_MAX_SNAP_NAME_LEN	32
51 #define RBD_MAX_OPT_LEN		1024
52 
53 #define RBD_SNAP_HEAD_NAME	"-"
54 
55 #define DEV_NAME_LEN		32
56 
57 /*
58  * block device image metadata (in-memory version)
59  */
60 struct rbd_image_header {
61 	u64 image_size;
62 	char block_name[32];
63 	__u8 obj_order;
64 	__u8 crypt_type;
65 	__u8 comp_type;
66 	struct rw_semaphore snap_rwsem;
67 	struct ceph_snap_context *snapc;
68 	size_t snap_names_len;
69 	u64 snap_seq;
70 	u32 total_snaps;
71 
72 	char *snap_names;
73 	u64 *snap_sizes;
74 };
75 
76 /*
77  * an instance of the client.  multiple devices may share a client.
78  */
79 struct rbd_client {
80 	struct ceph_client	*client;
81 	struct kref		kref;
82 	struct list_head	node;
83 };
84 
85 /*
86  * a single io request
87  */
88 struct rbd_request {
89 	struct request		*rq;		/* blk layer request */
90 	struct bio		*bio;		/* cloned bio */
91 	struct page		**pages;	/* list of used pages */
92 	u64			len;
93 };
94 
95 struct rbd_snap {
96 	struct	device		dev;
97 	const char		*name;
98 	size_t			size;
99 	struct list_head	node;
100 	u64			id;
101 };
102 
103 /*
104  * a single device
105  */
106 struct rbd_device {
107 	int			id;		/* blkdev unique id */
108 
109 	int			major;		/* blkdev assigned major */
110 	struct gendisk		*disk;		/* blkdev's gendisk and rq */
111 	struct request_queue	*q;
112 
113 	struct ceph_client	*client;
114 	struct rbd_client	*rbd_client;
115 
116 	char			name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
117 
118 	spinlock_t		lock;		/* queue lock */
119 
120 	struct rbd_image_header	header;
121 	char			obj[RBD_MAX_OBJ_NAME_LEN]; /* rbd image name */
122 	int			obj_len;
123 	char			obj_md_name[RBD_MAX_MD_NAME_LEN]; /* hdr nm. */
124 	char			pool_name[RBD_MAX_POOL_NAME_LEN];
125 	int			poolid;
126 
127 	char                    snap_name[RBD_MAX_SNAP_NAME_LEN];
128 	u32 cur_snap;	/* index+1 of current snapshot within snap context
129 			   0 - for the head */
130 	int read_only;
131 
132 	struct list_head	node;
133 
134 	/* list of snapshots */
135 	struct list_head	snaps;
136 
137 	/* sysfs related */
138 	struct device		dev;
139 };
140 
141 static struct bus_type rbd_bus_type = {
142 	.name		= "rbd",
143 };
144 
145 static spinlock_t node_lock;      /* protects client get/put */
146 
147 static DEFINE_MUTEX(ctl_mutex);	  /* Serialize open/close/setup/teardown */
148 static LIST_HEAD(rbd_dev_list);    /* devices */
149 static LIST_HEAD(rbd_client_list);      /* clients */
150 
151 static int __rbd_init_snaps_header(struct rbd_device *rbd_dev);
152 static void rbd_dev_release(struct device *dev);
153 static ssize_t rbd_snap_rollback(struct device *dev,
154 				 struct device_attribute *attr,
155 				 const char *buf,
156 				 size_t size);
157 static ssize_t rbd_snap_add(struct device *dev,
158 			    struct device_attribute *attr,
159 			    const char *buf,
160 			    size_t count);
161 static void __rbd_remove_snap_dev(struct rbd_device *rbd_dev,
162 				  struct rbd_snap *snap);;
163 
164 
165 static struct rbd_device *dev_to_rbd(struct device *dev)
166 {
167 	return container_of(dev, struct rbd_device, dev);
168 }
169 
170 static struct device *rbd_get_dev(struct rbd_device *rbd_dev)
171 {
172 	return get_device(&rbd_dev->dev);
173 }
174 
175 static void rbd_put_dev(struct rbd_device *rbd_dev)
176 {
177 	put_device(&rbd_dev->dev);
178 }
179 
180 static int rbd_open(struct block_device *bdev, fmode_t mode)
181 {
182 	struct gendisk *disk = bdev->bd_disk;
183 	struct rbd_device *rbd_dev = disk->private_data;
184 
185 	rbd_get_dev(rbd_dev);
186 
187 	set_device_ro(bdev, rbd_dev->read_only);
188 
189 	if ((mode & FMODE_WRITE) && rbd_dev->read_only)
190 		return -EROFS;
191 
192 	return 0;
193 }
194 
195 static int rbd_release(struct gendisk *disk, fmode_t mode)
196 {
197 	struct rbd_device *rbd_dev = disk->private_data;
198 
199 	rbd_put_dev(rbd_dev);
200 
201 	return 0;
202 }
203 
204 static const struct block_device_operations rbd_bd_ops = {
205 	.owner			= THIS_MODULE,
206 	.open			= rbd_open,
207 	.release		= rbd_release,
208 };
209 
210 /*
211  * Initialize an rbd client instance.
212  * We own *opt.
213  */
214 static struct rbd_client *rbd_client_create(struct ceph_options *opt)
215 {
216 	struct rbd_client *rbdc;
217 	int ret = -ENOMEM;
218 
219 	dout("rbd_client_create\n");
220 	rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
221 	if (!rbdc)
222 		goto out_opt;
223 
224 	kref_init(&rbdc->kref);
225 	INIT_LIST_HEAD(&rbdc->node);
226 
227 	rbdc->client = ceph_create_client(opt, rbdc);
228 	if (IS_ERR(rbdc->client))
229 		goto out_rbdc;
230 	opt = NULL; /* Now rbdc->client is responsible for opt */
231 
232 	ret = ceph_open_session(rbdc->client);
233 	if (ret < 0)
234 		goto out_err;
235 
236 	spin_lock(&node_lock);
237 	list_add_tail(&rbdc->node, &rbd_client_list);
238 	spin_unlock(&node_lock);
239 
240 	dout("rbd_client_create created %p\n", rbdc);
241 	return rbdc;
242 
243 out_err:
244 	ceph_destroy_client(rbdc->client);
245 out_rbdc:
246 	kfree(rbdc);
247 out_opt:
248 	if (opt)
249 		ceph_destroy_options(opt);
250 	return ERR_PTR(ret);
251 }
252 
253 /*
254  * Find a ceph client with specific addr and configuration.
255  */
256 static struct rbd_client *__rbd_client_find(struct ceph_options *opt)
257 {
258 	struct rbd_client *client_node;
259 
260 	if (opt->flags & CEPH_OPT_NOSHARE)
261 		return NULL;
262 
263 	list_for_each_entry(client_node, &rbd_client_list, node)
264 		if (ceph_compare_options(opt, client_node->client) == 0)
265 			return client_node;
266 	return NULL;
267 }
268 
269 /*
270  * Get a ceph client with specific addr and configuration, if one does
271  * not exist create it.
272  */
273 static int rbd_get_client(struct rbd_device *rbd_dev, const char *mon_addr,
274 			  char *options)
275 {
276 	struct rbd_client *rbdc;
277 	struct ceph_options *opt;
278 	int ret;
279 
280 	ret = ceph_parse_options(&opt, options, mon_addr,
281 				 mon_addr + strlen(mon_addr), NULL, NULL);
282 	if (ret < 0)
283 		return ret;
284 
285 	spin_lock(&node_lock);
286 	rbdc = __rbd_client_find(opt);
287 	if (rbdc) {
288 		ceph_destroy_options(opt);
289 
290 		/* using an existing client */
291 		kref_get(&rbdc->kref);
292 		rbd_dev->rbd_client = rbdc;
293 		rbd_dev->client = rbdc->client;
294 		spin_unlock(&node_lock);
295 		return 0;
296 	}
297 	spin_unlock(&node_lock);
298 
299 	rbdc = rbd_client_create(opt);
300 	if (IS_ERR(rbdc))
301 		return PTR_ERR(rbdc);
302 
303 	rbd_dev->rbd_client = rbdc;
304 	rbd_dev->client = rbdc->client;
305 	return 0;
306 }
307 
308 /*
309  * Destroy ceph client
310  */
311 static void rbd_client_release(struct kref *kref)
312 {
313 	struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
314 
315 	dout("rbd_release_client %p\n", rbdc);
316 	spin_lock(&node_lock);
317 	list_del(&rbdc->node);
318 	spin_unlock(&node_lock);
319 
320 	ceph_destroy_client(rbdc->client);
321 	kfree(rbdc);
322 }
323 
324 /*
325  * Drop reference to ceph client node. If it's not referenced anymore, release
326  * it.
327  */
328 static void rbd_put_client(struct rbd_device *rbd_dev)
329 {
330 	kref_put(&rbd_dev->rbd_client->kref, rbd_client_release);
331 	rbd_dev->rbd_client = NULL;
332 	rbd_dev->client = NULL;
333 }
334 
335 
336 /*
337  * Create a new header structure, translate header format from the on-disk
338  * header.
339  */
340 static int rbd_header_from_disk(struct rbd_image_header *header,
341 				 struct rbd_image_header_ondisk *ondisk,
342 				 int allocated_snaps,
343 				 gfp_t gfp_flags)
344 {
345 	int i;
346 	u32 snap_count = le32_to_cpu(ondisk->snap_count);
347 	int ret = -ENOMEM;
348 
349 	init_rwsem(&header->snap_rwsem);
350 	header->snap_names_len = le64_to_cpu(ondisk->snap_names_len);
351 	header->snapc = kmalloc(sizeof(struct ceph_snap_context) +
352 				snap_count *
353 				 sizeof(struct rbd_image_snap_ondisk),
354 				gfp_flags);
355 	if (!header->snapc)
356 		return -ENOMEM;
357 	if (snap_count) {
358 		header->snap_names = kmalloc(header->snap_names_len,
359 					     GFP_KERNEL);
360 		if (!header->snap_names)
361 			goto err_snapc;
362 		header->snap_sizes = kmalloc(snap_count * sizeof(u64),
363 					     GFP_KERNEL);
364 		if (!header->snap_sizes)
365 			goto err_names;
366 	} else {
367 		header->snap_names = NULL;
368 		header->snap_sizes = NULL;
369 	}
370 	memcpy(header->block_name, ondisk->block_name,
371 	       sizeof(ondisk->block_name));
372 
373 	header->image_size = le64_to_cpu(ondisk->image_size);
374 	header->obj_order = ondisk->options.order;
375 	header->crypt_type = ondisk->options.crypt_type;
376 	header->comp_type = ondisk->options.comp_type;
377 
378 	atomic_set(&header->snapc->nref, 1);
379 	header->snap_seq = le64_to_cpu(ondisk->snap_seq);
380 	header->snapc->num_snaps = snap_count;
381 	header->total_snaps = snap_count;
382 
383 	if (snap_count &&
384 	    allocated_snaps == snap_count) {
385 		for (i = 0; i < snap_count; i++) {
386 			header->snapc->snaps[i] =
387 				le64_to_cpu(ondisk->snaps[i].id);
388 			header->snap_sizes[i] =
389 				le64_to_cpu(ondisk->snaps[i].image_size);
390 		}
391 
392 		/* copy snapshot names */
393 		memcpy(header->snap_names, &ondisk->snaps[i],
394 			header->snap_names_len);
395 	}
396 
397 	return 0;
398 
399 err_names:
400 	kfree(header->snap_names);
401 err_snapc:
402 	kfree(header->snapc);
403 	return ret;
404 }
405 
406 static int snap_index(struct rbd_image_header *header, int snap_num)
407 {
408 	return header->total_snaps - snap_num;
409 }
410 
411 static u64 cur_snap_id(struct rbd_device *rbd_dev)
412 {
413 	struct rbd_image_header *header = &rbd_dev->header;
414 
415 	if (!rbd_dev->cur_snap)
416 		return 0;
417 
418 	return header->snapc->snaps[snap_index(header, rbd_dev->cur_snap)];
419 }
420 
421 static int snap_by_name(struct rbd_image_header *header, const char *snap_name,
422 			u64 *seq, u64 *size)
423 {
424 	int i;
425 	char *p = header->snap_names;
426 
427 	for (i = 0; i < header->total_snaps; i++, p += strlen(p) + 1) {
428 		if (strcmp(snap_name, p) == 0)
429 			break;
430 	}
431 	if (i == header->total_snaps)
432 		return -ENOENT;
433 	if (seq)
434 		*seq = header->snapc->snaps[i];
435 
436 	if (size)
437 		*size = header->snap_sizes[i];
438 
439 	return i;
440 }
441 
442 static int rbd_header_set_snap(struct rbd_device *dev,
443 			       const char *snap_name,
444 			       u64 *size)
445 {
446 	struct rbd_image_header *header = &dev->header;
447 	struct ceph_snap_context *snapc = header->snapc;
448 	int ret = -ENOENT;
449 
450 	down_write(&header->snap_rwsem);
451 
452 	if (!snap_name ||
453 	    !*snap_name ||
454 	    strcmp(snap_name, "-") == 0 ||
455 	    strcmp(snap_name, RBD_SNAP_HEAD_NAME) == 0) {
456 		if (header->total_snaps)
457 			snapc->seq = header->snap_seq;
458 		else
459 			snapc->seq = 0;
460 		dev->cur_snap = 0;
461 		dev->read_only = 0;
462 		if (size)
463 			*size = header->image_size;
464 	} else {
465 		ret = snap_by_name(header, snap_name, &snapc->seq, size);
466 		if (ret < 0)
467 			goto done;
468 
469 		dev->cur_snap = header->total_snaps - ret;
470 		dev->read_only = 1;
471 	}
472 
473 	ret = 0;
474 done:
475 	up_write(&header->snap_rwsem);
476 	return ret;
477 }
478 
479 static void rbd_header_free(struct rbd_image_header *header)
480 {
481 	kfree(header->snapc);
482 	kfree(header->snap_names);
483 	kfree(header->snap_sizes);
484 }
485 
486 /*
487  * get the actual striped segment name, offset and length
488  */
489 static u64 rbd_get_segment(struct rbd_image_header *header,
490 			   const char *block_name,
491 			   u64 ofs, u64 len,
492 			   char *seg_name, u64 *segofs)
493 {
494 	u64 seg = ofs >> header->obj_order;
495 
496 	if (seg_name)
497 		snprintf(seg_name, RBD_MAX_SEG_NAME_LEN,
498 			 "%s.%012llx", block_name, seg);
499 
500 	ofs = ofs & ((1 << header->obj_order) - 1);
501 	len = min_t(u64, len, (1 << header->obj_order) - ofs);
502 
503 	if (segofs)
504 		*segofs = ofs;
505 
506 	return len;
507 }
508 
509 /*
510  * bio helpers
511  */
512 
513 static void bio_chain_put(struct bio *chain)
514 {
515 	struct bio *tmp;
516 
517 	while (chain) {
518 		tmp = chain;
519 		chain = chain->bi_next;
520 		bio_put(tmp);
521 	}
522 }
523 
524 /*
525  * zeros a bio chain, starting at specific offset
526  */
527 static void zero_bio_chain(struct bio *chain, int start_ofs)
528 {
529 	struct bio_vec *bv;
530 	unsigned long flags;
531 	void *buf;
532 	int i;
533 	int pos = 0;
534 
535 	while (chain) {
536 		bio_for_each_segment(bv, chain, i) {
537 			if (pos + bv->bv_len > start_ofs) {
538 				int remainder = max(start_ofs - pos, 0);
539 				buf = bvec_kmap_irq(bv, &flags);
540 				memset(buf + remainder, 0,
541 				       bv->bv_len - remainder);
542 				bvec_kunmap_irq(buf, &flags);
543 			}
544 			pos += bv->bv_len;
545 		}
546 
547 		chain = chain->bi_next;
548 	}
549 }
550 
551 /*
552  * bio_chain_clone - clone a chain of bios up to a certain length.
553  * might return a bio_pair that will need to be released.
554  */
555 static struct bio *bio_chain_clone(struct bio **old, struct bio **next,
556 				   struct bio_pair **bp,
557 				   int len, gfp_t gfpmask)
558 {
559 	struct bio *tmp, *old_chain = *old, *new_chain = NULL, *tail = NULL;
560 	int total = 0;
561 
562 	if (*bp) {
563 		bio_pair_release(*bp);
564 		*bp = NULL;
565 	}
566 
567 	while (old_chain && (total < len)) {
568 		tmp = bio_kmalloc(gfpmask, old_chain->bi_max_vecs);
569 		if (!tmp)
570 			goto err_out;
571 
572 		if (total + old_chain->bi_size > len) {
573 			struct bio_pair *bp;
574 
575 			/*
576 			 * this split can only happen with a single paged bio,
577 			 * split_bio will BUG_ON if this is not the case
578 			 */
579 			dout("bio_chain_clone split! total=%d remaining=%d"
580 			     "bi_size=%d\n",
581 			     (int)total, (int)len-total,
582 			     (int)old_chain->bi_size);
583 
584 			/* split the bio. We'll release it either in the next
585 			   call, or it will have to be released outside */
586 			bp = bio_split(old_chain, (len - total) / 512ULL);
587 			if (!bp)
588 				goto err_out;
589 
590 			__bio_clone(tmp, &bp->bio1);
591 
592 			*next = &bp->bio2;
593 		} else {
594 			__bio_clone(tmp, old_chain);
595 			*next = old_chain->bi_next;
596 		}
597 
598 		tmp->bi_bdev = NULL;
599 		gfpmask &= ~__GFP_WAIT;
600 		tmp->bi_next = NULL;
601 
602 		if (!new_chain) {
603 			new_chain = tail = tmp;
604 		} else {
605 			tail->bi_next = tmp;
606 			tail = tmp;
607 		}
608 		old_chain = old_chain->bi_next;
609 
610 		total += tmp->bi_size;
611 	}
612 
613 	BUG_ON(total < len);
614 
615 	if (tail)
616 		tail->bi_next = NULL;
617 
618 	*old = old_chain;
619 
620 	return new_chain;
621 
622 err_out:
623 	dout("bio_chain_clone with err\n");
624 	bio_chain_put(new_chain);
625 	return NULL;
626 }
627 
628 /*
629  * helpers for osd request op vectors.
630  */
631 static int rbd_create_rw_ops(struct ceph_osd_req_op **ops,
632 			    int num_ops,
633 			    int opcode,
634 			    u32 payload_len)
635 {
636 	*ops = kzalloc(sizeof(struct ceph_osd_req_op) * (num_ops + 1),
637 		       GFP_NOIO);
638 	if (!*ops)
639 		return -ENOMEM;
640 	(*ops)[0].op = opcode;
641 	/*
642 	 * op extent offset and length will be set later on
643 	 * in calc_raw_layout()
644 	 */
645 	(*ops)[0].payload_len = payload_len;
646 	return 0;
647 }
648 
649 static void rbd_destroy_ops(struct ceph_osd_req_op *ops)
650 {
651 	kfree(ops);
652 }
653 
654 /*
655  * Send ceph osd request
656  */
657 static int rbd_do_request(struct request *rq,
658 			  struct rbd_device *dev,
659 			  struct ceph_snap_context *snapc,
660 			  u64 snapid,
661 			  const char *obj, u64 ofs, u64 len,
662 			  struct bio *bio,
663 			  struct page **pages,
664 			  int num_pages,
665 			  int flags,
666 			  struct ceph_osd_req_op *ops,
667 			  int num_reply,
668 			  void (*rbd_cb)(struct ceph_osd_request *req,
669 					 struct ceph_msg *msg))
670 {
671 	struct ceph_osd_request *req;
672 	struct ceph_file_layout *layout;
673 	int ret;
674 	u64 bno;
675 	struct timespec mtime = CURRENT_TIME;
676 	struct rbd_request *req_data;
677 	struct ceph_osd_request_head *reqhead;
678 	struct rbd_image_header *header = &dev->header;
679 
680 	ret = -ENOMEM;
681 	req_data = kzalloc(sizeof(*req_data), GFP_NOIO);
682 	if (!req_data)
683 		goto done;
684 
685 	dout("rbd_do_request len=%lld ofs=%lld\n", len, ofs);
686 
687 	down_read(&header->snap_rwsem);
688 
689 	req = ceph_osdc_alloc_request(&dev->client->osdc, flags,
690 				      snapc,
691 				      ops,
692 				      false,
693 				      GFP_NOIO, pages, bio);
694 	if (IS_ERR(req)) {
695 		up_read(&header->snap_rwsem);
696 		ret = PTR_ERR(req);
697 		goto done_pages;
698 	}
699 
700 	req->r_callback = rbd_cb;
701 
702 	req_data->rq = rq;
703 	req_data->bio = bio;
704 	req_data->pages = pages;
705 	req_data->len = len;
706 
707 	req->r_priv = req_data;
708 
709 	reqhead = req->r_request->front.iov_base;
710 	reqhead->snapid = cpu_to_le64(CEPH_NOSNAP);
711 
712 	strncpy(req->r_oid, obj, sizeof(req->r_oid));
713 	req->r_oid_len = strlen(req->r_oid);
714 
715 	layout = &req->r_file_layout;
716 	memset(layout, 0, sizeof(*layout));
717 	layout->fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
718 	layout->fl_stripe_count = cpu_to_le32(1);
719 	layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
720 	layout->fl_pg_preferred = cpu_to_le32(-1);
721 	layout->fl_pg_pool = cpu_to_le32(dev->poolid);
722 	ceph_calc_raw_layout(&dev->client->osdc, layout, snapid,
723 			     ofs, &len, &bno, req, ops);
724 
725 	ceph_osdc_build_request(req, ofs, &len,
726 				ops,
727 				snapc,
728 				&mtime,
729 				req->r_oid, req->r_oid_len);
730 	up_read(&header->snap_rwsem);
731 
732 	ret = ceph_osdc_start_request(&dev->client->osdc, req, false);
733 	if (ret < 0)
734 		goto done_err;
735 
736 	if (!rbd_cb) {
737 		ret = ceph_osdc_wait_request(&dev->client->osdc, req);
738 		ceph_osdc_put_request(req);
739 	}
740 	return ret;
741 
742 done_err:
743 	bio_chain_put(req_data->bio);
744 	ceph_osdc_put_request(req);
745 done_pages:
746 	kfree(req_data);
747 done:
748 	if (rq)
749 		blk_end_request(rq, ret, len);
750 	return ret;
751 }
752 
753 /*
754  * Ceph osd op callback
755  */
756 static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
757 {
758 	struct rbd_request *req_data = req->r_priv;
759 	struct ceph_osd_reply_head *replyhead;
760 	struct ceph_osd_op *op;
761 	__s32 rc;
762 	u64 bytes;
763 	int read_op;
764 
765 	/* parse reply */
766 	replyhead = msg->front.iov_base;
767 	WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
768 	op = (void *)(replyhead + 1);
769 	rc = le32_to_cpu(replyhead->result);
770 	bytes = le64_to_cpu(op->extent.length);
771 	read_op = (le32_to_cpu(op->op) == CEPH_OSD_OP_READ);
772 
773 	dout("rbd_req_cb bytes=%lld readop=%d rc=%d\n", bytes, read_op, rc);
774 
775 	if (rc == -ENOENT && read_op) {
776 		zero_bio_chain(req_data->bio, 0);
777 		rc = 0;
778 	} else if (rc == 0 && read_op && bytes < req_data->len) {
779 		zero_bio_chain(req_data->bio, bytes);
780 		bytes = req_data->len;
781 	}
782 
783 	blk_end_request(req_data->rq, rc, bytes);
784 
785 	if (req_data->bio)
786 		bio_chain_put(req_data->bio);
787 
788 	ceph_osdc_put_request(req);
789 	kfree(req_data);
790 }
791 
792 /*
793  * Do a synchronous ceph osd operation
794  */
795 static int rbd_req_sync_op(struct rbd_device *dev,
796 			   struct ceph_snap_context *snapc,
797 			   u64 snapid,
798 			   int opcode,
799 			   int flags,
800 			   struct ceph_osd_req_op *orig_ops,
801 			   int num_reply,
802 			   const char *obj,
803 			   u64 ofs, u64 len,
804 			   char *buf)
805 {
806 	int ret;
807 	struct page **pages;
808 	int num_pages;
809 	struct ceph_osd_req_op *ops = orig_ops;
810 	u32 payload_len;
811 
812 	num_pages = calc_pages_for(ofs , len);
813 	pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
814 	if (IS_ERR(pages))
815 		return PTR_ERR(pages);
816 
817 	if (!orig_ops) {
818 		payload_len = (flags & CEPH_OSD_FLAG_WRITE ? len : 0);
819 		ret = rbd_create_rw_ops(&ops, 1, opcode, payload_len);
820 		if (ret < 0)
821 			goto done;
822 
823 		if ((flags & CEPH_OSD_FLAG_WRITE) && buf) {
824 			ret = ceph_copy_to_page_vector(pages, buf, ofs, len);
825 			if (ret < 0)
826 				goto done_ops;
827 		}
828 	}
829 
830 	ret = rbd_do_request(NULL, dev, snapc, snapid,
831 			  obj, ofs, len, NULL,
832 			  pages, num_pages,
833 			  flags,
834 			  ops,
835 			  2,
836 			  NULL);
837 	if (ret < 0)
838 		goto done_ops;
839 
840 	if ((flags & CEPH_OSD_FLAG_READ) && buf)
841 		ret = ceph_copy_from_page_vector(pages, buf, ofs, ret);
842 
843 done_ops:
844 	if (!orig_ops)
845 		rbd_destroy_ops(ops);
846 done:
847 	ceph_release_page_vector(pages, num_pages);
848 	return ret;
849 }
850 
851 /*
852  * Do an asynchronous ceph osd operation
853  */
854 static int rbd_do_op(struct request *rq,
855 		     struct rbd_device *rbd_dev ,
856 		     struct ceph_snap_context *snapc,
857 		     u64 snapid,
858 		     int opcode, int flags, int num_reply,
859 		     u64 ofs, u64 len,
860 		     struct bio *bio)
861 {
862 	char *seg_name;
863 	u64 seg_ofs;
864 	u64 seg_len;
865 	int ret;
866 	struct ceph_osd_req_op *ops;
867 	u32 payload_len;
868 
869 	seg_name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO);
870 	if (!seg_name)
871 		return -ENOMEM;
872 
873 	seg_len = rbd_get_segment(&rbd_dev->header,
874 				  rbd_dev->header.block_name,
875 				  ofs, len,
876 				  seg_name, &seg_ofs);
877 
878 	payload_len = (flags & CEPH_OSD_FLAG_WRITE ? seg_len : 0);
879 
880 	ret = rbd_create_rw_ops(&ops, 1, opcode, payload_len);
881 	if (ret < 0)
882 		goto done;
883 
884 	/* we've taken care of segment sizes earlier when we
885 	   cloned the bios. We should never have a segment
886 	   truncated at this point */
887 	BUG_ON(seg_len < len);
888 
889 	ret = rbd_do_request(rq, rbd_dev, snapc, snapid,
890 			     seg_name, seg_ofs, seg_len,
891 			     bio,
892 			     NULL, 0,
893 			     flags,
894 			     ops,
895 			     num_reply,
896 			     rbd_req_cb);
897 done:
898 	kfree(seg_name);
899 	return ret;
900 }
901 
902 /*
903  * Request async osd write
904  */
905 static int rbd_req_write(struct request *rq,
906 			 struct rbd_device *rbd_dev,
907 			 struct ceph_snap_context *snapc,
908 			 u64 ofs, u64 len,
909 			 struct bio *bio)
910 {
911 	return rbd_do_op(rq, rbd_dev, snapc, CEPH_NOSNAP,
912 			 CEPH_OSD_OP_WRITE,
913 			 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
914 			 2,
915 			 ofs, len, bio);
916 }
917 
918 /*
919  * Request async osd read
920  */
921 static int rbd_req_read(struct request *rq,
922 			 struct rbd_device *rbd_dev,
923 			 u64 snapid,
924 			 u64 ofs, u64 len,
925 			 struct bio *bio)
926 {
927 	return rbd_do_op(rq, rbd_dev, NULL,
928 			 (snapid ? snapid : CEPH_NOSNAP),
929 			 CEPH_OSD_OP_READ,
930 			 CEPH_OSD_FLAG_READ,
931 			 2,
932 			 ofs, len, bio);
933 }
934 
935 /*
936  * Request sync osd read
937  */
938 static int rbd_req_sync_read(struct rbd_device *dev,
939 			  struct ceph_snap_context *snapc,
940 			  u64 snapid,
941 			  const char *obj,
942 			  u64 ofs, u64 len,
943 			  char *buf)
944 {
945 	return rbd_req_sync_op(dev, NULL,
946 			       (snapid ? snapid : CEPH_NOSNAP),
947 			       CEPH_OSD_OP_READ,
948 			       CEPH_OSD_FLAG_READ,
949 			       NULL,
950 			       1, obj, ofs, len, buf);
951 }
952 
953 /*
954  * Request sync osd read
955  */
956 static int rbd_req_sync_rollback_obj(struct rbd_device *dev,
957 				     u64 snapid,
958 				     const char *obj)
959 {
960 	struct ceph_osd_req_op *ops;
961 	int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_ROLLBACK, 0);
962 	if (ret < 0)
963 		return ret;
964 
965 	ops[0].snap.snapid = snapid;
966 
967 	ret = rbd_req_sync_op(dev, NULL,
968 			       CEPH_NOSNAP,
969 			       0,
970 			       CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
971 			       ops,
972 			       1, obj, 0, 0, NULL);
973 
974 	rbd_destroy_ops(ops);
975 
976 	if (ret < 0)
977 		return ret;
978 
979 	return ret;
980 }
981 
982 /*
983  * Request sync osd read
984  */
985 static int rbd_req_sync_exec(struct rbd_device *dev,
986 			     const char *obj,
987 			     const char *cls,
988 			     const char *method,
989 			     const char *data,
990 			     int len)
991 {
992 	struct ceph_osd_req_op *ops;
993 	int cls_len = strlen(cls);
994 	int method_len = strlen(method);
995 	int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_CALL,
996 				    cls_len + method_len + len);
997 	if (ret < 0)
998 		return ret;
999 
1000 	ops[0].cls.class_name = cls;
1001 	ops[0].cls.class_len = (__u8)cls_len;
1002 	ops[0].cls.method_name = method;
1003 	ops[0].cls.method_len = (__u8)method_len;
1004 	ops[0].cls.argc = 0;
1005 	ops[0].cls.indata = data;
1006 	ops[0].cls.indata_len = len;
1007 
1008 	ret = rbd_req_sync_op(dev, NULL,
1009 			       CEPH_NOSNAP,
1010 			       0,
1011 			       CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1012 			       ops,
1013 			       1, obj, 0, 0, NULL);
1014 
1015 	rbd_destroy_ops(ops);
1016 
1017 	dout("cls_exec returned %d\n", ret);
1018 	return ret;
1019 }
1020 
1021 /*
1022  * block device queue callback
1023  */
1024 static void rbd_rq_fn(struct request_queue *q)
1025 {
1026 	struct rbd_device *rbd_dev = q->queuedata;
1027 	struct request *rq;
1028 	struct bio_pair *bp = NULL;
1029 
1030 	rq = blk_fetch_request(q);
1031 
1032 	while (1) {
1033 		struct bio *bio;
1034 		struct bio *rq_bio, *next_bio = NULL;
1035 		bool do_write;
1036 		int size, op_size = 0;
1037 		u64 ofs;
1038 
1039 		/* peek at request from block layer */
1040 		if (!rq)
1041 			break;
1042 
1043 		dout("fetched request\n");
1044 
1045 		/* filter out block requests we don't understand */
1046 		if ((rq->cmd_type != REQ_TYPE_FS)) {
1047 			__blk_end_request_all(rq, 0);
1048 			goto next;
1049 		}
1050 
1051 		/* deduce our operation (read, write) */
1052 		do_write = (rq_data_dir(rq) == WRITE);
1053 
1054 		size = blk_rq_bytes(rq);
1055 		ofs = blk_rq_pos(rq) * 512ULL;
1056 		rq_bio = rq->bio;
1057 		if (do_write && rbd_dev->read_only) {
1058 			__blk_end_request_all(rq, -EROFS);
1059 			goto next;
1060 		}
1061 
1062 		spin_unlock_irq(q->queue_lock);
1063 
1064 		dout("%s 0x%x bytes at 0x%llx\n",
1065 		     do_write ? "write" : "read",
1066 		     size, blk_rq_pos(rq) * 512ULL);
1067 
1068 		do {
1069 			/* a bio clone to be passed down to OSD req */
1070 			dout("rq->bio->bi_vcnt=%d\n", rq->bio->bi_vcnt);
1071 			op_size = rbd_get_segment(&rbd_dev->header,
1072 						  rbd_dev->header.block_name,
1073 						  ofs, size,
1074 						  NULL, NULL);
1075 			bio = bio_chain_clone(&rq_bio, &next_bio, &bp,
1076 					      op_size, GFP_ATOMIC);
1077 			if (!bio) {
1078 				spin_lock_irq(q->queue_lock);
1079 				__blk_end_request_all(rq, -ENOMEM);
1080 				goto next;
1081 			}
1082 
1083 			/* init OSD command: write or read */
1084 			if (do_write)
1085 				rbd_req_write(rq, rbd_dev,
1086 					      rbd_dev->header.snapc,
1087 					      ofs,
1088 					      op_size, bio);
1089 			else
1090 				rbd_req_read(rq, rbd_dev,
1091 					     cur_snap_id(rbd_dev),
1092 					     ofs,
1093 					     op_size, bio);
1094 
1095 			size -= op_size;
1096 			ofs += op_size;
1097 
1098 			rq_bio = next_bio;
1099 		} while (size > 0);
1100 
1101 		if (bp)
1102 			bio_pair_release(bp);
1103 
1104 		spin_lock_irq(q->queue_lock);
1105 next:
1106 		rq = blk_fetch_request(q);
1107 	}
1108 }
1109 
1110 /*
1111  * a queue callback. Makes sure that we don't create a bio that spans across
1112  * multiple osd objects. One exception would be with a single page bios,
1113  * which we handle later at bio_chain_clone
1114  */
1115 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
1116 			  struct bio_vec *bvec)
1117 {
1118 	struct rbd_device *rbd_dev = q->queuedata;
1119 	unsigned int chunk_sectors = 1 << (rbd_dev->header.obj_order - 9);
1120 	sector_t sector = bmd->bi_sector + get_start_sect(bmd->bi_bdev);
1121 	unsigned int bio_sectors = bmd->bi_size >> 9;
1122 	int max;
1123 
1124 	max =  (chunk_sectors - ((sector & (chunk_sectors - 1))
1125 				 + bio_sectors)) << 9;
1126 	if (max < 0)
1127 		max = 0; /* bio_add cannot handle a negative return */
1128 	if (max <= bvec->bv_len && bio_sectors == 0)
1129 		return bvec->bv_len;
1130 	return max;
1131 }
1132 
1133 static void rbd_free_disk(struct rbd_device *rbd_dev)
1134 {
1135 	struct gendisk *disk = rbd_dev->disk;
1136 
1137 	if (!disk)
1138 		return;
1139 
1140 	rbd_header_free(&rbd_dev->header);
1141 
1142 	if (disk->flags & GENHD_FL_UP)
1143 		del_gendisk(disk);
1144 	if (disk->queue)
1145 		blk_cleanup_queue(disk->queue);
1146 	put_disk(disk);
1147 }
1148 
1149 /*
1150  * reload the ondisk the header
1151  */
1152 static int rbd_read_header(struct rbd_device *rbd_dev,
1153 			   struct rbd_image_header *header)
1154 {
1155 	ssize_t rc;
1156 	struct rbd_image_header_ondisk *dh;
1157 	int snap_count = 0;
1158 	u64 snap_names_len = 0;
1159 
1160 	while (1) {
1161 		int len = sizeof(*dh) +
1162 			  snap_count * sizeof(struct rbd_image_snap_ondisk) +
1163 			  snap_names_len;
1164 
1165 		rc = -ENOMEM;
1166 		dh = kmalloc(len, GFP_KERNEL);
1167 		if (!dh)
1168 			return -ENOMEM;
1169 
1170 		rc = rbd_req_sync_read(rbd_dev,
1171 				       NULL, CEPH_NOSNAP,
1172 				       rbd_dev->obj_md_name,
1173 				       0, len,
1174 				       (char *)dh);
1175 		if (rc < 0)
1176 			goto out_dh;
1177 
1178 		rc = rbd_header_from_disk(header, dh, snap_count, GFP_KERNEL);
1179 		if (rc < 0)
1180 			goto out_dh;
1181 
1182 		if (snap_count != header->total_snaps) {
1183 			snap_count = header->total_snaps;
1184 			snap_names_len = header->snap_names_len;
1185 			rbd_header_free(header);
1186 			kfree(dh);
1187 			continue;
1188 		}
1189 		break;
1190 	}
1191 
1192 out_dh:
1193 	kfree(dh);
1194 	return rc;
1195 }
1196 
1197 /*
1198  * create a snapshot
1199  */
1200 static int rbd_header_add_snap(struct rbd_device *dev,
1201 			       const char *snap_name,
1202 			       gfp_t gfp_flags)
1203 {
1204 	int name_len = strlen(snap_name);
1205 	u64 new_snapid;
1206 	int ret;
1207 	void *data, *data_start, *data_end;
1208 
1209 	/* we should create a snapshot only if we're pointing at the head */
1210 	if (dev->cur_snap)
1211 		return -EINVAL;
1212 
1213 	ret = ceph_monc_create_snapid(&dev->client->monc, dev->poolid,
1214 				      &new_snapid);
1215 	dout("created snapid=%lld\n", new_snapid);
1216 	if (ret < 0)
1217 		return ret;
1218 
1219 	data = kmalloc(name_len + 16, gfp_flags);
1220 	if (!data)
1221 		return -ENOMEM;
1222 
1223 	data_start = data;
1224 	data_end = data + name_len + 16;
1225 
1226 	ceph_encode_string_safe(&data, data_end, snap_name, name_len, bad);
1227 	ceph_encode_64_safe(&data, data_end, new_snapid, bad);
1228 
1229 	ret = rbd_req_sync_exec(dev, dev->obj_md_name, "rbd", "snap_add",
1230 				data_start, data - data_start);
1231 
1232 	kfree(data_start);
1233 
1234 	if (ret < 0)
1235 		return ret;
1236 
1237 	dev->header.snapc->seq =  new_snapid;
1238 
1239 	return 0;
1240 bad:
1241 	return -ERANGE;
1242 }
1243 
1244 static void __rbd_remove_all_snaps(struct rbd_device *rbd_dev)
1245 {
1246 	struct rbd_snap *snap;
1247 
1248 	while (!list_empty(&rbd_dev->snaps)) {
1249 		snap = list_first_entry(&rbd_dev->snaps, struct rbd_snap, node);
1250 		__rbd_remove_snap_dev(rbd_dev, snap);
1251 	}
1252 }
1253 
1254 /*
1255  * only read the first part of the ondisk header, without the snaps info
1256  */
1257 static int __rbd_update_snaps(struct rbd_device *rbd_dev)
1258 {
1259 	int ret;
1260 	struct rbd_image_header h;
1261 	u64 snap_seq;
1262 
1263 	ret = rbd_read_header(rbd_dev, &h);
1264 	if (ret < 0)
1265 		return ret;
1266 
1267 	down_write(&rbd_dev->header.snap_rwsem);
1268 
1269 	snap_seq = rbd_dev->header.snapc->seq;
1270 
1271 	kfree(rbd_dev->header.snapc);
1272 	kfree(rbd_dev->header.snap_names);
1273 	kfree(rbd_dev->header.snap_sizes);
1274 
1275 	rbd_dev->header.total_snaps = h.total_snaps;
1276 	rbd_dev->header.snapc = h.snapc;
1277 	rbd_dev->header.snap_names = h.snap_names;
1278 	rbd_dev->header.snap_names_len = h.snap_names_len;
1279 	rbd_dev->header.snap_sizes = h.snap_sizes;
1280 	rbd_dev->header.snapc->seq = snap_seq;
1281 
1282 	ret = __rbd_init_snaps_header(rbd_dev);
1283 
1284 	up_write(&rbd_dev->header.snap_rwsem);
1285 
1286 	return ret;
1287 }
1288 
1289 static int rbd_init_disk(struct rbd_device *rbd_dev)
1290 {
1291 	struct gendisk *disk;
1292 	struct request_queue *q;
1293 	int rc;
1294 	u64 total_size = 0;
1295 
1296 	/* contact OSD, request size info about the object being mapped */
1297 	rc = rbd_read_header(rbd_dev, &rbd_dev->header);
1298 	if (rc)
1299 		return rc;
1300 
1301 	/* no need to lock here, as rbd_dev is not registered yet */
1302 	rc = __rbd_init_snaps_header(rbd_dev);
1303 	if (rc)
1304 		return rc;
1305 
1306 	rc = rbd_header_set_snap(rbd_dev, rbd_dev->snap_name, &total_size);
1307 	if (rc)
1308 		return rc;
1309 
1310 	/* create gendisk info */
1311 	rc = -ENOMEM;
1312 	disk = alloc_disk(RBD_MINORS_PER_MAJOR);
1313 	if (!disk)
1314 		goto out;
1315 
1316 	sprintf(disk->disk_name, DRV_NAME "%d", rbd_dev->id);
1317 	disk->major = rbd_dev->major;
1318 	disk->first_minor = 0;
1319 	disk->fops = &rbd_bd_ops;
1320 	disk->private_data = rbd_dev;
1321 
1322 	/* init rq */
1323 	rc = -ENOMEM;
1324 	q = blk_init_queue(rbd_rq_fn, &rbd_dev->lock);
1325 	if (!q)
1326 		goto out_disk;
1327 	blk_queue_merge_bvec(q, rbd_merge_bvec);
1328 	disk->queue = q;
1329 
1330 	q->queuedata = rbd_dev;
1331 
1332 	rbd_dev->disk = disk;
1333 	rbd_dev->q = q;
1334 
1335 	/* finally, announce the disk to the world */
1336 	set_capacity(disk, total_size / 512ULL);
1337 	add_disk(disk);
1338 
1339 	pr_info("%s: added with size 0x%llx\n",
1340 		disk->disk_name, (unsigned long long)total_size);
1341 	return 0;
1342 
1343 out_disk:
1344 	put_disk(disk);
1345 out:
1346 	return rc;
1347 }
1348 
1349 /*
1350   sysfs
1351 */
1352 
1353 static ssize_t rbd_size_show(struct device *dev,
1354 			     struct device_attribute *attr, char *buf)
1355 {
1356 	struct rbd_device *rbd_dev = dev_to_rbd(dev);
1357 
1358 	return sprintf(buf, "%llu\n", (unsigned long long)rbd_dev->header.image_size);
1359 }
1360 
1361 static ssize_t rbd_major_show(struct device *dev,
1362 			      struct device_attribute *attr, char *buf)
1363 {
1364 	struct rbd_device *rbd_dev = dev_to_rbd(dev);
1365 
1366 	return sprintf(buf, "%d\n", rbd_dev->major);
1367 }
1368 
1369 static ssize_t rbd_client_id_show(struct device *dev,
1370 				  struct device_attribute *attr, char *buf)
1371 {
1372 	struct rbd_device *rbd_dev = dev_to_rbd(dev);
1373 
1374 	return sprintf(buf, "client%lld\n", ceph_client_id(rbd_dev->client));
1375 }
1376 
1377 static ssize_t rbd_pool_show(struct device *dev,
1378 			     struct device_attribute *attr, char *buf)
1379 {
1380 	struct rbd_device *rbd_dev = dev_to_rbd(dev);
1381 
1382 	return sprintf(buf, "%s\n", rbd_dev->pool_name);
1383 }
1384 
1385 static ssize_t rbd_name_show(struct device *dev,
1386 			     struct device_attribute *attr, char *buf)
1387 {
1388 	struct rbd_device *rbd_dev = dev_to_rbd(dev);
1389 
1390 	return sprintf(buf, "%s\n", rbd_dev->obj);
1391 }
1392 
1393 static ssize_t rbd_snap_show(struct device *dev,
1394 			     struct device_attribute *attr,
1395 			     char *buf)
1396 {
1397 	struct rbd_device *rbd_dev = dev_to_rbd(dev);
1398 
1399 	return sprintf(buf, "%s\n", rbd_dev->snap_name);
1400 }
1401 
1402 static ssize_t rbd_image_refresh(struct device *dev,
1403 				 struct device_attribute *attr,
1404 				 const char *buf,
1405 				 size_t size)
1406 {
1407 	struct rbd_device *rbd_dev = dev_to_rbd(dev);
1408 	int rc;
1409 	int ret = size;
1410 
1411 	mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1412 
1413 	rc = __rbd_update_snaps(rbd_dev);
1414 	if (rc < 0)
1415 		ret = rc;
1416 
1417 	mutex_unlock(&ctl_mutex);
1418 	return ret;
1419 }
1420 
1421 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
1422 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
1423 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
1424 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
1425 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
1426 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
1427 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
1428 static DEVICE_ATTR(create_snap, S_IWUSR, NULL, rbd_snap_add);
1429 static DEVICE_ATTR(rollback_snap, S_IWUSR, NULL, rbd_snap_rollback);
1430 
1431 static struct attribute *rbd_attrs[] = {
1432 	&dev_attr_size.attr,
1433 	&dev_attr_major.attr,
1434 	&dev_attr_client_id.attr,
1435 	&dev_attr_pool.attr,
1436 	&dev_attr_name.attr,
1437 	&dev_attr_current_snap.attr,
1438 	&dev_attr_refresh.attr,
1439 	&dev_attr_create_snap.attr,
1440 	&dev_attr_rollback_snap.attr,
1441 	NULL
1442 };
1443 
1444 static struct attribute_group rbd_attr_group = {
1445 	.attrs = rbd_attrs,
1446 };
1447 
1448 static const struct attribute_group *rbd_attr_groups[] = {
1449 	&rbd_attr_group,
1450 	NULL
1451 };
1452 
1453 static void rbd_sysfs_dev_release(struct device *dev)
1454 {
1455 }
1456 
1457 static struct device_type rbd_device_type = {
1458 	.name		= "rbd",
1459 	.groups		= rbd_attr_groups,
1460 	.release	= rbd_sysfs_dev_release,
1461 };
1462 
1463 
1464 /*
1465   sysfs - snapshots
1466 */
1467 
1468 static ssize_t rbd_snap_size_show(struct device *dev,
1469 				  struct device_attribute *attr,
1470 				  char *buf)
1471 {
1472 	struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1473 
1474 	return sprintf(buf, "%lld\n", (long long)snap->size);
1475 }
1476 
1477 static ssize_t rbd_snap_id_show(struct device *dev,
1478 				struct device_attribute *attr,
1479 				char *buf)
1480 {
1481 	struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1482 
1483 	return sprintf(buf, "%lld\n", (long long)snap->id);
1484 }
1485 
1486 static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
1487 static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
1488 
1489 static struct attribute *rbd_snap_attrs[] = {
1490 	&dev_attr_snap_size.attr,
1491 	&dev_attr_snap_id.attr,
1492 	NULL,
1493 };
1494 
1495 static struct attribute_group rbd_snap_attr_group = {
1496 	.attrs = rbd_snap_attrs,
1497 };
1498 
1499 static void rbd_snap_dev_release(struct device *dev)
1500 {
1501 	struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1502 	kfree(snap->name);
1503 	kfree(snap);
1504 }
1505 
1506 static const struct attribute_group *rbd_snap_attr_groups[] = {
1507 	&rbd_snap_attr_group,
1508 	NULL
1509 };
1510 
1511 static struct device_type rbd_snap_device_type = {
1512 	.groups		= rbd_snap_attr_groups,
1513 	.release	= rbd_snap_dev_release,
1514 };
1515 
1516 static void __rbd_remove_snap_dev(struct rbd_device *rbd_dev,
1517 				  struct rbd_snap *snap)
1518 {
1519 	list_del(&snap->node);
1520 	device_unregister(&snap->dev);
1521 }
1522 
1523 static int rbd_register_snap_dev(struct rbd_device *rbd_dev,
1524 				  struct rbd_snap *snap,
1525 				  struct device *parent)
1526 {
1527 	struct device *dev = &snap->dev;
1528 	int ret;
1529 
1530 	dev->type = &rbd_snap_device_type;
1531 	dev->parent = parent;
1532 	dev->release = rbd_snap_dev_release;
1533 	dev_set_name(dev, "snap_%s", snap->name);
1534 	ret = device_register(dev);
1535 
1536 	return ret;
1537 }
1538 
1539 static int __rbd_add_snap_dev(struct rbd_device *rbd_dev,
1540 			      int i, const char *name,
1541 			      struct rbd_snap **snapp)
1542 {
1543 	int ret;
1544 	struct rbd_snap *snap = kzalloc(sizeof(*snap), GFP_KERNEL);
1545 	if (!snap)
1546 		return -ENOMEM;
1547 	snap->name = kstrdup(name, GFP_KERNEL);
1548 	snap->size = rbd_dev->header.snap_sizes[i];
1549 	snap->id = rbd_dev->header.snapc->snaps[i];
1550 	if (device_is_registered(&rbd_dev->dev)) {
1551 		ret = rbd_register_snap_dev(rbd_dev, snap,
1552 					     &rbd_dev->dev);
1553 		if (ret < 0)
1554 			goto err;
1555 	}
1556 	*snapp = snap;
1557 	return 0;
1558 err:
1559 	kfree(snap->name);
1560 	kfree(snap);
1561 	return ret;
1562 }
1563 
1564 /*
1565  * search for the previous snap in a null delimited string list
1566  */
1567 const char *rbd_prev_snap_name(const char *name, const char *start)
1568 {
1569 	if (name < start + 2)
1570 		return NULL;
1571 
1572 	name -= 2;
1573 	while (*name) {
1574 		if (name == start)
1575 			return start;
1576 		name--;
1577 	}
1578 	return name + 1;
1579 }
1580 
1581 /*
1582  * compare the old list of snapshots that we have to what's in the header
1583  * and update it accordingly. Note that the header holds the snapshots
1584  * in a reverse order (from newest to oldest) and we need to go from
1585  * older to new so that we don't get a duplicate snap name when
1586  * doing the process (e.g., removed snapshot and recreated a new
1587  * one with the same name.
1588  */
1589 static int __rbd_init_snaps_header(struct rbd_device *rbd_dev)
1590 {
1591 	const char *name, *first_name;
1592 	int i = rbd_dev->header.total_snaps;
1593 	struct rbd_snap *snap, *old_snap = NULL;
1594 	int ret;
1595 	struct list_head *p, *n;
1596 
1597 	first_name = rbd_dev->header.snap_names;
1598 	name = first_name + rbd_dev->header.snap_names_len;
1599 
1600 	list_for_each_prev_safe(p, n, &rbd_dev->snaps) {
1601 		u64 cur_id;
1602 
1603 		old_snap = list_entry(p, struct rbd_snap, node);
1604 
1605 		if (i)
1606 			cur_id = rbd_dev->header.snapc->snaps[i - 1];
1607 
1608 		if (!i || old_snap->id < cur_id) {
1609 			/* old_snap->id was skipped, thus was removed */
1610 			__rbd_remove_snap_dev(rbd_dev, old_snap);
1611 			continue;
1612 		}
1613 		if (old_snap->id == cur_id) {
1614 			/* we have this snapshot already */
1615 			i--;
1616 			name = rbd_prev_snap_name(name, first_name);
1617 			continue;
1618 		}
1619 		for (; i > 0;
1620 		     i--, name = rbd_prev_snap_name(name, first_name)) {
1621 			if (!name) {
1622 				WARN_ON(1);
1623 				return -EINVAL;
1624 			}
1625 			cur_id = rbd_dev->header.snapc->snaps[i];
1626 			/* snapshot removal? handle it above */
1627 			if (cur_id >= old_snap->id)
1628 				break;
1629 			/* a new snapshot */
1630 			ret = __rbd_add_snap_dev(rbd_dev, i - 1, name, &snap);
1631 			if (ret < 0)
1632 				return ret;
1633 
1634 			/* note that we add it backward so using n and not p */
1635 			list_add(&snap->node, n);
1636 			p = &snap->node;
1637 		}
1638 	}
1639 	/* we're done going over the old snap list, just add what's left */
1640 	for (; i > 0; i--) {
1641 		name = rbd_prev_snap_name(name, first_name);
1642 		if (!name) {
1643 			WARN_ON(1);
1644 			return -EINVAL;
1645 		}
1646 		ret = __rbd_add_snap_dev(rbd_dev, i - 1, name, &snap);
1647 		if (ret < 0)
1648 			return ret;
1649 		list_add(&snap->node, &rbd_dev->snaps);
1650 	}
1651 
1652 	return 0;
1653 }
1654 
1655 
1656 static void rbd_root_dev_release(struct device *dev)
1657 {
1658 }
1659 
1660 static struct device rbd_root_dev = {
1661 	.init_name =    "rbd",
1662 	.release =      rbd_root_dev_release,
1663 };
1664 
1665 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
1666 {
1667 	int ret = -ENOMEM;
1668 	struct device *dev;
1669 	struct rbd_snap *snap;
1670 
1671 	mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1672 	dev = &rbd_dev->dev;
1673 
1674 	dev->bus = &rbd_bus_type;
1675 	dev->type = &rbd_device_type;
1676 	dev->parent = &rbd_root_dev;
1677 	dev->release = rbd_dev_release;
1678 	dev_set_name(dev, "%d", rbd_dev->id);
1679 	ret = device_register(dev);
1680 	if (ret < 0)
1681 		goto done_free;
1682 
1683 	list_for_each_entry(snap, &rbd_dev->snaps, node) {
1684 		ret = rbd_register_snap_dev(rbd_dev, snap,
1685 					     &rbd_dev->dev);
1686 		if (ret < 0)
1687 			break;
1688 	}
1689 
1690 	mutex_unlock(&ctl_mutex);
1691 	return 0;
1692 done_free:
1693 	mutex_unlock(&ctl_mutex);
1694 	return ret;
1695 }
1696 
1697 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
1698 {
1699 	device_unregister(&rbd_dev->dev);
1700 }
1701 
1702 static ssize_t rbd_add(struct bus_type *bus, const char *buf, size_t count)
1703 {
1704 	struct ceph_osd_client *osdc;
1705 	struct rbd_device *rbd_dev;
1706 	ssize_t rc = -ENOMEM;
1707 	int irc, new_id = 0;
1708 	struct list_head *tmp;
1709 	char *mon_dev_name;
1710 	char *options;
1711 
1712 	if (!try_module_get(THIS_MODULE))
1713 		return -ENODEV;
1714 
1715 	mon_dev_name = kmalloc(RBD_MAX_OPT_LEN, GFP_KERNEL);
1716 	if (!mon_dev_name)
1717 		goto err_out_mod;
1718 
1719 	options = kmalloc(RBD_MAX_OPT_LEN, GFP_KERNEL);
1720 	if (!options)
1721 		goto err_mon_dev;
1722 
1723 	/* new rbd_device object */
1724 	rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
1725 	if (!rbd_dev)
1726 		goto err_out_opt;
1727 
1728 	/* static rbd_device initialization */
1729 	spin_lock_init(&rbd_dev->lock);
1730 	INIT_LIST_HEAD(&rbd_dev->node);
1731 	INIT_LIST_HEAD(&rbd_dev->snaps);
1732 
1733 	/* generate unique id: find highest unique id, add one */
1734 	mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1735 
1736 	list_for_each(tmp, &rbd_dev_list) {
1737 		struct rbd_device *rbd_dev;
1738 
1739 		rbd_dev = list_entry(tmp, struct rbd_device, node);
1740 		if (rbd_dev->id >= new_id)
1741 			new_id = rbd_dev->id + 1;
1742 	}
1743 
1744 	rbd_dev->id = new_id;
1745 
1746 	/* add to global list */
1747 	list_add_tail(&rbd_dev->node, &rbd_dev_list);
1748 
1749 	/* parse add command */
1750 	if (sscanf(buf, "%" __stringify(RBD_MAX_OPT_LEN) "s "
1751 		   "%" __stringify(RBD_MAX_OPT_LEN) "s "
1752 		   "%" __stringify(RBD_MAX_POOL_NAME_LEN) "s "
1753 		   "%" __stringify(RBD_MAX_OBJ_NAME_LEN) "s"
1754 		   "%" __stringify(RBD_MAX_SNAP_NAME_LEN) "s",
1755 		   mon_dev_name, options, rbd_dev->pool_name,
1756 		   rbd_dev->obj, rbd_dev->snap_name) < 4) {
1757 		rc = -EINVAL;
1758 		goto err_out_slot;
1759 	}
1760 
1761 	if (rbd_dev->snap_name[0] == 0)
1762 		rbd_dev->snap_name[0] = '-';
1763 
1764 	rbd_dev->obj_len = strlen(rbd_dev->obj);
1765 	snprintf(rbd_dev->obj_md_name, sizeof(rbd_dev->obj_md_name), "%s%s",
1766 		 rbd_dev->obj, RBD_SUFFIX);
1767 
1768 	/* initialize rest of new object */
1769 	snprintf(rbd_dev->name, DEV_NAME_LEN, DRV_NAME "%d", rbd_dev->id);
1770 	rc = rbd_get_client(rbd_dev, mon_dev_name, options);
1771 	if (rc < 0)
1772 		goto err_out_slot;
1773 
1774 	mutex_unlock(&ctl_mutex);
1775 
1776 	/* pick the pool */
1777 	osdc = &rbd_dev->client->osdc;
1778 	rc = ceph_pg_poolid_by_name(osdc->osdmap, rbd_dev->pool_name);
1779 	if (rc < 0)
1780 		goto err_out_client;
1781 	rbd_dev->poolid = rc;
1782 
1783 	/* register our block device */
1784 	irc = register_blkdev(0, rbd_dev->name);
1785 	if (irc < 0) {
1786 		rc = irc;
1787 		goto err_out_client;
1788 	}
1789 	rbd_dev->major = irc;
1790 
1791 	rc = rbd_bus_add_dev(rbd_dev);
1792 	if (rc)
1793 		goto err_out_blkdev;
1794 
1795 	/* set up and announce blkdev mapping */
1796 	rc = rbd_init_disk(rbd_dev);
1797 	if (rc)
1798 		goto err_out_bus;
1799 
1800 	return count;
1801 
1802 err_out_bus:
1803 	mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1804 	list_del_init(&rbd_dev->node);
1805 	mutex_unlock(&ctl_mutex);
1806 
1807 	/* this will also clean up rest of rbd_dev stuff */
1808 
1809 	rbd_bus_del_dev(rbd_dev);
1810 	kfree(options);
1811 	kfree(mon_dev_name);
1812 	return rc;
1813 
1814 err_out_blkdev:
1815 	unregister_blkdev(rbd_dev->major, rbd_dev->name);
1816 err_out_client:
1817 	rbd_put_client(rbd_dev);
1818 	mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1819 err_out_slot:
1820 	list_del_init(&rbd_dev->node);
1821 	mutex_unlock(&ctl_mutex);
1822 
1823 	kfree(rbd_dev);
1824 err_out_opt:
1825 	kfree(options);
1826 err_mon_dev:
1827 	kfree(mon_dev_name);
1828 err_out_mod:
1829 	dout("Error adding device %s\n", buf);
1830 	module_put(THIS_MODULE);
1831 	return rc;
1832 }
1833 
1834 static struct rbd_device *__rbd_get_dev(unsigned long id)
1835 {
1836 	struct list_head *tmp;
1837 	struct rbd_device *rbd_dev;
1838 
1839 	list_for_each(tmp, &rbd_dev_list) {
1840 		rbd_dev = list_entry(tmp, struct rbd_device, node);
1841 		if (rbd_dev->id == id)
1842 			return rbd_dev;
1843 	}
1844 	return NULL;
1845 }
1846 
1847 static void rbd_dev_release(struct device *dev)
1848 {
1849 	struct rbd_device *rbd_dev =
1850 			container_of(dev, struct rbd_device, dev);
1851 
1852 	rbd_put_client(rbd_dev);
1853 
1854 	/* clean up and free blkdev */
1855 	rbd_free_disk(rbd_dev);
1856 	unregister_blkdev(rbd_dev->major, rbd_dev->name);
1857 	kfree(rbd_dev);
1858 
1859 	/* release module ref */
1860 	module_put(THIS_MODULE);
1861 }
1862 
1863 static ssize_t rbd_remove(struct bus_type *bus,
1864 			  const char *buf,
1865 			  size_t count)
1866 {
1867 	struct rbd_device *rbd_dev = NULL;
1868 	int target_id, rc;
1869 	unsigned long ul;
1870 	int ret = count;
1871 
1872 	rc = strict_strtoul(buf, 10, &ul);
1873 	if (rc)
1874 		return rc;
1875 
1876 	/* convert to int; abort if we lost anything in the conversion */
1877 	target_id = (int) ul;
1878 	if (target_id != ul)
1879 		return -EINVAL;
1880 
1881 	mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1882 
1883 	rbd_dev = __rbd_get_dev(target_id);
1884 	if (!rbd_dev) {
1885 		ret = -ENOENT;
1886 		goto done;
1887 	}
1888 
1889 	list_del_init(&rbd_dev->node);
1890 
1891 	__rbd_remove_all_snaps(rbd_dev);
1892 	rbd_bus_del_dev(rbd_dev);
1893 
1894 done:
1895 	mutex_unlock(&ctl_mutex);
1896 	return ret;
1897 }
1898 
1899 static ssize_t rbd_snap_add(struct device *dev,
1900 			    struct device_attribute *attr,
1901 			    const char *buf,
1902 			    size_t count)
1903 {
1904 	struct rbd_device *rbd_dev = dev_to_rbd(dev);
1905 	int ret;
1906 	char *name = kmalloc(count + 1, GFP_KERNEL);
1907 	if (!name)
1908 		return -ENOMEM;
1909 
1910 	snprintf(name, count, "%s", buf);
1911 
1912 	mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1913 
1914 	ret = rbd_header_add_snap(rbd_dev,
1915 				  name, GFP_KERNEL);
1916 	if (ret < 0)
1917 		goto done_unlock;
1918 
1919 	ret = __rbd_update_snaps(rbd_dev);
1920 	if (ret < 0)
1921 		goto done_unlock;
1922 
1923 	ret = count;
1924 done_unlock:
1925 	mutex_unlock(&ctl_mutex);
1926 	kfree(name);
1927 	return ret;
1928 }
1929 
1930 static ssize_t rbd_snap_rollback(struct device *dev,
1931 				 struct device_attribute *attr,
1932 				 const char *buf,
1933 				 size_t count)
1934 {
1935 	struct rbd_device *rbd_dev = dev_to_rbd(dev);
1936 	int ret;
1937 	u64 snapid;
1938 	u64 cur_ofs;
1939 	char *seg_name = NULL;
1940 	char *snap_name = kmalloc(count + 1, GFP_KERNEL);
1941 	ret = -ENOMEM;
1942 	if (!snap_name)
1943 		return ret;
1944 
1945 	/* parse snaps add command */
1946 	snprintf(snap_name, count, "%s", buf);
1947 	seg_name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO);
1948 	if (!seg_name)
1949 		goto done;
1950 
1951 	mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1952 
1953 	ret = snap_by_name(&rbd_dev->header, snap_name, &snapid, NULL);
1954 	if (ret < 0)
1955 		goto done_unlock;
1956 
1957 	dout("snapid=%lld\n", snapid);
1958 
1959 	cur_ofs = 0;
1960 	while (cur_ofs < rbd_dev->header.image_size) {
1961 		cur_ofs += rbd_get_segment(&rbd_dev->header,
1962 					   rbd_dev->obj,
1963 					   cur_ofs, (u64)-1,
1964 					   seg_name, NULL);
1965 		dout("seg_name=%s\n", seg_name);
1966 
1967 		ret = rbd_req_sync_rollback_obj(rbd_dev, snapid, seg_name);
1968 		if (ret < 0)
1969 			pr_warning("could not roll back obj %s err=%d\n",
1970 				   seg_name, ret);
1971 	}
1972 
1973 	ret = __rbd_update_snaps(rbd_dev);
1974 	if (ret < 0)
1975 		goto done_unlock;
1976 
1977 	ret = count;
1978 
1979 done_unlock:
1980 	mutex_unlock(&ctl_mutex);
1981 done:
1982 	kfree(seg_name);
1983 	kfree(snap_name);
1984 
1985 	return ret;
1986 }
1987 
1988 static struct bus_attribute rbd_bus_attrs[] = {
1989 	__ATTR(add, S_IWUSR, NULL, rbd_add),
1990 	__ATTR(remove, S_IWUSR, NULL, rbd_remove),
1991 	__ATTR_NULL
1992 };
1993 
1994 /*
1995  * create control files in sysfs
1996  * /sys/bus/rbd/...
1997  */
1998 static int rbd_sysfs_init(void)
1999 {
2000 	int ret;
2001 
2002 	rbd_bus_type.bus_attrs = rbd_bus_attrs;
2003 
2004 	ret = bus_register(&rbd_bus_type);
2005 	 if (ret < 0)
2006 		return ret;
2007 
2008 	ret = device_register(&rbd_root_dev);
2009 
2010 	return ret;
2011 }
2012 
2013 static void rbd_sysfs_cleanup(void)
2014 {
2015 	device_unregister(&rbd_root_dev);
2016 	bus_unregister(&rbd_bus_type);
2017 }
2018 
2019 int __init rbd_init(void)
2020 {
2021 	int rc;
2022 
2023 	rc = rbd_sysfs_init();
2024 	if (rc)
2025 		return rc;
2026 	spin_lock_init(&node_lock);
2027 	pr_info("loaded " DRV_NAME_LONG "\n");
2028 	return 0;
2029 }
2030 
2031 void __exit rbd_exit(void)
2032 {
2033 	rbd_sysfs_cleanup();
2034 }
2035 
2036 module_init(rbd_init);
2037 module_exit(rbd_exit);
2038 
2039 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
2040 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
2041 MODULE_DESCRIPTION("rados block device");
2042 
2043 /* following authorship retained from original osdblk.c */
2044 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
2045 
2046 MODULE_LICENSE("GPL");
2047