xref: /openbmc/linux/drivers/block/rbd.c (revision 9f380456)
1 /*
2    rbd.c -- Export ceph rados objects as a Linux block device
3 
4 
5    based on drivers/block/osdblk.c:
6 
7    Copyright 2009 Red Hat, Inc.
8 
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation.
12 
13    This program is distributed in the hope that it will be useful,
14    but WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16    GNU General Public License for more details.
17 
18    You should have received a copy of the GNU General Public License
19    along with this program; see the file COPYING.  If not, write to
20    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
21 
22 
23 
24    For usage instructions, please refer to:
25 
26                  Documentation/ABI/testing/sysfs-bus-rbd
27 
28  */
29 
30 #include <linux/ceph/libceph.h>
31 #include <linux/ceph/osd_client.h>
32 #include <linux/ceph/mon_client.h>
33 #include <linux/ceph/decode.h>
34 #include <linux/parser.h>
35 
36 #include <linux/kernel.h>
37 #include <linux/device.h>
38 #include <linux/module.h>
39 #include <linux/fs.h>
40 #include <linux/blkdev.h>
41 
42 #include "rbd_types.h"
43 
44 /*
45  * The basic unit of block I/O is a sector.  It is interpreted in a
46  * number of contexts in Linux (blk, bio, genhd), but the default is
47  * universally 512 bytes.  These symbols are just slightly more
48  * meaningful than the bare numbers they represent.
49  */
50 #define	SECTOR_SHIFT	9
51 #define	SECTOR_SIZE	(1ULL << SECTOR_SHIFT)
52 
53 #define RBD_DRV_NAME "rbd"
54 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
55 
56 #define RBD_MINORS_PER_MAJOR	256		/* max minors per blkdev */
57 
58 #define RBD_MAX_MD_NAME_LEN	(RBD_MAX_OBJ_NAME_LEN + sizeof(RBD_SUFFIX))
59 #define RBD_MAX_POOL_NAME_LEN	64
60 #define RBD_MAX_SNAP_NAME_LEN	32
61 #define RBD_MAX_OPT_LEN		1024
62 
63 #define RBD_SNAP_HEAD_NAME	"-"
64 
65 /*
66  * An RBD device name will be "rbd#", where the "rbd" comes from
67  * RBD_DRV_NAME above, and # is a unique integer identifier.
68  * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
69  * enough to hold all possible device names.
70  */
71 #define DEV_NAME_LEN		32
72 #define MAX_INT_FORMAT_WIDTH	((5 * sizeof (int)) / 2 + 1)
73 
74 #define RBD_NOTIFY_TIMEOUT_DEFAULT 10
75 
76 /*
77  * block device image metadata (in-memory version)
78  */
79 struct rbd_image_header {
80 	u64 image_size;
81 	char block_name[32];
82 	__u8 obj_order;
83 	__u8 crypt_type;
84 	__u8 comp_type;
85 	struct ceph_snap_context *snapc;
86 	size_t snap_names_len;
87 	u64 snap_seq;
88 	u32 total_snaps;
89 
90 	char *snap_names;
91 	u64 *snap_sizes;
92 
93 	u64 obj_version;
94 };
95 
96 struct rbd_options {
97 	int	notify_timeout;
98 };
99 
100 /*
101  * an instance of the client.  multiple devices may share an rbd client.
102  */
103 struct rbd_client {
104 	struct ceph_client	*client;
105 	struct rbd_options	*rbd_opts;
106 	struct kref		kref;
107 	struct list_head	node;
108 };
109 
110 /*
111  * a request completion status
112  */
113 struct rbd_req_status {
114 	int done;
115 	int rc;
116 	u64 bytes;
117 };
118 
119 /*
120  * a collection of requests
121  */
122 struct rbd_req_coll {
123 	int			total;
124 	int			num_done;
125 	struct kref		kref;
126 	struct rbd_req_status	status[0];
127 };
128 
129 /*
130  * a single io request
131  */
132 struct rbd_request {
133 	struct request		*rq;		/* blk layer request */
134 	struct bio		*bio;		/* cloned bio */
135 	struct page		**pages;	/* list of used pages */
136 	u64			len;
137 	int			coll_index;
138 	struct rbd_req_coll	*coll;
139 };
140 
141 struct rbd_snap {
142 	struct	device		dev;
143 	const char		*name;
144 	size_t			size;
145 	struct list_head	node;
146 	u64			id;
147 };
148 
149 /*
150  * a single device
151  */
152 struct rbd_device {
153 	int			id;		/* blkdev unique id */
154 
155 	int			major;		/* blkdev assigned major */
156 	struct gendisk		*disk;		/* blkdev's gendisk and rq */
157 	struct request_queue	*q;
158 
159 	struct rbd_client	*rbd_client;
160 
161 	char			name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
162 
163 	spinlock_t		lock;		/* queue lock */
164 
165 	struct rbd_image_header	header;
166 	char			obj[RBD_MAX_OBJ_NAME_LEN]; /* rbd image name */
167 	int			obj_len;
168 	char			obj_md_name[RBD_MAX_MD_NAME_LEN]; /* hdr nm. */
169 	char			pool_name[RBD_MAX_POOL_NAME_LEN];
170 	int			poolid;
171 
172 	struct ceph_osd_event   *watch_event;
173 	struct ceph_osd_request *watch_request;
174 
175 	/* protects updating the header */
176 	struct rw_semaphore     header_rwsem;
177 	char                    snap_name[RBD_MAX_SNAP_NAME_LEN];
178 	u32 cur_snap;	/* index+1 of current snapshot within snap context
179 			   0 - for the head */
180 	int read_only;
181 
182 	struct list_head	node;
183 
184 	/* list of snapshots */
185 	struct list_head	snaps;
186 
187 	/* sysfs related */
188 	struct device		dev;
189 };
190 
191 static DEFINE_MUTEX(ctl_mutex);	  /* Serialize open/close/setup/teardown */
192 
193 static LIST_HEAD(rbd_dev_list);    /* devices */
194 static DEFINE_SPINLOCK(rbd_dev_list_lock);
195 
196 static LIST_HEAD(rbd_client_list);		/* clients */
197 static DEFINE_SPINLOCK(rbd_client_list_lock);
198 
199 static int __rbd_init_snaps_header(struct rbd_device *rbd_dev);
200 static void rbd_dev_release(struct device *dev);
201 static ssize_t rbd_snap_add(struct device *dev,
202 			    struct device_attribute *attr,
203 			    const char *buf,
204 			    size_t count);
205 static void __rbd_remove_snap_dev(struct rbd_device *rbd_dev,
206 				  struct rbd_snap *snap);
207 
208 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
209 		       size_t count);
210 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
211 			  size_t count);
212 
213 static struct bus_attribute rbd_bus_attrs[] = {
214 	__ATTR(add, S_IWUSR, NULL, rbd_add),
215 	__ATTR(remove, S_IWUSR, NULL, rbd_remove),
216 	__ATTR_NULL
217 };
218 
219 static struct bus_type rbd_bus_type = {
220 	.name		= "rbd",
221 	.bus_attrs	= rbd_bus_attrs,
222 };
223 
224 static void rbd_root_dev_release(struct device *dev)
225 {
226 }
227 
228 static struct device rbd_root_dev = {
229 	.init_name =    "rbd",
230 	.release =      rbd_root_dev_release,
231 };
232 
233 
234 static struct device *rbd_get_dev(struct rbd_device *rbd_dev)
235 {
236 	return get_device(&rbd_dev->dev);
237 }
238 
239 static void rbd_put_dev(struct rbd_device *rbd_dev)
240 {
241 	put_device(&rbd_dev->dev);
242 }
243 
244 static int __rbd_update_snaps(struct rbd_device *rbd_dev);
245 
246 static int rbd_open(struct block_device *bdev, fmode_t mode)
247 {
248 	struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
249 
250 	rbd_get_dev(rbd_dev);
251 
252 	set_device_ro(bdev, rbd_dev->read_only);
253 
254 	if ((mode & FMODE_WRITE) && rbd_dev->read_only)
255 		return -EROFS;
256 
257 	return 0;
258 }
259 
260 static int rbd_release(struct gendisk *disk, fmode_t mode)
261 {
262 	struct rbd_device *rbd_dev = disk->private_data;
263 
264 	rbd_put_dev(rbd_dev);
265 
266 	return 0;
267 }
268 
269 static const struct block_device_operations rbd_bd_ops = {
270 	.owner			= THIS_MODULE,
271 	.open			= rbd_open,
272 	.release		= rbd_release,
273 };
274 
275 /*
276  * Initialize an rbd client instance.
277  * We own *opt.
278  */
279 static struct rbd_client *rbd_client_create(struct ceph_options *opt,
280 					    struct rbd_options *rbd_opts)
281 {
282 	struct rbd_client *rbdc;
283 	int ret = -ENOMEM;
284 
285 	dout("rbd_client_create\n");
286 	rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
287 	if (!rbdc)
288 		goto out_opt;
289 
290 	kref_init(&rbdc->kref);
291 	INIT_LIST_HEAD(&rbdc->node);
292 
293 	mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
294 
295 	rbdc->client = ceph_create_client(opt, rbdc, 0, 0);
296 	if (IS_ERR(rbdc->client))
297 		goto out_mutex;
298 	opt = NULL; /* Now rbdc->client is responsible for opt */
299 
300 	ret = ceph_open_session(rbdc->client);
301 	if (ret < 0)
302 		goto out_err;
303 
304 	rbdc->rbd_opts = rbd_opts;
305 
306 	spin_lock(&rbd_client_list_lock);
307 	list_add_tail(&rbdc->node, &rbd_client_list);
308 	spin_unlock(&rbd_client_list_lock);
309 
310 	mutex_unlock(&ctl_mutex);
311 
312 	dout("rbd_client_create created %p\n", rbdc);
313 	return rbdc;
314 
315 out_err:
316 	ceph_destroy_client(rbdc->client);
317 out_mutex:
318 	mutex_unlock(&ctl_mutex);
319 	kfree(rbdc);
320 out_opt:
321 	if (opt)
322 		ceph_destroy_options(opt);
323 	return ERR_PTR(ret);
324 }
325 
326 /*
327  * Find a ceph client with specific addr and configuration.
328  */
329 static struct rbd_client *__rbd_client_find(struct ceph_options *opt)
330 {
331 	struct rbd_client *client_node;
332 
333 	if (opt->flags & CEPH_OPT_NOSHARE)
334 		return NULL;
335 
336 	list_for_each_entry(client_node, &rbd_client_list, node)
337 		if (ceph_compare_options(opt, client_node->client) == 0)
338 			return client_node;
339 	return NULL;
340 }
341 
342 /*
343  * mount options
344  */
345 enum {
346 	Opt_notify_timeout,
347 	Opt_last_int,
348 	/* int args above */
349 	Opt_last_string,
350 	/* string args above */
351 };
352 
353 static match_table_t rbdopt_tokens = {
354 	{Opt_notify_timeout, "notify_timeout=%d"},
355 	/* int args above */
356 	/* string args above */
357 	{-1, NULL}
358 };
359 
360 static int parse_rbd_opts_token(char *c, void *private)
361 {
362 	struct rbd_options *rbdopt = private;
363 	substring_t argstr[MAX_OPT_ARGS];
364 	int token, intval, ret;
365 
366 	token = match_token(c, rbdopt_tokens, argstr);
367 	if (token < 0)
368 		return -EINVAL;
369 
370 	if (token < Opt_last_int) {
371 		ret = match_int(&argstr[0], &intval);
372 		if (ret < 0) {
373 			pr_err("bad mount option arg (not int) "
374 			       "at '%s'\n", c);
375 			return ret;
376 		}
377 		dout("got int token %d val %d\n", token, intval);
378 	} else if (token > Opt_last_int && token < Opt_last_string) {
379 		dout("got string token %d val %s\n", token,
380 		     argstr[0].from);
381 	} else {
382 		dout("got token %d\n", token);
383 	}
384 
385 	switch (token) {
386 	case Opt_notify_timeout:
387 		rbdopt->notify_timeout = intval;
388 		break;
389 	default:
390 		BUG_ON(token);
391 	}
392 	return 0;
393 }
394 
395 /*
396  * Get a ceph client with specific addr and configuration, if one does
397  * not exist create it.
398  */
399 static struct rbd_client *rbd_get_client(const char *mon_addr,
400 					 size_t mon_addr_len,
401 					 char *options)
402 {
403 	struct rbd_client *rbdc;
404 	struct ceph_options *opt;
405 	struct rbd_options *rbd_opts;
406 
407 	rbd_opts = kzalloc(sizeof(*rbd_opts), GFP_KERNEL);
408 	if (!rbd_opts)
409 		return ERR_PTR(-ENOMEM);
410 
411 	rbd_opts->notify_timeout = RBD_NOTIFY_TIMEOUT_DEFAULT;
412 
413 	opt = ceph_parse_options(options, mon_addr,
414 				mon_addr + mon_addr_len,
415 				parse_rbd_opts_token, rbd_opts);
416 	if (IS_ERR(opt)) {
417 		kfree(rbd_opts);
418 		return ERR_CAST(opt);
419 	}
420 
421 	spin_lock(&rbd_client_list_lock);
422 	rbdc = __rbd_client_find(opt);
423 	if (rbdc) {
424 		/* using an existing client */
425 		kref_get(&rbdc->kref);
426 		spin_unlock(&rbd_client_list_lock);
427 
428 		ceph_destroy_options(opt);
429 		kfree(rbd_opts);
430 
431 		return rbdc;
432 	}
433 	spin_unlock(&rbd_client_list_lock);
434 
435 	rbdc = rbd_client_create(opt, rbd_opts);
436 
437 	if (IS_ERR(rbdc))
438 		kfree(rbd_opts);
439 
440 	return rbdc;
441 }
442 
443 /*
444  * Destroy ceph client
445  *
446  * Caller must hold rbd_client_list_lock.
447  */
448 static void rbd_client_release(struct kref *kref)
449 {
450 	struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
451 
452 	dout("rbd_release_client %p\n", rbdc);
453 	list_del(&rbdc->node);
454 
455 	ceph_destroy_client(rbdc->client);
456 	kfree(rbdc->rbd_opts);
457 	kfree(rbdc);
458 }
459 
460 /*
461  * Drop reference to ceph client node. If it's not referenced anymore, release
462  * it.
463  */
464 static void rbd_put_client(struct rbd_device *rbd_dev)
465 {
466 	spin_lock(&rbd_client_list_lock);
467 	kref_put(&rbd_dev->rbd_client->kref, rbd_client_release);
468 	spin_unlock(&rbd_client_list_lock);
469 	rbd_dev->rbd_client = NULL;
470 }
471 
472 /*
473  * Destroy requests collection
474  */
475 static void rbd_coll_release(struct kref *kref)
476 {
477 	struct rbd_req_coll *coll =
478 		container_of(kref, struct rbd_req_coll, kref);
479 
480 	dout("rbd_coll_release %p\n", coll);
481 	kfree(coll);
482 }
483 
484 /*
485  * Create a new header structure, translate header format from the on-disk
486  * header.
487  */
488 static int rbd_header_from_disk(struct rbd_image_header *header,
489 				 struct rbd_image_header_ondisk *ondisk,
490 				 int allocated_snaps,
491 				 gfp_t gfp_flags)
492 {
493 	int i;
494 	u32 snap_count;
495 
496 	if (memcmp(ondisk, RBD_HEADER_TEXT, sizeof(RBD_HEADER_TEXT)))
497 		return -ENXIO;
498 
499 	snap_count = le32_to_cpu(ondisk->snap_count);
500 	header->snapc = kmalloc(sizeof(struct ceph_snap_context) +
501 				snap_count * sizeof (*ondisk),
502 				gfp_flags);
503 	if (!header->snapc)
504 		return -ENOMEM;
505 
506 	header->snap_names_len = le64_to_cpu(ondisk->snap_names_len);
507 	if (snap_count) {
508 		header->snap_names = kmalloc(header->snap_names_len,
509 					     GFP_KERNEL);
510 		if (!header->snap_names)
511 			goto err_snapc;
512 		header->snap_sizes = kmalloc(snap_count * sizeof(u64),
513 					     GFP_KERNEL);
514 		if (!header->snap_sizes)
515 			goto err_names;
516 	} else {
517 		header->snap_names = NULL;
518 		header->snap_sizes = NULL;
519 	}
520 	memcpy(header->block_name, ondisk->block_name,
521 	       sizeof(ondisk->block_name));
522 
523 	header->image_size = le64_to_cpu(ondisk->image_size);
524 	header->obj_order = ondisk->options.order;
525 	header->crypt_type = ondisk->options.crypt_type;
526 	header->comp_type = ondisk->options.comp_type;
527 
528 	atomic_set(&header->snapc->nref, 1);
529 	header->snap_seq = le64_to_cpu(ondisk->snap_seq);
530 	header->snapc->num_snaps = snap_count;
531 	header->total_snaps = snap_count;
532 
533 	if (snap_count && allocated_snaps == snap_count) {
534 		for (i = 0; i < snap_count; i++) {
535 			header->snapc->snaps[i] =
536 				le64_to_cpu(ondisk->snaps[i].id);
537 			header->snap_sizes[i] =
538 				le64_to_cpu(ondisk->snaps[i].image_size);
539 		}
540 
541 		/* copy snapshot names */
542 		memcpy(header->snap_names, &ondisk->snaps[i],
543 			header->snap_names_len);
544 	}
545 
546 	return 0;
547 
548 err_names:
549 	kfree(header->snap_names);
550 err_snapc:
551 	kfree(header->snapc);
552 	return -ENOMEM;
553 }
554 
555 static int snap_index(struct rbd_image_header *header, int snap_num)
556 {
557 	return header->total_snaps - snap_num;
558 }
559 
560 static u64 cur_snap_id(struct rbd_device *rbd_dev)
561 {
562 	struct rbd_image_header *header = &rbd_dev->header;
563 
564 	if (!rbd_dev->cur_snap)
565 		return 0;
566 
567 	return header->snapc->snaps[snap_index(header, rbd_dev->cur_snap)];
568 }
569 
570 static int snap_by_name(struct rbd_image_header *header, const char *snap_name,
571 			u64 *seq, u64 *size)
572 {
573 	int i;
574 	char *p = header->snap_names;
575 
576 	for (i = 0; i < header->total_snaps; i++) {
577 		if (!strcmp(snap_name, p)) {
578 
579 			/* Found it.  Pass back its id and/or size */
580 
581 			if (seq)
582 				*seq = header->snapc->snaps[i];
583 			if (size)
584 				*size = header->snap_sizes[i];
585 			return i;
586 		}
587 		p += strlen(p) + 1;	/* Skip ahead to the next name */
588 	}
589 	return -ENOENT;
590 }
591 
592 static int rbd_header_set_snap(struct rbd_device *dev, u64 *size)
593 {
594 	struct rbd_image_header *header = &dev->header;
595 	struct ceph_snap_context *snapc = header->snapc;
596 	int ret = -ENOENT;
597 
598 	BUILD_BUG_ON(sizeof (dev->snap_name) < sizeof (RBD_SNAP_HEAD_NAME));
599 
600 	down_write(&dev->header_rwsem);
601 
602 	if (!memcmp(dev->snap_name, RBD_SNAP_HEAD_NAME,
603 		    sizeof (RBD_SNAP_HEAD_NAME))) {
604 		if (header->total_snaps)
605 			snapc->seq = header->snap_seq;
606 		else
607 			snapc->seq = 0;
608 		dev->cur_snap = 0;
609 		dev->read_only = 0;
610 		if (size)
611 			*size = header->image_size;
612 	} else {
613 		ret = snap_by_name(header, dev->snap_name, &snapc->seq, size);
614 		if (ret < 0)
615 			goto done;
616 
617 		dev->cur_snap = header->total_snaps - ret;
618 		dev->read_only = 1;
619 	}
620 
621 	ret = 0;
622 done:
623 	up_write(&dev->header_rwsem);
624 	return ret;
625 }
626 
627 static void rbd_header_free(struct rbd_image_header *header)
628 {
629 	kfree(header->snapc);
630 	kfree(header->snap_names);
631 	kfree(header->snap_sizes);
632 }
633 
634 /*
635  * get the actual striped segment name, offset and length
636  */
637 static u64 rbd_get_segment(struct rbd_image_header *header,
638 			   const char *block_name,
639 			   u64 ofs, u64 len,
640 			   char *seg_name, u64 *segofs)
641 {
642 	u64 seg = ofs >> header->obj_order;
643 
644 	if (seg_name)
645 		snprintf(seg_name, RBD_MAX_SEG_NAME_LEN,
646 			 "%s.%012llx", block_name, seg);
647 
648 	ofs = ofs & ((1 << header->obj_order) - 1);
649 	len = min_t(u64, len, (1 << header->obj_order) - ofs);
650 
651 	if (segofs)
652 		*segofs = ofs;
653 
654 	return len;
655 }
656 
657 static int rbd_get_num_segments(struct rbd_image_header *header,
658 				u64 ofs, u64 len)
659 {
660 	u64 start_seg = ofs >> header->obj_order;
661 	u64 end_seg = (ofs + len - 1) >> header->obj_order;
662 	return end_seg - start_seg + 1;
663 }
664 
665 /*
666  * returns the size of an object in the image
667  */
668 static u64 rbd_obj_bytes(struct rbd_image_header *header)
669 {
670 	return 1 << header->obj_order;
671 }
672 
673 /*
674  * bio helpers
675  */
676 
677 static void bio_chain_put(struct bio *chain)
678 {
679 	struct bio *tmp;
680 
681 	while (chain) {
682 		tmp = chain;
683 		chain = chain->bi_next;
684 		bio_put(tmp);
685 	}
686 }
687 
688 /*
689  * zeros a bio chain, starting at specific offset
690  */
691 static void zero_bio_chain(struct bio *chain, int start_ofs)
692 {
693 	struct bio_vec *bv;
694 	unsigned long flags;
695 	void *buf;
696 	int i;
697 	int pos = 0;
698 
699 	while (chain) {
700 		bio_for_each_segment(bv, chain, i) {
701 			if (pos + bv->bv_len > start_ofs) {
702 				int remainder = max(start_ofs - pos, 0);
703 				buf = bvec_kmap_irq(bv, &flags);
704 				memset(buf + remainder, 0,
705 				       bv->bv_len - remainder);
706 				bvec_kunmap_irq(buf, &flags);
707 			}
708 			pos += bv->bv_len;
709 		}
710 
711 		chain = chain->bi_next;
712 	}
713 }
714 
715 /*
716  * bio_chain_clone - clone a chain of bios up to a certain length.
717  * might return a bio_pair that will need to be released.
718  */
719 static struct bio *bio_chain_clone(struct bio **old, struct bio **next,
720 				   struct bio_pair **bp,
721 				   int len, gfp_t gfpmask)
722 {
723 	struct bio *tmp, *old_chain = *old, *new_chain = NULL, *tail = NULL;
724 	int total = 0;
725 
726 	if (*bp) {
727 		bio_pair_release(*bp);
728 		*bp = NULL;
729 	}
730 
731 	while (old_chain && (total < len)) {
732 		tmp = bio_kmalloc(gfpmask, old_chain->bi_max_vecs);
733 		if (!tmp)
734 			goto err_out;
735 
736 		if (total + old_chain->bi_size > len) {
737 			struct bio_pair *bp;
738 
739 			/*
740 			 * this split can only happen with a single paged bio,
741 			 * split_bio will BUG_ON if this is not the case
742 			 */
743 			dout("bio_chain_clone split! total=%d remaining=%d"
744 			     "bi_size=%d\n",
745 			     (int)total, (int)len-total,
746 			     (int)old_chain->bi_size);
747 
748 			/* split the bio. We'll release it either in the next
749 			   call, or it will have to be released outside */
750 			bp = bio_split(old_chain, (len - total) / SECTOR_SIZE);
751 			if (!bp)
752 				goto err_out;
753 
754 			__bio_clone(tmp, &bp->bio1);
755 
756 			*next = &bp->bio2;
757 		} else {
758 			__bio_clone(tmp, old_chain);
759 			*next = old_chain->bi_next;
760 		}
761 
762 		tmp->bi_bdev = NULL;
763 		gfpmask &= ~__GFP_WAIT;
764 		tmp->bi_next = NULL;
765 
766 		if (!new_chain) {
767 			new_chain = tail = tmp;
768 		} else {
769 			tail->bi_next = tmp;
770 			tail = tmp;
771 		}
772 		old_chain = old_chain->bi_next;
773 
774 		total += tmp->bi_size;
775 	}
776 
777 	BUG_ON(total < len);
778 
779 	if (tail)
780 		tail->bi_next = NULL;
781 
782 	*old = old_chain;
783 
784 	return new_chain;
785 
786 err_out:
787 	dout("bio_chain_clone with err\n");
788 	bio_chain_put(new_chain);
789 	return NULL;
790 }
791 
792 /*
793  * helpers for osd request op vectors.
794  */
795 static int rbd_create_rw_ops(struct ceph_osd_req_op **ops,
796 			    int num_ops,
797 			    int opcode,
798 			    u32 payload_len)
799 {
800 	*ops = kzalloc(sizeof(struct ceph_osd_req_op) * (num_ops + 1),
801 		       GFP_NOIO);
802 	if (!*ops)
803 		return -ENOMEM;
804 	(*ops)[0].op = opcode;
805 	/*
806 	 * op extent offset and length will be set later on
807 	 * in calc_raw_layout()
808 	 */
809 	(*ops)[0].payload_len = payload_len;
810 	return 0;
811 }
812 
813 static void rbd_destroy_ops(struct ceph_osd_req_op *ops)
814 {
815 	kfree(ops);
816 }
817 
818 static void rbd_coll_end_req_index(struct request *rq,
819 				   struct rbd_req_coll *coll,
820 				   int index,
821 				   int ret, u64 len)
822 {
823 	struct request_queue *q;
824 	int min, max, i;
825 
826 	dout("rbd_coll_end_req_index %p index %d ret %d len %lld\n",
827 	     coll, index, ret, len);
828 
829 	if (!rq)
830 		return;
831 
832 	if (!coll) {
833 		blk_end_request(rq, ret, len);
834 		return;
835 	}
836 
837 	q = rq->q;
838 
839 	spin_lock_irq(q->queue_lock);
840 	coll->status[index].done = 1;
841 	coll->status[index].rc = ret;
842 	coll->status[index].bytes = len;
843 	max = min = coll->num_done;
844 	while (max < coll->total && coll->status[max].done)
845 		max++;
846 
847 	for (i = min; i<max; i++) {
848 		__blk_end_request(rq, coll->status[i].rc,
849 				  coll->status[i].bytes);
850 		coll->num_done++;
851 		kref_put(&coll->kref, rbd_coll_release);
852 	}
853 	spin_unlock_irq(q->queue_lock);
854 }
855 
856 static void rbd_coll_end_req(struct rbd_request *req,
857 			     int ret, u64 len)
858 {
859 	rbd_coll_end_req_index(req->rq, req->coll, req->coll_index, ret, len);
860 }
861 
862 /*
863  * Send ceph osd request
864  */
865 static int rbd_do_request(struct request *rq,
866 			  struct rbd_device *dev,
867 			  struct ceph_snap_context *snapc,
868 			  u64 snapid,
869 			  const char *obj, u64 ofs, u64 len,
870 			  struct bio *bio,
871 			  struct page **pages,
872 			  int num_pages,
873 			  int flags,
874 			  struct ceph_osd_req_op *ops,
875 			  int num_reply,
876 			  struct rbd_req_coll *coll,
877 			  int coll_index,
878 			  void (*rbd_cb)(struct ceph_osd_request *req,
879 					 struct ceph_msg *msg),
880 			  struct ceph_osd_request **linger_req,
881 			  u64 *ver)
882 {
883 	struct ceph_osd_request *req;
884 	struct ceph_file_layout *layout;
885 	int ret;
886 	u64 bno;
887 	struct timespec mtime = CURRENT_TIME;
888 	struct rbd_request *req_data;
889 	struct ceph_osd_request_head *reqhead;
890 	struct ceph_osd_client *osdc;
891 
892 	req_data = kzalloc(sizeof(*req_data), GFP_NOIO);
893 	if (!req_data) {
894 		if (coll)
895 			rbd_coll_end_req_index(rq, coll, coll_index,
896 					       -ENOMEM, len);
897 		return -ENOMEM;
898 	}
899 
900 	if (coll) {
901 		req_data->coll = coll;
902 		req_data->coll_index = coll_index;
903 	}
904 
905 	dout("rbd_do_request obj=%s ofs=%lld len=%lld\n", obj, len, ofs);
906 
907 	down_read(&dev->header_rwsem);
908 
909 	osdc = &dev->rbd_client->client->osdc;
910 	req = ceph_osdc_alloc_request(osdc, flags, snapc, ops,
911 					false, GFP_NOIO, pages, bio);
912 	if (!req) {
913 		up_read(&dev->header_rwsem);
914 		ret = -ENOMEM;
915 		goto done_pages;
916 	}
917 
918 	req->r_callback = rbd_cb;
919 
920 	req_data->rq = rq;
921 	req_data->bio = bio;
922 	req_data->pages = pages;
923 	req_data->len = len;
924 
925 	req->r_priv = req_data;
926 
927 	reqhead = req->r_request->front.iov_base;
928 	reqhead->snapid = cpu_to_le64(CEPH_NOSNAP);
929 
930 	strncpy(req->r_oid, obj, sizeof(req->r_oid));
931 	req->r_oid_len = strlen(req->r_oid);
932 
933 	layout = &req->r_file_layout;
934 	memset(layout, 0, sizeof(*layout));
935 	layout->fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
936 	layout->fl_stripe_count = cpu_to_le32(1);
937 	layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
938 	layout->fl_pg_preferred = cpu_to_le32(-1);
939 	layout->fl_pg_pool = cpu_to_le32(dev->poolid);
940 	ceph_calc_raw_layout(osdc, layout, snapid, ofs, &len, &bno,
941 				req, ops);
942 
943 	ceph_osdc_build_request(req, ofs, &len,
944 				ops,
945 				snapc,
946 				&mtime,
947 				req->r_oid, req->r_oid_len);
948 	up_read(&dev->header_rwsem);
949 
950 	if (linger_req) {
951 		ceph_osdc_set_request_linger(osdc, req);
952 		*linger_req = req;
953 	}
954 
955 	ret = ceph_osdc_start_request(osdc, req, false);
956 	if (ret < 0)
957 		goto done_err;
958 
959 	if (!rbd_cb) {
960 		ret = ceph_osdc_wait_request(osdc, req);
961 		if (ver)
962 			*ver = le64_to_cpu(req->r_reassert_version.version);
963 		dout("reassert_ver=%lld\n",
964 		     le64_to_cpu(req->r_reassert_version.version));
965 		ceph_osdc_put_request(req);
966 	}
967 	return ret;
968 
969 done_err:
970 	bio_chain_put(req_data->bio);
971 	ceph_osdc_put_request(req);
972 done_pages:
973 	rbd_coll_end_req(req_data, ret, len);
974 	kfree(req_data);
975 	return ret;
976 }
977 
978 /*
979  * Ceph osd op callback
980  */
981 static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
982 {
983 	struct rbd_request *req_data = req->r_priv;
984 	struct ceph_osd_reply_head *replyhead;
985 	struct ceph_osd_op *op;
986 	__s32 rc;
987 	u64 bytes;
988 	int read_op;
989 
990 	/* parse reply */
991 	replyhead = msg->front.iov_base;
992 	WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
993 	op = (void *)(replyhead + 1);
994 	rc = le32_to_cpu(replyhead->result);
995 	bytes = le64_to_cpu(op->extent.length);
996 	read_op = (le32_to_cpu(op->op) == CEPH_OSD_OP_READ);
997 
998 	dout("rbd_req_cb bytes=%lld readop=%d rc=%d\n", bytes, read_op, rc);
999 
1000 	if (rc == -ENOENT && read_op) {
1001 		zero_bio_chain(req_data->bio, 0);
1002 		rc = 0;
1003 	} else if (rc == 0 && read_op && bytes < req_data->len) {
1004 		zero_bio_chain(req_data->bio, bytes);
1005 		bytes = req_data->len;
1006 	}
1007 
1008 	rbd_coll_end_req(req_data, rc, bytes);
1009 
1010 	if (req_data->bio)
1011 		bio_chain_put(req_data->bio);
1012 
1013 	ceph_osdc_put_request(req);
1014 	kfree(req_data);
1015 }
1016 
1017 static void rbd_simple_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
1018 {
1019 	ceph_osdc_put_request(req);
1020 }
1021 
1022 /*
1023  * Do a synchronous ceph osd operation
1024  */
1025 static int rbd_req_sync_op(struct rbd_device *dev,
1026 			   struct ceph_snap_context *snapc,
1027 			   u64 snapid,
1028 			   int opcode,
1029 			   int flags,
1030 			   struct ceph_osd_req_op *orig_ops,
1031 			   int num_reply,
1032 			   const char *obj,
1033 			   u64 ofs, u64 len,
1034 			   char *buf,
1035 			   struct ceph_osd_request **linger_req,
1036 			   u64 *ver)
1037 {
1038 	int ret;
1039 	struct page **pages;
1040 	int num_pages;
1041 	struct ceph_osd_req_op *ops = orig_ops;
1042 	u32 payload_len;
1043 
1044 	num_pages = calc_pages_for(ofs , len);
1045 	pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
1046 	if (IS_ERR(pages))
1047 		return PTR_ERR(pages);
1048 
1049 	if (!orig_ops) {
1050 		payload_len = (flags & CEPH_OSD_FLAG_WRITE ? len : 0);
1051 		ret = rbd_create_rw_ops(&ops, 1, opcode, payload_len);
1052 		if (ret < 0)
1053 			goto done;
1054 
1055 		if ((flags & CEPH_OSD_FLAG_WRITE) && buf) {
1056 			ret = ceph_copy_to_page_vector(pages, buf, ofs, len);
1057 			if (ret < 0)
1058 				goto done_ops;
1059 		}
1060 	}
1061 
1062 	ret = rbd_do_request(NULL, dev, snapc, snapid,
1063 			  obj, ofs, len, NULL,
1064 			  pages, num_pages,
1065 			  flags,
1066 			  ops,
1067 			  2,
1068 			  NULL, 0,
1069 			  NULL,
1070 			  linger_req, ver);
1071 	if (ret < 0)
1072 		goto done_ops;
1073 
1074 	if ((flags & CEPH_OSD_FLAG_READ) && buf)
1075 		ret = ceph_copy_from_page_vector(pages, buf, ofs, ret);
1076 
1077 done_ops:
1078 	if (!orig_ops)
1079 		rbd_destroy_ops(ops);
1080 done:
1081 	ceph_release_page_vector(pages, num_pages);
1082 	return ret;
1083 }
1084 
1085 /*
1086  * Do an asynchronous ceph osd operation
1087  */
1088 static int rbd_do_op(struct request *rq,
1089 		     struct rbd_device *rbd_dev ,
1090 		     struct ceph_snap_context *snapc,
1091 		     u64 snapid,
1092 		     int opcode, int flags, int num_reply,
1093 		     u64 ofs, u64 len,
1094 		     struct bio *bio,
1095 		     struct rbd_req_coll *coll,
1096 		     int coll_index)
1097 {
1098 	char *seg_name;
1099 	u64 seg_ofs;
1100 	u64 seg_len;
1101 	int ret;
1102 	struct ceph_osd_req_op *ops;
1103 	u32 payload_len;
1104 
1105 	seg_name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO);
1106 	if (!seg_name)
1107 		return -ENOMEM;
1108 
1109 	seg_len = rbd_get_segment(&rbd_dev->header,
1110 				  rbd_dev->header.block_name,
1111 				  ofs, len,
1112 				  seg_name, &seg_ofs);
1113 
1114 	payload_len = (flags & CEPH_OSD_FLAG_WRITE ? seg_len : 0);
1115 
1116 	ret = rbd_create_rw_ops(&ops, 1, opcode, payload_len);
1117 	if (ret < 0)
1118 		goto done;
1119 
1120 	/* we've taken care of segment sizes earlier when we
1121 	   cloned the bios. We should never have a segment
1122 	   truncated at this point */
1123 	BUG_ON(seg_len < len);
1124 
1125 	ret = rbd_do_request(rq, rbd_dev, snapc, snapid,
1126 			     seg_name, seg_ofs, seg_len,
1127 			     bio,
1128 			     NULL, 0,
1129 			     flags,
1130 			     ops,
1131 			     num_reply,
1132 			     coll, coll_index,
1133 			     rbd_req_cb, 0, NULL);
1134 
1135 	rbd_destroy_ops(ops);
1136 done:
1137 	kfree(seg_name);
1138 	return ret;
1139 }
1140 
1141 /*
1142  * Request async osd write
1143  */
1144 static int rbd_req_write(struct request *rq,
1145 			 struct rbd_device *rbd_dev,
1146 			 struct ceph_snap_context *snapc,
1147 			 u64 ofs, u64 len,
1148 			 struct bio *bio,
1149 			 struct rbd_req_coll *coll,
1150 			 int coll_index)
1151 {
1152 	return rbd_do_op(rq, rbd_dev, snapc, CEPH_NOSNAP,
1153 			 CEPH_OSD_OP_WRITE,
1154 			 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1155 			 2,
1156 			 ofs, len, bio, coll, coll_index);
1157 }
1158 
1159 /*
1160  * Request async osd read
1161  */
1162 static int rbd_req_read(struct request *rq,
1163 			 struct rbd_device *rbd_dev,
1164 			 u64 snapid,
1165 			 u64 ofs, u64 len,
1166 			 struct bio *bio,
1167 			 struct rbd_req_coll *coll,
1168 			 int coll_index)
1169 {
1170 	return rbd_do_op(rq, rbd_dev, NULL,
1171 			 (snapid ? snapid : CEPH_NOSNAP),
1172 			 CEPH_OSD_OP_READ,
1173 			 CEPH_OSD_FLAG_READ,
1174 			 2,
1175 			 ofs, len, bio, coll, coll_index);
1176 }
1177 
1178 /*
1179  * Request sync osd read
1180  */
1181 static int rbd_req_sync_read(struct rbd_device *dev,
1182 			  struct ceph_snap_context *snapc,
1183 			  u64 snapid,
1184 			  const char *obj,
1185 			  u64 ofs, u64 len,
1186 			  char *buf,
1187 			  u64 *ver)
1188 {
1189 	return rbd_req_sync_op(dev, NULL,
1190 			       (snapid ? snapid : CEPH_NOSNAP),
1191 			       CEPH_OSD_OP_READ,
1192 			       CEPH_OSD_FLAG_READ,
1193 			       NULL,
1194 			       1, obj, ofs, len, buf, NULL, ver);
1195 }
1196 
1197 /*
1198  * Request sync osd watch
1199  */
1200 static int rbd_req_sync_notify_ack(struct rbd_device *dev,
1201 				   u64 ver,
1202 				   u64 notify_id,
1203 				   const char *obj)
1204 {
1205 	struct ceph_osd_req_op *ops;
1206 	struct page **pages = NULL;
1207 	int ret;
1208 
1209 	ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_NOTIFY_ACK, 0);
1210 	if (ret < 0)
1211 		return ret;
1212 
1213 	ops[0].watch.ver = cpu_to_le64(dev->header.obj_version);
1214 	ops[0].watch.cookie = notify_id;
1215 	ops[0].watch.flag = 0;
1216 
1217 	ret = rbd_do_request(NULL, dev, NULL, CEPH_NOSNAP,
1218 			  obj, 0, 0, NULL,
1219 			  pages, 0,
1220 			  CEPH_OSD_FLAG_READ,
1221 			  ops,
1222 			  1,
1223 			  NULL, 0,
1224 			  rbd_simple_req_cb, 0, NULL);
1225 
1226 	rbd_destroy_ops(ops);
1227 	return ret;
1228 }
1229 
1230 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1231 {
1232 	struct rbd_device *dev = (struct rbd_device *)data;
1233 	int rc;
1234 
1235 	if (!dev)
1236 		return;
1237 
1238 	dout("rbd_watch_cb %s notify_id=%lld opcode=%d\n", dev->obj_md_name,
1239 		notify_id, (int)opcode);
1240 	mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1241 	rc = __rbd_update_snaps(dev);
1242 	mutex_unlock(&ctl_mutex);
1243 	if (rc)
1244 		pr_warning(RBD_DRV_NAME "%d got notification but failed to "
1245 			   " update snaps: %d\n", dev->major, rc);
1246 
1247 	rbd_req_sync_notify_ack(dev, ver, notify_id, dev->obj_md_name);
1248 }
1249 
1250 /*
1251  * Request sync osd watch
1252  */
1253 static int rbd_req_sync_watch(struct rbd_device *dev,
1254 			      const char *obj,
1255 			      u64 ver)
1256 {
1257 	struct ceph_osd_req_op *ops;
1258 	struct ceph_osd_client *osdc = &dev->rbd_client->client->osdc;
1259 
1260 	int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_WATCH, 0);
1261 	if (ret < 0)
1262 		return ret;
1263 
1264 	ret = ceph_osdc_create_event(osdc, rbd_watch_cb, 0,
1265 				     (void *)dev, &dev->watch_event);
1266 	if (ret < 0)
1267 		goto fail;
1268 
1269 	ops[0].watch.ver = cpu_to_le64(ver);
1270 	ops[0].watch.cookie = cpu_to_le64(dev->watch_event->cookie);
1271 	ops[0].watch.flag = 1;
1272 
1273 	ret = rbd_req_sync_op(dev, NULL,
1274 			      CEPH_NOSNAP,
1275 			      0,
1276 			      CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1277 			      ops,
1278 			      1, obj, 0, 0, NULL,
1279 			      &dev->watch_request, NULL);
1280 
1281 	if (ret < 0)
1282 		goto fail_event;
1283 
1284 	rbd_destroy_ops(ops);
1285 	return 0;
1286 
1287 fail_event:
1288 	ceph_osdc_cancel_event(dev->watch_event);
1289 	dev->watch_event = NULL;
1290 fail:
1291 	rbd_destroy_ops(ops);
1292 	return ret;
1293 }
1294 
1295 /*
1296  * Request sync osd unwatch
1297  */
1298 static int rbd_req_sync_unwatch(struct rbd_device *dev,
1299 				const char *obj)
1300 {
1301 	struct ceph_osd_req_op *ops;
1302 
1303 	int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_WATCH, 0);
1304 	if (ret < 0)
1305 		return ret;
1306 
1307 	ops[0].watch.ver = 0;
1308 	ops[0].watch.cookie = cpu_to_le64(dev->watch_event->cookie);
1309 	ops[0].watch.flag = 0;
1310 
1311 	ret = rbd_req_sync_op(dev, NULL,
1312 			      CEPH_NOSNAP,
1313 			      0,
1314 			      CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1315 			      ops,
1316 			      1, obj, 0, 0, NULL, NULL, NULL);
1317 
1318 	rbd_destroy_ops(ops);
1319 	ceph_osdc_cancel_event(dev->watch_event);
1320 	dev->watch_event = NULL;
1321 	return ret;
1322 }
1323 
1324 struct rbd_notify_info {
1325 	struct rbd_device *dev;
1326 };
1327 
1328 static void rbd_notify_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1329 {
1330 	struct rbd_device *dev = (struct rbd_device *)data;
1331 	if (!dev)
1332 		return;
1333 
1334 	dout("rbd_notify_cb %s notify_id=%lld opcode=%d\n", dev->obj_md_name,
1335 		notify_id, (int)opcode);
1336 }
1337 
1338 /*
1339  * Request sync osd notify
1340  */
1341 static int rbd_req_sync_notify(struct rbd_device *dev,
1342 		          const char *obj)
1343 {
1344 	struct ceph_osd_req_op *ops;
1345 	struct ceph_osd_client *osdc = &dev->rbd_client->client->osdc;
1346 	struct ceph_osd_event *event;
1347 	struct rbd_notify_info info;
1348 	int payload_len = sizeof(u32) + sizeof(u32);
1349 	int ret;
1350 
1351 	ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_NOTIFY, payload_len);
1352 	if (ret < 0)
1353 		return ret;
1354 
1355 	info.dev = dev;
1356 
1357 	ret = ceph_osdc_create_event(osdc, rbd_notify_cb, 1,
1358 				     (void *)&info, &event);
1359 	if (ret < 0)
1360 		goto fail;
1361 
1362 	ops[0].watch.ver = 1;
1363 	ops[0].watch.flag = 1;
1364 	ops[0].watch.cookie = event->cookie;
1365 	ops[0].watch.prot_ver = RADOS_NOTIFY_VER;
1366 	ops[0].watch.timeout = 12;
1367 
1368 	ret = rbd_req_sync_op(dev, NULL,
1369 			       CEPH_NOSNAP,
1370 			       0,
1371 			       CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1372 			       ops,
1373 			       1, obj, 0, 0, NULL, NULL, NULL);
1374 	if (ret < 0)
1375 		goto fail_event;
1376 
1377 	ret = ceph_osdc_wait_event(event, CEPH_OSD_TIMEOUT_DEFAULT);
1378 	dout("ceph_osdc_wait_event returned %d\n", ret);
1379 	rbd_destroy_ops(ops);
1380 	return 0;
1381 
1382 fail_event:
1383 	ceph_osdc_cancel_event(event);
1384 fail:
1385 	rbd_destroy_ops(ops);
1386 	return ret;
1387 }
1388 
1389 /*
1390  * Request sync osd read
1391  */
1392 static int rbd_req_sync_exec(struct rbd_device *dev,
1393 			     const char *obj,
1394 			     const char *cls,
1395 			     const char *method,
1396 			     const char *data,
1397 			     int len,
1398 			     u64 *ver)
1399 {
1400 	struct ceph_osd_req_op *ops;
1401 	int cls_len = strlen(cls);
1402 	int method_len = strlen(method);
1403 	int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_CALL,
1404 				    cls_len + method_len + len);
1405 	if (ret < 0)
1406 		return ret;
1407 
1408 	ops[0].cls.class_name = cls;
1409 	ops[0].cls.class_len = (__u8)cls_len;
1410 	ops[0].cls.method_name = method;
1411 	ops[0].cls.method_len = (__u8)method_len;
1412 	ops[0].cls.argc = 0;
1413 	ops[0].cls.indata = data;
1414 	ops[0].cls.indata_len = len;
1415 
1416 	ret = rbd_req_sync_op(dev, NULL,
1417 			       CEPH_NOSNAP,
1418 			       0,
1419 			       CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1420 			       ops,
1421 			       1, obj, 0, 0, NULL, NULL, ver);
1422 
1423 	rbd_destroy_ops(ops);
1424 
1425 	dout("cls_exec returned %d\n", ret);
1426 	return ret;
1427 }
1428 
1429 static struct rbd_req_coll *rbd_alloc_coll(int num_reqs)
1430 {
1431 	struct rbd_req_coll *coll =
1432 			kzalloc(sizeof(struct rbd_req_coll) +
1433 			        sizeof(struct rbd_req_status) * num_reqs,
1434 				GFP_ATOMIC);
1435 
1436 	if (!coll)
1437 		return NULL;
1438 	coll->total = num_reqs;
1439 	kref_init(&coll->kref);
1440 	return coll;
1441 }
1442 
1443 /*
1444  * block device queue callback
1445  */
1446 static void rbd_rq_fn(struct request_queue *q)
1447 {
1448 	struct rbd_device *rbd_dev = q->queuedata;
1449 	struct request *rq;
1450 	struct bio_pair *bp = NULL;
1451 
1452 	while ((rq = blk_fetch_request(q))) {
1453 		struct bio *bio;
1454 		struct bio *rq_bio, *next_bio = NULL;
1455 		bool do_write;
1456 		int size, op_size = 0;
1457 		u64 ofs;
1458 		int num_segs, cur_seg = 0;
1459 		struct rbd_req_coll *coll;
1460 
1461 		/* peek at request from block layer */
1462 		if (!rq)
1463 			break;
1464 
1465 		dout("fetched request\n");
1466 
1467 		/* filter out block requests we don't understand */
1468 		if ((rq->cmd_type != REQ_TYPE_FS)) {
1469 			__blk_end_request_all(rq, 0);
1470 			continue;
1471 		}
1472 
1473 		/* deduce our operation (read, write) */
1474 		do_write = (rq_data_dir(rq) == WRITE);
1475 
1476 		size = blk_rq_bytes(rq);
1477 		ofs = blk_rq_pos(rq) * SECTOR_SIZE;
1478 		rq_bio = rq->bio;
1479 		if (do_write && rbd_dev->read_only) {
1480 			__blk_end_request_all(rq, -EROFS);
1481 			continue;
1482 		}
1483 
1484 		spin_unlock_irq(q->queue_lock);
1485 
1486 		dout("%s 0x%x bytes at 0x%llx\n",
1487 		     do_write ? "write" : "read",
1488 		     size, blk_rq_pos(rq) * SECTOR_SIZE);
1489 
1490 		num_segs = rbd_get_num_segments(&rbd_dev->header, ofs, size);
1491 		coll = rbd_alloc_coll(num_segs);
1492 		if (!coll) {
1493 			spin_lock_irq(q->queue_lock);
1494 			__blk_end_request_all(rq, -ENOMEM);
1495 			continue;
1496 		}
1497 
1498 		do {
1499 			/* a bio clone to be passed down to OSD req */
1500 			dout("rq->bio->bi_vcnt=%d\n", rq->bio->bi_vcnt);
1501 			op_size = rbd_get_segment(&rbd_dev->header,
1502 						  rbd_dev->header.block_name,
1503 						  ofs, size,
1504 						  NULL, NULL);
1505 			kref_get(&coll->kref);
1506 			bio = bio_chain_clone(&rq_bio, &next_bio, &bp,
1507 					      op_size, GFP_ATOMIC);
1508 			if (!bio) {
1509 				rbd_coll_end_req_index(rq, coll, cur_seg,
1510 						       -ENOMEM, op_size);
1511 				goto next_seg;
1512 			}
1513 
1514 
1515 			/* init OSD command: write or read */
1516 			if (do_write)
1517 				rbd_req_write(rq, rbd_dev,
1518 					      rbd_dev->header.snapc,
1519 					      ofs,
1520 					      op_size, bio,
1521 					      coll, cur_seg);
1522 			else
1523 				rbd_req_read(rq, rbd_dev,
1524 					     cur_snap_id(rbd_dev),
1525 					     ofs,
1526 					     op_size, bio,
1527 					     coll, cur_seg);
1528 
1529 next_seg:
1530 			size -= op_size;
1531 			ofs += op_size;
1532 
1533 			cur_seg++;
1534 			rq_bio = next_bio;
1535 		} while (size > 0);
1536 		kref_put(&coll->kref, rbd_coll_release);
1537 
1538 		if (bp)
1539 			bio_pair_release(bp);
1540 		spin_lock_irq(q->queue_lock);
1541 	}
1542 }
1543 
1544 /*
1545  * a queue callback. Makes sure that we don't create a bio that spans across
1546  * multiple osd objects. One exception would be with a single page bios,
1547  * which we handle later at bio_chain_clone
1548  */
1549 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
1550 			  struct bio_vec *bvec)
1551 {
1552 	struct rbd_device *rbd_dev = q->queuedata;
1553 	unsigned int chunk_sectors;
1554 	sector_t sector;
1555 	unsigned int bio_sectors;
1556 	int max;
1557 
1558 	chunk_sectors = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
1559 	sector = bmd->bi_sector + get_start_sect(bmd->bi_bdev);
1560 	bio_sectors = bmd->bi_size >> SECTOR_SHIFT;
1561 
1562 	max =  (chunk_sectors - ((sector & (chunk_sectors - 1))
1563 				 + bio_sectors)) << SECTOR_SHIFT;
1564 	if (max < 0)
1565 		max = 0; /* bio_add cannot handle a negative return */
1566 	if (max <= bvec->bv_len && bio_sectors == 0)
1567 		return bvec->bv_len;
1568 	return max;
1569 }
1570 
1571 static void rbd_free_disk(struct rbd_device *rbd_dev)
1572 {
1573 	struct gendisk *disk = rbd_dev->disk;
1574 
1575 	if (!disk)
1576 		return;
1577 
1578 	rbd_header_free(&rbd_dev->header);
1579 
1580 	if (disk->flags & GENHD_FL_UP)
1581 		del_gendisk(disk);
1582 	if (disk->queue)
1583 		blk_cleanup_queue(disk->queue);
1584 	put_disk(disk);
1585 }
1586 
1587 /*
1588  * reload the ondisk the header
1589  */
1590 static int rbd_read_header(struct rbd_device *rbd_dev,
1591 			   struct rbd_image_header *header)
1592 {
1593 	ssize_t rc;
1594 	struct rbd_image_header_ondisk *dh;
1595 	int snap_count = 0;
1596 	u64 ver;
1597 	size_t len;
1598 
1599 	/*
1600 	 * First reads the fixed-size header to determine the number
1601 	 * of snapshots, then re-reads it, along with all snapshot
1602 	 * records as well as their stored names.
1603 	 */
1604 	len = sizeof (*dh);
1605 	while (1) {
1606 		dh = kmalloc(len, GFP_KERNEL);
1607 		if (!dh)
1608 			return -ENOMEM;
1609 
1610 		rc = rbd_req_sync_read(rbd_dev,
1611 				       NULL, CEPH_NOSNAP,
1612 				       rbd_dev->obj_md_name,
1613 				       0, len,
1614 				       (char *)dh, &ver);
1615 		if (rc < 0)
1616 			goto out_dh;
1617 
1618 		rc = rbd_header_from_disk(header, dh, snap_count, GFP_KERNEL);
1619 		if (rc < 0) {
1620 			if (rc == -ENXIO)
1621 				pr_warning("unrecognized header format"
1622 					   " for image %s", rbd_dev->obj);
1623 			goto out_dh;
1624 		}
1625 
1626 		if (snap_count == header->total_snaps)
1627 			break;
1628 
1629 		snap_count = header->total_snaps;
1630 		len = sizeof (*dh) +
1631 			snap_count * sizeof(struct rbd_image_snap_ondisk) +
1632 			header->snap_names_len;
1633 
1634 		rbd_header_free(header);
1635 		kfree(dh);
1636 	}
1637 	header->obj_version = ver;
1638 
1639 out_dh:
1640 	kfree(dh);
1641 	return rc;
1642 }
1643 
1644 /*
1645  * create a snapshot
1646  */
1647 static int rbd_header_add_snap(struct rbd_device *dev,
1648 			       const char *snap_name,
1649 			       gfp_t gfp_flags)
1650 {
1651 	int name_len = strlen(snap_name);
1652 	u64 new_snapid;
1653 	int ret;
1654 	void *data, *p, *e;
1655 	u64 ver;
1656 	struct ceph_mon_client *monc;
1657 
1658 	/* we should create a snapshot only if we're pointing at the head */
1659 	if (dev->cur_snap)
1660 		return -EINVAL;
1661 
1662 	monc = &dev->rbd_client->client->monc;
1663 	ret = ceph_monc_create_snapid(monc, dev->poolid, &new_snapid);
1664 	dout("created snapid=%lld\n", new_snapid);
1665 	if (ret < 0)
1666 		return ret;
1667 
1668 	data = kmalloc(name_len + 16, gfp_flags);
1669 	if (!data)
1670 		return -ENOMEM;
1671 
1672 	p = data;
1673 	e = data + name_len + 16;
1674 
1675 	ceph_encode_string_safe(&p, e, snap_name, name_len, bad);
1676 	ceph_encode_64_safe(&p, e, new_snapid, bad);
1677 
1678 	ret = rbd_req_sync_exec(dev, dev->obj_md_name, "rbd", "snap_add",
1679 				data, p - data, &ver);
1680 
1681 	kfree(data);
1682 
1683 	if (ret < 0)
1684 		return ret;
1685 
1686 	dev->header.snapc->seq =  new_snapid;
1687 
1688 	return 0;
1689 bad:
1690 	return -ERANGE;
1691 }
1692 
1693 static void __rbd_remove_all_snaps(struct rbd_device *rbd_dev)
1694 {
1695 	struct rbd_snap *snap;
1696 
1697 	while (!list_empty(&rbd_dev->snaps)) {
1698 		snap = list_first_entry(&rbd_dev->snaps, struct rbd_snap, node);
1699 		__rbd_remove_snap_dev(rbd_dev, snap);
1700 	}
1701 }
1702 
1703 /*
1704  * only read the first part of the ondisk header, without the snaps info
1705  */
1706 static int __rbd_update_snaps(struct rbd_device *rbd_dev)
1707 {
1708 	int ret;
1709 	struct rbd_image_header h;
1710 	u64 snap_seq;
1711 	int follow_seq = 0;
1712 
1713 	ret = rbd_read_header(rbd_dev, &h);
1714 	if (ret < 0)
1715 		return ret;
1716 
1717 	/* resized? */
1718 	set_capacity(rbd_dev->disk, h.image_size / SECTOR_SIZE);
1719 
1720 	down_write(&rbd_dev->header_rwsem);
1721 
1722 	snap_seq = rbd_dev->header.snapc->seq;
1723 	if (rbd_dev->header.total_snaps &&
1724 	    rbd_dev->header.snapc->snaps[0] == snap_seq)
1725 		/* pointing at the head, will need to follow that
1726 		   if head moves */
1727 		follow_seq = 1;
1728 
1729 	kfree(rbd_dev->header.snapc);
1730 	kfree(rbd_dev->header.snap_names);
1731 	kfree(rbd_dev->header.snap_sizes);
1732 
1733 	rbd_dev->header.total_snaps = h.total_snaps;
1734 	rbd_dev->header.snapc = h.snapc;
1735 	rbd_dev->header.snap_names = h.snap_names;
1736 	rbd_dev->header.snap_names_len = h.snap_names_len;
1737 	rbd_dev->header.snap_sizes = h.snap_sizes;
1738 	if (follow_seq)
1739 		rbd_dev->header.snapc->seq = rbd_dev->header.snapc->snaps[0];
1740 	else
1741 		rbd_dev->header.snapc->seq = snap_seq;
1742 
1743 	ret = __rbd_init_snaps_header(rbd_dev);
1744 
1745 	up_write(&rbd_dev->header_rwsem);
1746 
1747 	return ret;
1748 }
1749 
1750 static int rbd_init_disk(struct rbd_device *rbd_dev)
1751 {
1752 	struct gendisk *disk;
1753 	struct request_queue *q;
1754 	int rc;
1755 	u64 segment_size;
1756 	u64 total_size = 0;
1757 
1758 	/* contact OSD, request size info about the object being mapped */
1759 	rc = rbd_read_header(rbd_dev, &rbd_dev->header);
1760 	if (rc)
1761 		return rc;
1762 
1763 	/* no need to lock here, as rbd_dev is not registered yet */
1764 	rc = __rbd_init_snaps_header(rbd_dev);
1765 	if (rc)
1766 		return rc;
1767 
1768 	rc = rbd_header_set_snap(rbd_dev, &total_size);
1769 	if (rc)
1770 		return rc;
1771 
1772 	/* create gendisk info */
1773 	rc = -ENOMEM;
1774 	disk = alloc_disk(RBD_MINORS_PER_MAJOR);
1775 	if (!disk)
1776 		goto out;
1777 
1778 	snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
1779 		 rbd_dev->id);
1780 	disk->major = rbd_dev->major;
1781 	disk->first_minor = 0;
1782 	disk->fops = &rbd_bd_ops;
1783 	disk->private_data = rbd_dev;
1784 
1785 	/* init rq */
1786 	rc = -ENOMEM;
1787 	q = blk_init_queue(rbd_rq_fn, &rbd_dev->lock);
1788 	if (!q)
1789 		goto out_disk;
1790 
1791 	/* We use the default size, but let's be explicit about it. */
1792 	blk_queue_physical_block_size(q, SECTOR_SIZE);
1793 
1794 	/* set io sizes to object size */
1795 	segment_size = rbd_obj_bytes(&rbd_dev->header);
1796 	blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
1797 	blk_queue_max_segment_size(q, segment_size);
1798 	blk_queue_io_min(q, segment_size);
1799 	blk_queue_io_opt(q, segment_size);
1800 
1801 	blk_queue_merge_bvec(q, rbd_merge_bvec);
1802 	disk->queue = q;
1803 
1804 	q->queuedata = rbd_dev;
1805 
1806 	rbd_dev->disk = disk;
1807 	rbd_dev->q = q;
1808 
1809 	/* finally, announce the disk to the world */
1810 	set_capacity(disk, total_size / SECTOR_SIZE);
1811 	add_disk(disk);
1812 
1813 	pr_info("%s: added with size 0x%llx\n",
1814 		disk->disk_name, (unsigned long long)total_size);
1815 	return 0;
1816 
1817 out_disk:
1818 	put_disk(disk);
1819 out:
1820 	return rc;
1821 }
1822 
1823 /*
1824   sysfs
1825 */
1826 
1827 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
1828 {
1829 	return container_of(dev, struct rbd_device, dev);
1830 }
1831 
1832 static ssize_t rbd_size_show(struct device *dev,
1833 			     struct device_attribute *attr, char *buf)
1834 {
1835 	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1836 
1837 	return sprintf(buf, "%llu\n", (unsigned long long)rbd_dev->header.image_size);
1838 }
1839 
1840 static ssize_t rbd_major_show(struct device *dev,
1841 			      struct device_attribute *attr, char *buf)
1842 {
1843 	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1844 
1845 	return sprintf(buf, "%d\n", rbd_dev->major);
1846 }
1847 
1848 static ssize_t rbd_client_id_show(struct device *dev,
1849 				  struct device_attribute *attr, char *buf)
1850 {
1851 	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1852 
1853 	return sprintf(buf, "client%lld\n",
1854 			ceph_client_id(rbd_dev->rbd_client->client));
1855 }
1856 
1857 static ssize_t rbd_pool_show(struct device *dev,
1858 			     struct device_attribute *attr, char *buf)
1859 {
1860 	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1861 
1862 	return sprintf(buf, "%s\n", rbd_dev->pool_name);
1863 }
1864 
1865 static ssize_t rbd_name_show(struct device *dev,
1866 			     struct device_attribute *attr, char *buf)
1867 {
1868 	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1869 
1870 	return sprintf(buf, "%s\n", rbd_dev->obj);
1871 }
1872 
1873 static ssize_t rbd_snap_show(struct device *dev,
1874 			     struct device_attribute *attr,
1875 			     char *buf)
1876 {
1877 	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1878 
1879 	return sprintf(buf, "%s\n", rbd_dev->snap_name);
1880 }
1881 
1882 static ssize_t rbd_image_refresh(struct device *dev,
1883 				 struct device_attribute *attr,
1884 				 const char *buf,
1885 				 size_t size)
1886 {
1887 	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1888 	int rc;
1889 	int ret = size;
1890 
1891 	mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1892 
1893 	rc = __rbd_update_snaps(rbd_dev);
1894 	if (rc < 0)
1895 		ret = rc;
1896 
1897 	mutex_unlock(&ctl_mutex);
1898 	return ret;
1899 }
1900 
1901 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
1902 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
1903 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
1904 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
1905 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
1906 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
1907 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
1908 static DEVICE_ATTR(create_snap, S_IWUSR, NULL, rbd_snap_add);
1909 
1910 static struct attribute *rbd_attrs[] = {
1911 	&dev_attr_size.attr,
1912 	&dev_attr_major.attr,
1913 	&dev_attr_client_id.attr,
1914 	&dev_attr_pool.attr,
1915 	&dev_attr_name.attr,
1916 	&dev_attr_current_snap.attr,
1917 	&dev_attr_refresh.attr,
1918 	&dev_attr_create_snap.attr,
1919 	NULL
1920 };
1921 
1922 static struct attribute_group rbd_attr_group = {
1923 	.attrs = rbd_attrs,
1924 };
1925 
1926 static const struct attribute_group *rbd_attr_groups[] = {
1927 	&rbd_attr_group,
1928 	NULL
1929 };
1930 
1931 static void rbd_sysfs_dev_release(struct device *dev)
1932 {
1933 }
1934 
1935 static struct device_type rbd_device_type = {
1936 	.name		= "rbd",
1937 	.groups		= rbd_attr_groups,
1938 	.release	= rbd_sysfs_dev_release,
1939 };
1940 
1941 
1942 /*
1943   sysfs - snapshots
1944 */
1945 
1946 static ssize_t rbd_snap_size_show(struct device *dev,
1947 				  struct device_attribute *attr,
1948 				  char *buf)
1949 {
1950 	struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1951 
1952 	return sprintf(buf, "%zd\n", snap->size);
1953 }
1954 
1955 static ssize_t rbd_snap_id_show(struct device *dev,
1956 				struct device_attribute *attr,
1957 				char *buf)
1958 {
1959 	struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1960 
1961 	return sprintf(buf, "%llu\n", (unsigned long long) snap->id);
1962 }
1963 
1964 static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
1965 static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
1966 
1967 static struct attribute *rbd_snap_attrs[] = {
1968 	&dev_attr_snap_size.attr,
1969 	&dev_attr_snap_id.attr,
1970 	NULL,
1971 };
1972 
1973 static struct attribute_group rbd_snap_attr_group = {
1974 	.attrs = rbd_snap_attrs,
1975 };
1976 
1977 static void rbd_snap_dev_release(struct device *dev)
1978 {
1979 	struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1980 	kfree(snap->name);
1981 	kfree(snap);
1982 }
1983 
1984 static const struct attribute_group *rbd_snap_attr_groups[] = {
1985 	&rbd_snap_attr_group,
1986 	NULL
1987 };
1988 
1989 static struct device_type rbd_snap_device_type = {
1990 	.groups		= rbd_snap_attr_groups,
1991 	.release	= rbd_snap_dev_release,
1992 };
1993 
1994 static void __rbd_remove_snap_dev(struct rbd_device *rbd_dev,
1995 				  struct rbd_snap *snap)
1996 {
1997 	list_del(&snap->node);
1998 	device_unregister(&snap->dev);
1999 }
2000 
2001 static int rbd_register_snap_dev(struct rbd_device *rbd_dev,
2002 				  struct rbd_snap *snap,
2003 				  struct device *parent)
2004 {
2005 	struct device *dev = &snap->dev;
2006 	int ret;
2007 
2008 	dev->type = &rbd_snap_device_type;
2009 	dev->parent = parent;
2010 	dev->release = rbd_snap_dev_release;
2011 	dev_set_name(dev, "snap_%s", snap->name);
2012 	ret = device_register(dev);
2013 
2014 	return ret;
2015 }
2016 
2017 static int __rbd_add_snap_dev(struct rbd_device *rbd_dev,
2018 			      int i, const char *name,
2019 			      struct rbd_snap **snapp)
2020 {
2021 	int ret;
2022 	struct rbd_snap *snap = kzalloc(sizeof(*snap), GFP_KERNEL);
2023 	if (!snap)
2024 		return -ENOMEM;
2025 	snap->name = kstrdup(name, GFP_KERNEL);
2026 	snap->size = rbd_dev->header.snap_sizes[i];
2027 	snap->id = rbd_dev->header.snapc->snaps[i];
2028 	if (device_is_registered(&rbd_dev->dev)) {
2029 		ret = rbd_register_snap_dev(rbd_dev, snap,
2030 					     &rbd_dev->dev);
2031 		if (ret < 0)
2032 			goto err;
2033 	}
2034 	*snapp = snap;
2035 	return 0;
2036 err:
2037 	kfree(snap->name);
2038 	kfree(snap);
2039 	return ret;
2040 }
2041 
2042 /*
2043  * search for the previous snap in a null delimited string list
2044  */
2045 const char *rbd_prev_snap_name(const char *name, const char *start)
2046 {
2047 	if (name < start + 2)
2048 		return NULL;
2049 
2050 	name -= 2;
2051 	while (*name) {
2052 		if (name == start)
2053 			return start;
2054 		name--;
2055 	}
2056 	return name + 1;
2057 }
2058 
2059 /*
2060  * compare the old list of snapshots that we have to what's in the header
2061  * and update it accordingly. Note that the header holds the snapshots
2062  * in a reverse order (from newest to oldest) and we need to go from
2063  * older to new so that we don't get a duplicate snap name when
2064  * doing the process (e.g., removed snapshot and recreated a new
2065  * one with the same name.
2066  */
2067 static int __rbd_init_snaps_header(struct rbd_device *rbd_dev)
2068 {
2069 	const char *name, *first_name;
2070 	int i = rbd_dev->header.total_snaps;
2071 	struct rbd_snap *snap, *old_snap = NULL;
2072 	int ret;
2073 	struct list_head *p, *n;
2074 
2075 	first_name = rbd_dev->header.snap_names;
2076 	name = first_name + rbd_dev->header.snap_names_len;
2077 
2078 	list_for_each_prev_safe(p, n, &rbd_dev->snaps) {
2079 		u64 cur_id;
2080 
2081 		old_snap = list_entry(p, struct rbd_snap, node);
2082 
2083 		if (i)
2084 			cur_id = rbd_dev->header.snapc->snaps[i - 1];
2085 
2086 		if (!i || old_snap->id < cur_id) {
2087 			/* old_snap->id was skipped, thus was removed */
2088 			__rbd_remove_snap_dev(rbd_dev, old_snap);
2089 			continue;
2090 		}
2091 		if (old_snap->id == cur_id) {
2092 			/* we have this snapshot already */
2093 			i--;
2094 			name = rbd_prev_snap_name(name, first_name);
2095 			continue;
2096 		}
2097 		for (; i > 0;
2098 		     i--, name = rbd_prev_snap_name(name, first_name)) {
2099 			if (!name) {
2100 				WARN_ON(1);
2101 				return -EINVAL;
2102 			}
2103 			cur_id = rbd_dev->header.snapc->snaps[i];
2104 			/* snapshot removal? handle it above */
2105 			if (cur_id >= old_snap->id)
2106 				break;
2107 			/* a new snapshot */
2108 			ret = __rbd_add_snap_dev(rbd_dev, i - 1, name, &snap);
2109 			if (ret < 0)
2110 				return ret;
2111 
2112 			/* note that we add it backward so using n and not p */
2113 			list_add(&snap->node, n);
2114 			p = &snap->node;
2115 		}
2116 	}
2117 	/* we're done going over the old snap list, just add what's left */
2118 	for (; i > 0; i--) {
2119 		name = rbd_prev_snap_name(name, first_name);
2120 		if (!name) {
2121 			WARN_ON(1);
2122 			return -EINVAL;
2123 		}
2124 		ret = __rbd_add_snap_dev(rbd_dev, i - 1, name, &snap);
2125 		if (ret < 0)
2126 			return ret;
2127 		list_add(&snap->node, &rbd_dev->snaps);
2128 	}
2129 
2130 	return 0;
2131 }
2132 
2133 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
2134 {
2135 	int ret;
2136 	struct device *dev;
2137 	struct rbd_snap *snap;
2138 
2139 	mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2140 	dev = &rbd_dev->dev;
2141 
2142 	dev->bus = &rbd_bus_type;
2143 	dev->type = &rbd_device_type;
2144 	dev->parent = &rbd_root_dev;
2145 	dev->release = rbd_dev_release;
2146 	dev_set_name(dev, "%d", rbd_dev->id);
2147 	ret = device_register(dev);
2148 	if (ret < 0)
2149 		goto out;
2150 
2151 	list_for_each_entry(snap, &rbd_dev->snaps, node) {
2152 		ret = rbd_register_snap_dev(rbd_dev, snap,
2153 					     &rbd_dev->dev);
2154 		if (ret < 0)
2155 			break;
2156 	}
2157 out:
2158 	mutex_unlock(&ctl_mutex);
2159 	return ret;
2160 }
2161 
2162 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
2163 {
2164 	device_unregister(&rbd_dev->dev);
2165 }
2166 
2167 static int rbd_init_watch_dev(struct rbd_device *rbd_dev)
2168 {
2169 	int ret, rc;
2170 
2171 	do {
2172 		ret = rbd_req_sync_watch(rbd_dev, rbd_dev->obj_md_name,
2173 					 rbd_dev->header.obj_version);
2174 		if (ret == -ERANGE) {
2175 			mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2176 			rc = __rbd_update_snaps(rbd_dev);
2177 			mutex_unlock(&ctl_mutex);
2178 			if (rc < 0)
2179 				return rc;
2180 		}
2181 	} while (ret == -ERANGE);
2182 
2183 	return ret;
2184 }
2185 
2186 static atomic64_t rbd_id_max = ATOMIC64_INIT(0);
2187 
2188 /*
2189  * Get a unique rbd identifier for the given new rbd_dev, and add
2190  * the rbd_dev to the global list.  The minimum rbd id is 1.
2191  */
2192 static void rbd_id_get(struct rbd_device *rbd_dev)
2193 {
2194 	rbd_dev->id = atomic64_inc_return(&rbd_id_max);
2195 
2196 	spin_lock(&rbd_dev_list_lock);
2197 	list_add_tail(&rbd_dev->node, &rbd_dev_list);
2198 	spin_unlock(&rbd_dev_list_lock);
2199 }
2200 
2201 /*
2202  * Remove an rbd_dev from the global list, and record that its
2203  * identifier is no longer in use.
2204  */
2205 static void rbd_id_put(struct rbd_device *rbd_dev)
2206 {
2207 	struct list_head *tmp;
2208 	int rbd_id = rbd_dev->id;
2209 	int max_id;
2210 
2211 	BUG_ON(rbd_id < 1);
2212 
2213 	spin_lock(&rbd_dev_list_lock);
2214 	list_del_init(&rbd_dev->node);
2215 
2216 	/*
2217 	 * If the id being "put" is not the current maximum, there
2218 	 * is nothing special we need to do.
2219 	 */
2220 	if (rbd_id != atomic64_read(&rbd_id_max)) {
2221 		spin_unlock(&rbd_dev_list_lock);
2222 		return;
2223 	}
2224 
2225 	/*
2226 	 * We need to update the current maximum id.  Search the
2227 	 * list to find out what it is.  We're more likely to find
2228 	 * the maximum at the end, so search the list backward.
2229 	 */
2230 	max_id = 0;
2231 	list_for_each_prev(tmp, &rbd_dev_list) {
2232 		struct rbd_device *rbd_dev;
2233 
2234 		rbd_dev = list_entry(tmp, struct rbd_device, node);
2235 		if (rbd_id > max_id)
2236 			max_id = rbd_id;
2237 	}
2238 	spin_unlock(&rbd_dev_list_lock);
2239 
2240 	/*
2241 	 * The max id could have been updated by rbd_id_get(), in
2242 	 * which case it now accurately reflects the new maximum.
2243 	 * Be careful not to overwrite the maximum value in that
2244 	 * case.
2245 	 */
2246 	atomic64_cmpxchg(&rbd_id_max, rbd_id, max_id);
2247 }
2248 
2249 /*
2250  * Skips over white space at *buf, and updates *buf to point to the
2251  * first found non-space character (if any). Returns the length of
2252  * the token (string of non-white space characters) found.  Note
2253  * that *buf must be terminated with '\0'.
2254  */
2255 static inline size_t next_token(const char **buf)
2256 {
2257         /*
2258         * These are the characters that produce nonzero for
2259         * isspace() in the "C" and "POSIX" locales.
2260         */
2261         const char *spaces = " \f\n\r\t\v";
2262 
2263         *buf += strspn(*buf, spaces);	/* Find start of token */
2264 
2265 	return strcspn(*buf, spaces);   /* Return token length */
2266 }
2267 
2268 /*
2269  * Finds the next token in *buf, and if the provided token buffer is
2270  * big enough, copies the found token into it.  The result, if
2271  * copied, is guaranteed to be terminated with '\0'.  Note that *buf
2272  * must be terminated with '\0' on entry.
2273  *
2274  * Returns the length of the token found (not including the '\0').
2275  * Return value will be 0 if no token is found, and it will be >=
2276  * token_size if the token would not fit.
2277  *
2278  * The *buf pointer will be updated to point beyond the end of the
2279  * found token.  Note that this occurs even if the token buffer is
2280  * too small to hold it.
2281  */
2282 static inline size_t copy_token(const char **buf,
2283 				char *token,
2284 				size_t token_size)
2285 {
2286         size_t len;
2287 
2288 	len = next_token(buf);
2289 	if (len < token_size) {
2290 		memcpy(token, *buf, len);
2291 		*(token + len) = '\0';
2292 	}
2293 	*buf += len;
2294 
2295         return len;
2296 }
2297 
2298 /*
2299  * This fills in the pool_name, obj, obj_len, snap_name, obj_len,
2300  * rbd_dev, rbd_md_name, and name fields of the given rbd_dev, based
2301  * on the list of monitor addresses and other options provided via
2302  * /sys/bus/rbd/add.
2303  */
2304 static int rbd_add_parse_args(struct rbd_device *rbd_dev,
2305 			      const char *buf,
2306 			      const char **mon_addrs,
2307 			      size_t *mon_addrs_size,
2308 			      char *options,
2309 			      size_t options_size)
2310 {
2311 	size_t	len;
2312 
2313 	/* The first four tokens are required */
2314 
2315 	len = next_token(&buf);
2316 	if (!len)
2317 		return -EINVAL;
2318 	*mon_addrs_size = len + 1;
2319 	*mon_addrs = buf;
2320 
2321 	buf += len;
2322 
2323 	len = copy_token(&buf, options, options_size);
2324 	if (!len || len >= options_size)
2325 		return -EINVAL;
2326 
2327 	len = copy_token(&buf, rbd_dev->pool_name, sizeof (rbd_dev->pool_name));
2328 	if (!len || len >= sizeof (rbd_dev->pool_name))
2329 		return -EINVAL;
2330 
2331 	len = copy_token(&buf, rbd_dev->obj, sizeof (rbd_dev->obj));
2332 	if (!len || len >= sizeof (rbd_dev->obj))
2333 		return -EINVAL;
2334 
2335 	/* We have the object length in hand, save it. */
2336 
2337 	rbd_dev->obj_len = len;
2338 
2339 	BUILD_BUG_ON(RBD_MAX_MD_NAME_LEN
2340 				< RBD_MAX_OBJ_NAME_LEN + sizeof (RBD_SUFFIX));
2341 	sprintf(rbd_dev->obj_md_name, "%s%s", rbd_dev->obj, RBD_SUFFIX);
2342 
2343 	/*
2344 	 * The snapshot name is optional, but it's an error if it's
2345 	 * too long.  If no snapshot is supplied, fill in the default.
2346 	 */
2347 	len = copy_token(&buf, rbd_dev->snap_name, sizeof (rbd_dev->snap_name));
2348 	if (!len)
2349 		memcpy(rbd_dev->snap_name, RBD_SNAP_HEAD_NAME,
2350 			sizeof (RBD_SNAP_HEAD_NAME));
2351 	else if (len >= sizeof (rbd_dev->snap_name))
2352 		return -EINVAL;
2353 
2354 	return 0;
2355 }
2356 
2357 static ssize_t rbd_add(struct bus_type *bus,
2358 		       const char *buf,
2359 		       size_t count)
2360 {
2361 	struct rbd_device *rbd_dev;
2362 	const char *mon_addrs = NULL;
2363 	size_t mon_addrs_size = 0;
2364 	char *options = NULL;
2365 	struct ceph_osd_client *osdc;
2366 	int rc = -ENOMEM;
2367 
2368 	if (!try_module_get(THIS_MODULE))
2369 		return -ENODEV;
2370 
2371 	rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
2372 	if (!rbd_dev)
2373 		goto err_nomem;
2374 	options = kmalloc(count, GFP_KERNEL);
2375 	if (!options)
2376 		goto err_nomem;
2377 
2378 	/* static rbd_device initialization */
2379 	spin_lock_init(&rbd_dev->lock);
2380 	INIT_LIST_HEAD(&rbd_dev->node);
2381 	INIT_LIST_HEAD(&rbd_dev->snaps);
2382 	init_rwsem(&rbd_dev->header_rwsem);
2383 
2384 	init_rwsem(&rbd_dev->header_rwsem);
2385 
2386 	/* generate unique id: find highest unique id, add one */
2387 	rbd_id_get(rbd_dev);
2388 
2389 	/* Fill in the device name, now that we have its id. */
2390 	BUILD_BUG_ON(DEV_NAME_LEN
2391 			< sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
2392 	sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->id);
2393 
2394 	/* parse add command */
2395 	rc = rbd_add_parse_args(rbd_dev, buf, &mon_addrs, &mon_addrs_size,
2396 				options, count);
2397 	if (rc)
2398 		goto err_put_id;
2399 
2400 	rbd_dev->rbd_client = rbd_get_client(mon_addrs, mon_addrs_size - 1,
2401 						options);
2402 	if (IS_ERR(rbd_dev->rbd_client)) {
2403 		rc = PTR_ERR(rbd_dev->rbd_client);
2404 		goto err_put_id;
2405 	}
2406 
2407 	/* pick the pool */
2408 	osdc = &rbd_dev->rbd_client->client->osdc;
2409 	rc = ceph_pg_poolid_by_name(osdc->osdmap, rbd_dev->pool_name);
2410 	if (rc < 0)
2411 		goto err_out_client;
2412 	rbd_dev->poolid = rc;
2413 
2414 	/* register our block device */
2415 	rc = register_blkdev(0, rbd_dev->name);
2416 	if (rc < 0)
2417 		goto err_out_client;
2418 	rbd_dev->major = rc;
2419 
2420 	rc = rbd_bus_add_dev(rbd_dev);
2421 	if (rc)
2422 		goto err_out_blkdev;
2423 
2424 	/*
2425 	 * At this point cleanup in the event of an error is the job
2426 	 * of the sysfs code (initiated by rbd_bus_del_dev()).
2427 	 *
2428 	 * Set up and announce blkdev mapping.
2429 	 */
2430 	rc = rbd_init_disk(rbd_dev);
2431 	if (rc)
2432 		goto err_out_bus;
2433 
2434 	rc = rbd_init_watch_dev(rbd_dev);
2435 	if (rc)
2436 		goto err_out_bus;
2437 
2438 	return count;
2439 
2440 err_out_bus:
2441 	/* this will also clean up rest of rbd_dev stuff */
2442 
2443 	rbd_bus_del_dev(rbd_dev);
2444 	kfree(options);
2445 	return rc;
2446 
2447 err_out_blkdev:
2448 	unregister_blkdev(rbd_dev->major, rbd_dev->name);
2449 err_out_client:
2450 	rbd_put_client(rbd_dev);
2451 err_put_id:
2452 	rbd_id_put(rbd_dev);
2453 err_nomem:
2454 	kfree(options);
2455 	kfree(rbd_dev);
2456 
2457 	dout("Error adding device %s\n", buf);
2458 	module_put(THIS_MODULE);
2459 
2460 	return (ssize_t) rc;
2461 }
2462 
2463 static struct rbd_device *__rbd_get_dev(unsigned long id)
2464 {
2465 	struct list_head *tmp;
2466 	struct rbd_device *rbd_dev;
2467 
2468 	spin_lock(&rbd_dev_list_lock);
2469 	list_for_each(tmp, &rbd_dev_list) {
2470 		rbd_dev = list_entry(tmp, struct rbd_device, node);
2471 		if (rbd_dev->id == id) {
2472 			spin_unlock(&rbd_dev_list_lock);
2473 			return rbd_dev;
2474 		}
2475 	}
2476 	spin_unlock(&rbd_dev_list_lock);
2477 	return NULL;
2478 }
2479 
2480 static void rbd_dev_release(struct device *dev)
2481 {
2482 	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2483 
2484 	if (rbd_dev->watch_request) {
2485 		struct ceph_client *client = rbd_dev->rbd_client->client;
2486 
2487 		ceph_osdc_unregister_linger_request(&client->osdc,
2488 						    rbd_dev->watch_request);
2489 	}
2490 	if (rbd_dev->watch_event)
2491 		rbd_req_sync_unwatch(rbd_dev, rbd_dev->obj_md_name);
2492 
2493 	rbd_put_client(rbd_dev);
2494 
2495 	/* clean up and free blkdev */
2496 	rbd_free_disk(rbd_dev);
2497 	unregister_blkdev(rbd_dev->major, rbd_dev->name);
2498 
2499 	/* done with the id, and with the rbd_dev */
2500 	rbd_id_put(rbd_dev);
2501 	kfree(rbd_dev);
2502 
2503 	/* release module ref */
2504 	module_put(THIS_MODULE);
2505 }
2506 
2507 static ssize_t rbd_remove(struct bus_type *bus,
2508 			  const char *buf,
2509 			  size_t count)
2510 {
2511 	struct rbd_device *rbd_dev = NULL;
2512 	int target_id, rc;
2513 	unsigned long ul;
2514 	int ret = count;
2515 
2516 	rc = strict_strtoul(buf, 10, &ul);
2517 	if (rc)
2518 		return rc;
2519 
2520 	/* convert to int; abort if we lost anything in the conversion */
2521 	target_id = (int) ul;
2522 	if (target_id != ul)
2523 		return -EINVAL;
2524 
2525 	mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2526 
2527 	rbd_dev = __rbd_get_dev(target_id);
2528 	if (!rbd_dev) {
2529 		ret = -ENOENT;
2530 		goto done;
2531 	}
2532 
2533 	__rbd_remove_all_snaps(rbd_dev);
2534 	rbd_bus_del_dev(rbd_dev);
2535 
2536 done:
2537 	mutex_unlock(&ctl_mutex);
2538 	return ret;
2539 }
2540 
2541 static ssize_t rbd_snap_add(struct device *dev,
2542 			    struct device_attribute *attr,
2543 			    const char *buf,
2544 			    size_t count)
2545 {
2546 	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2547 	int ret;
2548 	char *name = kmalloc(count + 1, GFP_KERNEL);
2549 	if (!name)
2550 		return -ENOMEM;
2551 
2552 	snprintf(name, count, "%s", buf);
2553 
2554 	mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2555 
2556 	ret = rbd_header_add_snap(rbd_dev,
2557 				  name, GFP_KERNEL);
2558 	if (ret < 0)
2559 		goto err_unlock;
2560 
2561 	ret = __rbd_update_snaps(rbd_dev);
2562 	if (ret < 0)
2563 		goto err_unlock;
2564 
2565 	/* shouldn't hold ctl_mutex when notifying.. notify might
2566 	   trigger a watch callback that would need to get that mutex */
2567 	mutex_unlock(&ctl_mutex);
2568 
2569 	/* make a best effort, don't error if failed */
2570 	rbd_req_sync_notify(rbd_dev, rbd_dev->obj_md_name);
2571 
2572 	ret = count;
2573 	kfree(name);
2574 	return ret;
2575 
2576 err_unlock:
2577 	mutex_unlock(&ctl_mutex);
2578 	kfree(name);
2579 	return ret;
2580 }
2581 
2582 /*
2583  * create control files in sysfs
2584  * /sys/bus/rbd/...
2585  */
2586 static int rbd_sysfs_init(void)
2587 {
2588 	int ret;
2589 
2590 	ret = device_register(&rbd_root_dev);
2591 	if (ret < 0)
2592 		return ret;
2593 
2594 	ret = bus_register(&rbd_bus_type);
2595 	if (ret < 0)
2596 		device_unregister(&rbd_root_dev);
2597 
2598 	return ret;
2599 }
2600 
2601 static void rbd_sysfs_cleanup(void)
2602 {
2603 	bus_unregister(&rbd_bus_type);
2604 	device_unregister(&rbd_root_dev);
2605 }
2606 
2607 int __init rbd_init(void)
2608 {
2609 	int rc;
2610 
2611 	rc = rbd_sysfs_init();
2612 	if (rc)
2613 		return rc;
2614 	pr_info("loaded " RBD_DRV_NAME_LONG "\n");
2615 	return 0;
2616 }
2617 
2618 void __exit rbd_exit(void)
2619 {
2620 	rbd_sysfs_cleanup();
2621 }
2622 
2623 module_init(rbd_init);
2624 module_exit(rbd_exit);
2625 
2626 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
2627 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
2628 MODULE_DESCRIPTION("rados block device");
2629 
2630 /* following authorship retained from original osdblk.c */
2631 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
2632 
2633 MODULE_LICENSE("GPL");
2634