xref: /openbmc/linux/drivers/block/rbd.c (revision 95e9fd10)
1 /*
2    rbd.c -- Export ceph rados objects as a Linux block device
3 
4 
5    based on drivers/block/osdblk.c:
6 
7    Copyright 2009 Red Hat, Inc.
8 
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation.
12 
13    This program is distributed in the hope that it will be useful,
14    but WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16    GNU General Public License for more details.
17 
18    You should have received a copy of the GNU General Public License
19    along with this program; see the file COPYING.  If not, write to
20    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
21 
22 
23 
24    For usage instructions, please refer to:
25 
26                  Documentation/ABI/testing/sysfs-bus-rbd
27 
28  */
29 
30 #include <linux/ceph/libceph.h>
31 #include <linux/ceph/osd_client.h>
32 #include <linux/ceph/mon_client.h>
33 #include <linux/ceph/decode.h>
34 #include <linux/parser.h>
35 
36 #include <linux/kernel.h>
37 #include <linux/device.h>
38 #include <linux/module.h>
39 #include <linux/fs.h>
40 #include <linux/blkdev.h>
41 
42 #include "rbd_types.h"
43 
44 /*
45  * The basic unit of block I/O is a sector.  It is interpreted in a
46  * number of contexts in Linux (blk, bio, genhd), but the default is
47  * universally 512 bytes.  These symbols are just slightly more
48  * meaningful than the bare numbers they represent.
49  */
50 #define	SECTOR_SHIFT	9
51 #define	SECTOR_SIZE	(1ULL << SECTOR_SHIFT)
52 
53 #define RBD_DRV_NAME "rbd"
54 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
55 
56 #define RBD_MINORS_PER_MAJOR	256		/* max minors per blkdev */
57 
58 #define RBD_MAX_SNAP_NAME_LEN	32
59 #define RBD_MAX_OPT_LEN		1024
60 
61 #define RBD_SNAP_HEAD_NAME	"-"
62 
63 /*
64  * An RBD device name will be "rbd#", where the "rbd" comes from
65  * RBD_DRV_NAME above, and # is a unique integer identifier.
66  * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
67  * enough to hold all possible device names.
68  */
69 #define DEV_NAME_LEN		32
70 #define MAX_INT_FORMAT_WIDTH	((5 * sizeof (int)) / 2 + 1)
71 
72 #define RBD_NOTIFY_TIMEOUT_DEFAULT 10
73 
74 /*
75  * block device image metadata (in-memory version)
76  */
77 struct rbd_image_header {
78 	u64 image_size;
79 	char *object_prefix;
80 	__u8 obj_order;
81 	__u8 crypt_type;
82 	__u8 comp_type;
83 	struct ceph_snap_context *snapc;
84 	size_t snap_names_len;
85 	u32 total_snaps;
86 
87 	char *snap_names;
88 	u64 *snap_sizes;
89 
90 	u64 obj_version;
91 };
92 
93 struct rbd_options {
94 	int	notify_timeout;
95 };
96 
97 /*
98  * an instance of the client.  multiple devices may share an rbd client.
99  */
100 struct rbd_client {
101 	struct ceph_client	*client;
102 	struct rbd_options	*rbd_opts;
103 	struct kref		kref;
104 	struct list_head	node;
105 };
106 
107 /*
108  * a request completion status
109  */
110 struct rbd_req_status {
111 	int done;
112 	int rc;
113 	u64 bytes;
114 };
115 
116 /*
117  * a collection of requests
118  */
119 struct rbd_req_coll {
120 	int			total;
121 	int			num_done;
122 	struct kref		kref;
123 	struct rbd_req_status	status[0];
124 };
125 
126 /*
127  * a single io request
128  */
129 struct rbd_request {
130 	struct request		*rq;		/* blk layer request */
131 	struct bio		*bio;		/* cloned bio */
132 	struct page		**pages;	/* list of used pages */
133 	u64			len;
134 	int			coll_index;
135 	struct rbd_req_coll	*coll;
136 };
137 
138 struct rbd_snap {
139 	struct	device		dev;
140 	const char		*name;
141 	u64			size;
142 	struct list_head	node;
143 	u64			id;
144 };
145 
146 /*
147  * a single device
148  */
149 struct rbd_device {
150 	int			dev_id;		/* blkdev unique id */
151 
152 	int			major;		/* blkdev assigned major */
153 	struct gendisk		*disk;		/* blkdev's gendisk and rq */
154 	struct request_queue	*q;
155 
156 	struct rbd_client	*rbd_client;
157 
158 	char			name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
159 
160 	spinlock_t		lock;		/* queue lock */
161 
162 	struct rbd_image_header	header;
163 	char			*image_name;
164 	size_t			image_name_len;
165 	char			*header_name;
166 	char			*pool_name;
167 	int			pool_id;
168 
169 	struct ceph_osd_event   *watch_event;
170 	struct ceph_osd_request *watch_request;
171 
172 	/* protects updating the header */
173 	struct rw_semaphore     header_rwsem;
174 	/* name of the snapshot this device reads from */
175 	char                    *snap_name;
176 	/* id of the snapshot this device reads from */
177 	u64                     snap_id;	/* current snapshot id */
178 	/* whether the snap_id this device reads from still exists */
179 	bool                    snap_exists;
180 	int                     read_only;
181 
182 	struct list_head	node;
183 
184 	/* list of snapshots */
185 	struct list_head	snaps;
186 
187 	/* sysfs related */
188 	struct device		dev;
189 };
190 
191 static DEFINE_MUTEX(ctl_mutex);	  /* Serialize open/close/setup/teardown */
192 
193 static LIST_HEAD(rbd_dev_list);    /* devices */
194 static DEFINE_SPINLOCK(rbd_dev_list_lock);
195 
196 static LIST_HEAD(rbd_client_list);		/* clients */
197 static DEFINE_SPINLOCK(rbd_client_list_lock);
198 
199 static int __rbd_init_snaps_header(struct rbd_device *rbd_dev);
200 static void rbd_dev_release(struct device *dev);
201 static ssize_t rbd_snap_add(struct device *dev,
202 			    struct device_attribute *attr,
203 			    const char *buf,
204 			    size_t count);
205 static void __rbd_remove_snap_dev(struct rbd_snap *snap);
206 
207 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
208 		       size_t count);
209 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
210 			  size_t count);
211 
212 static struct bus_attribute rbd_bus_attrs[] = {
213 	__ATTR(add, S_IWUSR, NULL, rbd_add),
214 	__ATTR(remove, S_IWUSR, NULL, rbd_remove),
215 	__ATTR_NULL
216 };
217 
218 static struct bus_type rbd_bus_type = {
219 	.name		= "rbd",
220 	.bus_attrs	= rbd_bus_attrs,
221 };
222 
223 static void rbd_root_dev_release(struct device *dev)
224 {
225 }
226 
227 static struct device rbd_root_dev = {
228 	.init_name =    "rbd",
229 	.release =      rbd_root_dev_release,
230 };
231 
232 
233 static struct device *rbd_get_dev(struct rbd_device *rbd_dev)
234 {
235 	return get_device(&rbd_dev->dev);
236 }
237 
238 static void rbd_put_dev(struct rbd_device *rbd_dev)
239 {
240 	put_device(&rbd_dev->dev);
241 }
242 
243 static int rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver);
244 
245 static int rbd_open(struct block_device *bdev, fmode_t mode)
246 {
247 	struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
248 
249 	rbd_get_dev(rbd_dev);
250 
251 	set_device_ro(bdev, rbd_dev->read_only);
252 
253 	if ((mode & FMODE_WRITE) && rbd_dev->read_only)
254 		return -EROFS;
255 
256 	return 0;
257 }
258 
259 static int rbd_release(struct gendisk *disk, fmode_t mode)
260 {
261 	struct rbd_device *rbd_dev = disk->private_data;
262 
263 	rbd_put_dev(rbd_dev);
264 
265 	return 0;
266 }
267 
268 static const struct block_device_operations rbd_bd_ops = {
269 	.owner			= THIS_MODULE,
270 	.open			= rbd_open,
271 	.release		= rbd_release,
272 };
273 
274 /*
275  * Initialize an rbd client instance.
276  * We own *ceph_opts.
277  */
278 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts,
279 					    struct rbd_options *rbd_opts)
280 {
281 	struct rbd_client *rbdc;
282 	int ret = -ENOMEM;
283 
284 	dout("rbd_client_create\n");
285 	rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
286 	if (!rbdc)
287 		goto out_opt;
288 
289 	kref_init(&rbdc->kref);
290 	INIT_LIST_HEAD(&rbdc->node);
291 
292 	mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
293 
294 	rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
295 	if (IS_ERR(rbdc->client))
296 		goto out_mutex;
297 	ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
298 
299 	ret = ceph_open_session(rbdc->client);
300 	if (ret < 0)
301 		goto out_err;
302 
303 	rbdc->rbd_opts = rbd_opts;
304 
305 	spin_lock(&rbd_client_list_lock);
306 	list_add_tail(&rbdc->node, &rbd_client_list);
307 	spin_unlock(&rbd_client_list_lock);
308 
309 	mutex_unlock(&ctl_mutex);
310 
311 	dout("rbd_client_create created %p\n", rbdc);
312 	return rbdc;
313 
314 out_err:
315 	ceph_destroy_client(rbdc->client);
316 out_mutex:
317 	mutex_unlock(&ctl_mutex);
318 	kfree(rbdc);
319 out_opt:
320 	if (ceph_opts)
321 		ceph_destroy_options(ceph_opts);
322 	return ERR_PTR(ret);
323 }
324 
325 /*
326  * Find a ceph client with specific addr and configuration.
327  */
328 static struct rbd_client *__rbd_client_find(struct ceph_options *ceph_opts)
329 {
330 	struct rbd_client *client_node;
331 
332 	if (ceph_opts->flags & CEPH_OPT_NOSHARE)
333 		return NULL;
334 
335 	list_for_each_entry(client_node, &rbd_client_list, node)
336 		if (!ceph_compare_options(ceph_opts, client_node->client))
337 			return client_node;
338 	return NULL;
339 }
340 
341 /*
342  * mount options
343  */
344 enum {
345 	Opt_notify_timeout,
346 	Opt_last_int,
347 	/* int args above */
348 	Opt_last_string,
349 	/* string args above */
350 };
351 
352 static match_table_t rbd_opts_tokens = {
353 	{Opt_notify_timeout, "notify_timeout=%d"},
354 	/* int args above */
355 	/* string args above */
356 	{-1, NULL}
357 };
358 
359 static int parse_rbd_opts_token(char *c, void *private)
360 {
361 	struct rbd_options *rbd_opts = private;
362 	substring_t argstr[MAX_OPT_ARGS];
363 	int token, intval, ret;
364 
365 	token = match_token(c, rbd_opts_tokens, argstr);
366 	if (token < 0)
367 		return -EINVAL;
368 
369 	if (token < Opt_last_int) {
370 		ret = match_int(&argstr[0], &intval);
371 		if (ret < 0) {
372 			pr_err("bad mount option arg (not int) "
373 			       "at '%s'\n", c);
374 			return ret;
375 		}
376 		dout("got int token %d val %d\n", token, intval);
377 	} else if (token > Opt_last_int && token < Opt_last_string) {
378 		dout("got string token %d val %s\n", token,
379 		     argstr[0].from);
380 	} else {
381 		dout("got token %d\n", token);
382 	}
383 
384 	switch (token) {
385 	case Opt_notify_timeout:
386 		rbd_opts->notify_timeout = intval;
387 		break;
388 	default:
389 		BUG_ON(token);
390 	}
391 	return 0;
392 }
393 
394 /*
395  * Get a ceph client with specific addr and configuration, if one does
396  * not exist create it.
397  */
398 static struct rbd_client *rbd_get_client(const char *mon_addr,
399 					 size_t mon_addr_len,
400 					 char *options)
401 {
402 	struct rbd_client *rbdc;
403 	struct ceph_options *ceph_opts;
404 	struct rbd_options *rbd_opts;
405 
406 	rbd_opts = kzalloc(sizeof(*rbd_opts), GFP_KERNEL);
407 	if (!rbd_opts)
408 		return ERR_PTR(-ENOMEM);
409 
410 	rbd_opts->notify_timeout = RBD_NOTIFY_TIMEOUT_DEFAULT;
411 
412 	ceph_opts = ceph_parse_options(options, mon_addr,
413 					mon_addr + mon_addr_len,
414 					parse_rbd_opts_token, rbd_opts);
415 	if (IS_ERR(ceph_opts)) {
416 		kfree(rbd_opts);
417 		return ERR_CAST(ceph_opts);
418 	}
419 
420 	spin_lock(&rbd_client_list_lock);
421 	rbdc = __rbd_client_find(ceph_opts);
422 	if (rbdc) {
423 		/* using an existing client */
424 		kref_get(&rbdc->kref);
425 		spin_unlock(&rbd_client_list_lock);
426 
427 		ceph_destroy_options(ceph_opts);
428 		kfree(rbd_opts);
429 
430 		return rbdc;
431 	}
432 	spin_unlock(&rbd_client_list_lock);
433 
434 	rbdc = rbd_client_create(ceph_opts, rbd_opts);
435 
436 	if (IS_ERR(rbdc))
437 		kfree(rbd_opts);
438 
439 	return rbdc;
440 }
441 
442 /*
443  * Destroy ceph client
444  *
445  * Caller must hold rbd_client_list_lock.
446  */
447 static void rbd_client_release(struct kref *kref)
448 {
449 	struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
450 
451 	dout("rbd_release_client %p\n", rbdc);
452 	spin_lock(&rbd_client_list_lock);
453 	list_del(&rbdc->node);
454 	spin_unlock(&rbd_client_list_lock);
455 
456 	ceph_destroy_client(rbdc->client);
457 	kfree(rbdc->rbd_opts);
458 	kfree(rbdc);
459 }
460 
461 /*
462  * Drop reference to ceph client node. If it's not referenced anymore, release
463  * it.
464  */
465 static void rbd_put_client(struct rbd_device *rbd_dev)
466 {
467 	kref_put(&rbd_dev->rbd_client->kref, rbd_client_release);
468 	rbd_dev->rbd_client = NULL;
469 }
470 
471 /*
472  * Destroy requests collection
473  */
474 static void rbd_coll_release(struct kref *kref)
475 {
476 	struct rbd_req_coll *coll =
477 		container_of(kref, struct rbd_req_coll, kref);
478 
479 	dout("rbd_coll_release %p\n", coll);
480 	kfree(coll);
481 }
482 
483 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
484 {
485 	return !memcmp(&ondisk->text,
486 			RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT));
487 }
488 
489 /*
490  * Create a new header structure, translate header format from the on-disk
491  * header.
492  */
493 static int rbd_header_from_disk(struct rbd_image_header *header,
494 				 struct rbd_image_header_ondisk *ondisk,
495 				 u32 allocated_snaps)
496 {
497 	u32 snap_count;
498 
499 	if (!rbd_dev_ondisk_valid(ondisk))
500 		return -ENXIO;
501 
502 	snap_count = le32_to_cpu(ondisk->snap_count);
503 	if (snap_count > (SIZE_MAX - sizeof(struct ceph_snap_context))
504 				 / sizeof (u64))
505 		return -EINVAL;
506 	header->snapc = kmalloc(sizeof(struct ceph_snap_context) +
507 				snap_count * sizeof(u64),
508 				GFP_KERNEL);
509 	if (!header->snapc)
510 		return -ENOMEM;
511 
512 	if (snap_count) {
513 		header->snap_names_len = le64_to_cpu(ondisk->snap_names_len);
514 		header->snap_names = kmalloc(header->snap_names_len,
515 					     GFP_KERNEL);
516 		if (!header->snap_names)
517 			goto err_snapc;
518 		header->snap_sizes = kmalloc(snap_count * sizeof(u64),
519 					     GFP_KERNEL);
520 		if (!header->snap_sizes)
521 			goto err_names;
522 	} else {
523 		WARN_ON(ondisk->snap_names_len);
524 		header->snap_names_len = 0;
525 		header->snap_names = NULL;
526 		header->snap_sizes = NULL;
527 	}
528 
529 	header->object_prefix = kmalloc(sizeof (ondisk->block_name) + 1,
530 					GFP_KERNEL);
531 	if (!header->object_prefix)
532 		goto err_sizes;
533 
534 	memcpy(header->object_prefix, ondisk->block_name,
535 	       sizeof(ondisk->block_name));
536 	header->object_prefix[sizeof (ondisk->block_name)] = '\0';
537 
538 	header->image_size = le64_to_cpu(ondisk->image_size);
539 	header->obj_order = ondisk->options.order;
540 	header->crypt_type = ondisk->options.crypt_type;
541 	header->comp_type = ondisk->options.comp_type;
542 
543 	atomic_set(&header->snapc->nref, 1);
544 	header->snapc->seq = le64_to_cpu(ondisk->snap_seq);
545 	header->snapc->num_snaps = snap_count;
546 	header->total_snaps = snap_count;
547 
548 	if (snap_count && allocated_snaps == snap_count) {
549 		int i;
550 
551 		for (i = 0; i < snap_count; i++) {
552 			header->snapc->snaps[i] =
553 				le64_to_cpu(ondisk->snaps[i].id);
554 			header->snap_sizes[i] =
555 				le64_to_cpu(ondisk->snaps[i].image_size);
556 		}
557 
558 		/* copy snapshot names */
559 		memcpy(header->snap_names, &ondisk->snaps[snap_count],
560 			header->snap_names_len);
561 	}
562 
563 	return 0;
564 
565 err_sizes:
566 	kfree(header->snap_sizes);
567 	header->snap_sizes = NULL;
568 err_names:
569 	kfree(header->snap_names);
570 	header->snap_names = NULL;
571 err_snapc:
572 	kfree(header->snapc);
573 	header->snapc = NULL;
574 
575 	return -ENOMEM;
576 }
577 
578 static int snap_by_name(struct rbd_image_header *header, const char *snap_name,
579 			u64 *seq, u64 *size)
580 {
581 	int i;
582 	char *p = header->snap_names;
583 
584 	for (i = 0; i < header->total_snaps; i++) {
585 		if (!strcmp(snap_name, p)) {
586 
587 			/* Found it.  Pass back its id and/or size */
588 
589 			if (seq)
590 				*seq = header->snapc->snaps[i];
591 			if (size)
592 				*size = header->snap_sizes[i];
593 			return i;
594 		}
595 		p += strlen(p) + 1;	/* Skip ahead to the next name */
596 	}
597 	return -ENOENT;
598 }
599 
600 static int rbd_header_set_snap(struct rbd_device *rbd_dev, u64 *size)
601 {
602 	int ret;
603 
604 	down_write(&rbd_dev->header_rwsem);
605 
606 	if (!memcmp(rbd_dev->snap_name, RBD_SNAP_HEAD_NAME,
607 		    sizeof (RBD_SNAP_HEAD_NAME))) {
608 		rbd_dev->snap_id = CEPH_NOSNAP;
609 		rbd_dev->snap_exists = false;
610 		rbd_dev->read_only = 0;
611 		if (size)
612 			*size = rbd_dev->header.image_size;
613 	} else {
614 		u64 snap_id = 0;
615 
616 		ret = snap_by_name(&rbd_dev->header, rbd_dev->snap_name,
617 					&snap_id, size);
618 		if (ret < 0)
619 			goto done;
620 		rbd_dev->snap_id = snap_id;
621 		rbd_dev->snap_exists = true;
622 		rbd_dev->read_only = 1;
623 	}
624 
625 	ret = 0;
626 done:
627 	up_write(&rbd_dev->header_rwsem);
628 	return ret;
629 }
630 
631 static void rbd_header_free(struct rbd_image_header *header)
632 {
633 	kfree(header->object_prefix);
634 	kfree(header->snap_sizes);
635 	kfree(header->snap_names);
636 	ceph_put_snap_context(header->snapc);
637 }
638 
639 /*
640  * get the actual striped segment name, offset and length
641  */
642 static u64 rbd_get_segment(struct rbd_image_header *header,
643 			   const char *object_prefix,
644 			   u64 ofs, u64 len,
645 			   char *seg_name, u64 *segofs)
646 {
647 	u64 seg = ofs >> header->obj_order;
648 
649 	if (seg_name)
650 		snprintf(seg_name, RBD_MAX_SEG_NAME_LEN,
651 			 "%s.%012llx", object_prefix, seg);
652 
653 	ofs = ofs & ((1 << header->obj_order) - 1);
654 	len = min_t(u64, len, (1 << header->obj_order) - ofs);
655 
656 	if (segofs)
657 		*segofs = ofs;
658 
659 	return len;
660 }
661 
662 static int rbd_get_num_segments(struct rbd_image_header *header,
663 				u64 ofs, u64 len)
664 {
665 	u64 start_seg = ofs >> header->obj_order;
666 	u64 end_seg = (ofs + len - 1) >> header->obj_order;
667 	return end_seg - start_seg + 1;
668 }
669 
670 /*
671  * returns the size of an object in the image
672  */
673 static u64 rbd_obj_bytes(struct rbd_image_header *header)
674 {
675 	return 1 << header->obj_order;
676 }
677 
678 /*
679  * bio helpers
680  */
681 
682 static void bio_chain_put(struct bio *chain)
683 {
684 	struct bio *tmp;
685 
686 	while (chain) {
687 		tmp = chain;
688 		chain = chain->bi_next;
689 		bio_put(tmp);
690 	}
691 }
692 
693 /*
694  * zeros a bio chain, starting at specific offset
695  */
696 static void zero_bio_chain(struct bio *chain, int start_ofs)
697 {
698 	struct bio_vec *bv;
699 	unsigned long flags;
700 	void *buf;
701 	int i;
702 	int pos = 0;
703 
704 	while (chain) {
705 		bio_for_each_segment(bv, chain, i) {
706 			if (pos + bv->bv_len > start_ofs) {
707 				int remainder = max(start_ofs - pos, 0);
708 				buf = bvec_kmap_irq(bv, &flags);
709 				memset(buf + remainder, 0,
710 				       bv->bv_len - remainder);
711 				bvec_kunmap_irq(buf, &flags);
712 			}
713 			pos += bv->bv_len;
714 		}
715 
716 		chain = chain->bi_next;
717 	}
718 }
719 
720 /*
721  * bio_chain_clone - clone a chain of bios up to a certain length.
722  * might return a bio_pair that will need to be released.
723  */
724 static struct bio *bio_chain_clone(struct bio **old, struct bio **next,
725 				   struct bio_pair **bp,
726 				   int len, gfp_t gfpmask)
727 {
728 	struct bio *tmp, *old_chain = *old, *new_chain = NULL, *tail = NULL;
729 	int total = 0;
730 
731 	if (*bp) {
732 		bio_pair_release(*bp);
733 		*bp = NULL;
734 	}
735 
736 	while (old_chain && (total < len)) {
737 		tmp = bio_kmalloc(gfpmask, old_chain->bi_max_vecs);
738 		if (!tmp)
739 			goto err_out;
740 
741 		if (total + old_chain->bi_size > len) {
742 			struct bio_pair *bp;
743 
744 			/*
745 			 * this split can only happen with a single paged bio,
746 			 * split_bio will BUG_ON if this is not the case
747 			 */
748 			dout("bio_chain_clone split! total=%d remaining=%d"
749 			     "bi_size=%u\n",
750 			     total, len - total, old_chain->bi_size);
751 
752 			/* split the bio. We'll release it either in the next
753 			   call, or it will have to be released outside */
754 			bp = bio_split(old_chain, (len - total) / SECTOR_SIZE);
755 			if (!bp)
756 				goto err_out;
757 
758 			__bio_clone(tmp, &bp->bio1);
759 
760 			*next = &bp->bio2;
761 		} else {
762 			__bio_clone(tmp, old_chain);
763 			*next = old_chain->bi_next;
764 		}
765 
766 		tmp->bi_bdev = NULL;
767 		gfpmask &= ~__GFP_WAIT;
768 		tmp->bi_next = NULL;
769 
770 		if (!new_chain) {
771 			new_chain = tail = tmp;
772 		} else {
773 			tail->bi_next = tmp;
774 			tail = tmp;
775 		}
776 		old_chain = old_chain->bi_next;
777 
778 		total += tmp->bi_size;
779 	}
780 
781 	BUG_ON(total < len);
782 
783 	if (tail)
784 		tail->bi_next = NULL;
785 
786 	*old = old_chain;
787 
788 	return new_chain;
789 
790 err_out:
791 	dout("bio_chain_clone with err\n");
792 	bio_chain_put(new_chain);
793 	return NULL;
794 }
795 
796 /*
797  * helpers for osd request op vectors.
798  */
799 static struct ceph_osd_req_op *rbd_create_rw_ops(int num_ops,
800 					int opcode, u32 payload_len)
801 {
802 	struct ceph_osd_req_op *ops;
803 
804 	ops = kzalloc(sizeof (*ops) * (num_ops + 1), GFP_NOIO);
805 	if (!ops)
806 		return NULL;
807 
808 	ops[0].op = opcode;
809 
810 	/*
811 	 * op extent offset and length will be set later on
812 	 * in calc_raw_layout()
813 	 */
814 	ops[0].payload_len = payload_len;
815 
816 	return ops;
817 }
818 
819 static void rbd_destroy_ops(struct ceph_osd_req_op *ops)
820 {
821 	kfree(ops);
822 }
823 
824 static void rbd_coll_end_req_index(struct request *rq,
825 				   struct rbd_req_coll *coll,
826 				   int index,
827 				   int ret, u64 len)
828 {
829 	struct request_queue *q;
830 	int min, max, i;
831 
832 	dout("rbd_coll_end_req_index %p index %d ret %d len %llu\n",
833 	     coll, index, ret, (unsigned long long) len);
834 
835 	if (!rq)
836 		return;
837 
838 	if (!coll) {
839 		blk_end_request(rq, ret, len);
840 		return;
841 	}
842 
843 	q = rq->q;
844 
845 	spin_lock_irq(q->queue_lock);
846 	coll->status[index].done = 1;
847 	coll->status[index].rc = ret;
848 	coll->status[index].bytes = len;
849 	max = min = coll->num_done;
850 	while (max < coll->total && coll->status[max].done)
851 		max++;
852 
853 	for (i = min; i<max; i++) {
854 		__blk_end_request(rq, coll->status[i].rc,
855 				  coll->status[i].bytes);
856 		coll->num_done++;
857 		kref_put(&coll->kref, rbd_coll_release);
858 	}
859 	spin_unlock_irq(q->queue_lock);
860 }
861 
862 static void rbd_coll_end_req(struct rbd_request *req,
863 			     int ret, u64 len)
864 {
865 	rbd_coll_end_req_index(req->rq, req->coll, req->coll_index, ret, len);
866 }
867 
868 /*
869  * Send ceph osd request
870  */
871 static int rbd_do_request(struct request *rq,
872 			  struct rbd_device *rbd_dev,
873 			  struct ceph_snap_context *snapc,
874 			  u64 snapid,
875 			  const char *object_name, u64 ofs, u64 len,
876 			  struct bio *bio,
877 			  struct page **pages,
878 			  int num_pages,
879 			  int flags,
880 			  struct ceph_osd_req_op *ops,
881 			  struct rbd_req_coll *coll,
882 			  int coll_index,
883 			  void (*rbd_cb)(struct ceph_osd_request *req,
884 					 struct ceph_msg *msg),
885 			  struct ceph_osd_request **linger_req,
886 			  u64 *ver)
887 {
888 	struct ceph_osd_request *req;
889 	struct ceph_file_layout *layout;
890 	int ret;
891 	u64 bno;
892 	struct timespec mtime = CURRENT_TIME;
893 	struct rbd_request *req_data;
894 	struct ceph_osd_request_head *reqhead;
895 	struct ceph_osd_client *osdc;
896 
897 	req_data = kzalloc(sizeof(*req_data), GFP_NOIO);
898 	if (!req_data) {
899 		if (coll)
900 			rbd_coll_end_req_index(rq, coll, coll_index,
901 					       -ENOMEM, len);
902 		return -ENOMEM;
903 	}
904 
905 	if (coll) {
906 		req_data->coll = coll;
907 		req_data->coll_index = coll_index;
908 	}
909 
910 	dout("rbd_do_request object_name=%s ofs=%llu len=%llu\n", object_name,
911 		(unsigned long long) ofs, (unsigned long long) len);
912 
913 	osdc = &rbd_dev->rbd_client->client->osdc;
914 	req = ceph_osdc_alloc_request(osdc, flags, snapc, ops,
915 					false, GFP_NOIO, pages, bio);
916 	if (!req) {
917 		ret = -ENOMEM;
918 		goto done_pages;
919 	}
920 
921 	req->r_callback = rbd_cb;
922 
923 	req_data->rq = rq;
924 	req_data->bio = bio;
925 	req_data->pages = pages;
926 	req_data->len = len;
927 
928 	req->r_priv = req_data;
929 
930 	reqhead = req->r_request->front.iov_base;
931 	reqhead->snapid = cpu_to_le64(CEPH_NOSNAP);
932 
933 	strncpy(req->r_oid, object_name, sizeof(req->r_oid));
934 	req->r_oid_len = strlen(req->r_oid);
935 
936 	layout = &req->r_file_layout;
937 	memset(layout, 0, sizeof(*layout));
938 	layout->fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
939 	layout->fl_stripe_count = cpu_to_le32(1);
940 	layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
941 	layout->fl_pg_pool = cpu_to_le32(rbd_dev->pool_id);
942 	ceph_calc_raw_layout(osdc, layout, snapid, ofs, &len, &bno,
943 				req, ops);
944 
945 	ceph_osdc_build_request(req, ofs, &len,
946 				ops,
947 				snapc,
948 				&mtime,
949 				req->r_oid, req->r_oid_len);
950 
951 	if (linger_req) {
952 		ceph_osdc_set_request_linger(osdc, req);
953 		*linger_req = req;
954 	}
955 
956 	ret = ceph_osdc_start_request(osdc, req, false);
957 	if (ret < 0)
958 		goto done_err;
959 
960 	if (!rbd_cb) {
961 		ret = ceph_osdc_wait_request(osdc, req);
962 		if (ver)
963 			*ver = le64_to_cpu(req->r_reassert_version.version);
964 		dout("reassert_ver=%llu\n",
965 			(unsigned long long)
966 				le64_to_cpu(req->r_reassert_version.version));
967 		ceph_osdc_put_request(req);
968 	}
969 	return ret;
970 
971 done_err:
972 	bio_chain_put(req_data->bio);
973 	ceph_osdc_put_request(req);
974 done_pages:
975 	rbd_coll_end_req(req_data, ret, len);
976 	kfree(req_data);
977 	return ret;
978 }
979 
980 /*
981  * Ceph osd op callback
982  */
983 static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
984 {
985 	struct rbd_request *req_data = req->r_priv;
986 	struct ceph_osd_reply_head *replyhead;
987 	struct ceph_osd_op *op;
988 	__s32 rc;
989 	u64 bytes;
990 	int read_op;
991 
992 	/* parse reply */
993 	replyhead = msg->front.iov_base;
994 	WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
995 	op = (void *)(replyhead + 1);
996 	rc = le32_to_cpu(replyhead->result);
997 	bytes = le64_to_cpu(op->extent.length);
998 	read_op = (le16_to_cpu(op->op) == CEPH_OSD_OP_READ);
999 
1000 	dout("rbd_req_cb bytes=%llu readop=%d rc=%d\n",
1001 		(unsigned long long) bytes, read_op, (int) rc);
1002 
1003 	if (rc == -ENOENT && read_op) {
1004 		zero_bio_chain(req_data->bio, 0);
1005 		rc = 0;
1006 	} else if (rc == 0 && read_op && bytes < req_data->len) {
1007 		zero_bio_chain(req_data->bio, bytes);
1008 		bytes = req_data->len;
1009 	}
1010 
1011 	rbd_coll_end_req(req_data, rc, bytes);
1012 
1013 	if (req_data->bio)
1014 		bio_chain_put(req_data->bio);
1015 
1016 	ceph_osdc_put_request(req);
1017 	kfree(req_data);
1018 }
1019 
1020 static void rbd_simple_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
1021 {
1022 	ceph_osdc_put_request(req);
1023 }
1024 
1025 /*
1026  * Do a synchronous ceph osd operation
1027  */
1028 static int rbd_req_sync_op(struct rbd_device *rbd_dev,
1029 			   struct ceph_snap_context *snapc,
1030 			   u64 snapid,
1031 			   int flags,
1032 			   struct ceph_osd_req_op *ops,
1033 			   const char *object_name,
1034 			   u64 ofs, u64 len,
1035 			   char *buf,
1036 			   struct ceph_osd_request **linger_req,
1037 			   u64 *ver)
1038 {
1039 	int ret;
1040 	struct page **pages;
1041 	int num_pages;
1042 
1043 	BUG_ON(ops == NULL);
1044 
1045 	num_pages = calc_pages_for(ofs , len);
1046 	pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
1047 	if (IS_ERR(pages))
1048 		return PTR_ERR(pages);
1049 
1050 	ret = rbd_do_request(NULL, rbd_dev, snapc, snapid,
1051 			  object_name, ofs, len, NULL,
1052 			  pages, num_pages,
1053 			  flags,
1054 			  ops,
1055 			  NULL, 0,
1056 			  NULL,
1057 			  linger_req, ver);
1058 	if (ret < 0)
1059 		goto done;
1060 
1061 	if ((flags & CEPH_OSD_FLAG_READ) && buf)
1062 		ret = ceph_copy_from_page_vector(pages, buf, ofs, ret);
1063 
1064 done:
1065 	ceph_release_page_vector(pages, num_pages);
1066 	return ret;
1067 }
1068 
1069 /*
1070  * Do an asynchronous ceph osd operation
1071  */
1072 static int rbd_do_op(struct request *rq,
1073 		     struct rbd_device *rbd_dev,
1074 		     struct ceph_snap_context *snapc,
1075 		     u64 snapid,
1076 		     int opcode, int flags,
1077 		     u64 ofs, u64 len,
1078 		     struct bio *bio,
1079 		     struct rbd_req_coll *coll,
1080 		     int coll_index)
1081 {
1082 	char *seg_name;
1083 	u64 seg_ofs;
1084 	u64 seg_len;
1085 	int ret;
1086 	struct ceph_osd_req_op *ops;
1087 	u32 payload_len;
1088 
1089 	seg_name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO);
1090 	if (!seg_name)
1091 		return -ENOMEM;
1092 
1093 	seg_len = rbd_get_segment(&rbd_dev->header,
1094 				  rbd_dev->header.object_prefix,
1095 				  ofs, len,
1096 				  seg_name, &seg_ofs);
1097 
1098 	payload_len = (flags & CEPH_OSD_FLAG_WRITE ? seg_len : 0);
1099 
1100 	ret = -ENOMEM;
1101 	ops = rbd_create_rw_ops(1, opcode, payload_len);
1102 	if (!ops)
1103 		goto done;
1104 
1105 	/* we've taken care of segment sizes earlier when we
1106 	   cloned the bios. We should never have a segment
1107 	   truncated at this point */
1108 	BUG_ON(seg_len < len);
1109 
1110 	ret = rbd_do_request(rq, rbd_dev, snapc, snapid,
1111 			     seg_name, seg_ofs, seg_len,
1112 			     bio,
1113 			     NULL, 0,
1114 			     flags,
1115 			     ops,
1116 			     coll, coll_index,
1117 			     rbd_req_cb, 0, NULL);
1118 
1119 	rbd_destroy_ops(ops);
1120 done:
1121 	kfree(seg_name);
1122 	return ret;
1123 }
1124 
1125 /*
1126  * Request async osd write
1127  */
1128 static int rbd_req_write(struct request *rq,
1129 			 struct rbd_device *rbd_dev,
1130 			 struct ceph_snap_context *snapc,
1131 			 u64 ofs, u64 len,
1132 			 struct bio *bio,
1133 			 struct rbd_req_coll *coll,
1134 			 int coll_index)
1135 {
1136 	return rbd_do_op(rq, rbd_dev, snapc, CEPH_NOSNAP,
1137 			 CEPH_OSD_OP_WRITE,
1138 			 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1139 			 ofs, len, bio, coll, coll_index);
1140 }
1141 
1142 /*
1143  * Request async osd read
1144  */
1145 static int rbd_req_read(struct request *rq,
1146 			 struct rbd_device *rbd_dev,
1147 			 u64 snapid,
1148 			 u64 ofs, u64 len,
1149 			 struct bio *bio,
1150 			 struct rbd_req_coll *coll,
1151 			 int coll_index)
1152 {
1153 	return rbd_do_op(rq, rbd_dev, NULL,
1154 			 snapid,
1155 			 CEPH_OSD_OP_READ,
1156 			 CEPH_OSD_FLAG_READ,
1157 			 ofs, len, bio, coll, coll_index);
1158 }
1159 
1160 /*
1161  * Request sync osd read
1162  */
1163 static int rbd_req_sync_read(struct rbd_device *rbd_dev,
1164 			  u64 snapid,
1165 			  const char *object_name,
1166 			  u64 ofs, u64 len,
1167 			  char *buf,
1168 			  u64 *ver)
1169 {
1170 	struct ceph_osd_req_op *ops;
1171 	int ret;
1172 
1173 	ops = rbd_create_rw_ops(1, CEPH_OSD_OP_READ, 0);
1174 	if (!ops)
1175 		return -ENOMEM;
1176 
1177 	ret = rbd_req_sync_op(rbd_dev, NULL,
1178 			       snapid,
1179 			       CEPH_OSD_FLAG_READ,
1180 			       ops, object_name, ofs, len, buf, NULL, ver);
1181 	rbd_destroy_ops(ops);
1182 
1183 	return ret;
1184 }
1185 
1186 /*
1187  * Request sync osd watch
1188  */
1189 static int rbd_req_sync_notify_ack(struct rbd_device *rbd_dev,
1190 				   u64 ver,
1191 				   u64 notify_id)
1192 {
1193 	struct ceph_osd_req_op *ops;
1194 	int ret;
1195 
1196 	ops = rbd_create_rw_ops(1, CEPH_OSD_OP_NOTIFY_ACK, 0);
1197 	if (!ops)
1198 		return -ENOMEM;
1199 
1200 	ops[0].watch.ver = cpu_to_le64(ver);
1201 	ops[0].watch.cookie = notify_id;
1202 	ops[0].watch.flag = 0;
1203 
1204 	ret = rbd_do_request(NULL, rbd_dev, NULL, CEPH_NOSNAP,
1205 			  rbd_dev->header_name, 0, 0, NULL,
1206 			  NULL, 0,
1207 			  CEPH_OSD_FLAG_READ,
1208 			  ops,
1209 			  NULL, 0,
1210 			  rbd_simple_req_cb, 0, NULL);
1211 
1212 	rbd_destroy_ops(ops);
1213 	return ret;
1214 }
1215 
1216 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1217 {
1218 	struct rbd_device *rbd_dev = (struct rbd_device *)data;
1219 	u64 hver;
1220 	int rc;
1221 
1222 	if (!rbd_dev)
1223 		return;
1224 
1225 	dout("rbd_watch_cb %s notify_id=%llu opcode=%u\n",
1226 		rbd_dev->header_name, (unsigned long long) notify_id,
1227 		(unsigned int) opcode);
1228 	rc = rbd_refresh_header(rbd_dev, &hver);
1229 	if (rc)
1230 		pr_warning(RBD_DRV_NAME "%d got notification but failed to "
1231 			   " update snaps: %d\n", rbd_dev->major, rc);
1232 
1233 	rbd_req_sync_notify_ack(rbd_dev, hver, notify_id);
1234 }
1235 
1236 /*
1237  * Request sync osd watch
1238  */
1239 static int rbd_req_sync_watch(struct rbd_device *rbd_dev)
1240 {
1241 	struct ceph_osd_req_op *ops;
1242 	struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1243 	int ret;
1244 
1245 	ops = rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH, 0);
1246 	if (!ops)
1247 		return -ENOMEM;
1248 
1249 	ret = ceph_osdc_create_event(osdc, rbd_watch_cb, 0,
1250 				     (void *)rbd_dev, &rbd_dev->watch_event);
1251 	if (ret < 0)
1252 		goto fail;
1253 
1254 	ops[0].watch.ver = cpu_to_le64(rbd_dev->header.obj_version);
1255 	ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
1256 	ops[0].watch.flag = 1;
1257 
1258 	ret = rbd_req_sync_op(rbd_dev, NULL,
1259 			      CEPH_NOSNAP,
1260 			      CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1261 			      ops,
1262 			      rbd_dev->header_name,
1263 			      0, 0, NULL,
1264 			      &rbd_dev->watch_request, NULL);
1265 
1266 	if (ret < 0)
1267 		goto fail_event;
1268 
1269 	rbd_destroy_ops(ops);
1270 	return 0;
1271 
1272 fail_event:
1273 	ceph_osdc_cancel_event(rbd_dev->watch_event);
1274 	rbd_dev->watch_event = NULL;
1275 fail:
1276 	rbd_destroy_ops(ops);
1277 	return ret;
1278 }
1279 
1280 /*
1281  * Request sync osd unwatch
1282  */
1283 static int rbd_req_sync_unwatch(struct rbd_device *rbd_dev)
1284 {
1285 	struct ceph_osd_req_op *ops;
1286 	int ret;
1287 
1288 	ops = rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH, 0);
1289 	if (!ops)
1290 		return -ENOMEM;
1291 
1292 	ops[0].watch.ver = 0;
1293 	ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
1294 	ops[0].watch.flag = 0;
1295 
1296 	ret = rbd_req_sync_op(rbd_dev, NULL,
1297 			      CEPH_NOSNAP,
1298 			      CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1299 			      ops,
1300 			      rbd_dev->header_name,
1301 			      0, 0, NULL, NULL, NULL);
1302 
1303 
1304 	rbd_destroy_ops(ops);
1305 	ceph_osdc_cancel_event(rbd_dev->watch_event);
1306 	rbd_dev->watch_event = NULL;
1307 	return ret;
1308 }
1309 
1310 struct rbd_notify_info {
1311 	struct rbd_device *rbd_dev;
1312 };
1313 
1314 static void rbd_notify_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1315 {
1316 	struct rbd_device *rbd_dev = (struct rbd_device *)data;
1317 	if (!rbd_dev)
1318 		return;
1319 
1320 	dout("rbd_notify_cb %s notify_id=%llu opcode=%u\n",
1321 			rbd_dev->header_name, (unsigned long long) notify_id,
1322 			(unsigned int) opcode);
1323 }
1324 
1325 /*
1326  * Request sync osd notify
1327  */
1328 static int rbd_req_sync_notify(struct rbd_device *rbd_dev)
1329 {
1330 	struct ceph_osd_req_op *ops;
1331 	struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1332 	struct ceph_osd_event *event;
1333 	struct rbd_notify_info info;
1334 	int payload_len = sizeof(u32) + sizeof(u32);
1335 	int ret;
1336 
1337 	ops = rbd_create_rw_ops(1, CEPH_OSD_OP_NOTIFY, payload_len);
1338 	if (!ops)
1339 		return -ENOMEM;
1340 
1341 	info.rbd_dev = rbd_dev;
1342 
1343 	ret = ceph_osdc_create_event(osdc, rbd_notify_cb, 1,
1344 				     (void *)&info, &event);
1345 	if (ret < 0)
1346 		goto fail;
1347 
1348 	ops[0].watch.ver = 1;
1349 	ops[0].watch.flag = 1;
1350 	ops[0].watch.cookie = event->cookie;
1351 	ops[0].watch.prot_ver = RADOS_NOTIFY_VER;
1352 	ops[0].watch.timeout = 12;
1353 
1354 	ret = rbd_req_sync_op(rbd_dev, NULL,
1355 			       CEPH_NOSNAP,
1356 			       CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1357 			       ops,
1358 			       rbd_dev->header_name,
1359 			       0, 0, NULL, NULL, NULL);
1360 	if (ret < 0)
1361 		goto fail_event;
1362 
1363 	ret = ceph_osdc_wait_event(event, CEPH_OSD_TIMEOUT_DEFAULT);
1364 	dout("ceph_osdc_wait_event returned %d\n", ret);
1365 	rbd_destroy_ops(ops);
1366 	return 0;
1367 
1368 fail_event:
1369 	ceph_osdc_cancel_event(event);
1370 fail:
1371 	rbd_destroy_ops(ops);
1372 	return ret;
1373 }
1374 
1375 /*
1376  * Request sync osd read
1377  */
1378 static int rbd_req_sync_exec(struct rbd_device *rbd_dev,
1379 			     const char *object_name,
1380 			     const char *class_name,
1381 			     const char *method_name,
1382 			     const char *data,
1383 			     int len,
1384 			     u64 *ver)
1385 {
1386 	struct ceph_osd_req_op *ops;
1387 	int class_name_len = strlen(class_name);
1388 	int method_name_len = strlen(method_name);
1389 	int ret;
1390 
1391 	ops = rbd_create_rw_ops(1, CEPH_OSD_OP_CALL,
1392 				    class_name_len + method_name_len + len);
1393 	if (!ops)
1394 		return -ENOMEM;
1395 
1396 	ops[0].cls.class_name = class_name;
1397 	ops[0].cls.class_len = (__u8) class_name_len;
1398 	ops[0].cls.method_name = method_name;
1399 	ops[0].cls.method_len = (__u8) method_name_len;
1400 	ops[0].cls.argc = 0;
1401 	ops[0].cls.indata = data;
1402 	ops[0].cls.indata_len = len;
1403 
1404 	ret = rbd_req_sync_op(rbd_dev, NULL,
1405 			       CEPH_NOSNAP,
1406 			       CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1407 			       ops,
1408 			       object_name, 0, 0, NULL, NULL, ver);
1409 
1410 	rbd_destroy_ops(ops);
1411 
1412 	dout("cls_exec returned %d\n", ret);
1413 	return ret;
1414 }
1415 
1416 static struct rbd_req_coll *rbd_alloc_coll(int num_reqs)
1417 {
1418 	struct rbd_req_coll *coll =
1419 			kzalloc(sizeof(struct rbd_req_coll) +
1420 			        sizeof(struct rbd_req_status) * num_reqs,
1421 				GFP_ATOMIC);
1422 
1423 	if (!coll)
1424 		return NULL;
1425 	coll->total = num_reqs;
1426 	kref_init(&coll->kref);
1427 	return coll;
1428 }
1429 
1430 /*
1431  * block device queue callback
1432  */
1433 static void rbd_rq_fn(struct request_queue *q)
1434 {
1435 	struct rbd_device *rbd_dev = q->queuedata;
1436 	struct request *rq;
1437 	struct bio_pair *bp = NULL;
1438 
1439 	while ((rq = blk_fetch_request(q))) {
1440 		struct bio *bio;
1441 		struct bio *rq_bio, *next_bio = NULL;
1442 		bool do_write;
1443 		unsigned int size;
1444 		u64 op_size = 0;
1445 		u64 ofs;
1446 		int num_segs, cur_seg = 0;
1447 		struct rbd_req_coll *coll;
1448 		struct ceph_snap_context *snapc;
1449 
1450 		/* peek at request from block layer */
1451 		if (!rq)
1452 			break;
1453 
1454 		dout("fetched request\n");
1455 
1456 		/* filter out block requests we don't understand */
1457 		if ((rq->cmd_type != REQ_TYPE_FS)) {
1458 			__blk_end_request_all(rq, 0);
1459 			continue;
1460 		}
1461 
1462 		/* deduce our operation (read, write) */
1463 		do_write = (rq_data_dir(rq) == WRITE);
1464 
1465 		size = blk_rq_bytes(rq);
1466 		ofs = blk_rq_pos(rq) * SECTOR_SIZE;
1467 		rq_bio = rq->bio;
1468 		if (do_write && rbd_dev->read_only) {
1469 			__blk_end_request_all(rq, -EROFS);
1470 			continue;
1471 		}
1472 
1473 		spin_unlock_irq(q->queue_lock);
1474 
1475 		down_read(&rbd_dev->header_rwsem);
1476 
1477 		if (rbd_dev->snap_id != CEPH_NOSNAP && !rbd_dev->snap_exists) {
1478 			up_read(&rbd_dev->header_rwsem);
1479 			dout("request for non-existent snapshot");
1480 			spin_lock_irq(q->queue_lock);
1481 			__blk_end_request_all(rq, -ENXIO);
1482 			continue;
1483 		}
1484 
1485 		snapc = ceph_get_snap_context(rbd_dev->header.snapc);
1486 
1487 		up_read(&rbd_dev->header_rwsem);
1488 
1489 		dout("%s 0x%x bytes at 0x%llx\n",
1490 		     do_write ? "write" : "read",
1491 		     size, (unsigned long long) blk_rq_pos(rq) * SECTOR_SIZE);
1492 
1493 		num_segs = rbd_get_num_segments(&rbd_dev->header, ofs, size);
1494 		coll = rbd_alloc_coll(num_segs);
1495 		if (!coll) {
1496 			spin_lock_irq(q->queue_lock);
1497 			__blk_end_request_all(rq, -ENOMEM);
1498 			ceph_put_snap_context(snapc);
1499 			continue;
1500 		}
1501 
1502 		do {
1503 			/* a bio clone to be passed down to OSD req */
1504 			dout("rq->bio->bi_vcnt=%hu\n", rq->bio->bi_vcnt);
1505 			op_size = rbd_get_segment(&rbd_dev->header,
1506 						  rbd_dev->header.object_prefix,
1507 						  ofs, size,
1508 						  NULL, NULL);
1509 			kref_get(&coll->kref);
1510 			bio = bio_chain_clone(&rq_bio, &next_bio, &bp,
1511 					      op_size, GFP_ATOMIC);
1512 			if (!bio) {
1513 				rbd_coll_end_req_index(rq, coll, cur_seg,
1514 						       -ENOMEM, op_size);
1515 				goto next_seg;
1516 			}
1517 
1518 
1519 			/* init OSD command: write or read */
1520 			if (do_write)
1521 				rbd_req_write(rq, rbd_dev,
1522 					      snapc,
1523 					      ofs,
1524 					      op_size, bio,
1525 					      coll, cur_seg);
1526 			else
1527 				rbd_req_read(rq, rbd_dev,
1528 					     rbd_dev->snap_id,
1529 					     ofs,
1530 					     op_size, bio,
1531 					     coll, cur_seg);
1532 
1533 next_seg:
1534 			size -= op_size;
1535 			ofs += op_size;
1536 
1537 			cur_seg++;
1538 			rq_bio = next_bio;
1539 		} while (size > 0);
1540 		kref_put(&coll->kref, rbd_coll_release);
1541 
1542 		if (bp)
1543 			bio_pair_release(bp);
1544 		spin_lock_irq(q->queue_lock);
1545 
1546 		ceph_put_snap_context(snapc);
1547 	}
1548 }
1549 
1550 /*
1551  * a queue callback. Makes sure that we don't create a bio that spans across
1552  * multiple osd objects. One exception would be with a single page bios,
1553  * which we handle later at bio_chain_clone
1554  */
1555 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
1556 			  struct bio_vec *bvec)
1557 {
1558 	struct rbd_device *rbd_dev = q->queuedata;
1559 	unsigned int chunk_sectors;
1560 	sector_t sector;
1561 	unsigned int bio_sectors;
1562 	int max;
1563 
1564 	chunk_sectors = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
1565 	sector = bmd->bi_sector + get_start_sect(bmd->bi_bdev);
1566 	bio_sectors = bmd->bi_size >> SECTOR_SHIFT;
1567 
1568 	max =  (chunk_sectors - ((sector & (chunk_sectors - 1))
1569 				 + bio_sectors)) << SECTOR_SHIFT;
1570 	if (max < 0)
1571 		max = 0; /* bio_add cannot handle a negative return */
1572 	if (max <= bvec->bv_len && bio_sectors == 0)
1573 		return bvec->bv_len;
1574 	return max;
1575 }
1576 
1577 static void rbd_free_disk(struct rbd_device *rbd_dev)
1578 {
1579 	struct gendisk *disk = rbd_dev->disk;
1580 
1581 	if (!disk)
1582 		return;
1583 
1584 	rbd_header_free(&rbd_dev->header);
1585 
1586 	if (disk->flags & GENHD_FL_UP)
1587 		del_gendisk(disk);
1588 	if (disk->queue)
1589 		blk_cleanup_queue(disk->queue);
1590 	put_disk(disk);
1591 }
1592 
1593 /*
1594  * reload the ondisk the header
1595  */
1596 static int rbd_read_header(struct rbd_device *rbd_dev,
1597 			   struct rbd_image_header *header)
1598 {
1599 	ssize_t rc;
1600 	struct rbd_image_header_ondisk *dh;
1601 	u32 snap_count = 0;
1602 	u64 ver;
1603 	size_t len;
1604 
1605 	/*
1606 	 * First reads the fixed-size header to determine the number
1607 	 * of snapshots, then re-reads it, along with all snapshot
1608 	 * records as well as their stored names.
1609 	 */
1610 	len = sizeof (*dh);
1611 	while (1) {
1612 		dh = kmalloc(len, GFP_KERNEL);
1613 		if (!dh)
1614 			return -ENOMEM;
1615 
1616 		rc = rbd_req_sync_read(rbd_dev,
1617 				       CEPH_NOSNAP,
1618 				       rbd_dev->header_name,
1619 				       0, len,
1620 				       (char *)dh, &ver);
1621 		if (rc < 0)
1622 			goto out_dh;
1623 
1624 		rc = rbd_header_from_disk(header, dh, snap_count);
1625 		if (rc < 0) {
1626 			if (rc == -ENXIO)
1627 				pr_warning("unrecognized header format"
1628 					   " for image %s\n",
1629 					   rbd_dev->image_name);
1630 			goto out_dh;
1631 		}
1632 
1633 		if (snap_count == header->total_snaps)
1634 			break;
1635 
1636 		snap_count = header->total_snaps;
1637 		len = sizeof (*dh) +
1638 			snap_count * sizeof(struct rbd_image_snap_ondisk) +
1639 			header->snap_names_len;
1640 
1641 		rbd_header_free(header);
1642 		kfree(dh);
1643 	}
1644 	header->obj_version = ver;
1645 
1646 out_dh:
1647 	kfree(dh);
1648 	return rc;
1649 }
1650 
1651 /*
1652  * create a snapshot
1653  */
1654 static int rbd_header_add_snap(struct rbd_device *rbd_dev,
1655 			       const char *snap_name,
1656 			       gfp_t gfp_flags)
1657 {
1658 	int name_len = strlen(snap_name);
1659 	u64 new_snapid;
1660 	int ret;
1661 	void *data, *p, *e;
1662 	struct ceph_mon_client *monc;
1663 
1664 	/* we should create a snapshot only if we're pointing at the head */
1665 	if (rbd_dev->snap_id != CEPH_NOSNAP)
1666 		return -EINVAL;
1667 
1668 	monc = &rbd_dev->rbd_client->client->monc;
1669 	ret = ceph_monc_create_snapid(monc, rbd_dev->pool_id, &new_snapid);
1670 	dout("created snapid=%llu\n", (unsigned long long) new_snapid);
1671 	if (ret < 0)
1672 		return ret;
1673 
1674 	data = kmalloc(name_len + 16, gfp_flags);
1675 	if (!data)
1676 		return -ENOMEM;
1677 
1678 	p = data;
1679 	e = data + name_len + 16;
1680 
1681 	ceph_encode_string_safe(&p, e, snap_name, name_len, bad);
1682 	ceph_encode_64_safe(&p, e, new_snapid, bad);
1683 
1684 	ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
1685 				"rbd", "snap_add",
1686 				data, p - data, NULL);
1687 
1688 	kfree(data);
1689 
1690 	return ret < 0 ? ret : 0;
1691 bad:
1692 	return -ERANGE;
1693 }
1694 
1695 static void __rbd_remove_all_snaps(struct rbd_device *rbd_dev)
1696 {
1697 	struct rbd_snap *snap;
1698 	struct rbd_snap *next;
1699 
1700 	list_for_each_entry_safe(snap, next, &rbd_dev->snaps, node)
1701 		__rbd_remove_snap_dev(snap);
1702 }
1703 
1704 /*
1705  * only read the first part of the ondisk header, without the snaps info
1706  */
1707 static int __rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver)
1708 {
1709 	int ret;
1710 	struct rbd_image_header h;
1711 
1712 	ret = rbd_read_header(rbd_dev, &h);
1713 	if (ret < 0)
1714 		return ret;
1715 
1716 	down_write(&rbd_dev->header_rwsem);
1717 
1718 	/* resized? */
1719 	if (rbd_dev->snap_id == CEPH_NOSNAP) {
1720 		sector_t size = (sector_t) h.image_size / SECTOR_SIZE;
1721 
1722 		dout("setting size to %llu sectors", (unsigned long long) size);
1723 		set_capacity(rbd_dev->disk, size);
1724 	}
1725 
1726 	/* rbd_dev->header.object_prefix shouldn't change */
1727 	kfree(rbd_dev->header.snap_sizes);
1728 	kfree(rbd_dev->header.snap_names);
1729 	/* osd requests may still refer to snapc */
1730 	ceph_put_snap_context(rbd_dev->header.snapc);
1731 
1732 	if (hver)
1733 		*hver = h.obj_version;
1734 	rbd_dev->header.obj_version = h.obj_version;
1735 	rbd_dev->header.image_size = h.image_size;
1736 	rbd_dev->header.total_snaps = h.total_snaps;
1737 	rbd_dev->header.snapc = h.snapc;
1738 	rbd_dev->header.snap_names = h.snap_names;
1739 	rbd_dev->header.snap_names_len = h.snap_names_len;
1740 	rbd_dev->header.snap_sizes = h.snap_sizes;
1741 	/* Free the extra copy of the object prefix */
1742 	WARN_ON(strcmp(rbd_dev->header.object_prefix, h.object_prefix));
1743 	kfree(h.object_prefix);
1744 
1745 	ret = __rbd_init_snaps_header(rbd_dev);
1746 
1747 	up_write(&rbd_dev->header_rwsem);
1748 
1749 	return ret;
1750 }
1751 
1752 static int rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver)
1753 {
1754 	int ret;
1755 
1756 	mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1757 	ret = __rbd_refresh_header(rbd_dev, hver);
1758 	mutex_unlock(&ctl_mutex);
1759 
1760 	return ret;
1761 }
1762 
1763 static int rbd_init_disk(struct rbd_device *rbd_dev)
1764 {
1765 	struct gendisk *disk;
1766 	struct request_queue *q;
1767 	int rc;
1768 	u64 segment_size;
1769 	u64 total_size = 0;
1770 
1771 	/* contact OSD, request size info about the object being mapped */
1772 	rc = rbd_read_header(rbd_dev, &rbd_dev->header);
1773 	if (rc)
1774 		return rc;
1775 
1776 	/* no need to lock here, as rbd_dev is not registered yet */
1777 	rc = __rbd_init_snaps_header(rbd_dev);
1778 	if (rc)
1779 		return rc;
1780 
1781 	rc = rbd_header_set_snap(rbd_dev, &total_size);
1782 	if (rc)
1783 		return rc;
1784 
1785 	/* create gendisk info */
1786 	rc = -ENOMEM;
1787 	disk = alloc_disk(RBD_MINORS_PER_MAJOR);
1788 	if (!disk)
1789 		goto out;
1790 
1791 	snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
1792 		 rbd_dev->dev_id);
1793 	disk->major = rbd_dev->major;
1794 	disk->first_minor = 0;
1795 	disk->fops = &rbd_bd_ops;
1796 	disk->private_data = rbd_dev;
1797 
1798 	/* init rq */
1799 	rc = -ENOMEM;
1800 	q = blk_init_queue(rbd_rq_fn, &rbd_dev->lock);
1801 	if (!q)
1802 		goto out_disk;
1803 
1804 	/* We use the default size, but let's be explicit about it. */
1805 	blk_queue_physical_block_size(q, SECTOR_SIZE);
1806 
1807 	/* set io sizes to object size */
1808 	segment_size = rbd_obj_bytes(&rbd_dev->header);
1809 	blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
1810 	blk_queue_max_segment_size(q, segment_size);
1811 	blk_queue_io_min(q, segment_size);
1812 	blk_queue_io_opt(q, segment_size);
1813 
1814 	blk_queue_merge_bvec(q, rbd_merge_bvec);
1815 	disk->queue = q;
1816 
1817 	q->queuedata = rbd_dev;
1818 
1819 	rbd_dev->disk = disk;
1820 	rbd_dev->q = q;
1821 
1822 	/* finally, announce the disk to the world */
1823 	set_capacity(disk, total_size / SECTOR_SIZE);
1824 	add_disk(disk);
1825 
1826 	pr_info("%s: added with size 0x%llx\n",
1827 		disk->disk_name, (unsigned long long)total_size);
1828 	return 0;
1829 
1830 out_disk:
1831 	put_disk(disk);
1832 out:
1833 	return rc;
1834 }
1835 
1836 /*
1837   sysfs
1838 */
1839 
1840 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
1841 {
1842 	return container_of(dev, struct rbd_device, dev);
1843 }
1844 
1845 static ssize_t rbd_size_show(struct device *dev,
1846 			     struct device_attribute *attr, char *buf)
1847 {
1848 	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1849 	sector_t size;
1850 
1851 	down_read(&rbd_dev->header_rwsem);
1852 	size = get_capacity(rbd_dev->disk);
1853 	up_read(&rbd_dev->header_rwsem);
1854 
1855 	return sprintf(buf, "%llu\n", (unsigned long long) size * SECTOR_SIZE);
1856 }
1857 
1858 static ssize_t rbd_major_show(struct device *dev,
1859 			      struct device_attribute *attr, char *buf)
1860 {
1861 	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1862 
1863 	return sprintf(buf, "%d\n", rbd_dev->major);
1864 }
1865 
1866 static ssize_t rbd_client_id_show(struct device *dev,
1867 				  struct device_attribute *attr, char *buf)
1868 {
1869 	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1870 
1871 	return sprintf(buf, "client%lld\n",
1872 			ceph_client_id(rbd_dev->rbd_client->client));
1873 }
1874 
1875 static ssize_t rbd_pool_show(struct device *dev,
1876 			     struct device_attribute *attr, char *buf)
1877 {
1878 	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1879 
1880 	return sprintf(buf, "%s\n", rbd_dev->pool_name);
1881 }
1882 
1883 static ssize_t rbd_pool_id_show(struct device *dev,
1884 			     struct device_attribute *attr, char *buf)
1885 {
1886 	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1887 
1888 	return sprintf(buf, "%d\n", rbd_dev->pool_id);
1889 }
1890 
1891 static ssize_t rbd_name_show(struct device *dev,
1892 			     struct device_attribute *attr, char *buf)
1893 {
1894 	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1895 
1896 	return sprintf(buf, "%s\n", rbd_dev->image_name);
1897 }
1898 
1899 static ssize_t rbd_snap_show(struct device *dev,
1900 			     struct device_attribute *attr,
1901 			     char *buf)
1902 {
1903 	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1904 
1905 	return sprintf(buf, "%s\n", rbd_dev->snap_name);
1906 }
1907 
1908 static ssize_t rbd_image_refresh(struct device *dev,
1909 				 struct device_attribute *attr,
1910 				 const char *buf,
1911 				 size_t size)
1912 {
1913 	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1914 	int ret;
1915 
1916 	ret = rbd_refresh_header(rbd_dev, NULL);
1917 
1918 	return ret < 0 ? ret : size;
1919 }
1920 
1921 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
1922 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
1923 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
1924 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
1925 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
1926 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
1927 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
1928 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
1929 static DEVICE_ATTR(create_snap, S_IWUSR, NULL, rbd_snap_add);
1930 
1931 static struct attribute *rbd_attrs[] = {
1932 	&dev_attr_size.attr,
1933 	&dev_attr_major.attr,
1934 	&dev_attr_client_id.attr,
1935 	&dev_attr_pool.attr,
1936 	&dev_attr_pool_id.attr,
1937 	&dev_attr_name.attr,
1938 	&dev_attr_current_snap.attr,
1939 	&dev_attr_refresh.attr,
1940 	&dev_attr_create_snap.attr,
1941 	NULL
1942 };
1943 
1944 static struct attribute_group rbd_attr_group = {
1945 	.attrs = rbd_attrs,
1946 };
1947 
1948 static const struct attribute_group *rbd_attr_groups[] = {
1949 	&rbd_attr_group,
1950 	NULL
1951 };
1952 
1953 static void rbd_sysfs_dev_release(struct device *dev)
1954 {
1955 }
1956 
1957 static struct device_type rbd_device_type = {
1958 	.name		= "rbd",
1959 	.groups		= rbd_attr_groups,
1960 	.release	= rbd_sysfs_dev_release,
1961 };
1962 
1963 
1964 /*
1965   sysfs - snapshots
1966 */
1967 
1968 static ssize_t rbd_snap_size_show(struct device *dev,
1969 				  struct device_attribute *attr,
1970 				  char *buf)
1971 {
1972 	struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1973 
1974 	return sprintf(buf, "%llu\n", (unsigned long long)snap->size);
1975 }
1976 
1977 static ssize_t rbd_snap_id_show(struct device *dev,
1978 				struct device_attribute *attr,
1979 				char *buf)
1980 {
1981 	struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1982 
1983 	return sprintf(buf, "%llu\n", (unsigned long long)snap->id);
1984 }
1985 
1986 static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
1987 static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
1988 
1989 static struct attribute *rbd_snap_attrs[] = {
1990 	&dev_attr_snap_size.attr,
1991 	&dev_attr_snap_id.attr,
1992 	NULL,
1993 };
1994 
1995 static struct attribute_group rbd_snap_attr_group = {
1996 	.attrs = rbd_snap_attrs,
1997 };
1998 
1999 static void rbd_snap_dev_release(struct device *dev)
2000 {
2001 	struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2002 	kfree(snap->name);
2003 	kfree(snap);
2004 }
2005 
2006 static const struct attribute_group *rbd_snap_attr_groups[] = {
2007 	&rbd_snap_attr_group,
2008 	NULL
2009 };
2010 
2011 static struct device_type rbd_snap_device_type = {
2012 	.groups		= rbd_snap_attr_groups,
2013 	.release	= rbd_snap_dev_release,
2014 };
2015 
2016 static void __rbd_remove_snap_dev(struct rbd_snap *snap)
2017 {
2018 	list_del(&snap->node);
2019 	device_unregister(&snap->dev);
2020 }
2021 
2022 static int rbd_register_snap_dev(struct rbd_snap *snap,
2023 				  struct device *parent)
2024 {
2025 	struct device *dev = &snap->dev;
2026 	int ret;
2027 
2028 	dev->type = &rbd_snap_device_type;
2029 	dev->parent = parent;
2030 	dev->release = rbd_snap_dev_release;
2031 	dev_set_name(dev, "snap_%s", snap->name);
2032 	ret = device_register(dev);
2033 
2034 	return ret;
2035 }
2036 
2037 static struct rbd_snap *__rbd_add_snap_dev(struct rbd_device *rbd_dev,
2038 					      int i, const char *name)
2039 {
2040 	struct rbd_snap *snap;
2041 	int ret;
2042 
2043 	snap = kzalloc(sizeof (*snap), GFP_KERNEL);
2044 	if (!snap)
2045 		return ERR_PTR(-ENOMEM);
2046 
2047 	ret = -ENOMEM;
2048 	snap->name = kstrdup(name, GFP_KERNEL);
2049 	if (!snap->name)
2050 		goto err;
2051 
2052 	snap->size = rbd_dev->header.snap_sizes[i];
2053 	snap->id = rbd_dev->header.snapc->snaps[i];
2054 	if (device_is_registered(&rbd_dev->dev)) {
2055 		ret = rbd_register_snap_dev(snap, &rbd_dev->dev);
2056 		if (ret < 0)
2057 			goto err;
2058 	}
2059 
2060 	return snap;
2061 
2062 err:
2063 	kfree(snap->name);
2064 	kfree(snap);
2065 
2066 	return ERR_PTR(ret);
2067 }
2068 
2069 /*
2070  * search for the previous snap in a null delimited string list
2071  */
2072 const char *rbd_prev_snap_name(const char *name, const char *start)
2073 {
2074 	if (name < start + 2)
2075 		return NULL;
2076 
2077 	name -= 2;
2078 	while (*name) {
2079 		if (name == start)
2080 			return start;
2081 		name--;
2082 	}
2083 	return name + 1;
2084 }
2085 
2086 /*
2087  * compare the old list of snapshots that we have to what's in the header
2088  * and update it accordingly. Note that the header holds the snapshots
2089  * in a reverse order (from newest to oldest) and we need to go from
2090  * older to new so that we don't get a duplicate snap name when
2091  * doing the process (e.g., removed snapshot and recreated a new
2092  * one with the same name.
2093  */
2094 static int __rbd_init_snaps_header(struct rbd_device *rbd_dev)
2095 {
2096 	const char *name, *first_name;
2097 	int i = rbd_dev->header.total_snaps;
2098 	struct rbd_snap *snap, *old_snap = NULL;
2099 	struct list_head *p, *n;
2100 
2101 	first_name = rbd_dev->header.snap_names;
2102 	name = first_name + rbd_dev->header.snap_names_len;
2103 
2104 	list_for_each_prev_safe(p, n, &rbd_dev->snaps) {
2105 		u64 cur_id;
2106 
2107 		old_snap = list_entry(p, struct rbd_snap, node);
2108 
2109 		if (i)
2110 			cur_id = rbd_dev->header.snapc->snaps[i - 1];
2111 
2112 		if (!i || old_snap->id < cur_id) {
2113 			/*
2114 			 * old_snap->id was skipped, thus was
2115 			 * removed.  If this rbd_dev is mapped to
2116 			 * the removed snapshot, record that it no
2117 			 * longer exists, to prevent further I/O.
2118 			 */
2119 			if (rbd_dev->snap_id == old_snap->id)
2120 				rbd_dev->snap_exists = false;
2121 			__rbd_remove_snap_dev(old_snap);
2122 			continue;
2123 		}
2124 		if (old_snap->id == cur_id) {
2125 			/* we have this snapshot already */
2126 			i--;
2127 			name = rbd_prev_snap_name(name, first_name);
2128 			continue;
2129 		}
2130 		for (; i > 0;
2131 		     i--, name = rbd_prev_snap_name(name, first_name)) {
2132 			if (!name) {
2133 				WARN_ON(1);
2134 				return -EINVAL;
2135 			}
2136 			cur_id = rbd_dev->header.snapc->snaps[i];
2137 			/* snapshot removal? handle it above */
2138 			if (cur_id >= old_snap->id)
2139 				break;
2140 			/* a new snapshot */
2141 			snap = __rbd_add_snap_dev(rbd_dev, i - 1, name);
2142 			if (IS_ERR(snap))
2143 				return PTR_ERR(snap);
2144 
2145 			/* note that we add it backward so using n and not p */
2146 			list_add(&snap->node, n);
2147 			p = &snap->node;
2148 		}
2149 	}
2150 	/* we're done going over the old snap list, just add what's left */
2151 	for (; i > 0; i--) {
2152 		name = rbd_prev_snap_name(name, first_name);
2153 		if (!name) {
2154 			WARN_ON(1);
2155 			return -EINVAL;
2156 		}
2157 		snap = __rbd_add_snap_dev(rbd_dev, i - 1, name);
2158 		if (IS_ERR(snap))
2159 			return PTR_ERR(snap);
2160 		list_add(&snap->node, &rbd_dev->snaps);
2161 	}
2162 
2163 	return 0;
2164 }
2165 
2166 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
2167 {
2168 	int ret;
2169 	struct device *dev;
2170 	struct rbd_snap *snap;
2171 
2172 	mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2173 	dev = &rbd_dev->dev;
2174 
2175 	dev->bus = &rbd_bus_type;
2176 	dev->type = &rbd_device_type;
2177 	dev->parent = &rbd_root_dev;
2178 	dev->release = rbd_dev_release;
2179 	dev_set_name(dev, "%d", rbd_dev->dev_id);
2180 	ret = device_register(dev);
2181 	if (ret < 0)
2182 		goto out;
2183 
2184 	list_for_each_entry(snap, &rbd_dev->snaps, node) {
2185 		ret = rbd_register_snap_dev(snap, &rbd_dev->dev);
2186 		if (ret < 0)
2187 			break;
2188 	}
2189 out:
2190 	mutex_unlock(&ctl_mutex);
2191 	return ret;
2192 }
2193 
2194 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
2195 {
2196 	device_unregister(&rbd_dev->dev);
2197 }
2198 
2199 static int rbd_init_watch_dev(struct rbd_device *rbd_dev)
2200 {
2201 	int ret, rc;
2202 
2203 	do {
2204 		ret = rbd_req_sync_watch(rbd_dev);
2205 		if (ret == -ERANGE) {
2206 			rc = rbd_refresh_header(rbd_dev, NULL);
2207 			if (rc < 0)
2208 				return rc;
2209 		}
2210 	} while (ret == -ERANGE);
2211 
2212 	return ret;
2213 }
2214 
2215 static atomic64_t rbd_id_max = ATOMIC64_INIT(0);
2216 
2217 /*
2218  * Get a unique rbd identifier for the given new rbd_dev, and add
2219  * the rbd_dev to the global list.  The minimum rbd id is 1.
2220  */
2221 static void rbd_id_get(struct rbd_device *rbd_dev)
2222 {
2223 	rbd_dev->dev_id = atomic64_inc_return(&rbd_id_max);
2224 
2225 	spin_lock(&rbd_dev_list_lock);
2226 	list_add_tail(&rbd_dev->node, &rbd_dev_list);
2227 	spin_unlock(&rbd_dev_list_lock);
2228 }
2229 
2230 /*
2231  * Remove an rbd_dev from the global list, and record that its
2232  * identifier is no longer in use.
2233  */
2234 static void rbd_id_put(struct rbd_device *rbd_dev)
2235 {
2236 	struct list_head *tmp;
2237 	int rbd_id = rbd_dev->dev_id;
2238 	int max_id;
2239 
2240 	BUG_ON(rbd_id < 1);
2241 
2242 	spin_lock(&rbd_dev_list_lock);
2243 	list_del_init(&rbd_dev->node);
2244 
2245 	/*
2246 	 * If the id being "put" is not the current maximum, there
2247 	 * is nothing special we need to do.
2248 	 */
2249 	if (rbd_id != atomic64_read(&rbd_id_max)) {
2250 		spin_unlock(&rbd_dev_list_lock);
2251 		return;
2252 	}
2253 
2254 	/*
2255 	 * We need to update the current maximum id.  Search the
2256 	 * list to find out what it is.  We're more likely to find
2257 	 * the maximum at the end, so search the list backward.
2258 	 */
2259 	max_id = 0;
2260 	list_for_each_prev(tmp, &rbd_dev_list) {
2261 		struct rbd_device *rbd_dev;
2262 
2263 		rbd_dev = list_entry(tmp, struct rbd_device, node);
2264 		if (rbd_id > max_id)
2265 			max_id = rbd_id;
2266 	}
2267 	spin_unlock(&rbd_dev_list_lock);
2268 
2269 	/*
2270 	 * The max id could have been updated by rbd_id_get(), in
2271 	 * which case it now accurately reflects the new maximum.
2272 	 * Be careful not to overwrite the maximum value in that
2273 	 * case.
2274 	 */
2275 	atomic64_cmpxchg(&rbd_id_max, rbd_id, max_id);
2276 }
2277 
2278 /*
2279  * Skips over white space at *buf, and updates *buf to point to the
2280  * first found non-space character (if any). Returns the length of
2281  * the token (string of non-white space characters) found.  Note
2282  * that *buf must be terminated with '\0'.
2283  */
2284 static inline size_t next_token(const char **buf)
2285 {
2286         /*
2287         * These are the characters that produce nonzero for
2288         * isspace() in the "C" and "POSIX" locales.
2289         */
2290         const char *spaces = " \f\n\r\t\v";
2291 
2292         *buf += strspn(*buf, spaces);	/* Find start of token */
2293 
2294 	return strcspn(*buf, spaces);   /* Return token length */
2295 }
2296 
2297 /*
2298  * Finds the next token in *buf, and if the provided token buffer is
2299  * big enough, copies the found token into it.  The result, if
2300  * copied, is guaranteed to be terminated with '\0'.  Note that *buf
2301  * must be terminated with '\0' on entry.
2302  *
2303  * Returns the length of the token found (not including the '\0').
2304  * Return value will be 0 if no token is found, and it will be >=
2305  * token_size if the token would not fit.
2306  *
2307  * The *buf pointer will be updated to point beyond the end of the
2308  * found token.  Note that this occurs even if the token buffer is
2309  * too small to hold it.
2310  */
2311 static inline size_t copy_token(const char **buf,
2312 				char *token,
2313 				size_t token_size)
2314 {
2315         size_t len;
2316 
2317 	len = next_token(buf);
2318 	if (len < token_size) {
2319 		memcpy(token, *buf, len);
2320 		*(token + len) = '\0';
2321 	}
2322 	*buf += len;
2323 
2324         return len;
2325 }
2326 
2327 /*
2328  * Finds the next token in *buf, dynamically allocates a buffer big
2329  * enough to hold a copy of it, and copies the token into the new
2330  * buffer.  The copy is guaranteed to be terminated with '\0'.  Note
2331  * that a duplicate buffer is created even for a zero-length token.
2332  *
2333  * Returns a pointer to the newly-allocated duplicate, or a null
2334  * pointer if memory for the duplicate was not available.  If
2335  * the lenp argument is a non-null pointer, the length of the token
2336  * (not including the '\0') is returned in *lenp.
2337  *
2338  * If successful, the *buf pointer will be updated to point beyond
2339  * the end of the found token.
2340  *
2341  * Note: uses GFP_KERNEL for allocation.
2342  */
2343 static inline char *dup_token(const char **buf, size_t *lenp)
2344 {
2345 	char *dup;
2346 	size_t len;
2347 
2348 	len = next_token(buf);
2349 	dup = kmalloc(len + 1, GFP_KERNEL);
2350 	if (!dup)
2351 		return NULL;
2352 
2353 	memcpy(dup, *buf, len);
2354 	*(dup + len) = '\0';
2355 	*buf += len;
2356 
2357 	if (lenp)
2358 		*lenp = len;
2359 
2360 	return dup;
2361 }
2362 
2363 /*
2364  * This fills in the pool_name, image_name, image_name_len, snap_name,
2365  * rbd_dev, rbd_md_name, and name fields of the given rbd_dev, based
2366  * on the list of monitor addresses and other options provided via
2367  * /sys/bus/rbd/add.
2368  *
2369  * Note: rbd_dev is assumed to have been initially zero-filled.
2370  */
2371 static int rbd_add_parse_args(struct rbd_device *rbd_dev,
2372 			      const char *buf,
2373 			      const char **mon_addrs,
2374 			      size_t *mon_addrs_size,
2375 			      char *options,
2376 			     size_t options_size)
2377 {
2378 	size_t len;
2379 	int ret;
2380 
2381 	/* The first four tokens are required */
2382 
2383 	len = next_token(&buf);
2384 	if (!len)
2385 		return -EINVAL;
2386 	*mon_addrs_size = len + 1;
2387 	*mon_addrs = buf;
2388 
2389 	buf += len;
2390 
2391 	len = copy_token(&buf, options, options_size);
2392 	if (!len || len >= options_size)
2393 		return -EINVAL;
2394 
2395 	ret = -ENOMEM;
2396 	rbd_dev->pool_name = dup_token(&buf, NULL);
2397 	if (!rbd_dev->pool_name)
2398 		goto out_err;
2399 
2400 	rbd_dev->image_name = dup_token(&buf, &rbd_dev->image_name_len);
2401 	if (!rbd_dev->image_name)
2402 		goto out_err;
2403 
2404 	/* Create the name of the header object */
2405 
2406 	rbd_dev->header_name = kmalloc(rbd_dev->image_name_len
2407 						+ sizeof (RBD_SUFFIX),
2408 					GFP_KERNEL);
2409 	if (!rbd_dev->header_name)
2410 		goto out_err;
2411 	sprintf(rbd_dev->header_name, "%s%s", rbd_dev->image_name, RBD_SUFFIX);
2412 
2413 	/*
2414 	 * The snapshot name is optional.  If none is is supplied,
2415 	 * we use the default value.
2416 	 */
2417 	rbd_dev->snap_name = dup_token(&buf, &len);
2418 	if (!rbd_dev->snap_name)
2419 		goto out_err;
2420 	if (!len) {
2421 		/* Replace the empty name with the default */
2422 		kfree(rbd_dev->snap_name);
2423 		rbd_dev->snap_name
2424 			= kmalloc(sizeof (RBD_SNAP_HEAD_NAME), GFP_KERNEL);
2425 		if (!rbd_dev->snap_name)
2426 			goto out_err;
2427 
2428 		memcpy(rbd_dev->snap_name, RBD_SNAP_HEAD_NAME,
2429 			sizeof (RBD_SNAP_HEAD_NAME));
2430 	}
2431 
2432 	return 0;
2433 
2434 out_err:
2435 	kfree(rbd_dev->header_name);
2436 	kfree(rbd_dev->image_name);
2437 	kfree(rbd_dev->pool_name);
2438 	rbd_dev->pool_name = NULL;
2439 
2440 	return ret;
2441 }
2442 
2443 static ssize_t rbd_add(struct bus_type *bus,
2444 		       const char *buf,
2445 		       size_t count)
2446 {
2447 	char *options;
2448 	struct rbd_device *rbd_dev = NULL;
2449 	const char *mon_addrs = NULL;
2450 	size_t mon_addrs_size = 0;
2451 	struct ceph_osd_client *osdc;
2452 	int rc = -ENOMEM;
2453 
2454 	if (!try_module_get(THIS_MODULE))
2455 		return -ENODEV;
2456 
2457 	options = kmalloc(count, GFP_KERNEL);
2458 	if (!options)
2459 		goto err_nomem;
2460 	rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
2461 	if (!rbd_dev)
2462 		goto err_nomem;
2463 
2464 	/* static rbd_device initialization */
2465 	spin_lock_init(&rbd_dev->lock);
2466 	INIT_LIST_HEAD(&rbd_dev->node);
2467 	INIT_LIST_HEAD(&rbd_dev->snaps);
2468 	init_rwsem(&rbd_dev->header_rwsem);
2469 
2470 	/* generate unique id: find highest unique id, add one */
2471 	rbd_id_get(rbd_dev);
2472 
2473 	/* Fill in the device name, now that we have its id. */
2474 	BUILD_BUG_ON(DEV_NAME_LEN
2475 			< sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
2476 	sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
2477 
2478 	/* parse add command */
2479 	rc = rbd_add_parse_args(rbd_dev, buf, &mon_addrs, &mon_addrs_size,
2480 				options, count);
2481 	if (rc)
2482 		goto err_put_id;
2483 
2484 	rbd_dev->rbd_client = rbd_get_client(mon_addrs, mon_addrs_size - 1,
2485 						options);
2486 	if (IS_ERR(rbd_dev->rbd_client)) {
2487 		rc = PTR_ERR(rbd_dev->rbd_client);
2488 		goto err_put_id;
2489 	}
2490 
2491 	/* pick the pool */
2492 	osdc = &rbd_dev->rbd_client->client->osdc;
2493 	rc = ceph_pg_poolid_by_name(osdc->osdmap, rbd_dev->pool_name);
2494 	if (rc < 0)
2495 		goto err_out_client;
2496 	rbd_dev->pool_id = rc;
2497 
2498 	/* register our block device */
2499 	rc = register_blkdev(0, rbd_dev->name);
2500 	if (rc < 0)
2501 		goto err_out_client;
2502 	rbd_dev->major = rc;
2503 
2504 	rc = rbd_bus_add_dev(rbd_dev);
2505 	if (rc)
2506 		goto err_out_blkdev;
2507 
2508 	/*
2509 	 * At this point cleanup in the event of an error is the job
2510 	 * of the sysfs code (initiated by rbd_bus_del_dev()).
2511 	 *
2512 	 * Set up and announce blkdev mapping.
2513 	 */
2514 	rc = rbd_init_disk(rbd_dev);
2515 	if (rc)
2516 		goto err_out_bus;
2517 
2518 	rc = rbd_init_watch_dev(rbd_dev);
2519 	if (rc)
2520 		goto err_out_bus;
2521 
2522 	return count;
2523 
2524 err_out_bus:
2525 	/* this will also clean up rest of rbd_dev stuff */
2526 
2527 	rbd_bus_del_dev(rbd_dev);
2528 	kfree(options);
2529 	return rc;
2530 
2531 err_out_blkdev:
2532 	unregister_blkdev(rbd_dev->major, rbd_dev->name);
2533 err_out_client:
2534 	rbd_put_client(rbd_dev);
2535 err_put_id:
2536 	if (rbd_dev->pool_name) {
2537 		kfree(rbd_dev->snap_name);
2538 		kfree(rbd_dev->header_name);
2539 		kfree(rbd_dev->image_name);
2540 		kfree(rbd_dev->pool_name);
2541 	}
2542 	rbd_id_put(rbd_dev);
2543 err_nomem:
2544 	kfree(rbd_dev);
2545 	kfree(options);
2546 
2547 	dout("Error adding device %s\n", buf);
2548 	module_put(THIS_MODULE);
2549 
2550 	return (ssize_t) rc;
2551 }
2552 
2553 static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
2554 {
2555 	struct list_head *tmp;
2556 	struct rbd_device *rbd_dev;
2557 
2558 	spin_lock(&rbd_dev_list_lock);
2559 	list_for_each(tmp, &rbd_dev_list) {
2560 		rbd_dev = list_entry(tmp, struct rbd_device, node);
2561 		if (rbd_dev->dev_id == dev_id) {
2562 			spin_unlock(&rbd_dev_list_lock);
2563 			return rbd_dev;
2564 		}
2565 	}
2566 	spin_unlock(&rbd_dev_list_lock);
2567 	return NULL;
2568 }
2569 
2570 static void rbd_dev_release(struct device *dev)
2571 {
2572 	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2573 
2574 	if (rbd_dev->watch_request) {
2575 		struct ceph_client *client = rbd_dev->rbd_client->client;
2576 
2577 		ceph_osdc_unregister_linger_request(&client->osdc,
2578 						    rbd_dev->watch_request);
2579 	}
2580 	if (rbd_dev->watch_event)
2581 		rbd_req_sync_unwatch(rbd_dev);
2582 
2583 	rbd_put_client(rbd_dev);
2584 
2585 	/* clean up and free blkdev */
2586 	rbd_free_disk(rbd_dev);
2587 	unregister_blkdev(rbd_dev->major, rbd_dev->name);
2588 
2589 	/* done with the id, and with the rbd_dev */
2590 	kfree(rbd_dev->snap_name);
2591 	kfree(rbd_dev->header_name);
2592 	kfree(rbd_dev->pool_name);
2593 	kfree(rbd_dev->image_name);
2594 	rbd_id_put(rbd_dev);
2595 	kfree(rbd_dev);
2596 
2597 	/* release module ref */
2598 	module_put(THIS_MODULE);
2599 }
2600 
2601 static ssize_t rbd_remove(struct bus_type *bus,
2602 			  const char *buf,
2603 			  size_t count)
2604 {
2605 	struct rbd_device *rbd_dev = NULL;
2606 	int target_id, rc;
2607 	unsigned long ul;
2608 	int ret = count;
2609 
2610 	rc = strict_strtoul(buf, 10, &ul);
2611 	if (rc)
2612 		return rc;
2613 
2614 	/* convert to int; abort if we lost anything in the conversion */
2615 	target_id = (int) ul;
2616 	if (target_id != ul)
2617 		return -EINVAL;
2618 
2619 	mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2620 
2621 	rbd_dev = __rbd_get_dev(target_id);
2622 	if (!rbd_dev) {
2623 		ret = -ENOENT;
2624 		goto done;
2625 	}
2626 
2627 	__rbd_remove_all_snaps(rbd_dev);
2628 	rbd_bus_del_dev(rbd_dev);
2629 
2630 done:
2631 	mutex_unlock(&ctl_mutex);
2632 	return ret;
2633 }
2634 
2635 static ssize_t rbd_snap_add(struct device *dev,
2636 			    struct device_attribute *attr,
2637 			    const char *buf,
2638 			    size_t count)
2639 {
2640 	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2641 	int ret;
2642 	char *name = kmalloc(count + 1, GFP_KERNEL);
2643 	if (!name)
2644 		return -ENOMEM;
2645 
2646 	snprintf(name, count, "%s", buf);
2647 
2648 	mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2649 
2650 	ret = rbd_header_add_snap(rbd_dev,
2651 				  name, GFP_KERNEL);
2652 	if (ret < 0)
2653 		goto err_unlock;
2654 
2655 	ret = __rbd_refresh_header(rbd_dev, NULL);
2656 	if (ret < 0)
2657 		goto err_unlock;
2658 
2659 	/* shouldn't hold ctl_mutex when notifying.. notify might
2660 	   trigger a watch callback that would need to get that mutex */
2661 	mutex_unlock(&ctl_mutex);
2662 
2663 	/* make a best effort, don't error if failed */
2664 	rbd_req_sync_notify(rbd_dev);
2665 
2666 	ret = count;
2667 	kfree(name);
2668 	return ret;
2669 
2670 err_unlock:
2671 	mutex_unlock(&ctl_mutex);
2672 	kfree(name);
2673 	return ret;
2674 }
2675 
2676 /*
2677  * create control files in sysfs
2678  * /sys/bus/rbd/...
2679  */
2680 static int rbd_sysfs_init(void)
2681 {
2682 	int ret;
2683 
2684 	ret = device_register(&rbd_root_dev);
2685 	if (ret < 0)
2686 		return ret;
2687 
2688 	ret = bus_register(&rbd_bus_type);
2689 	if (ret < 0)
2690 		device_unregister(&rbd_root_dev);
2691 
2692 	return ret;
2693 }
2694 
2695 static void rbd_sysfs_cleanup(void)
2696 {
2697 	bus_unregister(&rbd_bus_type);
2698 	device_unregister(&rbd_root_dev);
2699 }
2700 
2701 int __init rbd_init(void)
2702 {
2703 	int rc;
2704 
2705 	rc = rbd_sysfs_init();
2706 	if (rc)
2707 		return rc;
2708 	pr_info("loaded " RBD_DRV_NAME_LONG "\n");
2709 	return 0;
2710 }
2711 
2712 void __exit rbd_exit(void)
2713 {
2714 	rbd_sysfs_cleanup();
2715 }
2716 
2717 module_init(rbd_init);
2718 module_exit(rbd_exit);
2719 
2720 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
2721 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
2722 MODULE_DESCRIPTION("rados block device");
2723 
2724 /* following authorship retained from original osdblk.c */
2725 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
2726 
2727 MODULE_LICENSE("GPL");
2728