1 /* 2 rbd.c -- Export ceph rados objects as a Linux block device 3 4 5 based on drivers/block/osdblk.c: 6 7 Copyright 2009 Red Hat, Inc. 8 9 This program is free software; you can redistribute it and/or modify 10 it under the terms of the GNU General Public License as published by 11 the Free Software Foundation. 12 13 This program is distributed in the hope that it will be useful, 14 but WITHOUT ANY WARRANTY; without even the implied warranty of 15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 16 GNU General Public License for more details. 17 18 You should have received a copy of the GNU General Public License 19 along with this program; see the file COPYING. If not, write to 20 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA. 21 22 23 24 For usage instructions, please refer to: 25 26 Documentation/ABI/testing/sysfs-bus-rbd 27 28 */ 29 30 #include <linux/ceph/libceph.h> 31 #include <linux/ceph/osd_client.h> 32 #include <linux/ceph/mon_client.h> 33 #include <linux/ceph/decode.h> 34 #include <linux/parser.h> 35 36 #include <linux/kernel.h> 37 #include <linux/device.h> 38 #include <linux/module.h> 39 #include <linux/fs.h> 40 #include <linux/blkdev.h> 41 42 #include "rbd_types.h" 43 44 #define RBD_DEBUG /* Activate rbd_assert() calls */ 45 46 /* 47 * The basic unit of block I/O is a sector. It is interpreted in a 48 * number of contexts in Linux (blk, bio, genhd), but the default is 49 * universally 512 bytes. These symbols are just slightly more 50 * meaningful than the bare numbers they represent. 51 */ 52 #define SECTOR_SHIFT 9 53 #define SECTOR_SIZE (1ULL << SECTOR_SHIFT) 54 55 /* It might be useful to have this defined elsewhere too */ 56 57 #define U64_MAX ((u64) (~0ULL)) 58 59 #define RBD_DRV_NAME "rbd" 60 #define RBD_DRV_NAME_LONG "rbd (rados block device)" 61 62 #define RBD_MINORS_PER_MAJOR 256 /* max minors per blkdev */ 63 64 #define RBD_SNAP_DEV_NAME_PREFIX "snap_" 65 #define RBD_MAX_SNAP_NAME_LEN \ 66 (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1)) 67 68 #define RBD_MAX_SNAP_COUNT 510 /* allows max snapc to fit in 4KB */ 69 #define RBD_MAX_OPT_LEN 1024 70 71 #define RBD_SNAP_HEAD_NAME "-" 72 73 /* This allows a single page to hold an image name sent by OSD */ 74 #define RBD_IMAGE_NAME_LEN_MAX (PAGE_SIZE - sizeof (__le32) - 1) 75 #define RBD_IMAGE_ID_LEN_MAX 64 76 77 #define RBD_OBJ_PREFIX_LEN_MAX 64 78 79 /* Feature bits */ 80 81 #define RBD_FEATURE_LAYERING 1 82 83 /* Features supported by this (client software) implementation. */ 84 85 #define RBD_FEATURES_ALL (0) 86 87 /* 88 * An RBD device name will be "rbd#", where the "rbd" comes from 89 * RBD_DRV_NAME above, and # is a unique integer identifier. 90 * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big 91 * enough to hold all possible device names. 92 */ 93 #define DEV_NAME_LEN 32 94 #define MAX_INT_FORMAT_WIDTH ((5 * sizeof (int)) / 2 + 1) 95 96 #define RBD_READ_ONLY_DEFAULT false 97 98 /* 99 * block device image metadata (in-memory version) 100 */ 101 struct rbd_image_header { 102 /* These four fields never change for a given rbd image */ 103 char *object_prefix; 104 u64 features; 105 __u8 obj_order; 106 __u8 crypt_type; 107 __u8 comp_type; 108 109 /* The remaining fields need to be updated occasionally */ 110 u64 image_size; 111 struct ceph_snap_context *snapc; 112 char *snap_names; 113 u64 *snap_sizes; 114 115 u64 obj_version; 116 }; 117 118 /* 119 * An rbd image specification. 120 * 121 * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely 122 * identify an image. 123 */ 124 struct rbd_spec { 125 u64 pool_id; 126 char *pool_name; 127 128 char *image_id; 129 size_t image_id_len; 130 char *image_name; 131 size_t image_name_len; 132 133 u64 snap_id; 134 char *snap_name; 135 136 struct kref kref; 137 }; 138 139 struct rbd_options { 140 bool read_only; 141 }; 142 143 /* 144 * an instance of the client. multiple devices may share an rbd client. 145 */ 146 struct rbd_client { 147 struct ceph_client *client; 148 struct kref kref; 149 struct list_head node; 150 }; 151 152 /* 153 * a request completion status 154 */ 155 struct rbd_req_status { 156 int done; 157 int rc; 158 u64 bytes; 159 }; 160 161 /* 162 * a collection of requests 163 */ 164 struct rbd_req_coll { 165 int total; 166 int num_done; 167 struct kref kref; 168 struct rbd_req_status status[0]; 169 }; 170 171 /* 172 * a single io request 173 */ 174 struct rbd_request { 175 struct request *rq; /* blk layer request */ 176 struct bio *bio; /* cloned bio */ 177 struct page **pages; /* list of used pages */ 178 u64 len; 179 int coll_index; 180 struct rbd_req_coll *coll; 181 }; 182 183 struct rbd_snap { 184 struct device dev; 185 const char *name; 186 u64 size; 187 struct list_head node; 188 u64 id; 189 u64 features; 190 }; 191 192 struct rbd_mapping { 193 u64 size; 194 u64 features; 195 bool read_only; 196 }; 197 198 /* 199 * a single device 200 */ 201 struct rbd_device { 202 int dev_id; /* blkdev unique id */ 203 204 int major; /* blkdev assigned major */ 205 struct gendisk *disk; /* blkdev's gendisk and rq */ 206 207 u32 image_format; /* Either 1 or 2 */ 208 struct rbd_client *rbd_client; 209 210 char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */ 211 212 spinlock_t lock; /* queue lock */ 213 214 struct rbd_image_header header; 215 bool exists; 216 struct rbd_spec *spec; 217 218 char *header_name; 219 220 struct ceph_osd_event *watch_event; 221 struct ceph_osd_request *watch_request; 222 223 struct rbd_spec *parent_spec; 224 u64 parent_overlap; 225 226 /* protects updating the header */ 227 struct rw_semaphore header_rwsem; 228 229 struct rbd_mapping mapping; 230 231 struct list_head node; 232 233 /* list of snapshots */ 234 struct list_head snaps; 235 236 /* sysfs related */ 237 struct device dev; 238 unsigned long open_count; 239 }; 240 241 static DEFINE_MUTEX(ctl_mutex); /* Serialize open/close/setup/teardown */ 242 243 static LIST_HEAD(rbd_dev_list); /* devices */ 244 static DEFINE_SPINLOCK(rbd_dev_list_lock); 245 246 static LIST_HEAD(rbd_client_list); /* clients */ 247 static DEFINE_SPINLOCK(rbd_client_list_lock); 248 249 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev); 250 static int rbd_dev_snaps_register(struct rbd_device *rbd_dev); 251 252 static void rbd_dev_release(struct device *dev); 253 static void rbd_remove_snap_dev(struct rbd_snap *snap); 254 255 static ssize_t rbd_add(struct bus_type *bus, const char *buf, 256 size_t count); 257 static ssize_t rbd_remove(struct bus_type *bus, const char *buf, 258 size_t count); 259 260 static struct bus_attribute rbd_bus_attrs[] = { 261 __ATTR(add, S_IWUSR, NULL, rbd_add), 262 __ATTR(remove, S_IWUSR, NULL, rbd_remove), 263 __ATTR_NULL 264 }; 265 266 static struct bus_type rbd_bus_type = { 267 .name = "rbd", 268 .bus_attrs = rbd_bus_attrs, 269 }; 270 271 static void rbd_root_dev_release(struct device *dev) 272 { 273 } 274 275 static struct device rbd_root_dev = { 276 .init_name = "rbd", 277 .release = rbd_root_dev_release, 278 }; 279 280 #ifdef RBD_DEBUG 281 #define rbd_assert(expr) \ 282 if (unlikely(!(expr))) { \ 283 printk(KERN_ERR "\nAssertion failure in %s() " \ 284 "at line %d:\n\n" \ 285 "\trbd_assert(%s);\n\n", \ 286 __func__, __LINE__, #expr); \ 287 BUG(); \ 288 } 289 #else /* !RBD_DEBUG */ 290 # define rbd_assert(expr) ((void) 0) 291 #endif /* !RBD_DEBUG */ 292 293 static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver); 294 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver); 295 296 static int rbd_open(struct block_device *bdev, fmode_t mode) 297 { 298 struct rbd_device *rbd_dev = bdev->bd_disk->private_data; 299 300 if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only) 301 return -EROFS; 302 303 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING); 304 (void) get_device(&rbd_dev->dev); 305 set_device_ro(bdev, rbd_dev->mapping.read_only); 306 rbd_dev->open_count++; 307 mutex_unlock(&ctl_mutex); 308 309 return 0; 310 } 311 312 static int rbd_release(struct gendisk *disk, fmode_t mode) 313 { 314 struct rbd_device *rbd_dev = disk->private_data; 315 316 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING); 317 rbd_assert(rbd_dev->open_count > 0); 318 rbd_dev->open_count--; 319 put_device(&rbd_dev->dev); 320 mutex_unlock(&ctl_mutex); 321 322 return 0; 323 } 324 325 static const struct block_device_operations rbd_bd_ops = { 326 .owner = THIS_MODULE, 327 .open = rbd_open, 328 .release = rbd_release, 329 }; 330 331 /* 332 * Initialize an rbd client instance. 333 * We own *ceph_opts. 334 */ 335 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts) 336 { 337 struct rbd_client *rbdc; 338 int ret = -ENOMEM; 339 340 dout("rbd_client_create\n"); 341 rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL); 342 if (!rbdc) 343 goto out_opt; 344 345 kref_init(&rbdc->kref); 346 INIT_LIST_HEAD(&rbdc->node); 347 348 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING); 349 350 rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0); 351 if (IS_ERR(rbdc->client)) 352 goto out_mutex; 353 ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */ 354 355 ret = ceph_open_session(rbdc->client); 356 if (ret < 0) 357 goto out_err; 358 359 spin_lock(&rbd_client_list_lock); 360 list_add_tail(&rbdc->node, &rbd_client_list); 361 spin_unlock(&rbd_client_list_lock); 362 363 mutex_unlock(&ctl_mutex); 364 365 dout("rbd_client_create created %p\n", rbdc); 366 return rbdc; 367 368 out_err: 369 ceph_destroy_client(rbdc->client); 370 out_mutex: 371 mutex_unlock(&ctl_mutex); 372 kfree(rbdc); 373 out_opt: 374 if (ceph_opts) 375 ceph_destroy_options(ceph_opts); 376 return ERR_PTR(ret); 377 } 378 379 /* 380 * Find a ceph client with specific addr and configuration. If 381 * found, bump its reference count. 382 */ 383 static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts) 384 { 385 struct rbd_client *client_node; 386 bool found = false; 387 388 if (ceph_opts->flags & CEPH_OPT_NOSHARE) 389 return NULL; 390 391 spin_lock(&rbd_client_list_lock); 392 list_for_each_entry(client_node, &rbd_client_list, node) { 393 if (!ceph_compare_options(ceph_opts, client_node->client)) { 394 kref_get(&client_node->kref); 395 found = true; 396 break; 397 } 398 } 399 spin_unlock(&rbd_client_list_lock); 400 401 return found ? client_node : NULL; 402 } 403 404 /* 405 * mount options 406 */ 407 enum { 408 Opt_last_int, 409 /* int args above */ 410 Opt_last_string, 411 /* string args above */ 412 Opt_read_only, 413 Opt_read_write, 414 /* Boolean args above */ 415 Opt_last_bool, 416 }; 417 418 static match_table_t rbd_opts_tokens = { 419 /* int args above */ 420 /* string args above */ 421 {Opt_read_only, "read_only"}, 422 {Opt_read_only, "ro"}, /* Alternate spelling */ 423 {Opt_read_write, "read_write"}, 424 {Opt_read_write, "rw"}, /* Alternate spelling */ 425 /* Boolean args above */ 426 {-1, NULL} 427 }; 428 429 static int parse_rbd_opts_token(char *c, void *private) 430 { 431 struct rbd_options *rbd_opts = private; 432 substring_t argstr[MAX_OPT_ARGS]; 433 int token, intval, ret; 434 435 token = match_token(c, rbd_opts_tokens, argstr); 436 if (token < 0) 437 return -EINVAL; 438 439 if (token < Opt_last_int) { 440 ret = match_int(&argstr[0], &intval); 441 if (ret < 0) { 442 pr_err("bad mount option arg (not int) " 443 "at '%s'\n", c); 444 return ret; 445 } 446 dout("got int token %d val %d\n", token, intval); 447 } else if (token > Opt_last_int && token < Opt_last_string) { 448 dout("got string token %d val %s\n", token, 449 argstr[0].from); 450 } else if (token > Opt_last_string && token < Opt_last_bool) { 451 dout("got Boolean token %d\n", token); 452 } else { 453 dout("got token %d\n", token); 454 } 455 456 switch (token) { 457 case Opt_read_only: 458 rbd_opts->read_only = true; 459 break; 460 case Opt_read_write: 461 rbd_opts->read_only = false; 462 break; 463 default: 464 rbd_assert(false); 465 break; 466 } 467 return 0; 468 } 469 470 /* 471 * Get a ceph client with specific addr and configuration, if one does 472 * not exist create it. 473 */ 474 static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts) 475 { 476 struct rbd_client *rbdc; 477 478 rbdc = rbd_client_find(ceph_opts); 479 if (rbdc) /* using an existing client */ 480 ceph_destroy_options(ceph_opts); 481 else 482 rbdc = rbd_client_create(ceph_opts); 483 484 return rbdc; 485 } 486 487 /* 488 * Destroy ceph client 489 * 490 * Caller must hold rbd_client_list_lock. 491 */ 492 static void rbd_client_release(struct kref *kref) 493 { 494 struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref); 495 496 dout("rbd_release_client %p\n", rbdc); 497 spin_lock(&rbd_client_list_lock); 498 list_del(&rbdc->node); 499 spin_unlock(&rbd_client_list_lock); 500 501 ceph_destroy_client(rbdc->client); 502 kfree(rbdc); 503 } 504 505 /* 506 * Drop reference to ceph client node. If it's not referenced anymore, release 507 * it. 508 */ 509 static void rbd_put_client(struct rbd_client *rbdc) 510 { 511 if (rbdc) 512 kref_put(&rbdc->kref, rbd_client_release); 513 } 514 515 /* 516 * Destroy requests collection 517 */ 518 static void rbd_coll_release(struct kref *kref) 519 { 520 struct rbd_req_coll *coll = 521 container_of(kref, struct rbd_req_coll, kref); 522 523 dout("rbd_coll_release %p\n", coll); 524 kfree(coll); 525 } 526 527 static bool rbd_image_format_valid(u32 image_format) 528 { 529 return image_format == 1 || image_format == 2; 530 } 531 532 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk) 533 { 534 size_t size; 535 u32 snap_count; 536 537 /* The header has to start with the magic rbd header text */ 538 if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT))) 539 return false; 540 541 /* The bio layer requires at least sector-sized I/O */ 542 543 if (ondisk->options.order < SECTOR_SHIFT) 544 return false; 545 546 /* If we use u64 in a few spots we may be able to loosen this */ 547 548 if (ondisk->options.order > 8 * sizeof (int) - 1) 549 return false; 550 551 /* 552 * The size of a snapshot header has to fit in a size_t, and 553 * that limits the number of snapshots. 554 */ 555 snap_count = le32_to_cpu(ondisk->snap_count); 556 size = SIZE_MAX - sizeof (struct ceph_snap_context); 557 if (snap_count > size / sizeof (__le64)) 558 return false; 559 560 /* 561 * Not only that, but the size of the entire the snapshot 562 * header must also be representable in a size_t. 563 */ 564 size -= snap_count * sizeof (__le64); 565 if ((u64) size < le64_to_cpu(ondisk->snap_names_len)) 566 return false; 567 568 return true; 569 } 570 571 /* 572 * Create a new header structure, translate header format from the on-disk 573 * header. 574 */ 575 static int rbd_header_from_disk(struct rbd_image_header *header, 576 struct rbd_image_header_ondisk *ondisk) 577 { 578 u32 snap_count; 579 size_t len; 580 size_t size; 581 u32 i; 582 583 memset(header, 0, sizeof (*header)); 584 585 snap_count = le32_to_cpu(ondisk->snap_count); 586 587 len = strnlen(ondisk->object_prefix, sizeof (ondisk->object_prefix)); 588 header->object_prefix = kmalloc(len + 1, GFP_KERNEL); 589 if (!header->object_prefix) 590 return -ENOMEM; 591 memcpy(header->object_prefix, ondisk->object_prefix, len); 592 header->object_prefix[len] = '\0'; 593 594 if (snap_count) { 595 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len); 596 597 /* Save a copy of the snapshot names */ 598 599 if (snap_names_len > (u64) SIZE_MAX) 600 return -EIO; 601 header->snap_names = kmalloc(snap_names_len, GFP_KERNEL); 602 if (!header->snap_names) 603 goto out_err; 604 /* 605 * Note that rbd_dev_v1_header_read() guarantees 606 * the ondisk buffer we're working with has 607 * snap_names_len bytes beyond the end of the 608 * snapshot id array, this memcpy() is safe. 609 */ 610 memcpy(header->snap_names, &ondisk->snaps[snap_count], 611 snap_names_len); 612 613 /* Record each snapshot's size */ 614 615 size = snap_count * sizeof (*header->snap_sizes); 616 header->snap_sizes = kmalloc(size, GFP_KERNEL); 617 if (!header->snap_sizes) 618 goto out_err; 619 for (i = 0; i < snap_count; i++) 620 header->snap_sizes[i] = 621 le64_to_cpu(ondisk->snaps[i].image_size); 622 } else { 623 WARN_ON(ondisk->snap_names_len); 624 header->snap_names = NULL; 625 header->snap_sizes = NULL; 626 } 627 628 header->features = 0; /* No features support in v1 images */ 629 header->obj_order = ondisk->options.order; 630 header->crypt_type = ondisk->options.crypt_type; 631 header->comp_type = ondisk->options.comp_type; 632 633 /* Allocate and fill in the snapshot context */ 634 635 header->image_size = le64_to_cpu(ondisk->image_size); 636 size = sizeof (struct ceph_snap_context); 637 size += snap_count * sizeof (header->snapc->snaps[0]); 638 header->snapc = kzalloc(size, GFP_KERNEL); 639 if (!header->snapc) 640 goto out_err; 641 642 atomic_set(&header->snapc->nref, 1); 643 header->snapc->seq = le64_to_cpu(ondisk->snap_seq); 644 header->snapc->num_snaps = snap_count; 645 for (i = 0; i < snap_count; i++) 646 header->snapc->snaps[i] = 647 le64_to_cpu(ondisk->snaps[i].id); 648 649 return 0; 650 651 out_err: 652 kfree(header->snap_sizes); 653 header->snap_sizes = NULL; 654 kfree(header->snap_names); 655 header->snap_names = NULL; 656 kfree(header->object_prefix); 657 header->object_prefix = NULL; 658 659 return -ENOMEM; 660 } 661 662 static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id) 663 { 664 struct rbd_snap *snap; 665 666 if (snap_id == CEPH_NOSNAP) 667 return RBD_SNAP_HEAD_NAME; 668 669 list_for_each_entry(snap, &rbd_dev->snaps, node) 670 if (snap_id == snap->id) 671 return snap->name; 672 673 return NULL; 674 } 675 676 static int snap_by_name(struct rbd_device *rbd_dev, const char *snap_name) 677 { 678 679 struct rbd_snap *snap; 680 681 list_for_each_entry(snap, &rbd_dev->snaps, node) { 682 if (!strcmp(snap_name, snap->name)) { 683 rbd_dev->spec->snap_id = snap->id; 684 rbd_dev->mapping.size = snap->size; 685 rbd_dev->mapping.features = snap->features; 686 687 return 0; 688 } 689 } 690 691 return -ENOENT; 692 } 693 694 static int rbd_dev_set_mapping(struct rbd_device *rbd_dev) 695 { 696 int ret; 697 698 if (!memcmp(rbd_dev->spec->snap_name, RBD_SNAP_HEAD_NAME, 699 sizeof (RBD_SNAP_HEAD_NAME))) { 700 rbd_dev->spec->snap_id = CEPH_NOSNAP; 701 rbd_dev->mapping.size = rbd_dev->header.image_size; 702 rbd_dev->mapping.features = rbd_dev->header.features; 703 ret = 0; 704 } else { 705 ret = snap_by_name(rbd_dev, rbd_dev->spec->snap_name); 706 if (ret < 0) 707 goto done; 708 rbd_dev->mapping.read_only = true; 709 } 710 rbd_dev->exists = true; 711 done: 712 return ret; 713 } 714 715 static void rbd_header_free(struct rbd_image_header *header) 716 { 717 kfree(header->object_prefix); 718 header->object_prefix = NULL; 719 kfree(header->snap_sizes); 720 header->snap_sizes = NULL; 721 kfree(header->snap_names); 722 header->snap_names = NULL; 723 ceph_put_snap_context(header->snapc); 724 header->snapc = NULL; 725 } 726 727 static char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset) 728 { 729 char *name; 730 u64 segment; 731 int ret; 732 733 name = kmalloc(MAX_OBJ_NAME_SIZE + 1, GFP_NOIO); 734 if (!name) 735 return NULL; 736 segment = offset >> rbd_dev->header.obj_order; 737 ret = snprintf(name, MAX_OBJ_NAME_SIZE + 1, "%s.%012llx", 738 rbd_dev->header.object_prefix, segment); 739 if (ret < 0 || ret > MAX_OBJ_NAME_SIZE) { 740 pr_err("error formatting segment name for #%llu (%d)\n", 741 segment, ret); 742 kfree(name); 743 name = NULL; 744 } 745 746 return name; 747 } 748 749 static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset) 750 { 751 u64 segment_size = (u64) 1 << rbd_dev->header.obj_order; 752 753 return offset & (segment_size - 1); 754 } 755 756 static u64 rbd_segment_length(struct rbd_device *rbd_dev, 757 u64 offset, u64 length) 758 { 759 u64 segment_size = (u64) 1 << rbd_dev->header.obj_order; 760 761 offset &= segment_size - 1; 762 763 rbd_assert(length <= U64_MAX - offset); 764 if (offset + length > segment_size) 765 length = segment_size - offset; 766 767 return length; 768 } 769 770 static int rbd_get_num_segments(struct rbd_image_header *header, 771 u64 ofs, u64 len) 772 { 773 u64 start_seg; 774 u64 end_seg; 775 776 if (!len) 777 return 0; 778 if (len - 1 > U64_MAX - ofs) 779 return -ERANGE; 780 781 start_seg = ofs >> header->obj_order; 782 end_seg = (ofs + len - 1) >> header->obj_order; 783 784 return end_seg - start_seg + 1; 785 } 786 787 /* 788 * returns the size of an object in the image 789 */ 790 static u64 rbd_obj_bytes(struct rbd_image_header *header) 791 { 792 return 1 << header->obj_order; 793 } 794 795 /* 796 * bio helpers 797 */ 798 799 static void bio_chain_put(struct bio *chain) 800 { 801 struct bio *tmp; 802 803 while (chain) { 804 tmp = chain; 805 chain = chain->bi_next; 806 bio_put(tmp); 807 } 808 } 809 810 /* 811 * zeros a bio chain, starting at specific offset 812 */ 813 static void zero_bio_chain(struct bio *chain, int start_ofs) 814 { 815 struct bio_vec *bv; 816 unsigned long flags; 817 void *buf; 818 int i; 819 int pos = 0; 820 821 while (chain) { 822 bio_for_each_segment(bv, chain, i) { 823 if (pos + bv->bv_len > start_ofs) { 824 int remainder = max(start_ofs - pos, 0); 825 buf = bvec_kmap_irq(bv, &flags); 826 memset(buf + remainder, 0, 827 bv->bv_len - remainder); 828 bvec_kunmap_irq(buf, &flags); 829 } 830 pos += bv->bv_len; 831 } 832 833 chain = chain->bi_next; 834 } 835 } 836 837 /* 838 * Clone a portion of a bio, starting at the given byte offset 839 * and continuing for the number of bytes indicated. 840 */ 841 static struct bio *bio_clone_range(struct bio *bio_src, 842 unsigned int offset, 843 unsigned int len, 844 gfp_t gfpmask) 845 { 846 struct bio_vec *bv; 847 unsigned int resid; 848 unsigned short idx; 849 unsigned int voff; 850 unsigned short end_idx; 851 unsigned short vcnt; 852 struct bio *bio; 853 854 /* Handle the easy case for the caller */ 855 856 if (!offset && len == bio_src->bi_size) 857 return bio_clone(bio_src, gfpmask); 858 859 if (WARN_ON_ONCE(!len)) 860 return NULL; 861 if (WARN_ON_ONCE(len > bio_src->bi_size)) 862 return NULL; 863 if (WARN_ON_ONCE(offset > bio_src->bi_size - len)) 864 return NULL; 865 866 /* Find first affected segment... */ 867 868 resid = offset; 869 __bio_for_each_segment(bv, bio_src, idx, 0) { 870 if (resid < bv->bv_len) 871 break; 872 resid -= bv->bv_len; 873 } 874 voff = resid; 875 876 /* ...and the last affected segment */ 877 878 resid += len; 879 __bio_for_each_segment(bv, bio_src, end_idx, idx) { 880 if (resid <= bv->bv_len) 881 break; 882 resid -= bv->bv_len; 883 } 884 vcnt = end_idx - idx + 1; 885 886 /* Build the clone */ 887 888 bio = bio_alloc(gfpmask, (unsigned int) vcnt); 889 if (!bio) 890 return NULL; /* ENOMEM */ 891 892 bio->bi_bdev = bio_src->bi_bdev; 893 bio->bi_sector = bio_src->bi_sector + (offset >> SECTOR_SHIFT); 894 bio->bi_rw = bio_src->bi_rw; 895 bio->bi_flags |= 1 << BIO_CLONED; 896 897 /* 898 * Copy over our part of the bio_vec, then update the first 899 * and last (or only) entries. 900 */ 901 memcpy(&bio->bi_io_vec[0], &bio_src->bi_io_vec[idx], 902 vcnt * sizeof (struct bio_vec)); 903 bio->bi_io_vec[0].bv_offset += voff; 904 if (vcnt > 1) { 905 bio->bi_io_vec[0].bv_len -= voff; 906 bio->bi_io_vec[vcnt - 1].bv_len = resid; 907 } else { 908 bio->bi_io_vec[0].bv_len = len; 909 } 910 911 bio->bi_vcnt = vcnt; 912 bio->bi_size = len; 913 bio->bi_idx = 0; 914 915 return bio; 916 } 917 918 /* 919 * Clone a portion of a bio chain, starting at the given byte offset 920 * into the first bio in the source chain and continuing for the 921 * number of bytes indicated. The result is another bio chain of 922 * exactly the given length, or a null pointer on error. 923 * 924 * The bio_src and offset parameters are both in-out. On entry they 925 * refer to the first source bio and the offset into that bio where 926 * the start of data to be cloned is located. 927 * 928 * On return, bio_src is updated to refer to the bio in the source 929 * chain that contains first un-cloned byte, and *offset will 930 * contain the offset of that byte within that bio. 931 */ 932 static struct bio *bio_chain_clone_range(struct bio **bio_src, 933 unsigned int *offset, 934 unsigned int len, 935 gfp_t gfpmask) 936 { 937 struct bio *bi = *bio_src; 938 unsigned int off = *offset; 939 struct bio *chain = NULL; 940 struct bio **end; 941 942 /* Build up a chain of clone bios up to the limit */ 943 944 if (!bi || off >= bi->bi_size || !len) 945 return NULL; /* Nothing to clone */ 946 947 end = &chain; 948 while (len) { 949 unsigned int bi_size; 950 struct bio *bio; 951 952 if (!bi) 953 goto out_err; /* EINVAL; ran out of bio's */ 954 bi_size = min_t(unsigned int, bi->bi_size - off, len); 955 bio = bio_clone_range(bi, off, bi_size, gfpmask); 956 if (!bio) 957 goto out_err; /* ENOMEM */ 958 959 *end = bio; 960 end = &bio->bi_next; 961 962 off += bi_size; 963 if (off == bi->bi_size) { 964 bi = bi->bi_next; 965 off = 0; 966 } 967 len -= bi_size; 968 } 969 *bio_src = bi; 970 *offset = off; 971 972 return chain; 973 out_err: 974 bio_chain_put(chain); 975 976 return NULL; 977 } 978 979 /* 980 * helpers for osd request op vectors. 981 */ 982 static struct ceph_osd_req_op *rbd_create_rw_ops(int num_ops, 983 int opcode, u32 payload_len) 984 { 985 struct ceph_osd_req_op *ops; 986 987 ops = kzalloc(sizeof (*ops) * (num_ops + 1), GFP_NOIO); 988 if (!ops) 989 return NULL; 990 991 ops[0].op = opcode; 992 993 /* 994 * op extent offset and length will be set later on 995 * in calc_raw_layout() 996 */ 997 ops[0].payload_len = payload_len; 998 999 return ops; 1000 } 1001 1002 static void rbd_destroy_ops(struct ceph_osd_req_op *ops) 1003 { 1004 kfree(ops); 1005 } 1006 1007 static void rbd_coll_end_req_index(struct request *rq, 1008 struct rbd_req_coll *coll, 1009 int index, 1010 int ret, u64 len) 1011 { 1012 struct request_queue *q; 1013 int min, max, i; 1014 1015 dout("rbd_coll_end_req_index %p index %d ret %d len %llu\n", 1016 coll, index, ret, (unsigned long long) len); 1017 1018 if (!rq) 1019 return; 1020 1021 if (!coll) { 1022 blk_end_request(rq, ret, len); 1023 return; 1024 } 1025 1026 q = rq->q; 1027 1028 spin_lock_irq(q->queue_lock); 1029 coll->status[index].done = 1; 1030 coll->status[index].rc = ret; 1031 coll->status[index].bytes = len; 1032 max = min = coll->num_done; 1033 while (max < coll->total && coll->status[max].done) 1034 max++; 1035 1036 for (i = min; i<max; i++) { 1037 __blk_end_request(rq, coll->status[i].rc, 1038 coll->status[i].bytes); 1039 coll->num_done++; 1040 kref_put(&coll->kref, rbd_coll_release); 1041 } 1042 spin_unlock_irq(q->queue_lock); 1043 } 1044 1045 static void rbd_coll_end_req(struct rbd_request *req, 1046 int ret, u64 len) 1047 { 1048 rbd_coll_end_req_index(req->rq, req->coll, req->coll_index, ret, len); 1049 } 1050 1051 /* 1052 * Send ceph osd request 1053 */ 1054 static int rbd_do_request(struct request *rq, 1055 struct rbd_device *rbd_dev, 1056 struct ceph_snap_context *snapc, 1057 u64 snapid, 1058 const char *object_name, u64 ofs, u64 len, 1059 struct bio *bio, 1060 struct page **pages, 1061 int num_pages, 1062 int flags, 1063 struct ceph_osd_req_op *ops, 1064 struct rbd_req_coll *coll, 1065 int coll_index, 1066 void (*rbd_cb)(struct ceph_osd_request *req, 1067 struct ceph_msg *msg), 1068 struct ceph_osd_request **linger_req, 1069 u64 *ver) 1070 { 1071 struct ceph_osd_request *req; 1072 struct ceph_file_layout *layout; 1073 int ret; 1074 u64 bno; 1075 struct timespec mtime = CURRENT_TIME; 1076 struct rbd_request *req_data; 1077 struct ceph_osd_request_head *reqhead; 1078 struct ceph_osd_client *osdc; 1079 1080 req_data = kzalloc(sizeof(*req_data), GFP_NOIO); 1081 if (!req_data) { 1082 if (coll) 1083 rbd_coll_end_req_index(rq, coll, coll_index, 1084 -ENOMEM, len); 1085 return -ENOMEM; 1086 } 1087 1088 if (coll) { 1089 req_data->coll = coll; 1090 req_data->coll_index = coll_index; 1091 } 1092 1093 dout("rbd_do_request object_name=%s ofs=%llu len=%llu coll=%p[%d]\n", 1094 object_name, (unsigned long long) ofs, 1095 (unsigned long long) len, coll, coll_index); 1096 1097 osdc = &rbd_dev->rbd_client->client->osdc; 1098 req = ceph_osdc_alloc_request(osdc, flags, snapc, ops, 1099 false, GFP_NOIO, pages, bio); 1100 if (!req) { 1101 ret = -ENOMEM; 1102 goto done_pages; 1103 } 1104 1105 req->r_callback = rbd_cb; 1106 1107 req_data->rq = rq; 1108 req_data->bio = bio; 1109 req_data->pages = pages; 1110 req_data->len = len; 1111 1112 req->r_priv = req_data; 1113 1114 reqhead = req->r_request->front.iov_base; 1115 reqhead->snapid = cpu_to_le64(CEPH_NOSNAP); 1116 1117 strncpy(req->r_oid, object_name, sizeof(req->r_oid)); 1118 req->r_oid_len = strlen(req->r_oid); 1119 1120 layout = &req->r_file_layout; 1121 memset(layout, 0, sizeof(*layout)); 1122 layout->fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER); 1123 layout->fl_stripe_count = cpu_to_le32(1); 1124 layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER); 1125 layout->fl_pg_pool = cpu_to_le32((int) rbd_dev->spec->pool_id); 1126 ret = ceph_calc_raw_layout(osdc, layout, snapid, ofs, &len, &bno, 1127 req, ops); 1128 rbd_assert(ret == 0); 1129 1130 ceph_osdc_build_request(req, ofs, &len, 1131 ops, 1132 snapc, 1133 &mtime, 1134 req->r_oid, req->r_oid_len); 1135 1136 if (linger_req) { 1137 ceph_osdc_set_request_linger(osdc, req); 1138 *linger_req = req; 1139 } 1140 1141 ret = ceph_osdc_start_request(osdc, req, false); 1142 if (ret < 0) 1143 goto done_err; 1144 1145 if (!rbd_cb) { 1146 ret = ceph_osdc_wait_request(osdc, req); 1147 if (ver) 1148 *ver = le64_to_cpu(req->r_reassert_version.version); 1149 dout("reassert_ver=%llu\n", 1150 (unsigned long long) 1151 le64_to_cpu(req->r_reassert_version.version)); 1152 ceph_osdc_put_request(req); 1153 } 1154 return ret; 1155 1156 done_err: 1157 bio_chain_put(req_data->bio); 1158 ceph_osdc_put_request(req); 1159 done_pages: 1160 rbd_coll_end_req(req_data, ret, len); 1161 kfree(req_data); 1162 return ret; 1163 } 1164 1165 /* 1166 * Ceph osd op callback 1167 */ 1168 static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg) 1169 { 1170 struct rbd_request *req_data = req->r_priv; 1171 struct ceph_osd_reply_head *replyhead; 1172 struct ceph_osd_op *op; 1173 __s32 rc; 1174 u64 bytes; 1175 int read_op; 1176 1177 /* parse reply */ 1178 replyhead = msg->front.iov_base; 1179 WARN_ON(le32_to_cpu(replyhead->num_ops) == 0); 1180 op = (void *)(replyhead + 1); 1181 rc = le32_to_cpu(replyhead->result); 1182 bytes = le64_to_cpu(op->extent.length); 1183 read_op = (le16_to_cpu(op->op) == CEPH_OSD_OP_READ); 1184 1185 dout("rbd_req_cb bytes=%llu readop=%d rc=%d\n", 1186 (unsigned long long) bytes, read_op, (int) rc); 1187 1188 if (rc == -ENOENT && read_op) { 1189 zero_bio_chain(req_data->bio, 0); 1190 rc = 0; 1191 } else if (rc == 0 && read_op && bytes < req_data->len) { 1192 zero_bio_chain(req_data->bio, bytes); 1193 bytes = req_data->len; 1194 } 1195 1196 rbd_coll_end_req(req_data, rc, bytes); 1197 1198 if (req_data->bio) 1199 bio_chain_put(req_data->bio); 1200 1201 ceph_osdc_put_request(req); 1202 kfree(req_data); 1203 } 1204 1205 static void rbd_simple_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg) 1206 { 1207 ceph_osdc_put_request(req); 1208 } 1209 1210 /* 1211 * Do a synchronous ceph osd operation 1212 */ 1213 static int rbd_req_sync_op(struct rbd_device *rbd_dev, 1214 struct ceph_snap_context *snapc, 1215 u64 snapid, 1216 int flags, 1217 struct ceph_osd_req_op *ops, 1218 const char *object_name, 1219 u64 ofs, u64 inbound_size, 1220 char *inbound, 1221 struct ceph_osd_request **linger_req, 1222 u64 *ver) 1223 { 1224 int ret; 1225 struct page **pages; 1226 int num_pages; 1227 1228 rbd_assert(ops != NULL); 1229 1230 num_pages = calc_pages_for(ofs, inbound_size); 1231 pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL); 1232 if (IS_ERR(pages)) 1233 return PTR_ERR(pages); 1234 1235 ret = rbd_do_request(NULL, rbd_dev, snapc, snapid, 1236 object_name, ofs, inbound_size, NULL, 1237 pages, num_pages, 1238 flags, 1239 ops, 1240 NULL, 0, 1241 NULL, 1242 linger_req, ver); 1243 if (ret < 0) 1244 goto done; 1245 1246 if ((flags & CEPH_OSD_FLAG_READ) && inbound) 1247 ret = ceph_copy_from_page_vector(pages, inbound, ofs, ret); 1248 1249 done: 1250 ceph_release_page_vector(pages, num_pages); 1251 return ret; 1252 } 1253 1254 /* 1255 * Do an asynchronous ceph osd operation 1256 */ 1257 static int rbd_do_op(struct request *rq, 1258 struct rbd_device *rbd_dev, 1259 struct ceph_snap_context *snapc, 1260 u64 ofs, u64 len, 1261 struct bio *bio, 1262 struct rbd_req_coll *coll, 1263 int coll_index) 1264 { 1265 char *seg_name; 1266 u64 seg_ofs; 1267 u64 seg_len; 1268 int ret; 1269 struct ceph_osd_req_op *ops; 1270 u32 payload_len; 1271 int opcode; 1272 int flags; 1273 u64 snapid; 1274 1275 seg_name = rbd_segment_name(rbd_dev, ofs); 1276 if (!seg_name) 1277 return -ENOMEM; 1278 seg_len = rbd_segment_length(rbd_dev, ofs, len); 1279 seg_ofs = rbd_segment_offset(rbd_dev, ofs); 1280 1281 if (rq_data_dir(rq) == WRITE) { 1282 opcode = CEPH_OSD_OP_WRITE; 1283 flags = CEPH_OSD_FLAG_WRITE|CEPH_OSD_FLAG_ONDISK; 1284 snapid = CEPH_NOSNAP; 1285 payload_len = seg_len; 1286 } else { 1287 opcode = CEPH_OSD_OP_READ; 1288 flags = CEPH_OSD_FLAG_READ; 1289 snapc = NULL; 1290 snapid = rbd_dev->spec->snap_id; 1291 payload_len = 0; 1292 } 1293 1294 ret = -ENOMEM; 1295 ops = rbd_create_rw_ops(1, opcode, payload_len); 1296 if (!ops) 1297 goto done; 1298 1299 /* we've taken care of segment sizes earlier when we 1300 cloned the bios. We should never have a segment 1301 truncated at this point */ 1302 rbd_assert(seg_len == len); 1303 1304 ret = rbd_do_request(rq, rbd_dev, snapc, snapid, 1305 seg_name, seg_ofs, seg_len, 1306 bio, 1307 NULL, 0, 1308 flags, 1309 ops, 1310 coll, coll_index, 1311 rbd_req_cb, 0, NULL); 1312 1313 rbd_destroy_ops(ops); 1314 done: 1315 kfree(seg_name); 1316 return ret; 1317 } 1318 1319 /* 1320 * Request sync osd read 1321 */ 1322 static int rbd_req_sync_read(struct rbd_device *rbd_dev, 1323 u64 snapid, 1324 const char *object_name, 1325 u64 ofs, u64 len, 1326 char *buf, 1327 u64 *ver) 1328 { 1329 struct ceph_osd_req_op *ops; 1330 int ret; 1331 1332 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_READ, 0); 1333 if (!ops) 1334 return -ENOMEM; 1335 1336 ret = rbd_req_sync_op(rbd_dev, NULL, 1337 snapid, 1338 CEPH_OSD_FLAG_READ, 1339 ops, object_name, ofs, len, buf, NULL, ver); 1340 rbd_destroy_ops(ops); 1341 1342 return ret; 1343 } 1344 1345 /* 1346 * Request sync osd watch 1347 */ 1348 static int rbd_req_sync_notify_ack(struct rbd_device *rbd_dev, 1349 u64 ver, 1350 u64 notify_id) 1351 { 1352 struct ceph_osd_req_op *ops; 1353 int ret; 1354 1355 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_NOTIFY_ACK, 0); 1356 if (!ops) 1357 return -ENOMEM; 1358 1359 ops[0].watch.ver = cpu_to_le64(ver); 1360 ops[0].watch.cookie = notify_id; 1361 ops[0].watch.flag = 0; 1362 1363 ret = rbd_do_request(NULL, rbd_dev, NULL, CEPH_NOSNAP, 1364 rbd_dev->header_name, 0, 0, NULL, 1365 NULL, 0, 1366 CEPH_OSD_FLAG_READ, 1367 ops, 1368 NULL, 0, 1369 rbd_simple_req_cb, 0, NULL); 1370 1371 rbd_destroy_ops(ops); 1372 return ret; 1373 } 1374 1375 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data) 1376 { 1377 struct rbd_device *rbd_dev = (struct rbd_device *)data; 1378 u64 hver; 1379 int rc; 1380 1381 if (!rbd_dev) 1382 return; 1383 1384 dout("rbd_watch_cb %s notify_id=%llu opcode=%u\n", 1385 rbd_dev->header_name, (unsigned long long) notify_id, 1386 (unsigned int) opcode); 1387 rc = rbd_dev_refresh(rbd_dev, &hver); 1388 if (rc) 1389 pr_warning(RBD_DRV_NAME "%d got notification but failed to " 1390 " update snaps: %d\n", rbd_dev->major, rc); 1391 1392 rbd_req_sync_notify_ack(rbd_dev, hver, notify_id); 1393 } 1394 1395 /* 1396 * Request sync osd watch 1397 */ 1398 static int rbd_req_sync_watch(struct rbd_device *rbd_dev) 1399 { 1400 struct ceph_osd_req_op *ops; 1401 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; 1402 int ret; 1403 1404 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH, 0); 1405 if (!ops) 1406 return -ENOMEM; 1407 1408 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, 0, 1409 (void *)rbd_dev, &rbd_dev->watch_event); 1410 if (ret < 0) 1411 goto fail; 1412 1413 ops[0].watch.ver = cpu_to_le64(rbd_dev->header.obj_version); 1414 ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie); 1415 ops[0].watch.flag = 1; 1416 1417 ret = rbd_req_sync_op(rbd_dev, NULL, 1418 CEPH_NOSNAP, 1419 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK, 1420 ops, 1421 rbd_dev->header_name, 1422 0, 0, NULL, 1423 &rbd_dev->watch_request, NULL); 1424 1425 if (ret < 0) 1426 goto fail_event; 1427 1428 rbd_destroy_ops(ops); 1429 return 0; 1430 1431 fail_event: 1432 ceph_osdc_cancel_event(rbd_dev->watch_event); 1433 rbd_dev->watch_event = NULL; 1434 fail: 1435 rbd_destroy_ops(ops); 1436 return ret; 1437 } 1438 1439 /* 1440 * Request sync osd unwatch 1441 */ 1442 static int rbd_req_sync_unwatch(struct rbd_device *rbd_dev) 1443 { 1444 struct ceph_osd_req_op *ops; 1445 int ret; 1446 1447 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH, 0); 1448 if (!ops) 1449 return -ENOMEM; 1450 1451 ops[0].watch.ver = 0; 1452 ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie); 1453 ops[0].watch.flag = 0; 1454 1455 ret = rbd_req_sync_op(rbd_dev, NULL, 1456 CEPH_NOSNAP, 1457 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK, 1458 ops, 1459 rbd_dev->header_name, 1460 0, 0, NULL, NULL, NULL); 1461 1462 1463 rbd_destroy_ops(ops); 1464 ceph_osdc_cancel_event(rbd_dev->watch_event); 1465 rbd_dev->watch_event = NULL; 1466 return ret; 1467 } 1468 1469 /* 1470 * Synchronous osd object method call 1471 */ 1472 static int rbd_req_sync_exec(struct rbd_device *rbd_dev, 1473 const char *object_name, 1474 const char *class_name, 1475 const char *method_name, 1476 const char *outbound, 1477 size_t outbound_size, 1478 char *inbound, 1479 size_t inbound_size, 1480 int flags, 1481 u64 *ver) 1482 { 1483 struct ceph_osd_req_op *ops; 1484 int class_name_len = strlen(class_name); 1485 int method_name_len = strlen(method_name); 1486 int payload_size; 1487 int ret; 1488 1489 /* 1490 * Any input parameters required by the method we're calling 1491 * will be sent along with the class and method names as 1492 * part of the message payload. That data and its size are 1493 * supplied via the indata and indata_len fields (named from 1494 * the perspective of the server side) in the OSD request 1495 * operation. 1496 */ 1497 payload_size = class_name_len + method_name_len + outbound_size; 1498 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_CALL, payload_size); 1499 if (!ops) 1500 return -ENOMEM; 1501 1502 ops[0].cls.class_name = class_name; 1503 ops[0].cls.class_len = (__u8) class_name_len; 1504 ops[0].cls.method_name = method_name; 1505 ops[0].cls.method_len = (__u8) method_name_len; 1506 ops[0].cls.argc = 0; 1507 ops[0].cls.indata = outbound; 1508 ops[0].cls.indata_len = outbound_size; 1509 1510 ret = rbd_req_sync_op(rbd_dev, NULL, 1511 CEPH_NOSNAP, 1512 flags, ops, 1513 object_name, 0, inbound_size, inbound, 1514 NULL, ver); 1515 1516 rbd_destroy_ops(ops); 1517 1518 dout("cls_exec returned %d\n", ret); 1519 return ret; 1520 } 1521 1522 static struct rbd_req_coll *rbd_alloc_coll(int num_reqs) 1523 { 1524 struct rbd_req_coll *coll = 1525 kzalloc(sizeof(struct rbd_req_coll) + 1526 sizeof(struct rbd_req_status) * num_reqs, 1527 GFP_ATOMIC); 1528 1529 if (!coll) 1530 return NULL; 1531 coll->total = num_reqs; 1532 kref_init(&coll->kref); 1533 return coll; 1534 } 1535 1536 /* 1537 * block device queue callback 1538 */ 1539 static void rbd_rq_fn(struct request_queue *q) 1540 { 1541 struct rbd_device *rbd_dev = q->queuedata; 1542 struct request *rq; 1543 1544 while ((rq = blk_fetch_request(q))) { 1545 struct bio *bio; 1546 bool do_write; 1547 unsigned int size; 1548 u64 ofs; 1549 int num_segs, cur_seg = 0; 1550 struct rbd_req_coll *coll; 1551 struct ceph_snap_context *snapc; 1552 unsigned int bio_offset; 1553 1554 dout("fetched request\n"); 1555 1556 /* filter out block requests we don't understand */ 1557 if ((rq->cmd_type != REQ_TYPE_FS)) { 1558 __blk_end_request_all(rq, 0); 1559 continue; 1560 } 1561 1562 /* deduce our operation (read, write) */ 1563 do_write = (rq_data_dir(rq) == WRITE); 1564 if (do_write && rbd_dev->mapping.read_only) { 1565 __blk_end_request_all(rq, -EROFS); 1566 continue; 1567 } 1568 1569 spin_unlock_irq(q->queue_lock); 1570 1571 down_read(&rbd_dev->header_rwsem); 1572 1573 if (!rbd_dev->exists) { 1574 rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP); 1575 up_read(&rbd_dev->header_rwsem); 1576 dout("request for non-existent snapshot"); 1577 spin_lock_irq(q->queue_lock); 1578 __blk_end_request_all(rq, -ENXIO); 1579 continue; 1580 } 1581 1582 snapc = ceph_get_snap_context(rbd_dev->header.snapc); 1583 1584 up_read(&rbd_dev->header_rwsem); 1585 1586 size = blk_rq_bytes(rq); 1587 ofs = blk_rq_pos(rq) * SECTOR_SIZE; 1588 bio = rq->bio; 1589 1590 dout("%s 0x%x bytes at 0x%llx\n", 1591 do_write ? "write" : "read", 1592 size, (unsigned long long) blk_rq_pos(rq) * SECTOR_SIZE); 1593 1594 num_segs = rbd_get_num_segments(&rbd_dev->header, ofs, size); 1595 if (num_segs <= 0) { 1596 spin_lock_irq(q->queue_lock); 1597 __blk_end_request_all(rq, num_segs); 1598 ceph_put_snap_context(snapc); 1599 continue; 1600 } 1601 coll = rbd_alloc_coll(num_segs); 1602 if (!coll) { 1603 spin_lock_irq(q->queue_lock); 1604 __blk_end_request_all(rq, -ENOMEM); 1605 ceph_put_snap_context(snapc); 1606 continue; 1607 } 1608 1609 bio_offset = 0; 1610 do { 1611 u64 limit = rbd_segment_length(rbd_dev, ofs, size); 1612 unsigned int chain_size; 1613 struct bio *bio_chain; 1614 1615 BUG_ON(limit > (u64) UINT_MAX); 1616 chain_size = (unsigned int) limit; 1617 dout("rq->bio->bi_vcnt=%hu\n", rq->bio->bi_vcnt); 1618 1619 kref_get(&coll->kref); 1620 1621 /* Pass a cloned bio chain via an osd request */ 1622 1623 bio_chain = bio_chain_clone_range(&bio, 1624 &bio_offset, chain_size, 1625 GFP_ATOMIC); 1626 if (bio_chain) 1627 (void) rbd_do_op(rq, rbd_dev, snapc, 1628 ofs, chain_size, 1629 bio_chain, coll, cur_seg); 1630 else 1631 rbd_coll_end_req_index(rq, coll, cur_seg, 1632 -ENOMEM, chain_size); 1633 size -= chain_size; 1634 ofs += chain_size; 1635 1636 cur_seg++; 1637 } while (size > 0); 1638 kref_put(&coll->kref, rbd_coll_release); 1639 1640 spin_lock_irq(q->queue_lock); 1641 1642 ceph_put_snap_context(snapc); 1643 } 1644 } 1645 1646 /* 1647 * a queue callback. Makes sure that we don't create a bio that spans across 1648 * multiple osd objects. One exception would be with a single page bios, 1649 * which we handle later at bio_chain_clone_range() 1650 */ 1651 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd, 1652 struct bio_vec *bvec) 1653 { 1654 struct rbd_device *rbd_dev = q->queuedata; 1655 sector_t sector_offset; 1656 sector_t sectors_per_obj; 1657 sector_t obj_sector_offset; 1658 int ret; 1659 1660 /* 1661 * Find how far into its rbd object the partition-relative 1662 * bio start sector is to offset relative to the enclosing 1663 * device. 1664 */ 1665 sector_offset = get_start_sect(bmd->bi_bdev) + bmd->bi_sector; 1666 sectors_per_obj = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT); 1667 obj_sector_offset = sector_offset & (sectors_per_obj - 1); 1668 1669 /* 1670 * Compute the number of bytes from that offset to the end 1671 * of the object. Account for what's already used by the bio. 1672 */ 1673 ret = (int) (sectors_per_obj - obj_sector_offset) << SECTOR_SHIFT; 1674 if (ret > bmd->bi_size) 1675 ret -= bmd->bi_size; 1676 else 1677 ret = 0; 1678 1679 /* 1680 * Don't send back more than was asked for. And if the bio 1681 * was empty, let the whole thing through because: "Note 1682 * that a block device *must* allow a single page to be 1683 * added to an empty bio." 1684 */ 1685 rbd_assert(bvec->bv_len <= PAGE_SIZE); 1686 if (ret > (int) bvec->bv_len || !bmd->bi_size) 1687 ret = (int) bvec->bv_len; 1688 1689 return ret; 1690 } 1691 1692 static void rbd_free_disk(struct rbd_device *rbd_dev) 1693 { 1694 struct gendisk *disk = rbd_dev->disk; 1695 1696 if (!disk) 1697 return; 1698 1699 if (disk->flags & GENHD_FL_UP) 1700 del_gendisk(disk); 1701 if (disk->queue) 1702 blk_cleanup_queue(disk->queue); 1703 put_disk(disk); 1704 } 1705 1706 /* 1707 * Read the complete header for the given rbd device. 1708 * 1709 * Returns a pointer to a dynamically-allocated buffer containing 1710 * the complete and validated header. Caller can pass the address 1711 * of a variable that will be filled in with the version of the 1712 * header object at the time it was read. 1713 * 1714 * Returns a pointer-coded errno if a failure occurs. 1715 */ 1716 static struct rbd_image_header_ondisk * 1717 rbd_dev_v1_header_read(struct rbd_device *rbd_dev, u64 *version) 1718 { 1719 struct rbd_image_header_ondisk *ondisk = NULL; 1720 u32 snap_count = 0; 1721 u64 names_size = 0; 1722 u32 want_count; 1723 int ret; 1724 1725 /* 1726 * The complete header will include an array of its 64-bit 1727 * snapshot ids, followed by the names of those snapshots as 1728 * a contiguous block of NUL-terminated strings. Note that 1729 * the number of snapshots could change by the time we read 1730 * it in, in which case we re-read it. 1731 */ 1732 do { 1733 size_t size; 1734 1735 kfree(ondisk); 1736 1737 size = sizeof (*ondisk); 1738 size += snap_count * sizeof (struct rbd_image_snap_ondisk); 1739 size += names_size; 1740 ondisk = kmalloc(size, GFP_KERNEL); 1741 if (!ondisk) 1742 return ERR_PTR(-ENOMEM); 1743 1744 ret = rbd_req_sync_read(rbd_dev, CEPH_NOSNAP, 1745 rbd_dev->header_name, 1746 0, size, 1747 (char *) ondisk, version); 1748 1749 if (ret < 0) 1750 goto out_err; 1751 if (WARN_ON((size_t) ret < size)) { 1752 ret = -ENXIO; 1753 pr_warning("short header read for image %s" 1754 " (want %zd got %d)\n", 1755 rbd_dev->spec->image_name, size, ret); 1756 goto out_err; 1757 } 1758 if (!rbd_dev_ondisk_valid(ondisk)) { 1759 ret = -ENXIO; 1760 pr_warning("invalid header for image %s\n", 1761 rbd_dev->spec->image_name); 1762 goto out_err; 1763 } 1764 1765 names_size = le64_to_cpu(ondisk->snap_names_len); 1766 want_count = snap_count; 1767 snap_count = le32_to_cpu(ondisk->snap_count); 1768 } while (snap_count != want_count); 1769 1770 return ondisk; 1771 1772 out_err: 1773 kfree(ondisk); 1774 1775 return ERR_PTR(ret); 1776 } 1777 1778 /* 1779 * reload the ondisk the header 1780 */ 1781 static int rbd_read_header(struct rbd_device *rbd_dev, 1782 struct rbd_image_header *header) 1783 { 1784 struct rbd_image_header_ondisk *ondisk; 1785 u64 ver = 0; 1786 int ret; 1787 1788 ondisk = rbd_dev_v1_header_read(rbd_dev, &ver); 1789 if (IS_ERR(ondisk)) 1790 return PTR_ERR(ondisk); 1791 ret = rbd_header_from_disk(header, ondisk); 1792 if (ret >= 0) 1793 header->obj_version = ver; 1794 kfree(ondisk); 1795 1796 return ret; 1797 } 1798 1799 static void rbd_remove_all_snaps(struct rbd_device *rbd_dev) 1800 { 1801 struct rbd_snap *snap; 1802 struct rbd_snap *next; 1803 1804 list_for_each_entry_safe(snap, next, &rbd_dev->snaps, node) 1805 rbd_remove_snap_dev(snap); 1806 } 1807 1808 static void rbd_update_mapping_size(struct rbd_device *rbd_dev) 1809 { 1810 sector_t size; 1811 1812 if (rbd_dev->spec->snap_id != CEPH_NOSNAP) 1813 return; 1814 1815 size = (sector_t) rbd_dev->header.image_size / SECTOR_SIZE; 1816 dout("setting size to %llu sectors", (unsigned long long) size); 1817 rbd_dev->mapping.size = (u64) size; 1818 set_capacity(rbd_dev->disk, size); 1819 } 1820 1821 /* 1822 * only read the first part of the ondisk header, without the snaps info 1823 */ 1824 static int rbd_dev_v1_refresh(struct rbd_device *rbd_dev, u64 *hver) 1825 { 1826 int ret; 1827 struct rbd_image_header h; 1828 1829 ret = rbd_read_header(rbd_dev, &h); 1830 if (ret < 0) 1831 return ret; 1832 1833 down_write(&rbd_dev->header_rwsem); 1834 1835 /* Update image size, and check for resize of mapped image */ 1836 rbd_dev->header.image_size = h.image_size; 1837 rbd_update_mapping_size(rbd_dev); 1838 1839 /* rbd_dev->header.object_prefix shouldn't change */ 1840 kfree(rbd_dev->header.snap_sizes); 1841 kfree(rbd_dev->header.snap_names); 1842 /* osd requests may still refer to snapc */ 1843 ceph_put_snap_context(rbd_dev->header.snapc); 1844 1845 if (hver) 1846 *hver = h.obj_version; 1847 rbd_dev->header.obj_version = h.obj_version; 1848 rbd_dev->header.image_size = h.image_size; 1849 rbd_dev->header.snapc = h.snapc; 1850 rbd_dev->header.snap_names = h.snap_names; 1851 rbd_dev->header.snap_sizes = h.snap_sizes; 1852 /* Free the extra copy of the object prefix */ 1853 WARN_ON(strcmp(rbd_dev->header.object_prefix, h.object_prefix)); 1854 kfree(h.object_prefix); 1855 1856 ret = rbd_dev_snaps_update(rbd_dev); 1857 if (!ret) 1858 ret = rbd_dev_snaps_register(rbd_dev); 1859 1860 up_write(&rbd_dev->header_rwsem); 1861 1862 return ret; 1863 } 1864 1865 static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver) 1866 { 1867 int ret; 1868 1869 rbd_assert(rbd_image_format_valid(rbd_dev->image_format)); 1870 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING); 1871 if (rbd_dev->image_format == 1) 1872 ret = rbd_dev_v1_refresh(rbd_dev, hver); 1873 else 1874 ret = rbd_dev_v2_refresh(rbd_dev, hver); 1875 mutex_unlock(&ctl_mutex); 1876 1877 return ret; 1878 } 1879 1880 static int rbd_init_disk(struct rbd_device *rbd_dev) 1881 { 1882 struct gendisk *disk; 1883 struct request_queue *q; 1884 u64 segment_size; 1885 1886 /* create gendisk info */ 1887 disk = alloc_disk(RBD_MINORS_PER_MAJOR); 1888 if (!disk) 1889 return -ENOMEM; 1890 1891 snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d", 1892 rbd_dev->dev_id); 1893 disk->major = rbd_dev->major; 1894 disk->first_minor = 0; 1895 disk->fops = &rbd_bd_ops; 1896 disk->private_data = rbd_dev; 1897 1898 /* init rq */ 1899 q = blk_init_queue(rbd_rq_fn, &rbd_dev->lock); 1900 if (!q) 1901 goto out_disk; 1902 1903 /* We use the default size, but let's be explicit about it. */ 1904 blk_queue_physical_block_size(q, SECTOR_SIZE); 1905 1906 /* set io sizes to object size */ 1907 segment_size = rbd_obj_bytes(&rbd_dev->header); 1908 blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE); 1909 blk_queue_max_segment_size(q, segment_size); 1910 blk_queue_io_min(q, segment_size); 1911 blk_queue_io_opt(q, segment_size); 1912 1913 blk_queue_merge_bvec(q, rbd_merge_bvec); 1914 disk->queue = q; 1915 1916 q->queuedata = rbd_dev; 1917 1918 rbd_dev->disk = disk; 1919 1920 set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE); 1921 1922 return 0; 1923 out_disk: 1924 put_disk(disk); 1925 1926 return -ENOMEM; 1927 } 1928 1929 /* 1930 sysfs 1931 */ 1932 1933 static struct rbd_device *dev_to_rbd_dev(struct device *dev) 1934 { 1935 return container_of(dev, struct rbd_device, dev); 1936 } 1937 1938 static ssize_t rbd_size_show(struct device *dev, 1939 struct device_attribute *attr, char *buf) 1940 { 1941 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 1942 sector_t size; 1943 1944 down_read(&rbd_dev->header_rwsem); 1945 size = get_capacity(rbd_dev->disk); 1946 up_read(&rbd_dev->header_rwsem); 1947 1948 return sprintf(buf, "%llu\n", (unsigned long long) size * SECTOR_SIZE); 1949 } 1950 1951 /* 1952 * Note this shows the features for whatever's mapped, which is not 1953 * necessarily the base image. 1954 */ 1955 static ssize_t rbd_features_show(struct device *dev, 1956 struct device_attribute *attr, char *buf) 1957 { 1958 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 1959 1960 return sprintf(buf, "0x%016llx\n", 1961 (unsigned long long) rbd_dev->mapping.features); 1962 } 1963 1964 static ssize_t rbd_major_show(struct device *dev, 1965 struct device_attribute *attr, char *buf) 1966 { 1967 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 1968 1969 return sprintf(buf, "%d\n", rbd_dev->major); 1970 } 1971 1972 static ssize_t rbd_client_id_show(struct device *dev, 1973 struct device_attribute *attr, char *buf) 1974 { 1975 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 1976 1977 return sprintf(buf, "client%lld\n", 1978 ceph_client_id(rbd_dev->rbd_client->client)); 1979 } 1980 1981 static ssize_t rbd_pool_show(struct device *dev, 1982 struct device_attribute *attr, char *buf) 1983 { 1984 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 1985 1986 return sprintf(buf, "%s\n", rbd_dev->spec->pool_name); 1987 } 1988 1989 static ssize_t rbd_pool_id_show(struct device *dev, 1990 struct device_attribute *attr, char *buf) 1991 { 1992 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 1993 1994 return sprintf(buf, "%llu\n", 1995 (unsigned long long) rbd_dev->spec->pool_id); 1996 } 1997 1998 static ssize_t rbd_name_show(struct device *dev, 1999 struct device_attribute *attr, char *buf) 2000 { 2001 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 2002 2003 if (rbd_dev->spec->image_name) 2004 return sprintf(buf, "%s\n", rbd_dev->spec->image_name); 2005 2006 return sprintf(buf, "(unknown)\n"); 2007 } 2008 2009 static ssize_t rbd_image_id_show(struct device *dev, 2010 struct device_attribute *attr, char *buf) 2011 { 2012 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 2013 2014 return sprintf(buf, "%s\n", rbd_dev->spec->image_id); 2015 } 2016 2017 /* 2018 * Shows the name of the currently-mapped snapshot (or 2019 * RBD_SNAP_HEAD_NAME for the base image). 2020 */ 2021 static ssize_t rbd_snap_show(struct device *dev, 2022 struct device_attribute *attr, 2023 char *buf) 2024 { 2025 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 2026 2027 return sprintf(buf, "%s\n", rbd_dev->spec->snap_name); 2028 } 2029 2030 /* 2031 * For an rbd v2 image, shows the pool id, image id, and snapshot id 2032 * for the parent image. If there is no parent, simply shows 2033 * "(no parent image)". 2034 */ 2035 static ssize_t rbd_parent_show(struct device *dev, 2036 struct device_attribute *attr, 2037 char *buf) 2038 { 2039 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 2040 struct rbd_spec *spec = rbd_dev->parent_spec; 2041 int count; 2042 char *bufp = buf; 2043 2044 if (!spec) 2045 return sprintf(buf, "(no parent image)\n"); 2046 2047 count = sprintf(bufp, "pool_id %llu\npool_name %s\n", 2048 (unsigned long long) spec->pool_id, spec->pool_name); 2049 if (count < 0) 2050 return count; 2051 bufp += count; 2052 2053 count = sprintf(bufp, "image_id %s\nimage_name %s\n", spec->image_id, 2054 spec->image_name ? spec->image_name : "(unknown)"); 2055 if (count < 0) 2056 return count; 2057 bufp += count; 2058 2059 count = sprintf(bufp, "snap_id %llu\nsnap_name %s\n", 2060 (unsigned long long) spec->snap_id, spec->snap_name); 2061 if (count < 0) 2062 return count; 2063 bufp += count; 2064 2065 count = sprintf(bufp, "overlap %llu\n", rbd_dev->parent_overlap); 2066 if (count < 0) 2067 return count; 2068 bufp += count; 2069 2070 return (ssize_t) (bufp - buf); 2071 } 2072 2073 static ssize_t rbd_image_refresh(struct device *dev, 2074 struct device_attribute *attr, 2075 const char *buf, 2076 size_t size) 2077 { 2078 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 2079 int ret; 2080 2081 ret = rbd_dev_refresh(rbd_dev, NULL); 2082 2083 return ret < 0 ? ret : size; 2084 } 2085 2086 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL); 2087 static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL); 2088 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL); 2089 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL); 2090 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL); 2091 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL); 2092 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL); 2093 static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL); 2094 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh); 2095 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL); 2096 static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL); 2097 2098 static struct attribute *rbd_attrs[] = { 2099 &dev_attr_size.attr, 2100 &dev_attr_features.attr, 2101 &dev_attr_major.attr, 2102 &dev_attr_client_id.attr, 2103 &dev_attr_pool.attr, 2104 &dev_attr_pool_id.attr, 2105 &dev_attr_name.attr, 2106 &dev_attr_image_id.attr, 2107 &dev_attr_current_snap.attr, 2108 &dev_attr_parent.attr, 2109 &dev_attr_refresh.attr, 2110 NULL 2111 }; 2112 2113 static struct attribute_group rbd_attr_group = { 2114 .attrs = rbd_attrs, 2115 }; 2116 2117 static const struct attribute_group *rbd_attr_groups[] = { 2118 &rbd_attr_group, 2119 NULL 2120 }; 2121 2122 static void rbd_sysfs_dev_release(struct device *dev) 2123 { 2124 } 2125 2126 static struct device_type rbd_device_type = { 2127 .name = "rbd", 2128 .groups = rbd_attr_groups, 2129 .release = rbd_sysfs_dev_release, 2130 }; 2131 2132 2133 /* 2134 sysfs - snapshots 2135 */ 2136 2137 static ssize_t rbd_snap_size_show(struct device *dev, 2138 struct device_attribute *attr, 2139 char *buf) 2140 { 2141 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev); 2142 2143 return sprintf(buf, "%llu\n", (unsigned long long)snap->size); 2144 } 2145 2146 static ssize_t rbd_snap_id_show(struct device *dev, 2147 struct device_attribute *attr, 2148 char *buf) 2149 { 2150 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev); 2151 2152 return sprintf(buf, "%llu\n", (unsigned long long)snap->id); 2153 } 2154 2155 static ssize_t rbd_snap_features_show(struct device *dev, 2156 struct device_attribute *attr, 2157 char *buf) 2158 { 2159 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev); 2160 2161 return sprintf(buf, "0x%016llx\n", 2162 (unsigned long long) snap->features); 2163 } 2164 2165 static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL); 2166 static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL); 2167 static DEVICE_ATTR(snap_features, S_IRUGO, rbd_snap_features_show, NULL); 2168 2169 static struct attribute *rbd_snap_attrs[] = { 2170 &dev_attr_snap_size.attr, 2171 &dev_attr_snap_id.attr, 2172 &dev_attr_snap_features.attr, 2173 NULL, 2174 }; 2175 2176 static struct attribute_group rbd_snap_attr_group = { 2177 .attrs = rbd_snap_attrs, 2178 }; 2179 2180 static void rbd_snap_dev_release(struct device *dev) 2181 { 2182 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev); 2183 kfree(snap->name); 2184 kfree(snap); 2185 } 2186 2187 static const struct attribute_group *rbd_snap_attr_groups[] = { 2188 &rbd_snap_attr_group, 2189 NULL 2190 }; 2191 2192 static struct device_type rbd_snap_device_type = { 2193 .groups = rbd_snap_attr_groups, 2194 .release = rbd_snap_dev_release, 2195 }; 2196 2197 static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec) 2198 { 2199 kref_get(&spec->kref); 2200 2201 return spec; 2202 } 2203 2204 static void rbd_spec_free(struct kref *kref); 2205 static void rbd_spec_put(struct rbd_spec *spec) 2206 { 2207 if (spec) 2208 kref_put(&spec->kref, rbd_spec_free); 2209 } 2210 2211 static struct rbd_spec *rbd_spec_alloc(void) 2212 { 2213 struct rbd_spec *spec; 2214 2215 spec = kzalloc(sizeof (*spec), GFP_KERNEL); 2216 if (!spec) 2217 return NULL; 2218 kref_init(&spec->kref); 2219 2220 rbd_spec_put(rbd_spec_get(spec)); /* TEMPORARY */ 2221 2222 return spec; 2223 } 2224 2225 static void rbd_spec_free(struct kref *kref) 2226 { 2227 struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref); 2228 2229 kfree(spec->pool_name); 2230 kfree(spec->image_id); 2231 kfree(spec->image_name); 2232 kfree(spec->snap_name); 2233 kfree(spec); 2234 } 2235 2236 struct rbd_device *rbd_dev_create(struct rbd_client *rbdc, 2237 struct rbd_spec *spec) 2238 { 2239 struct rbd_device *rbd_dev; 2240 2241 rbd_dev = kzalloc(sizeof (*rbd_dev), GFP_KERNEL); 2242 if (!rbd_dev) 2243 return NULL; 2244 2245 spin_lock_init(&rbd_dev->lock); 2246 INIT_LIST_HEAD(&rbd_dev->node); 2247 INIT_LIST_HEAD(&rbd_dev->snaps); 2248 init_rwsem(&rbd_dev->header_rwsem); 2249 2250 rbd_dev->spec = spec; 2251 rbd_dev->rbd_client = rbdc; 2252 2253 return rbd_dev; 2254 } 2255 2256 static void rbd_dev_destroy(struct rbd_device *rbd_dev) 2257 { 2258 rbd_spec_put(rbd_dev->parent_spec); 2259 kfree(rbd_dev->header_name); 2260 rbd_put_client(rbd_dev->rbd_client); 2261 rbd_spec_put(rbd_dev->spec); 2262 kfree(rbd_dev); 2263 } 2264 2265 static bool rbd_snap_registered(struct rbd_snap *snap) 2266 { 2267 bool ret = snap->dev.type == &rbd_snap_device_type; 2268 bool reg = device_is_registered(&snap->dev); 2269 2270 rbd_assert(!ret ^ reg); 2271 2272 return ret; 2273 } 2274 2275 static void rbd_remove_snap_dev(struct rbd_snap *snap) 2276 { 2277 list_del(&snap->node); 2278 if (device_is_registered(&snap->dev)) 2279 device_unregister(&snap->dev); 2280 } 2281 2282 static int rbd_register_snap_dev(struct rbd_snap *snap, 2283 struct device *parent) 2284 { 2285 struct device *dev = &snap->dev; 2286 int ret; 2287 2288 dev->type = &rbd_snap_device_type; 2289 dev->parent = parent; 2290 dev->release = rbd_snap_dev_release; 2291 dev_set_name(dev, "%s%s", RBD_SNAP_DEV_NAME_PREFIX, snap->name); 2292 dout("%s: registering device for snapshot %s\n", __func__, snap->name); 2293 2294 ret = device_register(dev); 2295 2296 return ret; 2297 } 2298 2299 static struct rbd_snap *__rbd_add_snap_dev(struct rbd_device *rbd_dev, 2300 const char *snap_name, 2301 u64 snap_id, u64 snap_size, 2302 u64 snap_features) 2303 { 2304 struct rbd_snap *snap; 2305 int ret; 2306 2307 snap = kzalloc(sizeof (*snap), GFP_KERNEL); 2308 if (!snap) 2309 return ERR_PTR(-ENOMEM); 2310 2311 ret = -ENOMEM; 2312 snap->name = kstrdup(snap_name, GFP_KERNEL); 2313 if (!snap->name) 2314 goto err; 2315 2316 snap->id = snap_id; 2317 snap->size = snap_size; 2318 snap->features = snap_features; 2319 2320 return snap; 2321 2322 err: 2323 kfree(snap->name); 2324 kfree(snap); 2325 2326 return ERR_PTR(ret); 2327 } 2328 2329 static char *rbd_dev_v1_snap_info(struct rbd_device *rbd_dev, u32 which, 2330 u64 *snap_size, u64 *snap_features) 2331 { 2332 char *snap_name; 2333 2334 rbd_assert(which < rbd_dev->header.snapc->num_snaps); 2335 2336 *snap_size = rbd_dev->header.snap_sizes[which]; 2337 *snap_features = 0; /* No features for v1 */ 2338 2339 /* Skip over names until we find the one we are looking for */ 2340 2341 snap_name = rbd_dev->header.snap_names; 2342 while (which--) 2343 snap_name += strlen(snap_name) + 1; 2344 2345 return snap_name; 2346 } 2347 2348 /* 2349 * Get the size and object order for an image snapshot, or if 2350 * snap_id is CEPH_NOSNAP, gets this information for the base 2351 * image. 2352 */ 2353 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id, 2354 u8 *order, u64 *snap_size) 2355 { 2356 __le64 snapid = cpu_to_le64(snap_id); 2357 int ret; 2358 struct { 2359 u8 order; 2360 __le64 size; 2361 } __attribute__ ((packed)) size_buf = { 0 }; 2362 2363 ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name, 2364 "rbd", "get_size", 2365 (char *) &snapid, sizeof (snapid), 2366 (char *) &size_buf, sizeof (size_buf), 2367 CEPH_OSD_FLAG_READ, NULL); 2368 dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret); 2369 if (ret < 0) 2370 return ret; 2371 2372 *order = size_buf.order; 2373 *snap_size = le64_to_cpu(size_buf.size); 2374 2375 dout(" snap_id 0x%016llx order = %u, snap_size = %llu\n", 2376 (unsigned long long) snap_id, (unsigned int) *order, 2377 (unsigned long long) *snap_size); 2378 2379 return 0; 2380 } 2381 2382 static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev) 2383 { 2384 return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP, 2385 &rbd_dev->header.obj_order, 2386 &rbd_dev->header.image_size); 2387 } 2388 2389 static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev) 2390 { 2391 void *reply_buf; 2392 int ret; 2393 void *p; 2394 2395 reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL); 2396 if (!reply_buf) 2397 return -ENOMEM; 2398 2399 ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name, 2400 "rbd", "get_object_prefix", 2401 NULL, 0, 2402 reply_buf, RBD_OBJ_PREFIX_LEN_MAX, 2403 CEPH_OSD_FLAG_READ, NULL); 2404 dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret); 2405 if (ret < 0) 2406 goto out; 2407 ret = 0; /* rbd_req_sync_exec() can return positive */ 2408 2409 p = reply_buf; 2410 rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p, 2411 p + RBD_OBJ_PREFIX_LEN_MAX, 2412 NULL, GFP_NOIO); 2413 2414 if (IS_ERR(rbd_dev->header.object_prefix)) { 2415 ret = PTR_ERR(rbd_dev->header.object_prefix); 2416 rbd_dev->header.object_prefix = NULL; 2417 } else { 2418 dout(" object_prefix = %s\n", rbd_dev->header.object_prefix); 2419 } 2420 2421 out: 2422 kfree(reply_buf); 2423 2424 return ret; 2425 } 2426 2427 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id, 2428 u64 *snap_features) 2429 { 2430 __le64 snapid = cpu_to_le64(snap_id); 2431 struct { 2432 __le64 features; 2433 __le64 incompat; 2434 } features_buf = { 0 }; 2435 u64 incompat; 2436 int ret; 2437 2438 ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name, 2439 "rbd", "get_features", 2440 (char *) &snapid, sizeof (snapid), 2441 (char *) &features_buf, sizeof (features_buf), 2442 CEPH_OSD_FLAG_READ, NULL); 2443 dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret); 2444 if (ret < 0) 2445 return ret; 2446 2447 incompat = le64_to_cpu(features_buf.incompat); 2448 if (incompat & ~RBD_FEATURES_ALL) 2449 return -ENXIO; 2450 2451 *snap_features = le64_to_cpu(features_buf.features); 2452 2453 dout(" snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n", 2454 (unsigned long long) snap_id, 2455 (unsigned long long) *snap_features, 2456 (unsigned long long) le64_to_cpu(features_buf.incompat)); 2457 2458 return 0; 2459 } 2460 2461 static int rbd_dev_v2_features(struct rbd_device *rbd_dev) 2462 { 2463 return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP, 2464 &rbd_dev->header.features); 2465 } 2466 2467 static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev) 2468 { 2469 struct rbd_spec *parent_spec; 2470 size_t size; 2471 void *reply_buf = NULL; 2472 __le64 snapid; 2473 void *p; 2474 void *end; 2475 char *image_id; 2476 u64 overlap; 2477 size_t len = 0; 2478 int ret; 2479 2480 parent_spec = rbd_spec_alloc(); 2481 if (!parent_spec) 2482 return -ENOMEM; 2483 2484 size = sizeof (__le64) + /* pool_id */ 2485 sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX + /* image_id */ 2486 sizeof (__le64) + /* snap_id */ 2487 sizeof (__le64); /* overlap */ 2488 reply_buf = kmalloc(size, GFP_KERNEL); 2489 if (!reply_buf) { 2490 ret = -ENOMEM; 2491 goto out_err; 2492 } 2493 2494 snapid = cpu_to_le64(CEPH_NOSNAP); 2495 ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name, 2496 "rbd", "get_parent", 2497 (char *) &snapid, sizeof (snapid), 2498 (char *) reply_buf, size, 2499 CEPH_OSD_FLAG_READ, NULL); 2500 dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret); 2501 if (ret < 0) 2502 goto out_err; 2503 2504 ret = -ERANGE; 2505 p = reply_buf; 2506 end = (char *) reply_buf + size; 2507 ceph_decode_64_safe(&p, end, parent_spec->pool_id, out_err); 2508 if (parent_spec->pool_id == CEPH_NOPOOL) 2509 goto out; /* No parent? No problem. */ 2510 2511 image_id = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL); 2512 if (IS_ERR(image_id)) { 2513 ret = PTR_ERR(image_id); 2514 goto out_err; 2515 } 2516 parent_spec->image_id = image_id; 2517 parent_spec->image_id_len = len; 2518 ceph_decode_64_safe(&p, end, parent_spec->snap_id, out_err); 2519 ceph_decode_64_safe(&p, end, overlap, out_err); 2520 2521 rbd_dev->parent_overlap = overlap; 2522 rbd_dev->parent_spec = parent_spec; 2523 parent_spec = NULL; /* rbd_dev now owns this */ 2524 out: 2525 ret = 0; 2526 out_err: 2527 kfree(reply_buf); 2528 rbd_spec_put(parent_spec); 2529 2530 return ret; 2531 } 2532 2533 static char *rbd_dev_image_name(struct rbd_device *rbd_dev) 2534 { 2535 size_t image_id_size; 2536 char *image_id; 2537 void *p; 2538 void *end; 2539 size_t size; 2540 void *reply_buf = NULL; 2541 size_t len = 0; 2542 char *image_name = NULL; 2543 int ret; 2544 2545 rbd_assert(!rbd_dev->spec->image_name); 2546 2547 image_id_size = sizeof (__le32) + rbd_dev->spec->image_id_len; 2548 image_id = kmalloc(image_id_size, GFP_KERNEL); 2549 if (!image_id) 2550 return NULL; 2551 2552 p = image_id; 2553 end = (char *) image_id + image_id_size; 2554 ceph_encode_string(&p, end, rbd_dev->spec->image_id, 2555 (u32) rbd_dev->spec->image_id_len); 2556 2557 size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX; 2558 reply_buf = kmalloc(size, GFP_KERNEL); 2559 if (!reply_buf) 2560 goto out; 2561 2562 ret = rbd_req_sync_exec(rbd_dev, RBD_DIRECTORY, 2563 "rbd", "dir_get_name", 2564 image_id, image_id_size, 2565 (char *) reply_buf, size, 2566 CEPH_OSD_FLAG_READ, NULL); 2567 if (ret < 0) 2568 goto out; 2569 p = reply_buf; 2570 end = (char *) reply_buf + size; 2571 image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL); 2572 if (IS_ERR(image_name)) 2573 image_name = NULL; 2574 else 2575 dout("%s: name is %s len is %zd\n", __func__, image_name, len); 2576 out: 2577 kfree(reply_buf); 2578 kfree(image_id); 2579 2580 return image_name; 2581 } 2582 2583 /* 2584 * When a parent image gets probed, we only have the pool, image, 2585 * and snapshot ids but not the names of any of them. This call 2586 * is made later to fill in those names. It has to be done after 2587 * rbd_dev_snaps_update() has completed because some of the 2588 * information (in particular, snapshot name) is not available 2589 * until then. 2590 */ 2591 static int rbd_dev_probe_update_spec(struct rbd_device *rbd_dev) 2592 { 2593 struct ceph_osd_client *osdc; 2594 const char *name; 2595 void *reply_buf = NULL; 2596 int ret; 2597 2598 if (rbd_dev->spec->pool_name) 2599 return 0; /* Already have the names */ 2600 2601 /* Look up the pool name */ 2602 2603 osdc = &rbd_dev->rbd_client->client->osdc; 2604 name = ceph_pg_pool_name_by_id(osdc->osdmap, rbd_dev->spec->pool_id); 2605 if (!name) 2606 return -EIO; /* pool id too large (>= 2^31) */ 2607 2608 rbd_dev->spec->pool_name = kstrdup(name, GFP_KERNEL); 2609 if (!rbd_dev->spec->pool_name) 2610 return -ENOMEM; 2611 2612 /* Fetch the image name; tolerate failure here */ 2613 2614 name = rbd_dev_image_name(rbd_dev); 2615 if (name) { 2616 rbd_dev->spec->image_name_len = strlen(name); 2617 rbd_dev->spec->image_name = (char *) name; 2618 } else { 2619 pr_warning(RBD_DRV_NAME "%d " 2620 "unable to get image name for image id %s\n", 2621 rbd_dev->major, rbd_dev->spec->image_id); 2622 } 2623 2624 /* Look up the snapshot name. */ 2625 2626 name = rbd_snap_name(rbd_dev, rbd_dev->spec->snap_id); 2627 if (!name) { 2628 ret = -EIO; 2629 goto out_err; 2630 } 2631 rbd_dev->spec->snap_name = kstrdup(name, GFP_KERNEL); 2632 if(!rbd_dev->spec->snap_name) 2633 goto out_err; 2634 2635 return 0; 2636 out_err: 2637 kfree(reply_buf); 2638 kfree(rbd_dev->spec->pool_name); 2639 rbd_dev->spec->pool_name = NULL; 2640 2641 return ret; 2642 } 2643 2644 static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev, u64 *ver) 2645 { 2646 size_t size; 2647 int ret; 2648 void *reply_buf; 2649 void *p; 2650 void *end; 2651 u64 seq; 2652 u32 snap_count; 2653 struct ceph_snap_context *snapc; 2654 u32 i; 2655 2656 /* 2657 * We'll need room for the seq value (maximum snapshot id), 2658 * snapshot count, and array of that many snapshot ids. 2659 * For now we have a fixed upper limit on the number we're 2660 * prepared to receive. 2661 */ 2662 size = sizeof (__le64) + sizeof (__le32) + 2663 RBD_MAX_SNAP_COUNT * sizeof (__le64); 2664 reply_buf = kzalloc(size, GFP_KERNEL); 2665 if (!reply_buf) 2666 return -ENOMEM; 2667 2668 ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name, 2669 "rbd", "get_snapcontext", 2670 NULL, 0, 2671 reply_buf, size, 2672 CEPH_OSD_FLAG_READ, ver); 2673 dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret); 2674 if (ret < 0) 2675 goto out; 2676 2677 ret = -ERANGE; 2678 p = reply_buf; 2679 end = (char *) reply_buf + size; 2680 ceph_decode_64_safe(&p, end, seq, out); 2681 ceph_decode_32_safe(&p, end, snap_count, out); 2682 2683 /* 2684 * Make sure the reported number of snapshot ids wouldn't go 2685 * beyond the end of our buffer. But before checking that, 2686 * make sure the computed size of the snapshot context we 2687 * allocate is representable in a size_t. 2688 */ 2689 if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context)) 2690 / sizeof (u64)) { 2691 ret = -EINVAL; 2692 goto out; 2693 } 2694 if (!ceph_has_room(&p, end, snap_count * sizeof (__le64))) 2695 goto out; 2696 2697 size = sizeof (struct ceph_snap_context) + 2698 snap_count * sizeof (snapc->snaps[0]); 2699 snapc = kmalloc(size, GFP_KERNEL); 2700 if (!snapc) { 2701 ret = -ENOMEM; 2702 goto out; 2703 } 2704 2705 atomic_set(&snapc->nref, 1); 2706 snapc->seq = seq; 2707 snapc->num_snaps = snap_count; 2708 for (i = 0; i < snap_count; i++) 2709 snapc->snaps[i] = ceph_decode_64(&p); 2710 2711 rbd_dev->header.snapc = snapc; 2712 2713 dout(" snap context seq = %llu, snap_count = %u\n", 2714 (unsigned long long) seq, (unsigned int) snap_count); 2715 2716 out: 2717 kfree(reply_buf); 2718 2719 return 0; 2720 } 2721 2722 static char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev, u32 which) 2723 { 2724 size_t size; 2725 void *reply_buf; 2726 __le64 snap_id; 2727 int ret; 2728 void *p; 2729 void *end; 2730 char *snap_name; 2731 2732 size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN; 2733 reply_buf = kmalloc(size, GFP_KERNEL); 2734 if (!reply_buf) 2735 return ERR_PTR(-ENOMEM); 2736 2737 snap_id = cpu_to_le64(rbd_dev->header.snapc->snaps[which]); 2738 ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name, 2739 "rbd", "get_snapshot_name", 2740 (char *) &snap_id, sizeof (snap_id), 2741 reply_buf, size, 2742 CEPH_OSD_FLAG_READ, NULL); 2743 dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret); 2744 if (ret < 0) 2745 goto out; 2746 2747 p = reply_buf; 2748 end = (char *) reply_buf + size; 2749 snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL); 2750 if (IS_ERR(snap_name)) { 2751 ret = PTR_ERR(snap_name); 2752 goto out; 2753 } else { 2754 dout(" snap_id 0x%016llx snap_name = %s\n", 2755 (unsigned long long) le64_to_cpu(snap_id), snap_name); 2756 } 2757 kfree(reply_buf); 2758 2759 return snap_name; 2760 out: 2761 kfree(reply_buf); 2762 2763 return ERR_PTR(ret); 2764 } 2765 2766 static char *rbd_dev_v2_snap_info(struct rbd_device *rbd_dev, u32 which, 2767 u64 *snap_size, u64 *snap_features) 2768 { 2769 __le64 snap_id; 2770 u8 order; 2771 int ret; 2772 2773 snap_id = rbd_dev->header.snapc->snaps[which]; 2774 ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, &order, snap_size); 2775 if (ret) 2776 return ERR_PTR(ret); 2777 ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, snap_features); 2778 if (ret) 2779 return ERR_PTR(ret); 2780 2781 return rbd_dev_v2_snap_name(rbd_dev, which); 2782 } 2783 2784 static char *rbd_dev_snap_info(struct rbd_device *rbd_dev, u32 which, 2785 u64 *snap_size, u64 *snap_features) 2786 { 2787 if (rbd_dev->image_format == 1) 2788 return rbd_dev_v1_snap_info(rbd_dev, which, 2789 snap_size, snap_features); 2790 if (rbd_dev->image_format == 2) 2791 return rbd_dev_v2_snap_info(rbd_dev, which, 2792 snap_size, snap_features); 2793 return ERR_PTR(-EINVAL); 2794 } 2795 2796 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver) 2797 { 2798 int ret; 2799 __u8 obj_order; 2800 2801 down_write(&rbd_dev->header_rwsem); 2802 2803 /* Grab old order first, to see if it changes */ 2804 2805 obj_order = rbd_dev->header.obj_order, 2806 ret = rbd_dev_v2_image_size(rbd_dev); 2807 if (ret) 2808 goto out; 2809 if (rbd_dev->header.obj_order != obj_order) { 2810 ret = -EIO; 2811 goto out; 2812 } 2813 rbd_update_mapping_size(rbd_dev); 2814 2815 ret = rbd_dev_v2_snap_context(rbd_dev, hver); 2816 dout("rbd_dev_v2_snap_context returned %d\n", ret); 2817 if (ret) 2818 goto out; 2819 ret = rbd_dev_snaps_update(rbd_dev); 2820 dout("rbd_dev_snaps_update returned %d\n", ret); 2821 if (ret) 2822 goto out; 2823 ret = rbd_dev_snaps_register(rbd_dev); 2824 dout("rbd_dev_snaps_register returned %d\n", ret); 2825 out: 2826 up_write(&rbd_dev->header_rwsem); 2827 2828 return ret; 2829 } 2830 2831 /* 2832 * Scan the rbd device's current snapshot list and compare it to the 2833 * newly-received snapshot context. Remove any existing snapshots 2834 * not present in the new snapshot context. Add a new snapshot for 2835 * any snaphots in the snapshot context not in the current list. 2836 * And verify there are no changes to snapshots we already know 2837 * about. 2838 * 2839 * Assumes the snapshots in the snapshot context are sorted by 2840 * snapshot id, highest id first. (Snapshots in the rbd_dev's list 2841 * are also maintained in that order.) 2842 */ 2843 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev) 2844 { 2845 struct ceph_snap_context *snapc = rbd_dev->header.snapc; 2846 const u32 snap_count = snapc->num_snaps; 2847 struct list_head *head = &rbd_dev->snaps; 2848 struct list_head *links = head->next; 2849 u32 index = 0; 2850 2851 dout("%s: snap count is %u\n", __func__, (unsigned int) snap_count); 2852 while (index < snap_count || links != head) { 2853 u64 snap_id; 2854 struct rbd_snap *snap; 2855 char *snap_name; 2856 u64 snap_size = 0; 2857 u64 snap_features = 0; 2858 2859 snap_id = index < snap_count ? snapc->snaps[index] 2860 : CEPH_NOSNAP; 2861 snap = links != head ? list_entry(links, struct rbd_snap, node) 2862 : NULL; 2863 rbd_assert(!snap || snap->id != CEPH_NOSNAP); 2864 2865 if (snap_id == CEPH_NOSNAP || (snap && snap->id > snap_id)) { 2866 struct list_head *next = links->next; 2867 2868 /* Existing snapshot not in the new snap context */ 2869 2870 if (rbd_dev->spec->snap_id == snap->id) 2871 rbd_dev->exists = false; 2872 rbd_remove_snap_dev(snap); 2873 dout("%ssnap id %llu has been removed\n", 2874 rbd_dev->spec->snap_id == snap->id ? 2875 "mapped " : "", 2876 (unsigned long long) snap->id); 2877 2878 /* Done with this list entry; advance */ 2879 2880 links = next; 2881 continue; 2882 } 2883 2884 snap_name = rbd_dev_snap_info(rbd_dev, index, 2885 &snap_size, &snap_features); 2886 if (IS_ERR(snap_name)) 2887 return PTR_ERR(snap_name); 2888 2889 dout("entry %u: snap_id = %llu\n", (unsigned int) snap_count, 2890 (unsigned long long) snap_id); 2891 if (!snap || (snap_id != CEPH_NOSNAP && snap->id < snap_id)) { 2892 struct rbd_snap *new_snap; 2893 2894 /* We haven't seen this snapshot before */ 2895 2896 new_snap = __rbd_add_snap_dev(rbd_dev, snap_name, 2897 snap_id, snap_size, snap_features); 2898 if (IS_ERR(new_snap)) { 2899 int err = PTR_ERR(new_snap); 2900 2901 dout(" failed to add dev, error %d\n", err); 2902 2903 return err; 2904 } 2905 2906 /* New goes before existing, or at end of list */ 2907 2908 dout(" added dev%s\n", snap ? "" : " at end\n"); 2909 if (snap) 2910 list_add_tail(&new_snap->node, &snap->node); 2911 else 2912 list_add_tail(&new_snap->node, head); 2913 } else { 2914 /* Already have this one */ 2915 2916 dout(" already present\n"); 2917 2918 rbd_assert(snap->size == snap_size); 2919 rbd_assert(!strcmp(snap->name, snap_name)); 2920 rbd_assert(snap->features == snap_features); 2921 2922 /* Done with this list entry; advance */ 2923 2924 links = links->next; 2925 } 2926 2927 /* Advance to the next entry in the snapshot context */ 2928 2929 index++; 2930 } 2931 dout("%s: done\n", __func__); 2932 2933 return 0; 2934 } 2935 2936 /* 2937 * Scan the list of snapshots and register the devices for any that 2938 * have not already been registered. 2939 */ 2940 static int rbd_dev_snaps_register(struct rbd_device *rbd_dev) 2941 { 2942 struct rbd_snap *snap; 2943 int ret = 0; 2944 2945 dout("%s called\n", __func__); 2946 if (WARN_ON(!device_is_registered(&rbd_dev->dev))) 2947 return -EIO; 2948 2949 list_for_each_entry(snap, &rbd_dev->snaps, node) { 2950 if (!rbd_snap_registered(snap)) { 2951 ret = rbd_register_snap_dev(snap, &rbd_dev->dev); 2952 if (ret < 0) 2953 break; 2954 } 2955 } 2956 dout("%s: returning %d\n", __func__, ret); 2957 2958 return ret; 2959 } 2960 2961 static int rbd_bus_add_dev(struct rbd_device *rbd_dev) 2962 { 2963 struct device *dev; 2964 int ret; 2965 2966 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING); 2967 2968 dev = &rbd_dev->dev; 2969 dev->bus = &rbd_bus_type; 2970 dev->type = &rbd_device_type; 2971 dev->parent = &rbd_root_dev; 2972 dev->release = rbd_dev_release; 2973 dev_set_name(dev, "%d", rbd_dev->dev_id); 2974 ret = device_register(dev); 2975 2976 mutex_unlock(&ctl_mutex); 2977 2978 return ret; 2979 } 2980 2981 static void rbd_bus_del_dev(struct rbd_device *rbd_dev) 2982 { 2983 device_unregister(&rbd_dev->dev); 2984 } 2985 2986 static int rbd_init_watch_dev(struct rbd_device *rbd_dev) 2987 { 2988 int ret, rc; 2989 2990 do { 2991 ret = rbd_req_sync_watch(rbd_dev); 2992 if (ret == -ERANGE) { 2993 rc = rbd_dev_refresh(rbd_dev, NULL); 2994 if (rc < 0) 2995 return rc; 2996 } 2997 } while (ret == -ERANGE); 2998 2999 return ret; 3000 } 3001 3002 static atomic64_t rbd_dev_id_max = ATOMIC64_INIT(0); 3003 3004 /* 3005 * Get a unique rbd identifier for the given new rbd_dev, and add 3006 * the rbd_dev to the global list. The minimum rbd id is 1. 3007 */ 3008 static void rbd_dev_id_get(struct rbd_device *rbd_dev) 3009 { 3010 rbd_dev->dev_id = atomic64_inc_return(&rbd_dev_id_max); 3011 3012 spin_lock(&rbd_dev_list_lock); 3013 list_add_tail(&rbd_dev->node, &rbd_dev_list); 3014 spin_unlock(&rbd_dev_list_lock); 3015 dout("rbd_dev %p given dev id %llu\n", rbd_dev, 3016 (unsigned long long) rbd_dev->dev_id); 3017 } 3018 3019 /* 3020 * Remove an rbd_dev from the global list, and record that its 3021 * identifier is no longer in use. 3022 */ 3023 static void rbd_dev_id_put(struct rbd_device *rbd_dev) 3024 { 3025 struct list_head *tmp; 3026 int rbd_id = rbd_dev->dev_id; 3027 int max_id; 3028 3029 rbd_assert(rbd_id > 0); 3030 3031 dout("rbd_dev %p released dev id %llu\n", rbd_dev, 3032 (unsigned long long) rbd_dev->dev_id); 3033 spin_lock(&rbd_dev_list_lock); 3034 list_del_init(&rbd_dev->node); 3035 3036 /* 3037 * If the id being "put" is not the current maximum, there 3038 * is nothing special we need to do. 3039 */ 3040 if (rbd_id != atomic64_read(&rbd_dev_id_max)) { 3041 spin_unlock(&rbd_dev_list_lock); 3042 return; 3043 } 3044 3045 /* 3046 * We need to update the current maximum id. Search the 3047 * list to find out what it is. We're more likely to find 3048 * the maximum at the end, so search the list backward. 3049 */ 3050 max_id = 0; 3051 list_for_each_prev(tmp, &rbd_dev_list) { 3052 struct rbd_device *rbd_dev; 3053 3054 rbd_dev = list_entry(tmp, struct rbd_device, node); 3055 if (rbd_dev->dev_id > max_id) 3056 max_id = rbd_dev->dev_id; 3057 } 3058 spin_unlock(&rbd_dev_list_lock); 3059 3060 /* 3061 * The max id could have been updated by rbd_dev_id_get(), in 3062 * which case it now accurately reflects the new maximum. 3063 * Be careful not to overwrite the maximum value in that 3064 * case. 3065 */ 3066 atomic64_cmpxchg(&rbd_dev_id_max, rbd_id, max_id); 3067 dout(" max dev id has been reset\n"); 3068 } 3069 3070 /* 3071 * Skips over white space at *buf, and updates *buf to point to the 3072 * first found non-space character (if any). Returns the length of 3073 * the token (string of non-white space characters) found. Note 3074 * that *buf must be terminated with '\0'. 3075 */ 3076 static inline size_t next_token(const char **buf) 3077 { 3078 /* 3079 * These are the characters that produce nonzero for 3080 * isspace() in the "C" and "POSIX" locales. 3081 */ 3082 const char *spaces = " \f\n\r\t\v"; 3083 3084 *buf += strspn(*buf, spaces); /* Find start of token */ 3085 3086 return strcspn(*buf, spaces); /* Return token length */ 3087 } 3088 3089 /* 3090 * Finds the next token in *buf, and if the provided token buffer is 3091 * big enough, copies the found token into it. The result, if 3092 * copied, is guaranteed to be terminated with '\0'. Note that *buf 3093 * must be terminated with '\0' on entry. 3094 * 3095 * Returns the length of the token found (not including the '\0'). 3096 * Return value will be 0 if no token is found, and it will be >= 3097 * token_size if the token would not fit. 3098 * 3099 * The *buf pointer will be updated to point beyond the end of the 3100 * found token. Note that this occurs even if the token buffer is 3101 * too small to hold it. 3102 */ 3103 static inline size_t copy_token(const char **buf, 3104 char *token, 3105 size_t token_size) 3106 { 3107 size_t len; 3108 3109 len = next_token(buf); 3110 if (len < token_size) { 3111 memcpy(token, *buf, len); 3112 *(token + len) = '\0'; 3113 } 3114 *buf += len; 3115 3116 return len; 3117 } 3118 3119 /* 3120 * Finds the next token in *buf, dynamically allocates a buffer big 3121 * enough to hold a copy of it, and copies the token into the new 3122 * buffer. The copy is guaranteed to be terminated with '\0'. Note 3123 * that a duplicate buffer is created even for a zero-length token. 3124 * 3125 * Returns a pointer to the newly-allocated duplicate, or a null 3126 * pointer if memory for the duplicate was not available. If 3127 * the lenp argument is a non-null pointer, the length of the token 3128 * (not including the '\0') is returned in *lenp. 3129 * 3130 * If successful, the *buf pointer will be updated to point beyond 3131 * the end of the found token. 3132 * 3133 * Note: uses GFP_KERNEL for allocation. 3134 */ 3135 static inline char *dup_token(const char **buf, size_t *lenp) 3136 { 3137 char *dup; 3138 size_t len; 3139 3140 len = next_token(buf); 3141 dup = kmalloc(len + 1, GFP_KERNEL); 3142 if (!dup) 3143 return NULL; 3144 3145 memcpy(dup, *buf, len); 3146 *(dup + len) = '\0'; 3147 *buf += len; 3148 3149 if (lenp) 3150 *lenp = len; 3151 3152 return dup; 3153 } 3154 3155 /* 3156 * Parse the options provided for an "rbd add" (i.e., rbd image 3157 * mapping) request. These arrive via a write to /sys/bus/rbd/add, 3158 * and the data written is passed here via a NUL-terminated buffer. 3159 * Returns 0 if successful or an error code otherwise. 3160 * 3161 * The information extracted from these options is recorded in 3162 * the other parameters which return dynamically-allocated 3163 * structures: 3164 * ceph_opts 3165 * The address of a pointer that will refer to a ceph options 3166 * structure. Caller must release the returned pointer using 3167 * ceph_destroy_options() when it is no longer needed. 3168 * rbd_opts 3169 * Address of an rbd options pointer. Fully initialized by 3170 * this function; caller must release with kfree(). 3171 * spec 3172 * Address of an rbd image specification pointer. Fully 3173 * initialized by this function based on parsed options. 3174 * Caller must release with rbd_spec_put(). 3175 * 3176 * The options passed take this form: 3177 * <mon_addrs> <options> <pool_name> <image_name> [<snap_id>] 3178 * where: 3179 * <mon_addrs> 3180 * A comma-separated list of one or more monitor addresses. 3181 * A monitor address is an ip address, optionally followed 3182 * by a port number (separated by a colon). 3183 * I.e.: ip1[:port1][,ip2[:port2]...] 3184 * <options> 3185 * A comma-separated list of ceph and/or rbd options. 3186 * <pool_name> 3187 * The name of the rados pool containing the rbd image. 3188 * <image_name> 3189 * The name of the image in that pool to map. 3190 * <snap_id> 3191 * An optional snapshot id. If provided, the mapping will 3192 * present data from the image at the time that snapshot was 3193 * created. The image head is used if no snapshot id is 3194 * provided. Snapshot mappings are always read-only. 3195 */ 3196 static int rbd_add_parse_args(const char *buf, 3197 struct ceph_options **ceph_opts, 3198 struct rbd_options **opts, 3199 struct rbd_spec **rbd_spec) 3200 { 3201 size_t len; 3202 char *options; 3203 const char *mon_addrs; 3204 size_t mon_addrs_size; 3205 struct rbd_spec *spec = NULL; 3206 struct rbd_options *rbd_opts = NULL; 3207 struct ceph_options *copts; 3208 int ret; 3209 3210 /* The first four tokens are required */ 3211 3212 len = next_token(&buf); 3213 if (!len) 3214 return -EINVAL; /* Missing monitor address(es) */ 3215 mon_addrs = buf; 3216 mon_addrs_size = len + 1; 3217 buf += len; 3218 3219 ret = -EINVAL; 3220 options = dup_token(&buf, NULL); 3221 if (!options) 3222 return -ENOMEM; 3223 if (!*options) 3224 goto out_err; /* Missing options */ 3225 3226 spec = rbd_spec_alloc(); 3227 if (!spec) 3228 goto out_mem; 3229 3230 spec->pool_name = dup_token(&buf, NULL); 3231 if (!spec->pool_name) 3232 goto out_mem; 3233 if (!*spec->pool_name) 3234 goto out_err; /* Missing pool name */ 3235 3236 spec->image_name = dup_token(&buf, &spec->image_name_len); 3237 if (!spec->image_name) 3238 goto out_mem; 3239 if (!*spec->image_name) 3240 goto out_err; /* Missing image name */ 3241 3242 /* 3243 * Snapshot name is optional; default is to use "-" 3244 * (indicating the head/no snapshot). 3245 */ 3246 len = next_token(&buf); 3247 if (!len) { 3248 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */ 3249 len = sizeof (RBD_SNAP_HEAD_NAME) - 1; 3250 } else if (len > RBD_MAX_SNAP_NAME_LEN) { 3251 ret = -ENAMETOOLONG; 3252 goto out_err; 3253 } 3254 spec->snap_name = kmalloc(len + 1, GFP_KERNEL); 3255 if (!spec->snap_name) 3256 goto out_mem; 3257 memcpy(spec->snap_name, buf, len); 3258 *(spec->snap_name + len) = '\0'; 3259 3260 /* Initialize all rbd options to the defaults */ 3261 3262 rbd_opts = kzalloc(sizeof (*rbd_opts), GFP_KERNEL); 3263 if (!rbd_opts) 3264 goto out_mem; 3265 3266 rbd_opts->read_only = RBD_READ_ONLY_DEFAULT; 3267 3268 copts = ceph_parse_options(options, mon_addrs, 3269 mon_addrs + mon_addrs_size - 1, 3270 parse_rbd_opts_token, rbd_opts); 3271 if (IS_ERR(copts)) { 3272 ret = PTR_ERR(copts); 3273 goto out_err; 3274 } 3275 kfree(options); 3276 3277 *ceph_opts = copts; 3278 *opts = rbd_opts; 3279 *rbd_spec = spec; 3280 3281 return 0; 3282 out_mem: 3283 ret = -ENOMEM; 3284 out_err: 3285 kfree(rbd_opts); 3286 rbd_spec_put(spec); 3287 kfree(options); 3288 3289 return ret; 3290 } 3291 3292 /* 3293 * An rbd format 2 image has a unique identifier, distinct from the 3294 * name given to it by the user. Internally, that identifier is 3295 * what's used to specify the names of objects related to the image. 3296 * 3297 * A special "rbd id" object is used to map an rbd image name to its 3298 * id. If that object doesn't exist, then there is no v2 rbd image 3299 * with the supplied name. 3300 * 3301 * This function will record the given rbd_dev's image_id field if 3302 * it can be determined, and in that case will return 0. If any 3303 * errors occur a negative errno will be returned and the rbd_dev's 3304 * image_id field will be unchanged (and should be NULL). 3305 */ 3306 static int rbd_dev_image_id(struct rbd_device *rbd_dev) 3307 { 3308 int ret; 3309 size_t size; 3310 char *object_name; 3311 void *response; 3312 void *p; 3313 3314 /* 3315 * When probing a parent image, the image id is already 3316 * known (and the image name likely is not). There's no 3317 * need to fetch the image id again in this case. 3318 */ 3319 if (rbd_dev->spec->image_id) 3320 return 0; 3321 3322 /* 3323 * First, see if the format 2 image id file exists, and if 3324 * so, get the image's persistent id from it. 3325 */ 3326 size = sizeof (RBD_ID_PREFIX) + rbd_dev->spec->image_name_len; 3327 object_name = kmalloc(size, GFP_NOIO); 3328 if (!object_name) 3329 return -ENOMEM; 3330 sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->spec->image_name); 3331 dout("rbd id object name is %s\n", object_name); 3332 3333 /* Response will be an encoded string, which includes a length */ 3334 3335 size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX; 3336 response = kzalloc(size, GFP_NOIO); 3337 if (!response) { 3338 ret = -ENOMEM; 3339 goto out; 3340 } 3341 3342 ret = rbd_req_sync_exec(rbd_dev, object_name, 3343 "rbd", "get_id", 3344 NULL, 0, 3345 response, RBD_IMAGE_ID_LEN_MAX, 3346 CEPH_OSD_FLAG_READ, NULL); 3347 dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret); 3348 if (ret < 0) 3349 goto out; 3350 ret = 0; /* rbd_req_sync_exec() can return positive */ 3351 3352 p = response; 3353 rbd_dev->spec->image_id = ceph_extract_encoded_string(&p, 3354 p + RBD_IMAGE_ID_LEN_MAX, 3355 &rbd_dev->spec->image_id_len, 3356 GFP_NOIO); 3357 if (IS_ERR(rbd_dev->spec->image_id)) { 3358 ret = PTR_ERR(rbd_dev->spec->image_id); 3359 rbd_dev->spec->image_id = NULL; 3360 } else { 3361 dout("image_id is %s\n", rbd_dev->spec->image_id); 3362 } 3363 out: 3364 kfree(response); 3365 kfree(object_name); 3366 3367 return ret; 3368 } 3369 3370 static int rbd_dev_v1_probe(struct rbd_device *rbd_dev) 3371 { 3372 int ret; 3373 size_t size; 3374 3375 /* Version 1 images have no id; empty string is used */ 3376 3377 rbd_dev->spec->image_id = kstrdup("", GFP_KERNEL); 3378 if (!rbd_dev->spec->image_id) 3379 return -ENOMEM; 3380 rbd_dev->spec->image_id_len = 0; 3381 3382 /* Record the header object name for this rbd image. */ 3383 3384 size = rbd_dev->spec->image_name_len + sizeof (RBD_SUFFIX); 3385 rbd_dev->header_name = kmalloc(size, GFP_KERNEL); 3386 if (!rbd_dev->header_name) { 3387 ret = -ENOMEM; 3388 goto out_err; 3389 } 3390 sprintf(rbd_dev->header_name, "%s%s", 3391 rbd_dev->spec->image_name, RBD_SUFFIX); 3392 3393 /* Populate rbd image metadata */ 3394 3395 ret = rbd_read_header(rbd_dev, &rbd_dev->header); 3396 if (ret < 0) 3397 goto out_err; 3398 3399 /* Version 1 images have no parent (no layering) */ 3400 3401 rbd_dev->parent_spec = NULL; 3402 rbd_dev->parent_overlap = 0; 3403 3404 rbd_dev->image_format = 1; 3405 3406 dout("discovered version 1 image, header name is %s\n", 3407 rbd_dev->header_name); 3408 3409 return 0; 3410 3411 out_err: 3412 kfree(rbd_dev->header_name); 3413 rbd_dev->header_name = NULL; 3414 kfree(rbd_dev->spec->image_id); 3415 rbd_dev->spec->image_id = NULL; 3416 3417 return ret; 3418 } 3419 3420 static int rbd_dev_v2_probe(struct rbd_device *rbd_dev) 3421 { 3422 size_t size; 3423 int ret; 3424 u64 ver = 0; 3425 3426 /* 3427 * Image id was filled in by the caller. Record the header 3428 * object name for this rbd image. 3429 */ 3430 size = sizeof (RBD_HEADER_PREFIX) + rbd_dev->spec->image_id_len; 3431 rbd_dev->header_name = kmalloc(size, GFP_KERNEL); 3432 if (!rbd_dev->header_name) 3433 return -ENOMEM; 3434 sprintf(rbd_dev->header_name, "%s%s", 3435 RBD_HEADER_PREFIX, rbd_dev->spec->image_id); 3436 3437 /* Get the size and object order for the image */ 3438 3439 ret = rbd_dev_v2_image_size(rbd_dev); 3440 if (ret < 0) 3441 goto out_err; 3442 3443 /* Get the object prefix (a.k.a. block_name) for the image */ 3444 3445 ret = rbd_dev_v2_object_prefix(rbd_dev); 3446 if (ret < 0) 3447 goto out_err; 3448 3449 /* Get the and check features for the image */ 3450 3451 ret = rbd_dev_v2_features(rbd_dev); 3452 if (ret < 0) 3453 goto out_err; 3454 3455 /* If the image supports layering, get the parent info */ 3456 3457 if (rbd_dev->header.features & RBD_FEATURE_LAYERING) { 3458 ret = rbd_dev_v2_parent_info(rbd_dev); 3459 if (ret < 0) 3460 goto out_err; 3461 } 3462 3463 /* crypto and compression type aren't (yet) supported for v2 images */ 3464 3465 rbd_dev->header.crypt_type = 0; 3466 rbd_dev->header.comp_type = 0; 3467 3468 /* Get the snapshot context, plus the header version */ 3469 3470 ret = rbd_dev_v2_snap_context(rbd_dev, &ver); 3471 if (ret) 3472 goto out_err; 3473 rbd_dev->header.obj_version = ver; 3474 3475 rbd_dev->image_format = 2; 3476 3477 dout("discovered version 2 image, header name is %s\n", 3478 rbd_dev->header_name); 3479 3480 return 0; 3481 out_err: 3482 rbd_dev->parent_overlap = 0; 3483 rbd_spec_put(rbd_dev->parent_spec); 3484 rbd_dev->parent_spec = NULL; 3485 kfree(rbd_dev->header_name); 3486 rbd_dev->header_name = NULL; 3487 kfree(rbd_dev->header.object_prefix); 3488 rbd_dev->header.object_prefix = NULL; 3489 3490 return ret; 3491 } 3492 3493 static int rbd_dev_probe_finish(struct rbd_device *rbd_dev) 3494 { 3495 int ret; 3496 3497 /* no need to lock here, as rbd_dev is not registered yet */ 3498 ret = rbd_dev_snaps_update(rbd_dev); 3499 if (ret) 3500 return ret; 3501 3502 ret = rbd_dev_probe_update_spec(rbd_dev); 3503 if (ret) 3504 goto err_out_snaps; 3505 3506 ret = rbd_dev_set_mapping(rbd_dev); 3507 if (ret) 3508 goto err_out_snaps; 3509 3510 /* generate unique id: find highest unique id, add one */ 3511 rbd_dev_id_get(rbd_dev); 3512 3513 /* Fill in the device name, now that we have its id. */ 3514 BUILD_BUG_ON(DEV_NAME_LEN 3515 < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH); 3516 sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id); 3517 3518 /* Get our block major device number. */ 3519 3520 ret = register_blkdev(0, rbd_dev->name); 3521 if (ret < 0) 3522 goto err_out_id; 3523 rbd_dev->major = ret; 3524 3525 /* Set up the blkdev mapping. */ 3526 3527 ret = rbd_init_disk(rbd_dev); 3528 if (ret) 3529 goto err_out_blkdev; 3530 3531 ret = rbd_bus_add_dev(rbd_dev); 3532 if (ret) 3533 goto err_out_disk; 3534 3535 /* 3536 * At this point cleanup in the event of an error is the job 3537 * of the sysfs code (initiated by rbd_bus_del_dev()). 3538 */ 3539 down_write(&rbd_dev->header_rwsem); 3540 ret = rbd_dev_snaps_register(rbd_dev); 3541 up_write(&rbd_dev->header_rwsem); 3542 if (ret) 3543 goto err_out_bus; 3544 3545 ret = rbd_init_watch_dev(rbd_dev); 3546 if (ret) 3547 goto err_out_bus; 3548 3549 /* Everything's ready. Announce the disk to the world. */ 3550 3551 add_disk(rbd_dev->disk); 3552 3553 pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name, 3554 (unsigned long long) rbd_dev->mapping.size); 3555 3556 return ret; 3557 err_out_bus: 3558 /* this will also clean up rest of rbd_dev stuff */ 3559 3560 rbd_bus_del_dev(rbd_dev); 3561 3562 return ret; 3563 err_out_disk: 3564 rbd_free_disk(rbd_dev); 3565 err_out_blkdev: 3566 unregister_blkdev(rbd_dev->major, rbd_dev->name); 3567 err_out_id: 3568 rbd_dev_id_put(rbd_dev); 3569 err_out_snaps: 3570 rbd_remove_all_snaps(rbd_dev); 3571 3572 return ret; 3573 } 3574 3575 /* 3576 * Probe for the existence of the header object for the given rbd 3577 * device. For format 2 images this includes determining the image 3578 * id. 3579 */ 3580 static int rbd_dev_probe(struct rbd_device *rbd_dev) 3581 { 3582 int ret; 3583 3584 /* 3585 * Get the id from the image id object. If it's not a 3586 * format 2 image, we'll get ENOENT back, and we'll assume 3587 * it's a format 1 image. 3588 */ 3589 ret = rbd_dev_image_id(rbd_dev); 3590 if (ret) 3591 ret = rbd_dev_v1_probe(rbd_dev); 3592 else 3593 ret = rbd_dev_v2_probe(rbd_dev); 3594 if (ret) { 3595 dout("probe failed, returning %d\n", ret); 3596 3597 return ret; 3598 } 3599 3600 ret = rbd_dev_probe_finish(rbd_dev); 3601 if (ret) 3602 rbd_header_free(&rbd_dev->header); 3603 3604 return ret; 3605 } 3606 3607 static ssize_t rbd_add(struct bus_type *bus, 3608 const char *buf, 3609 size_t count) 3610 { 3611 struct rbd_device *rbd_dev = NULL; 3612 struct ceph_options *ceph_opts = NULL; 3613 struct rbd_options *rbd_opts = NULL; 3614 struct rbd_spec *spec = NULL; 3615 struct rbd_client *rbdc; 3616 struct ceph_osd_client *osdc; 3617 int rc = -ENOMEM; 3618 3619 if (!try_module_get(THIS_MODULE)) 3620 return -ENODEV; 3621 3622 /* parse add command */ 3623 rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec); 3624 if (rc < 0) 3625 goto err_out_module; 3626 3627 rbdc = rbd_get_client(ceph_opts); 3628 if (IS_ERR(rbdc)) { 3629 rc = PTR_ERR(rbdc); 3630 goto err_out_args; 3631 } 3632 ceph_opts = NULL; /* rbd_dev client now owns this */ 3633 3634 /* pick the pool */ 3635 osdc = &rbdc->client->osdc; 3636 rc = ceph_pg_poolid_by_name(osdc->osdmap, spec->pool_name); 3637 if (rc < 0) 3638 goto err_out_client; 3639 spec->pool_id = (u64) rc; 3640 3641 rbd_dev = rbd_dev_create(rbdc, spec); 3642 if (!rbd_dev) 3643 goto err_out_client; 3644 rbdc = NULL; /* rbd_dev now owns this */ 3645 spec = NULL; /* rbd_dev now owns this */ 3646 3647 rbd_dev->mapping.read_only = rbd_opts->read_only; 3648 kfree(rbd_opts); 3649 rbd_opts = NULL; /* done with this */ 3650 3651 rc = rbd_dev_probe(rbd_dev); 3652 if (rc < 0) 3653 goto err_out_rbd_dev; 3654 3655 return count; 3656 err_out_rbd_dev: 3657 rbd_dev_destroy(rbd_dev); 3658 err_out_client: 3659 rbd_put_client(rbdc); 3660 err_out_args: 3661 if (ceph_opts) 3662 ceph_destroy_options(ceph_opts); 3663 kfree(rbd_opts); 3664 rbd_spec_put(spec); 3665 err_out_module: 3666 module_put(THIS_MODULE); 3667 3668 dout("Error adding device %s\n", buf); 3669 3670 return (ssize_t) rc; 3671 } 3672 3673 static struct rbd_device *__rbd_get_dev(unsigned long dev_id) 3674 { 3675 struct list_head *tmp; 3676 struct rbd_device *rbd_dev; 3677 3678 spin_lock(&rbd_dev_list_lock); 3679 list_for_each(tmp, &rbd_dev_list) { 3680 rbd_dev = list_entry(tmp, struct rbd_device, node); 3681 if (rbd_dev->dev_id == dev_id) { 3682 spin_unlock(&rbd_dev_list_lock); 3683 return rbd_dev; 3684 } 3685 } 3686 spin_unlock(&rbd_dev_list_lock); 3687 return NULL; 3688 } 3689 3690 static void rbd_dev_release(struct device *dev) 3691 { 3692 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 3693 3694 if (rbd_dev->watch_request) { 3695 struct ceph_client *client = rbd_dev->rbd_client->client; 3696 3697 ceph_osdc_unregister_linger_request(&client->osdc, 3698 rbd_dev->watch_request); 3699 } 3700 if (rbd_dev->watch_event) 3701 rbd_req_sync_unwatch(rbd_dev); 3702 3703 3704 /* clean up and free blkdev */ 3705 rbd_free_disk(rbd_dev); 3706 unregister_blkdev(rbd_dev->major, rbd_dev->name); 3707 3708 /* release allocated disk header fields */ 3709 rbd_header_free(&rbd_dev->header); 3710 3711 /* done with the id, and with the rbd_dev */ 3712 rbd_dev_id_put(rbd_dev); 3713 rbd_assert(rbd_dev->rbd_client != NULL); 3714 rbd_dev_destroy(rbd_dev); 3715 3716 /* release module ref */ 3717 module_put(THIS_MODULE); 3718 } 3719 3720 static ssize_t rbd_remove(struct bus_type *bus, 3721 const char *buf, 3722 size_t count) 3723 { 3724 struct rbd_device *rbd_dev = NULL; 3725 int target_id, rc; 3726 unsigned long ul; 3727 int ret = count; 3728 3729 rc = strict_strtoul(buf, 10, &ul); 3730 if (rc) 3731 return rc; 3732 3733 /* convert to int; abort if we lost anything in the conversion */ 3734 target_id = (int) ul; 3735 if (target_id != ul) 3736 return -EINVAL; 3737 3738 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING); 3739 3740 rbd_dev = __rbd_get_dev(target_id); 3741 if (!rbd_dev) { 3742 ret = -ENOENT; 3743 goto done; 3744 } 3745 3746 if (rbd_dev->open_count) { 3747 ret = -EBUSY; 3748 goto done; 3749 } 3750 3751 rbd_remove_all_snaps(rbd_dev); 3752 rbd_bus_del_dev(rbd_dev); 3753 3754 done: 3755 mutex_unlock(&ctl_mutex); 3756 3757 return ret; 3758 } 3759 3760 /* 3761 * create control files in sysfs 3762 * /sys/bus/rbd/... 3763 */ 3764 static int rbd_sysfs_init(void) 3765 { 3766 int ret; 3767 3768 ret = device_register(&rbd_root_dev); 3769 if (ret < 0) 3770 return ret; 3771 3772 ret = bus_register(&rbd_bus_type); 3773 if (ret < 0) 3774 device_unregister(&rbd_root_dev); 3775 3776 return ret; 3777 } 3778 3779 static void rbd_sysfs_cleanup(void) 3780 { 3781 bus_unregister(&rbd_bus_type); 3782 device_unregister(&rbd_root_dev); 3783 } 3784 3785 int __init rbd_init(void) 3786 { 3787 int rc; 3788 3789 rc = rbd_sysfs_init(); 3790 if (rc) 3791 return rc; 3792 pr_info("loaded " RBD_DRV_NAME_LONG "\n"); 3793 return 0; 3794 } 3795 3796 void __exit rbd_exit(void) 3797 { 3798 rbd_sysfs_cleanup(); 3799 } 3800 3801 module_init(rbd_init); 3802 module_exit(rbd_exit); 3803 3804 MODULE_AUTHOR("Sage Weil <sage@newdream.net>"); 3805 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>"); 3806 MODULE_DESCRIPTION("rados block device"); 3807 3808 /* following authorship retained from original osdblk.c */ 3809 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>"); 3810 3811 MODULE_LICENSE("GPL"); 3812