1 2 /* 3 rbd.c -- Export ceph rados objects as a Linux block device 4 5 6 based on drivers/block/osdblk.c: 7 8 Copyright 2009 Red Hat, Inc. 9 10 This program is free software; you can redistribute it and/or modify 11 it under the terms of the GNU General Public License as published by 12 the Free Software Foundation. 13 14 This program is distributed in the hope that it will be useful, 15 but WITHOUT ANY WARRANTY; without even the implied warranty of 16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 17 GNU General Public License for more details. 18 19 You should have received a copy of the GNU General Public License 20 along with this program; see the file COPYING. If not, write to 21 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA. 22 23 24 25 For usage instructions, please refer to: 26 27 Documentation/ABI/testing/sysfs-bus-rbd 28 29 */ 30 31 #include <linux/ceph/libceph.h> 32 #include <linux/ceph/osd_client.h> 33 #include <linux/ceph/mon_client.h> 34 #include <linux/ceph/cls_lock_client.h> 35 #include <linux/ceph/striper.h> 36 #include <linux/ceph/decode.h> 37 #include <linux/parser.h> 38 #include <linux/bsearch.h> 39 40 #include <linux/kernel.h> 41 #include <linux/device.h> 42 #include <linux/module.h> 43 #include <linux/blk-mq.h> 44 #include <linux/fs.h> 45 #include <linux/blkdev.h> 46 #include <linux/slab.h> 47 #include <linux/idr.h> 48 #include <linux/workqueue.h> 49 50 #include "rbd_types.h" 51 52 #define RBD_DEBUG /* Activate rbd_assert() calls */ 53 54 /* 55 * Increment the given counter and return its updated value. 56 * If the counter is already 0 it will not be incremented. 57 * If the counter is already at its maximum value returns 58 * -EINVAL without updating it. 59 */ 60 static int atomic_inc_return_safe(atomic_t *v) 61 { 62 unsigned int counter; 63 64 counter = (unsigned int)__atomic_add_unless(v, 1, 0); 65 if (counter <= (unsigned int)INT_MAX) 66 return (int)counter; 67 68 atomic_dec(v); 69 70 return -EINVAL; 71 } 72 73 /* Decrement the counter. Return the resulting value, or -EINVAL */ 74 static int atomic_dec_return_safe(atomic_t *v) 75 { 76 int counter; 77 78 counter = atomic_dec_return(v); 79 if (counter >= 0) 80 return counter; 81 82 atomic_inc(v); 83 84 return -EINVAL; 85 } 86 87 #define RBD_DRV_NAME "rbd" 88 89 #define RBD_MINORS_PER_MAJOR 256 90 #define RBD_SINGLE_MAJOR_PART_SHIFT 4 91 92 #define RBD_MAX_PARENT_CHAIN_LEN 16 93 94 #define RBD_SNAP_DEV_NAME_PREFIX "snap_" 95 #define RBD_MAX_SNAP_NAME_LEN \ 96 (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1)) 97 98 #define RBD_MAX_SNAP_COUNT 510 /* allows max snapc to fit in 4KB */ 99 100 #define RBD_SNAP_HEAD_NAME "-" 101 102 #define BAD_SNAP_INDEX U32_MAX /* invalid index into snap array */ 103 104 /* This allows a single page to hold an image name sent by OSD */ 105 #define RBD_IMAGE_NAME_LEN_MAX (PAGE_SIZE - sizeof (__le32) - 1) 106 #define RBD_IMAGE_ID_LEN_MAX 64 107 108 #define RBD_OBJ_PREFIX_LEN_MAX 64 109 110 #define RBD_NOTIFY_TIMEOUT 5 /* seconds */ 111 #define RBD_RETRY_DELAY msecs_to_jiffies(1000) 112 113 /* Feature bits */ 114 115 #define RBD_FEATURE_LAYERING (1ULL<<0) 116 #define RBD_FEATURE_STRIPINGV2 (1ULL<<1) 117 #define RBD_FEATURE_EXCLUSIVE_LOCK (1ULL<<2) 118 #define RBD_FEATURE_DATA_POOL (1ULL<<7) 119 #define RBD_FEATURE_OPERATIONS (1ULL<<8) 120 121 #define RBD_FEATURES_ALL (RBD_FEATURE_LAYERING | \ 122 RBD_FEATURE_STRIPINGV2 | \ 123 RBD_FEATURE_EXCLUSIVE_LOCK | \ 124 RBD_FEATURE_DATA_POOL | \ 125 RBD_FEATURE_OPERATIONS) 126 127 /* Features supported by this (client software) implementation. */ 128 129 #define RBD_FEATURES_SUPPORTED (RBD_FEATURES_ALL) 130 131 /* 132 * An RBD device name will be "rbd#", where the "rbd" comes from 133 * RBD_DRV_NAME above, and # is a unique integer identifier. 134 */ 135 #define DEV_NAME_LEN 32 136 137 /* 138 * block device image metadata (in-memory version) 139 */ 140 struct rbd_image_header { 141 /* These six fields never change for a given rbd image */ 142 char *object_prefix; 143 __u8 obj_order; 144 u64 stripe_unit; 145 u64 stripe_count; 146 s64 data_pool_id; 147 u64 features; /* Might be changeable someday? */ 148 149 /* The remaining fields need to be updated occasionally */ 150 u64 image_size; 151 struct ceph_snap_context *snapc; 152 char *snap_names; /* format 1 only */ 153 u64 *snap_sizes; /* format 1 only */ 154 }; 155 156 /* 157 * An rbd image specification. 158 * 159 * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely 160 * identify an image. Each rbd_dev structure includes a pointer to 161 * an rbd_spec structure that encapsulates this identity. 162 * 163 * Each of the id's in an rbd_spec has an associated name. For a 164 * user-mapped image, the names are supplied and the id's associated 165 * with them are looked up. For a layered image, a parent image is 166 * defined by the tuple, and the names are looked up. 167 * 168 * An rbd_dev structure contains a parent_spec pointer which is 169 * non-null if the image it represents is a child in a layered 170 * image. This pointer will refer to the rbd_spec structure used 171 * by the parent rbd_dev for its own identity (i.e., the structure 172 * is shared between the parent and child). 173 * 174 * Since these structures are populated once, during the discovery 175 * phase of image construction, they are effectively immutable so 176 * we make no effort to synchronize access to them. 177 * 178 * Note that code herein does not assume the image name is known (it 179 * could be a null pointer). 180 */ 181 struct rbd_spec { 182 u64 pool_id; 183 const char *pool_name; 184 185 const char *image_id; 186 const char *image_name; 187 188 u64 snap_id; 189 const char *snap_name; 190 191 struct kref kref; 192 }; 193 194 /* 195 * an instance of the client. multiple devices may share an rbd client. 196 */ 197 struct rbd_client { 198 struct ceph_client *client; 199 struct kref kref; 200 struct list_head node; 201 }; 202 203 struct rbd_img_request; 204 205 enum obj_request_type { 206 OBJ_REQUEST_NODATA = 1, 207 OBJ_REQUEST_BIO, /* pointer into provided bio (list) */ 208 OBJ_REQUEST_BVECS, /* pointer into provided bio_vec array */ 209 OBJ_REQUEST_OWN_BVECS, /* private bio_vec array, doesn't own pages */ 210 }; 211 212 enum obj_operation_type { 213 OBJ_OP_READ = 1, 214 OBJ_OP_WRITE, 215 OBJ_OP_DISCARD, 216 }; 217 218 /* 219 * Writes go through the following state machine to deal with 220 * layering: 221 * 222 * need copyup 223 * RBD_OBJ_WRITE_GUARD ---------------> RBD_OBJ_WRITE_COPYUP 224 * | ^ | 225 * v \------------------------------/ 226 * done 227 * ^ 228 * | 229 * RBD_OBJ_WRITE_FLAT 230 * 231 * Writes start in RBD_OBJ_WRITE_GUARD or _FLAT, depending on whether 232 * there is a parent or not. 233 */ 234 enum rbd_obj_write_state { 235 RBD_OBJ_WRITE_FLAT = 1, 236 RBD_OBJ_WRITE_GUARD, 237 RBD_OBJ_WRITE_COPYUP, 238 }; 239 240 struct rbd_obj_request { 241 struct ceph_object_extent ex; 242 union { 243 bool tried_parent; /* for reads */ 244 enum rbd_obj_write_state write_state; /* for writes */ 245 }; 246 247 struct rbd_img_request *img_request; 248 struct ceph_file_extent *img_extents; 249 u32 num_img_extents; 250 251 union { 252 struct ceph_bio_iter bio_pos; 253 struct { 254 struct ceph_bvec_iter bvec_pos; 255 u32 bvec_count; 256 u32 bvec_idx; 257 }; 258 }; 259 struct bio_vec *copyup_bvecs; 260 u32 copyup_bvec_count; 261 262 struct ceph_osd_request *osd_req; 263 264 u64 xferred; /* bytes transferred */ 265 int result; 266 267 struct kref kref; 268 }; 269 270 enum img_req_flags { 271 IMG_REQ_CHILD, /* initiator: block = 0, child image = 1 */ 272 IMG_REQ_LAYERED, /* ENOENT handling: normal = 0, layered = 1 */ 273 }; 274 275 struct rbd_img_request { 276 struct rbd_device *rbd_dev; 277 enum obj_operation_type op_type; 278 enum obj_request_type data_type; 279 unsigned long flags; 280 union { 281 u64 snap_id; /* for reads */ 282 struct ceph_snap_context *snapc; /* for writes */ 283 }; 284 union { 285 struct request *rq; /* block request */ 286 struct rbd_obj_request *obj_request; /* obj req initiator */ 287 }; 288 spinlock_t completion_lock; 289 u64 xferred;/* aggregate bytes transferred */ 290 int result; /* first nonzero obj_request result */ 291 292 struct list_head object_extents; /* obj_req.ex structs */ 293 u32 obj_request_count; 294 u32 pending_count; 295 296 struct kref kref; 297 }; 298 299 #define for_each_obj_request(ireq, oreq) \ 300 list_for_each_entry(oreq, &(ireq)->object_extents, ex.oe_item) 301 #define for_each_obj_request_safe(ireq, oreq, n) \ 302 list_for_each_entry_safe(oreq, n, &(ireq)->object_extents, ex.oe_item) 303 304 enum rbd_watch_state { 305 RBD_WATCH_STATE_UNREGISTERED, 306 RBD_WATCH_STATE_REGISTERED, 307 RBD_WATCH_STATE_ERROR, 308 }; 309 310 enum rbd_lock_state { 311 RBD_LOCK_STATE_UNLOCKED, 312 RBD_LOCK_STATE_LOCKED, 313 RBD_LOCK_STATE_RELEASING, 314 }; 315 316 /* WatchNotify::ClientId */ 317 struct rbd_client_id { 318 u64 gid; 319 u64 handle; 320 }; 321 322 struct rbd_mapping { 323 u64 size; 324 u64 features; 325 }; 326 327 /* 328 * a single device 329 */ 330 struct rbd_device { 331 int dev_id; /* blkdev unique id */ 332 333 int major; /* blkdev assigned major */ 334 int minor; 335 struct gendisk *disk; /* blkdev's gendisk and rq */ 336 337 u32 image_format; /* Either 1 or 2 */ 338 struct rbd_client *rbd_client; 339 340 char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */ 341 342 spinlock_t lock; /* queue, flags, open_count */ 343 344 struct rbd_image_header header; 345 unsigned long flags; /* possibly lock protected */ 346 struct rbd_spec *spec; 347 struct rbd_options *opts; 348 char *config_info; /* add{,_single_major} string */ 349 350 struct ceph_object_id header_oid; 351 struct ceph_object_locator header_oloc; 352 353 struct ceph_file_layout layout; /* used for all rbd requests */ 354 355 struct mutex watch_mutex; 356 enum rbd_watch_state watch_state; 357 struct ceph_osd_linger_request *watch_handle; 358 u64 watch_cookie; 359 struct delayed_work watch_dwork; 360 361 struct rw_semaphore lock_rwsem; 362 enum rbd_lock_state lock_state; 363 char lock_cookie[32]; 364 struct rbd_client_id owner_cid; 365 struct work_struct acquired_lock_work; 366 struct work_struct released_lock_work; 367 struct delayed_work lock_dwork; 368 struct work_struct unlock_work; 369 wait_queue_head_t lock_waitq; 370 371 struct workqueue_struct *task_wq; 372 373 struct rbd_spec *parent_spec; 374 u64 parent_overlap; 375 atomic_t parent_ref; 376 struct rbd_device *parent; 377 378 /* Block layer tags. */ 379 struct blk_mq_tag_set tag_set; 380 381 /* protects updating the header */ 382 struct rw_semaphore header_rwsem; 383 384 struct rbd_mapping mapping; 385 386 struct list_head node; 387 388 /* sysfs related */ 389 struct device dev; 390 unsigned long open_count; /* protected by lock */ 391 }; 392 393 /* 394 * Flag bits for rbd_dev->flags: 395 * - REMOVING (which is coupled with rbd_dev->open_count) is protected 396 * by rbd_dev->lock 397 * - BLACKLISTED is protected by rbd_dev->lock_rwsem 398 */ 399 enum rbd_dev_flags { 400 RBD_DEV_FLAG_EXISTS, /* mapped snapshot has not been deleted */ 401 RBD_DEV_FLAG_REMOVING, /* this mapping is being removed */ 402 RBD_DEV_FLAG_BLACKLISTED, /* our ceph_client is blacklisted */ 403 }; 404 405 static DEFINE_MUTEX(client_mutex); /* Serialize client creation */ 406 407 static LIST_HEAD(rbd_dev_list); /* devices */ 408 static DEFINE_SPINLOCK(rbd_dev_list_lock); 409 410 static LIST_HEAD(rbd_client_list); /* clients */ 411 static DEFINE_SPINLOCK(rbd_client_list_lock); 412 413 /* Slab caches for frequently-allocated structures */ 414 415 static struct kmem_cache *rbd_img_request_cache; 416 static struct kmem_cache *rbd_obj_request_cache; 417 418 static int rbd_major; 419 static DEFINE_IDA(rbd_dev_id_ida); 420 421 static struct workqueue_struct *rbd_wq; 422 423 /* 424 * single-major requires >= 0.75 version of userspace rbd utility. 425 */ 426 static bool single_major = true; 427 module_param(single_major, bool, S_IRUGO); 428 MODULE_PARM_DESC(single_major, "Use a single major number for all rbd devices (default: true)"); 429 430 static ssize_t rbd_add(struct bus_type *bus, const char *buf, 431 size_t count); 432 static ssize_t rbd_remove(struct bus_type *bus, const char *buf, 433 size_t count); 434 static ssize_t rbd_add_single_major(struct bus_type *bus, const char *buf, 435 size_t count); 436 static ssize_t rbd_remove_single_major(struct bus_type *bus, const char *buf, 437 size_t count); 438 static int rbd_dev_image_probe(struct rbd_device *rbd_dev, int depth); 439 440 static int rbd_dev_id_to_minor(int dev_id) 441 { 442 return dev_id << RBD_SINGLE_MAJOR_PART_SHIFT; 443 } 444 445 static int minor_to_rbd_dev_id(int minor) 446 { 447 return minor >> RBD_SINGLE_MAJOR_PART_SHIFT; 448 } 449 450 static bool __rbd_is_lock_owner(struct rbd_device *rbd_dev) 451 { 452 return rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED || 453 rbd_dev->lock_state == RBD_LOCK_STATE_RELEASING; 454 } 455 456 static bool rbd_is_lock_owner(struct rbd_device *rbd_dev) 457 { 458 bool is_lock_owner; 459 460 down_read(&rbd_dev->lock_rwsem); 461 is_lock_owner = __rbd_is_lock_owner(rbd_dev); 462 up_read(&rbd_dev->lock_rwsem); 463 return is_lock_owner; 464 } 465 466 static ssize_t rbd_supported_features_show(struct bus_type *bus, char *buf) 467 { 468 return sprintf(buf, "0x%llx\n", RBD_FEATURES_SUPPORTED); 469 } 470 471 static BUS_ATTR(add, S_IWUSR, NULL, rbd_add); 472 static BUS_ATTR(remove, S_IWUSR, NULL, rbd_remove); 473 static BUS_ATTR(add_single_major, S_IWUSR, NULL, rbd_add_single_major); 474 static BUS_ATTR(remove_single_major, S_IWUSR, NULL, rbd_remove_single_major); 475 static BUS_ATTR(supported_features, S_IRUGO, rbd_supported_features_show, NULL); 476 477 static struct attribute *rbd_bus_attrs[] = { 478 &bus_attr_add.attr, 479 &bus_attr_remove.attr, 480 &bus_attr_add_single_major.attr, 481 &bus_attr_remove_single_major.attr, 482 &bus_attr_supported_features.attr, 483 NULL, 484 }; 485 486 static umode_t rbd_bus_is_visible(struct kobject *kobj, 487 struct attribute *attr, int index) 488 { 489 if (!single_major && 490 (attr == &bus_attr_add_single_major.attr || 491 attr == &bus_attr_remove_single_major.attr)) 492 return 0; 493 494 return attr->mode; 495 } 496 497 static const struct attribute_group rbd_bus_group = { 498 .attrs = rbd_bus_attrs, 499 .is_visible = rbd_bus_is_visible, 500 }; 501 __ATTRIBUTE_GROUPS(rbd_bus); 502 503 static struct bus_type rbd_bus_type = { 504 .name = "rbd", 505 .bus_groups = rbd_bus_groups, 506 }; 507 508 static void rbd_root_dev_release(struct device *dev) 509 { 510 } 511 512 static struct device rbd_root_dev = { 513 .init_name = "rbd", 514 .release = rbd_root_dev_release, 515 }; 516 517 static __printf(2, 3) 518 void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...) 519 { 520 struct va_format vaf; 521 va_list args; 522 523 va_start(args, fmt); 524 vaf.fmt = fmt; 525 vaf.va = &args; 526 527 if (!rbd_dev) 528 printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf); 529 else if (rbd_dev->disk) 530 printk(KERN_WARNING "%s: %s: %pV\n", 531 RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf); 532 else if (rbd_dev->spec && rbd_dev->spec->image_name) 533 printk(KERN_WARNING "%s: image %s: %pV\n", 534 RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf); 535 else if (rbd_dev->spec && rbd_dev->spec->image_id) 536 printk(KERN_WARNING "%s: id %s: %pV\n", 537 RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf); 538 else /* punt */ 539 printk(KERN_WARNING "%s: rbd_dev %p: %pV\n", 540 RBD_DRV_NAME, rbd_dev, &vaf); 541 va_end(args); 542 } 543 544 #ifdef RBD_DEBUG 545 #define rbd_assert(expr) \ 546 if (unlikely(!(expr))) { \ 547 printk(KERN_ERR "\nAssertion failure in %s() " \ 548 "at line %d:\n\n" \ 549 "\trbd_assert(%s);\n\n", \ 550 __func__, __LINE__, #expr); \ 551 BUG(); \ 552 } 553 #else /* !RBD_DEBUG */ 554 # define rbd_assert(expr) ((void) 0) 555 #endif /* !RBD_DEBUG */ 556 557 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev); 558 559 static int rbd_dev_refresh(struct rbd_device *rbd_dev); 560 static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev); 561 static int rbd_dev_header_info(struct rbd_device *rbd_dev); 562 static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev); 563 static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev, 564 u64 snap_id); 565 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id, 566 u8 *order, u64 *snap_size); 567 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id, 568 u64 *snap_features); 569 570 static int rbd_open(struct block_device *bdev, fmode_t mode) 571 { 572 struct rbd_device *rbd_dev = bdev->bd_disk->private_data; 573 bool removing = false; 574 575 spin_lock_irq(&rbd_dev->lock); 576 if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags)) 577 removing = true; 578 else 579 rbd_dev->open_count++; 580 spin_unlock_irq(&rbd_dev->lock); 581 if (removing) 582 return -ENOENT; 583 584 (void) get_device(&rbd_dev->dev); 585 586 return 0; 587 } 588 589 static void rbd_release(struct gendisk *disk, fmode_t mode) 590 { 591 struct rbd_device *rbd_dev = disk->private_data; 592 unsigned long open_count_before; 593 594 spin_lock_irq(&rbd_dev->lock); 595 open_count_before = rbd_dev->open_count--; 596 spin_unlock_irq(&rbd_dev->lock); 597 rbd_assert(open_count_before > 0); 598 599 put_device(&rbd_dev->dev); 600 } 601 602 static int rbd_ioctl_set_ro(struct rbd_device *rbd_dev, unsigned long arg) 603 { 604 int ro; 605 606 if (get_user(ro, (int __user *)arg)) 607 return -EFAULT; 608 609 /* Snapshots can't be marked read-write */ 610 if (rbd_dev->spec->snap_id != CEPH_NOSNAP && !ro) 611 return -EROFS; 612 613 /* Let blkdev_roset() handle it */ 614 return -ENOTTY; 615 } 616 617 static int rbd_ioctl(struct block_device *bdev, fmode_t mode, 618 unsigned int cmd, unsigned long arg) 619 { 620 struct rbd_device *rbd_dev = bdev->bd_disk->private_data; 621 int ret; 622 623 switch (cmd) { 624 case BLKROSET: 625 ret = rbd_ioctl_set_ro(rbd_dev, arg); 626 break; 627 default: 628 ret = -ENOTTY; 629 } 630 631 return ret; 632 } 633 634 #ifdef CONFIG_COMPAT 635 static int rbd_compat_ioctl(struct block_device *bdev, fmode_t mode, 636 unsigned int cmd, unsigned long arg) 637 { 638 return rbd_ioctl(bdev, mode, cmd, arg); 639 } 640 #endif /* CONFIG_COMPAT */ 641 642 static const struct block_device_operations rbd_bd_ops = { 643 .owner = THIS_MODULE, 644 .open = rbd_open, 645 .release = rbd_release, 646 .ioctl = rbd_ioctl, 647 #ifdef CONFIG_COMPAT 648 .compat_ioctl = rbd_compat_ioctl, 649 #endif 650 }; 651 652 /* 653 * Initialize an rbd client instance. Success or not, this function 654 * consumes ceph_opts. Caller holds client_mutex. 655 */ 656 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts) 657 { 658 struct rbd_client *rbdc; 659 int ret = -ENOMEM; 660 661 dout("%s:\n", __func__); 662 rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL); 663 if (!rbdc) 664 goto out_opt; 665 666 kref_init(&rbdc->kref); 667 INIT_LIST_HEAD(&rbdc->node); 668 669 rbdc->client = ceph_create_client(ceph_opts, rbdc); 670 if (IS_ERR(rbdc->client)) 671 goto out_rbdc; 672 ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */ 673 674 ret = ceph_open_session(rbdc->client); 675 if (ret < 0) 676 goto out_client; 677 678 spin_lock(&rbd_client_list_lock); 679 list_add_tail(&rbdc->node, &rbd_client_list); 680 spin_unlock(&rbd_client_list_lock); 681 682 dout("%s: rbdc %p\n", __func__, rbdc); 683 684 return rbdc; 685 out_client: 686 ceph_destroy_client(rbdc->client); 687 out_rbdc: 688 kfree(rbdc); 689 out_opt: 690 if (ceph_opts) 691 ceph_destroy_options(ceph_opts); 692 dout("%s: error %d\n", __func__, ret); 693 694 return ERR_PTR(ret); 695 } 696 697 static struct rbd_client *__rbd_get_client(struct rbd_client *rbdc) 698 { 699 kref_get(&rbdc->kref); 700 701 return rbdc; 702 } 703 704 /* 705 * Find a ceph client with specific addr and configuration. If 706 * found, bump its reference count. 707 */ 708 static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts) 709 { 710 struct rbd_client *client_node; 711 bool found = false; 712 713 if (ceph_opts->flags & CEPH_OPT_NOSHARE) 714 return NULL; 715 716 spin_lock(&rbd_client_list_lock); 717 list_for_each_entry(client_node, &rbd_client_list, node) { 718 if (!ceph_compare_options(ceph_opts, client_node->client)) { 719 __rbd_get_client(client_node); 720 721 found = true; 722 break; 723 } 724 } 725 spin_unlock(&rbd_client_list_lock); 726 727 return found ? client_node : NULL; 728 } 729 730 /* 731 * (Per device) rbd map options 732 */ 733 enum { 734 Opt_queue_depth, 735 Opt_last_int, 736 /* int args above */ 737 Opt_last_string, 738 /* string args above */ 739 Opt_read_only, 740 Opt_read_write, 741 Opt_lock_on_read, 742 Opt_exclusive, 743 Opt_err 744 }; 745 746 static match_table_t rbd_opts_tokens = { 747 {Opt_queue_depth, "queue_depth=%d"}, 748 /* int args above */ 749 /* string args above */ 750 {Opt_read_only, "read_only"}, 751 {Opt_read_only, "ro"}, /* Alternate spelling */ 752 {Opt_read_write, "read_write"}, 753 {Opt_read_write, "rw"}, /* Alternate spelling */ 754 {Opt_lock_on_read, "lock_on_read"}, 755 {Opt_exclusive, "exclusive"}, 756 {Opt_err, NULL} 757 }; 758 759 struct rbd_options { 760 int queue_depth; 761 bool read_only; 762 bool lock_on_read; 763 bool exclusive; 764 }; 765 766 #define RBD_QUEUE_DEPTH_DEFAULT BLKDEV_MAX_RQ 767 #define RBD_READ_ONLY_DEFAULT false 768 #define RBD_LOCK_ON_READ_DEFAULT false 769 #define RBD_EXCLUSIVE_DEFAULT false 770 771 static int parse_rbd_opts_token(char *c, void *private) 772 { 773 struct rbd_options *rbd_opts = private; 774 substring_t argstr[MAX_OPT_ARGS]; 775 int token, intval, ret; 776 777 token = match_token(c, rbd_opts_tokens, argstr); 778 if (token < Opt_last_int) { 779 ret = match_int(&argstr[0], &intval); 780 if (ret < 0) { 781 pr_err("bad mount option arg (not int) at '%s'\n", c); 782 return ret; 783 } 784 dout("got int token %d val %d\n", token, intval); 785 } else if (token > Opt_last_int && token < Opt_last_string) { 786 dout("got string token %d val %s\n", token, argstr[0].from); 787 } else { 788 dout("got token %d\n", token); 789 } 790 791 switch (token) { 792 case Opt_queue_depth: 793 if (intval < 1) { 794 pr_err("queue_depth out of range\n"); 795 return -EINVAL; 796 } 797 rbd_opts->queue_depth = intval; 798 break; 799 case Opt_read_only: 800 rbd_opts->read_only = true; 801 break; 802 case Opt_read_write: 803 rbd_opts->read_only = false; 804 break; 805 case Opt_lock_on_read: 806 rbd_opts->lock_on_read = true; 807 break; 808 case Opt_exclusive: 809 rbd_opts->exclusive = true; 810 break; 811 default: 812 /* libceph prints "bad option" msg */ 813 return -EINVAL; 814 } 815 816 return 0; 817 } 818 819 static char* obj_op_name(enum obj_operation_type op_type) 820 { 821 switch (op_type) { 822 case OBJ_OP_READ: 823 return "read"; 824 case OBJ_OP_WRITE: 825 return "write"; 826 case OBJ_OP_DISCARD: 827 return "discard"; 828 default: 829 return "???"; 830 } 831 } 832 833 /* 834 * Destroy ceph client 835 * 836 * Caller must hold rbd_client_list_lock. 837 */ 838 static void rbd_client_release(struct kref *kref) 839 { 840 struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref); 841 842 dout("%s: rbdc %p\n", __func__, rbdc); 843 spin_lock(&rbd_client_list_lock); 844 list_del(&rbdc->node); 845 spin_unlock(&rbd_client_list_lock); 846 847 ceph_destroy_client(rbdc->client); 848 kfree(rbdc); 849 } 850 851 /* 852 * Drop reference to ceph client node. If it's not referenced anymore, release 853 * it. 854 */ 855 static void rbd_put_client(struct rbd_client *rbdc) 856 { 857 if (rbdc) 858 kref_put(&rbdc->kref, rbd_client_release); 859 } 860 861 static int wait_for_latest_osdmap(struct ceph_client *client) 862 { 863 u64 newest_epoch; 864 int ret; 865 866 ret = ceph_monc_get_version(&client->monc, "osdmap", &newest_epoch); 867 if (ret) 868 return ret; 869 870 if (client->osdc.osdmap->epoch >= newest_epoch) 871 return 0; 872 873 ceph_osdc_maybe_request_map(&client->osdc); 874 return ceph_monc_wait_osdmap(&client->monc, newest_epoch, 875 client->options->mount_timeout); 876 } 877 878 /* 879 * Get a ceph client with specific addr and configuration, if one does 880 * not exist create it. Either way, ceph_opts is consumed by this 881 * function. 882 */ 883 static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts) 884 { 885 struct rbd_client *rbdc; 886 int ret; 887 888 mutex_lock_nested(&client_mutex, SINGLE_DEPTH_NESTING); 889 rbdc = rbd_client_find(ceph_opts); 890 if (rbdc) { 891 ceph_destroy_options(ceph_opts); 892 893 /* 894 * Using an existing client. Make sure ->pg_pools is up to 895 * date before we look up the pool id in do_rbd_add(). 896 */ 897 ret = wait_for_latest_osdmap(rbdc->client); 898 if (ret) { 899 rbd_warn(NULL, "failed to get latest osdmap: %d", ret); 900 rbd_put_client(rbdc); 901 rbdc = ERR_PTR(ret); 902 } 903 } else { 904 rbdc = rbd_client_create(ceph_opts); 905 } 906 mutex_unlock(&client_mutex); 907 908 return rbdc; 909 } 910 911 static bool rbd_image_format_valid(u32 image_format) 912 { 913 return image_format == 1 || image_format == 2; 914 } 915 916 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk) 917 { 918 size_t size; 919 u32 snap_count; 920 921 /* The header has to start with the magic rbd header text */ 922 if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT))) 923 return false; 924 925 /* The bio layer requires at least sector-sized I/O */ 926 927 if (ondisk->options.order < SECTOR_SHIFT) 928 return false; 929 930 /* If we use u64 in a few spots we may be able to loosen this */ 931 932 if (ondisk->options.order > 8 * sizeof (int) - 1) 933 return false; 934 935 /* 936 * The size of a snapshot header has to fit in a size_t, and 937 * that limits the number of snapshots. 938 */ 939 snap_count = le32_to_cpu(ondisk->snap_count); 940 size = SIZE_MAX - sizeof (struct ceph_snap_context); 941 if (snap_count > size / sizeof (__le64)) 942 return false; 943 944 /* 945 * Not only that, but the size of the entire the snapshot 946 * header must also be representable in a size_t. 947 */ 948 size -= snap_count * sizeof (__le64); 949 if ((u64) size < le64_to_cpu(ondisk->snap_names_len)) 950 return false; 951 952 return true; 953 } 954 955 /* 956 * returns the size of an object in the image 957 */ 958 static u32 rbd_obj_bytes(struct rbd_image_header *header) 959 { 960 return 1U << header->obj_order; 961 } 962 963 static void rbd_init_layout(struct rbd_device *rbd_dev) 964 { 965 if (rbd_dev->header.stripe_unit == 0 || 966 rbd_dev->header.stripe_count == 0) { 967 rbd_dev->header.stripe_unit = rbd_obj_bytes(&rbd_dev->header); 968 rbd_dev->header.stripe_count = 1; 969 } 970 971 rbd_dev->layout.stripe_unit = rbd_dev->header.stripe_unit; 972 rbd_dev->layout.stripe_count = rbd_dev->header.stripe_count; 973 rbd_dev->layout.object_size = rbd_obj_bytes(&rbd_dev->header); 974 rbd_dev->layout.pool_id = rbd_dev->header.data_pool_id == CEPH_NOPOOL ? 975 rbd_dev->spec->pool_id : rbd_dev->header.data_pool_id; 976 RCU_INIT_POINTER(rbd_dev->layout.pool_ns, NULL); 977 } 978 979 /* 980 * Fill an rbd image header with information from the given format 1 981 * on-disk header. 982 */ 983 static int rbd_header_from_disk(struct rbd_device *rbd_dev, 984 struct rbd_image_header_ondisk *ondisk) 985 { 986 struct rbd_image_header *header = &rbd_dev->header; 987 bool first_time = header->object_prefix == NULL; 988 struct ceph_snap_context *snapc; 989 char *object_prefix = NULL; 990 char *snap_names = NULL; 991 u64 *snap_sizes = NULL; 992 u32 snap_count; 993 int ret = -ENOMEM; 994 u32 i; 995 996 /* Allocate this now to avoid having to handle failure below */ 997 998 if (first_time) { 999 object_prefix = kstrndup(ondisk->object_prefix, 1000 sizeof(ondisk->object_prefix), 1001 GFP_KERNEL); 1002 if (!object_prefix) 1003 return -ENOMEM; 1004 } 1005 1006 /* Allocate the snapshot context and fill it in */ 1007 1008 snap_count = le32_to_cpu(ondisk->snap_count); 1009 snapc = ceph_create_snap_context(snap_count, GFP_KERNEL); 1010 if (!snapc) 1011 goto out_err; 1012 snapc->seq = le64_to_cpu(ondisk->snap_seq); 1013 if (snap_count) { 1014 struct rbd_image_snap_ondisk *snaps; 1015 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len); 1016 1017 /* We'll keep a copy of the snapshot names... */ 1018 1019 if (snap_names_len > (u64)SIZE_MAX) 1020 goto out_2big; 1021 snap_names = kmalloc(snap_names_len, GFP_KERNEL); 1022 if (!snap_names) 1023 goto out_err; 1024 1025 /* ...as well as the array of their sizes. */ 1026 snap_sizes = kmalloc_array(snap_count, 1027 sizeof(*header->snap_sizes), 1028 GFP_KERNEL); 1029 if (!snap_sizes) 1030 goto out_err; 1031 1032 /* 1033 * Copy the names, and fill in each snapshot's id 1034 * and size. 1035 * 1036 * Note that rbd_dev_v1_header_info() guarantees the 1037 * ondisk buffer we're working with has 1038 * snap_names_len bytes beyond the end of the 1039 * snapshot id array, this memcpy() is safe. 1040 */ 1041 memcpy(snap_names, &ondisk->snaps[snap_count], snap_names_len); 1042 snaps = ondisk->snaps; 1043 for (i = 0; i < snap_count; i++) { 1044 snapc->snaps[i] = le64_to_cpu(snaps[i].id); 1045 snap_sizes[i] = le64_to_cpu(snaps[i].image_size); 1046 } 1047 } 1048 1049 /* We won't fail any more, fill in the header */ 1050 1051 if (first_time) { 1052 header->object_prefix = object_prefix; 1053 header->obj_order = ondisk->options.order; 1054 rbd_init_layout(rbd_dev); 1055 } else { 1056 ceph_put_snap_context(header->snapc); 1057 kfree(header->snap_names); 1058 kfree(header->snap_sizes); 1059 } 1060 1061 /* The remaining fields always get updated (when we refresh) */ 1062 1063 header->image_size = le64_to_cpu(ondisk->image_size); 1064 header->snapc = snapc; 1065 header->snap_names = snap_names; 1066 header->snap_sizes = snap_sizes; 1067 1068 return 0; 1069 out_2big: 1070 ret = -EIO; 1071 out_err: 1072 kfree(snap_sizes); 1073 kfree(snap_names); 1074 ceph_put_snap_context(snapc); 1075 kfree(object_prefix); 1076 1077 return ret; 1078 } 1079 1080 static const char *_rbd_dev_v1_snap_name(struct rbd_device *rbd_dev, u32 which) 1081 { 1082 const char *snap_name; 1083 1084 rbd_assert(which < rbd_dev->header.snapc->num_snaps); 1085 1086 /* Skip over names until we find the one we are looking for */ 1087 1088 snap_name = rbd_dev->header.snap_names; 1089 while (which--) 1090 snap_name += strlen(snap_name) + 1; 1091 1092 return kstrdup(snap_name, GFP_KERNEL); 1093 } 1094 1095 /* 1096 * Snapshot id comparison function for use with qsort()/bsearch(). 1097 * Note that result is for snapshots in *descending* order. 1098 */ 1099 static int snapid_compare_reverse(const void *s1, const void *s2) 1100 { 1101 u64 snap_id1 = *(u64 *)s1; 1102 u64 snap_id2 = *(u64 *)s2; 1103 1104 if (snap_id1 < snap_id2) 1105 return 1; 1106 return snap_id1 == snap_id2 ? 0 : -1; 1107 } 1108 1109 /* 1110 * Search a snapshot context to see if the given snapshot id is 1111 * present. 1112 * 1113 * Returns the position of the snapshot id in the array if it's found, 1114 * or BAD_SNAP_INDEX otherwise. 1115 * 1116 * Note: The snapshot array is in kept sorted (by the osd) in 1117 * reverse order, highest snapshot id first. 1118 */ 1119 static u32 rbd_dev_snap_index(struct rbd_device *rbd_dev, u64 snap_id) 1120 { 1121 struct ceph_snap_context *snapc = rbd_dev->header.snapc; 1122 u64 *found; 1123 1124 found = bsearch(&snap_id, &snapc->snaps, snapc->num_snaps, 1125 sizeof (snap_id), snapid_compare_reverse); 1126 1127 return found ? (u32)(found - &snapc->snaps[0]) : BAD_SNAP_INDEX; 1128 } 1129 1130 static const char *rbd_dev_v1_snap_name(struct rbd_device *rbd_dev, 1131 u64 snap_id) 1132 { 1133 u32 which; 1134 const char *snap_name; 1135 1136 which = rbd_dev_snap_index(rbd_dev, snap_id); 1137 if (which == BAD_SNAP_INDEX) 1138 return ERR_PTR(-ENOENT); 1139 1140 snap_name = _rbd_dev_v1_snap_name(rbd_dev, which); 1141 return snap_name ? snap_name : ERR_PTR(-ENOMEM); 1142 } 1143 1144 static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id) 1145 { 1146 if (snap_id == CEPH_NOSNAP) 1147 return RBD_SNAP_HEAD_NAME; 1148 1149 rbd_assert(rbd_image_format_valid(rbd_dev->image_format)); 1150 if (rbd_dev->image_format == 1) 1151 return rbd_dev_v1_snap_name(rbd_dev, snap_id); 1152 1153 return rbd_dev_v2_snap_name(rbd_dev, snap_id); 1154 } 1155 1156 static int rbd_snap_size(struct rbd_device *rbd_dev, u64 snap_id, 1157 u64 *snap_size) 1158 { 1159 rbd_assert(rbd_image_format_valid(rbd_dev->image_format)); 1160 if (snap_id == CEPH_NOSNAP) { 1161 *snap_size = rbd_dev->header.image_size; 1162 } else if (rbd_dev->image_format == 1) { 1163 u32 which; 1164 1165 which = rbd_dev_snap_index(rbd_dev, snap_id); 1166 if (which == BAD_SNAP_INDEX) 1167 return -ENOENT; 1168 1169 *snap_size = rbd_dev->header.snap_sizes[which]; 1170 } else { 1171 u64 size = 0; 1172 int ret; 1173 1174 ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, NULL, &size); 1175 if (ret) 1176 return ret; 1177 1178 *snap_size = size; 1179 } 1180 return 0; 1181 } 1182 1183 static int rbd_snap_features(struct rbd_device *rbd_dev, u64 snap_id, 1184 u64 *snap_features) 1185 { 1186 rbd_assert(rbd_image_format_valid(rbd_dev->image_format)); 1187 if (snap_id == CEPH_NOSNAP) { 1188 *snap_features = rbd_dev->header.features; 1189 } else if (rbd_dev->image_format == 1) { 1190 *snap_features = 0; /* No features for format 1 */ 1191 } else { 1192 u64 features = 0; 1193 int ret; 1194 1195 ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, &features); 1196 if (ret) 1197 return ret; 1198 1199 *snap_features = features; 1200 } 1201 return 0; 1202 } 1203 1204 static int rbd_dev_mapping_set(struct rbd_device *rbd_dev) 1205 { 1206 u64 snap_id = rbd_dev->spec->snap_id; 1207 u64 size = 0; 1208 u64 features = 0; 1209 int ret; 1210 1211 ret = rbd_snap_size(rbd_dev, snap_id, &size); 1212 if (ret) 1213 return ret; 1214 ret = rbd_snap_features(rbd_dev, snap_id, &features); 1215 if (ret) 1216 return ret; 1217 1218 rbd_dev->mapping.size = size; 1219 rbd_dev->mapping.features = features; 1220 1221 return 0; 1222 } 1223 1224 static void rbd_dev_mapping_clear(struct rbd_device *rbd_dev) 1225 { 1226 rbd_dev->mapping.size = 0; 1227 rbd_dev->mapping.features = 0; 1228 } 1229 1230 static void zero_bvec(struct bio_vec *bv) 1231 { 1232 void *buf; 1233 unsigned long flags; 1234 1235 buf = bvec_kmap_irq(bv, &flags); 1236 memset(buf, 0, bv->bv_len); 1237 flush_dcache_page(bv->bv_page); 1238 bvec_kunmap_irq(buf, &flags); 1239 } 1240 1241 static void zero_bios(struct ceph_bio_iter *bio_pos, u32 off, u32 bytes) 1242 { 1243 struct ceph_bio_iter it = *bio_pos; 1244 1245 ceph_bio_iter_advance(&it, off); 1246 ceph_bio_iter_advance_step(&it, bytes, ({ 1247 zero_bvec(&bv); 1248 })); 1249 } 1250 1251 static void zero_bvecs(struct ceph_bvec_iter *bvec_pos, u32 off, u32 bytes) 1252 { 1253 struct ceph_bvec_iter it = *bvec_pos; 1254 1255 ceph_bvec_iter_advance(&it, off); 1256 ceph_bvec_iter_advance_step(&it, bytes, ({ 1257 zero_bvec(&bv); 1258 })); 1259 } 1260 1261 /* 1262 * Zero a range in @obj_req data buffer defined by a bio (list) or 1263 * (private) bio_vec array. 1264 * 1265 * @off is relative to the start of the data buffer. 1266 */ 1267 static void rbd_obj_zero_range(struct rbd_obj_request *obj_req, u32 off, 1268 u32 bytes) 1269 { 1270 switch (obj_req->img_request->data_type) { 1271 case OBJ_REQUEST_BIO: 1272 zero_bios(&obj_req->bio_pos, off, bytes); 1273 break; 1274 case OBJ_REQUEST_BVECS: 1275 case OBJ_REQUEST_OWN_BVECS: 1276 zero_bvecs(&obj_req->bvec_pos, off, bytes); 1277 break; 1278 default: 1279 rbd_assert(0); 1280 } 1281 } 1282 1283 static void rbd_obj_request_destroy(struct kref *kref); 1284 static void rbd_obj_request_put(struct rbd_obj_request *obj_request) 1285 { 1286 rbd_assert(obj_request != NULL); 1287 dout("%s: obj %p (was %d)\n", __func__, obj_request, 1288 kref_read(&obj_request->kref)); 1289 kref_put(&obj_request->kref, rbd_obj_request_destroy); 1290 } 1291 1292 static void rbd_img_request_get(struct rbd_img_request *img_request) 1293 { 1294 dout("%s: img %p (was %d)\n", __func__, img_request, 1295 kref_read(&img_request->kref)); 1296 kref_get(&img_request->kref); 1297 } 1298 1299 static void rbd_img_request_destroy(struct kref *kref); 1300 static void rbd_img_request_put(struct rbd_img_request *img_request) 1301 { 1302 rbd_assert(img_request != NULL); 1303 dout("%s: img %p (was %d)\n", __func__, img_request, 1304 kref_read(&img_request->kref)); 1305 kref_put(&img_request->kref, rbd_img_request_destroy); 1306 } 1307 1308 static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request, 1309 struct rbd_obj_request *obj_request) 1310 { 1311 rbd_assert(obj_request->img_request == NULL); 1312 1313 /* Image request now owns object's original reference */ 1314 obj_request->img_request = img_request; 1315 img_request->obj_request_count++; 1316 img_request->pending_count++; 1317 dout("%s: img %p obj %p\n", __func__, img_request, obj_request); 1318 } 1319 1320 static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request, 1321 struct rbd_obj_request *obj_request) 1322 { 1323 dout("%s: img %p obj %p\n", __func__, img_request, obj_request); 1324 list_del(&obj_request->ex.oe_item); 1325 rbd_assert(img_request->obj_request_count > 0); 1326 img_request->obj_request_count--; 1327 rbd_assert(obj_request->img_request == img_request); 1328 rbd_obj_request_put(obj_request); 1329 } 1330 1331 static void rbd_obj_request_submit(struct rbd_obj_request *obj_request) 1332 { 1333 struct ceph_osd_request *osd_req = obj_request->osd_req; 1334 1335 dout("%s %p object_no %016llx %llu~%llu osd_req %p\n", __func__, 1336 obj_request, obj_request->ex.oe_objno, obj_request->ex.oe_off, 1337 obj_request->ex.oe_len, osd_req); 1338 ceph_osdc_start_request(osd_req->r_osdc, osd_req, false); 1339 } 1340 1341 /* 1342 * The default/initial value for all image request flags is 0. Each 1343 * is conditionally set to 1 at image request initialization time 1344 * and currently never change thereafter. 1345 */ 1346 static void img_request_layered_set(struct rbd_img_request *img_request) 1347 { 1348 set_bit(IMG_REQ_LAYERED, &img_request->flags); 1349 smp_mb(); 1350 } 1351 1352 static void img_request_layered_clear(struct rbd_img_request *img_request) 1353 { 1354 clear_bit(IMG_REQ_LAYERED, &img_request->flags); 1355 smp_mb(); 1356 } 1357 1358 static bool img_request_layered_test(struct rbd_img_request *img_request) 1359 { 1360 smp_mb(); 1361 return test_bit(IMG_REQ_LAYERED, &img_request->flags) != 0; 1362 } 1363 1364 static bool rbd_obj_is_entire(struct rbd_obj_request *obj_req) 1365 { 1366 struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev; 1367 1368 return !obj_req->ex.oe_off && 1369 obj_req->ex.oe_len == rbd_dev->layout.object_size; 1370 } 1371 1372 static bool rbd_obj_is_tail(struct rbd_obj_request *obj_req) 1373 { 1374 struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev; 1375 1376 return obj_req->ex.oe_off + obj_req->ex.oe_len == 1377 rbd_dev->layout.object_size; 1378 } 1379 1380 static u64 rbd_obj_img_extents_bytes(struct rbd_obj_request *obj_req) 1381 { 1382 return ceph_file_extents_bytes(obj_req->img_extents, 1383 obj_req->num_img_extents); 1384 } 1385 1386 static bool rbd_img_is_write(struct rbd_img_request *img_req) 1387 { 1388 switch (img_req->op_type) { 1389 case OBJ_OP_READ: 1390 return false; 1391 case OBJ_OP_WRITE: 1392 case OBJ_OP_DISCARD: 1393 return true; 1394 default: 1395 rbd_assert(0); 1396 } 1397 } 1398 1399 static void rbd_obj_handle_request(struct rbd_obj_request *obj_req); 1400 1401 static void rbd_osd_req_callback(struct ceph_osd_request *osd_req) 1402 { 1403 struct rbd_obj_request *obj_req = osd_req->r_priv; 1404 1405 dout("%s osd_req %p result %d for obj_req %p\n", __func__, osd_req, 1406 osd_req->r_result, obj_req); 1407 rbd_assert(osd_req == obj_req->osd_req); 1408 1409 obj_req->result = osd_req->r_result < 0 ? osd_req->r_result : 0; 1410 if (!obj_req->result && !rbd_img_is_write(obj_req->img_request)) 1411 obj_req->xferred = osd_req->r_result; 1412 else 1413 /* 1414 * Writes aren't allowed to return a data payload. In some 1415 * guarded write cases (e.g. stat + zero on an empty object) 1416 * a stat response makes it through, but we don't care. 1417 */ 1418 obj_req->xferred = 0; 1419 1420 rbd_obj_handle_request(obj_req); 1421 } 1422 1423 static void rbd_osd_req_format_read(struct rbd_obj_request *obj_request) 1424 { 1425 struct ceph_osd_request *osd_req = obj_request->osd_req; 1426 1427 osd_req->r_flags = CEPH_OSD_FLAG_READ; 1428 osd_req->r_snapid = obj_request->img_request->snap_id; 1429 } 1430 1431 static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request) 1432 { 1433 struct ceph_osd_request *osd_req = obj_request->osd_req; 1434 1435 osd_req->r_flags = CEPH_OSD_FLAG_WRITE; 1436 ktime_get_real_ts(&osd_req->r_mtime); 1437 osd_req->r_data_offset = obj_request->ex.oe_off; 1438 } 1439 1440 static struct ceph_osd_request * 1441 rbd_osd_req_create(struct rbd_obj_request *obj_req, unsigned int num_ops) 1442 { 1443 struct rbd_img_request *img_req = obj_req->img_request; 1444 struct rbd_device *rbd_dev = img_req->rbd_dev; 1445 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; 1446 struct ceph_osd_request *req; 1447 const char *name_format = rbd_dev->image_format == 1 ? 1448 RBD_V1_DATA_FORMAT : RBD_V2_DATA_FORMAT; 1449 1450 req = ceph_osdc_alloc_request(osdc, 1451 (rbd_img_is_write(img_req) ? img_req->snapc : NULL), 1452 num_ops, false, GFP_NOIO); 1453 if (!req) 1454 return NULL; 1455 1456 req->r_callback = rbd_osd_req_callback; 1457 req->r_priv = obj_req; 1458 1459 req->r_base_oloc.pool = rbd_dev->layout.pool_id; 1460 if (ceph_oid_aprintf(&req->r_base_oid, GFP_NOIO, name_format, 1461 rbd_dev->header.object_prefix, obj_req->ex.oe_objno)) 1462 goto err_req; 1463 1464 if (ceph_osdc_alloc_messages(req, GFP_NOIO)) 1465 goto err_req; 1466 1467 return req; 1468 1469 err_req: 1470 ceph_osdc_put_request(req); 1471 return NULL; 1472 } 1473 1474 static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req) 1475 { 1476 ceph_osdc_put_request(osd_req); 1477 } 1478 1479 static struct rbd_obj_request *rbd_obj_request_create(void) 1480 { 1481 struct rbd_obj_request *obj_request; 1482 1483 obj_request = kmem_cache_zalloc(rbd_obj_request_cache, GFP_NOIO); 1484 if (!obj_request) 1485 return NULL; 1486 1487 ceph_object_extent_init(&obj_request->ex); 1488 kref_init(&obj_request->kref); 1489 1490 dout("%s %p\n", __func__, obj_request); 1491 return obj_request; 1492 } 1493 1494 static void rbd_obj_request_destroy(struct kref *kref) 1495 { 1496 struct rbd_obj_request *obj_request; 1497 u32 i; 1498 1499 obj_request = container_of(kref, struct rbd_obj_request, kref); 1500 1501 dout("%s: obj %p\n", __func__, obj_request); 1502 1503 if (obj_request->osd_req) 1504 rbd_osd_req_destroy(obj_request->osd_req); 1505 1506 switch (obj_request->img_request->data_type) { 1507 case OBJ_REQUEST_NODATA: 1508 case OBJ_REQUEST_BIO: 1509 case OBJ_REQUEST_BVECS: 1510 break; /* Nothing to do */ 1511 case OBJ_REQUEST_OWN_BVECS: 1512 kfree(obj_request->bvec_pos.bvecs); 1513 break; 1514 default: 1515 rbd_assert(0); 1516 } 1517 1518 kfree(obj_request->img_extents); 1519 if (obj_request->copyup_bvecs) { 1520 for (i = 0; i < obj_request->copyup_bvec_count; i++) { 1521 if (obj_request->copyup_bvecs[i].bv_page) 1522 __free_page(obj_request->copyup_bvecs[i].bv_page); 1523 } 1524 kfree(obj_request->copyup_bvecs); 1525 } 1526 1527 kmem_cache_free(rbd_obj_request_cache, obj_request); 1528 } 1529 1530 /* It's OK to call this for a device with no parent */ 1531 1532 static void rbd_spec_put(struct rbd_spec *spec); 1533 static void rbd_dev_unparent(struct rbd_device *rbd_dev) 1534 { 1535 rbd_dev_remove_parent(rbd_dev); 1536 rbd_spec_put(rbd_dev->parent_spec); 1537 rbd_dev->parent_spec = NULL; 1538 rbd_dev->parent_overlap = 0; 1539 } 1540 1541 /* 1542 * Parent image reference counting is used to determine when an 1543 * image's parent fields can be safely torn down--after there are no 1544 * more in-flight requests to the parent image. When the last 1545 * reference is dropped, cleaning them up is safe. 1546 */ 1547 static void rbd_dev_parent_put(struct rbd_device *rbd_dev) 1548 { 1549 int counter; 1550 1551 if (!rbd_dev->parent_spec) 1552 return; 1553 1554 counter = atomic_dec_return_safe(&rbd_dev->parent_ref); 1555 if (counter > 0) 1556 return; 1557 1558 /* Last reference; clean up parent data structures */ 1559 1560 if (!counter) 1561 rbd_dev_unparent(rbd_dev); 1562 else 1563 rbd_warn(rbd_dev, "parent reference underflow"); 1564 } 1565 1566 /* 1567 * If an image has a non-zero parent overlap, get a reference to its 1568 * parent. 1569 * 1570 * Returns true if the rbd device has a parent with a non-zero 1571 * overlap and a reference for it was successfully taken, or 1572 * false otherwise. 1573 */ 1574 static bool rbd_dev_parent_get(struct rbd_device *rbd_dev) 1575 { 1576 int counter = 0; 1577 1578 if (!rbd_dev->parent_spec) 1579 return false; 1580 1581 down_read(&rbd_dev->header_rwsem); 1582 if (rbd_dev->parent_overlap) 1583 counter = atomic_inc_return_safe(&rbd_dev->parent_ref); 1584 up_read(&rbd_dev->header_rwsem); 1585 1586 if (counter < 0) 1587 rbd_warn(rbd_dev, "parent reference overflow"); 1588 1589 return counter > 0; 1590 } 1591 1592 /* 1593 * Caller is responsible for filling in the list of object requests 1594 * that comprises the image request, and the Linux request pointer 1595 * (if there is one). 1596 */ 1597 static struct rbd_img_request *rbd_img_request_create( 1598 struct rbd_device *rbd_dev, 1599 enum obj_operation_type op_type, 1600 struct ceph_snap_context *snapc) 1601 { 1602 struct rbd_img_request *img_request; 1603 1604 img_request = kmem_cache_zalloc(rbd_img_request_cache, GFP_NOIO); 1605 if (!img_request) 1606 return NULL; 1607 1608 img_request->rbd_dev = rbd_dev; 1609 img_request->op_type = op_type; 1610 if (!rbd_img_is_write(img_request)) 1611 img_request->snap_id = rbd_dev->spec->snap_id; 1612 else 1613 img_request->snapc = snapc; 1614 1615 if (rbd_dev_parent_get(rbd_dev)) 1616 img_request_layered_set(img_request); 1617 1618 spin_lock_init(&img_request->completion_lock); 1619 INIT_LIST_HEAD(&img_request->object_extents); 1620 kref_init(&img_request->kref); 1621 1622 dout("%s: rbd_dev %p %s -> img %p\n", __func__, rbd_dev, 1623 obj_op_name(op_type), img_request); 1624 return img_request; 1625 } 1626 1627 static void rbd_img_request_destroy(struct kref *kref) 1628 { 1629 struct rbd_img_request *img_request; 1630 struct rbd_obj_request *obj_request; 1631 struct rbd_obj_request *next_obj_request; 1632 1633 img_request = container_of(kref, struct rbd_img_request, kref); 1634 1635 dout("%s: img %p\n", __func__, img_request); 1636 1637 for_each_obj_request_safe(img_request, obj_request, next_obj_request) 1638 rbd_img_obj_request_del(img_request, obj_request); 1639 rbd_assert(img_request->obj_request_count == 0); 1640 1641 if (img_request_layered_test(img_request)) { 1642 img_request_layered_clear(img_request); 1643 rbd_dev_parent_put(img_request->rbd_dev); 1644 } 1645 1646 if (rbd_img_is_write(img_request)) 1647 ceph_put_snap_context(img_request->snapc); 1648 1649 kmem_cache_free(rbd_img_request_cache, img_request); 1650 } 1651 1652 static void prune_extents(struct ceph_file_extent *img_extents, 1653 u32 *num_img_extents, u64 overlap) 1654 { 1655 u32 cnt = *num_img_extents; 1656 1657 /* drop extents completely beyond the overlap */ 1658 while (cnt && img_extents[cnt - 1].fe_off >= overlap) 1659 cnt--; 1660 1661 if (cnt) { 1662 struct ceph_file_extent *ex = &img_extents[cnt - 1]; 1663 1664 /* trim final overlapping extent */ 1665 if (ex->fe_off + ex->fe_len > overlap) 1666 ex->fe_len = overlap - ex->fe_off; 1667 } 1668 1669 *num_img_extents = cnt; 1670 } 1671 1672 /* 1673 * Determine the byte range(s) covered by either just the object extent 1674 * or the entire object in the parent image. 1675 */ 1676 static int rbd_obj_calc_img_extents(struct rbd_obj_request *obj_req, 1677 bool entire) 1678 { 1679 struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev; 1680 int ret; 1681 1682 if (!rbd_dev->parent_overlap) 1683 return 0; 1684 1685 ret = ceph_extent_to_file(&rbd_dev->layout, obj_req->ex.oe_objno, 1686 entire ? 0 : obj_req->ex.oe_off, 1687 entire ? rbd_dev->layout.object_size : 1688 obj_req->ex.oe_len, 1689 &obj_req->img_extents, 1690 &obj_req->num_img_extents); 1691 if (ret) 1692 return ret; 1693 1694 prune_extents(obj_req->img_extents, &obj_req->num_img_extents, 1695 rbd_dev->parent_overlap); 1696 return 0; 1697 } 1698 1699 static void rbd_osd_req_setup_data(struct rbd_obj_request *obj_req, u32 which) 1700 { 1701 switch (obj_req->img_request->data_type) { 1702 case OBJ_REQUEST_BIO: 1703 osd_req_op_extent_osd_data_bio(obj_req->osd_req, which, 1704 &obj_req->bio_pos, 1705 obj_req->ex.oe_len); 1706 break; 1707 case OBJ_REQUEST_BVECS: 1708 case OBJ_REQUEST_OWN_BVECS: 1709 rbd_assert(obj_req->bvec_pos.iter.bi_size == 1710 obj_req->ex.oe_len); 1711 rbd_assert(obj_req->bvec_idx == obj_req->bvec_count); 1712 osd_req_op_extent_osd_data_bvec_pos(obj_req->osd_req, which, 1713 &obj_req->bvec_pos); 1714 break; 1715 default: 1716 rbd_assert(0); 1717 } 1718 } 1719 1720 static int rbd_obj_setup_read(struct rbd_obj_request *obj_req) 1721 { 1722 obj_req->osd_req = rbd_osd_req_create(obj_req, 1); 1723 if (!obj_req->osd_req) 1724 return -ENOMEM; 1725 1726 osd_req_op_extent_init(obj_req->osd_req, 0, CEPH_OSD_OP_READ, 1727 obj_req->ex.oe_off, obj_req->ex.oe_len, 0, 0); 1728 rbd_osd_req_setup_data(obj_req, 0); 1729 1730 rbd_osd_req_format_read(obj_req); 1731 return 0; 1732 } 1733 1734 static int __rbd_obj_setup_stat(struct rbd_obj_request *obj_req, 1735 unsigned int which) 1736 { 1737 struct page **pages; 1738 1739 /* 1740 * The response data for a STAT call consists of: 1741 * le64 length; 1742 * struct { 1743 * le32 tv_sec; 1744 * le32 tv_nsec; 1745 * } mtime; 1746 */ 1747 pages = ceph_alloc_page_vector(1, GFP_NOIO); 1748 if (IS_ERR(pages)) 1749 return PTR_ERR(pages); 1750 1751 osd_req_op_init(obj_req->osd_req, which, CEPH_OSD_OP_STAT, 0); 1752 osd_req_op_raw_data_in_pages(obj_req->osd_req, which, pages, 1753 8 + sizeof(struct ceph_timespec), 1754 0, false, true); 1755 return 0; 1756 } 1757 1758 static void __rbd_obj_setup_write(struct rbd_obj_request *obj_req, 1759 unsigned int which) 1760 { 1761 struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev; 1762 u16 opcode; 1763 1764 osd_req_op_alloc_hint_init(obj_req->osd_req, which++, 1765 rbd_dev->layout.object_size, 1766 rbd_dev->layout.object_size); 1767 1768 if (rbd_obj_is_entire(obj_req)) 1769 opcode = CEPH_OSD_OP_WRITEFULL; 1770 else 1771 opcode = CEPH_OSD_OP_WRITE; 1772 1773 osd_req_op_extent_init(obj_req->osd_req, which, opcode, 1774 obj_req->ex.oe_off, obj_req->ex.oe_len, 0, 0); 1775 rbd_osd_req_setup_data(obj_req, which++); 1776 1777 rbd_assert(which == obj_req->osd_req->r_num_ops); 1778 rbd_osd_req_format_write(obj_req); 1779 } 1780 1781 static int rbd_obj_setup_write(struct rbd_obj_request *obj_req) 1782 { 1783 unsigned int num_osd_ops, which = 0; 1784 int ret; 1785 1786 /* reverse map the entire object onto the parent */ 1787 ret = rbd_obj_calc_img_extents(obj_req, true); 1788 if (ret) 1789 return ret; 1790 1791 if (obj_req->num_img_extents) { 1792 obj_req->write_state = RBD_OBJ_WRITE_GUARD; 1793 num_osd_ops = 3; /* stat + setallochint + write/writefull */ 1794 } else { 1795 obj_req->write_state = RBD_OBJ_WRITE_FLAT; 1796 num_osd_ops = 2; /* setallochint + write/writefull */ 1797 } 1798 1799 obj_req->osd_req = rbd_osd_req_create(obj_req, num_osd_ops); 1800 if (!obj_req->osd_req) 1801 return -ENOMEM; 1802 1803 if (obj_req->num_img_extents) { 1804 ret = __rbd_obj_setup_stat(obj_req, which++); 1805 if (ret) 1806 return ret; 1807 } 1808 1809 __rbd_obj_setup_write(obj_req, which); 1810 return 0; 1811 } 1812 1813 static void __rbd_obj_setup_discard(struct rbd_obj_request *obj_req, 1814 unsigned int which) 1815 { 1816 u16 opcode; 1817 1818 if (rbd_obj_is_entire(obj_req)) { 1819 if (obj_req->num_img_extents) { 1820 osd_req_op_init(obj_req->osd_req, which++, 1821 CEPH_OSD_OP_CREATE, 0); 1822 opcode = CEPH_OSD_OP_TRUNCATE; 1823 } else { 1824 osd_req_op_init(obj_req->osd_req, which++, 1825 CEPH_OSD_OP_DELETE, 0); 1826 opcode = 0; 1827 } 1828 } else if (rbd_obj_is_tail(obj_req)) { 1829 opcode = CEPH_OSD_OP_TRUNCATE; 1830 } else { 1831 opcode = CEPH_OSD_OP_ZERO; 1832 } 1833 1834 if (opcode) 1835 osd_req_op_extent_init(obj_req->osd_req, which++, opcode, 1836 obj_req->ex.oe_off, obj_req->ex.oe_len, 1837 0, 0); 1838 1839 rbd_assert(which == obj_req->osd_req->r_num_ops); 1840 rbd_osd_req_format_write(obj_req); 1841 } 1842 1843 static int rbd_obj_setup_discard(struct rbd_obj_request *obj_req) 1844 { 1845 unsigned int num_osd_ops, which = 0; 1846 int ret; 1847 1848 /* reverse map the entire object onto the parent */ 1849 ret = rbd_obj_calc_img_extents(obj_req, true); 1850 if (ret) 1851 return ret; 1852 1853 if (rbd_obj_is_entire(obj_req)) { 1854 obj_req->write_state = RBD_OBJ_WRITE_FLAT; 1855 if (obj_req->num_img_extents) 1856 num_osd_ops = 2; /* create + truncate */ 1857 else 1858 num_osd_ops = 1; /* delete */ 1859 } else { 1860 if (obj_req->num_img_extents) { 1861 obj_req->write_state = RBD_OBJ_WRITE_GUARD; 1862 num_osd_ops = 2; /* stat + truncate/zero */ 1863 } else { 1864 obj_req->write_state = RBD_OBJ_WRITE_FLAT; 1865 num_osd_ops = 1; /* truncate/zero */ 1866 } 1867 } 1868 1869 obj_req->osd_req = rbd_osd_req_create(obj_req, num_osd_ops); 1870 if (!obj_req->osd_req) 1871 return -ENOMEM; 1872 1873 if (!rbd_obj_is_entire(obj_req) && obj_req->num_img_extents) { 1874 ret = __rbd_obj_setup_stat(obj_req, which++); 1875 if (ret) 1876 return ret; 1877 } 1878 1879 __rbd_obj_setup_discard(obj_req, which); 1880 return 0; 1881 } 1882 1883 /* 1884 * For each object request in @img_req, allocate an OSD request, add 1885 * individual OSD ops and prepare them for submission. The number of 1886 * OSD ops depends on op_type and the overlap point (if any). 1887 */ 1888 static int __rbd_img_fill_request(struct rbd_img_request *img_req) 1889 { 1890 struct rbd_obj_request *obj_req; 1891 int ret; 1892 1893 for_each_obj_request(img_req, obj_req) { 1894 switch (img_req->op_type) { 1895 case OBJ_OP_READ: 1896 ret = rbd_obj_setup_read(obj_req); 1897 break; 1898 case OBJ_OP_WRITE: 1899 ret = rbd_obj_setup_write(obj_req); 1900 break; 1901 case OBJ_OP_DISCARD: 1902 ret = rbd_obj_setup_discard(obj_req); 1903 break; 1904 default: 1905 rbd_assert(0); 1906 } 1907 if (ret) 1908 return ret; 1909 } 1910 1911 return 0; 1912 } 1913 1914 union rbd_img_fill_iter { 1915 struct ceph_bio_iter bio_iter; 1916 struct ceph_bvec_iter bvec_iter; 1917 }; 1918 1919 struct rbd_img_fill_ctx { 1920 enum obj_request_type pos_type; 1921 union rbd_img_fill_iter *pos; 1922 union rbd_img_fill_iter iter; 1923 ceph_object_extent_fn_t set_pos_fn; 1924 ceph_object_extent_fn_t count_fn; 1925 ceph_object_extent_fn_t copy_fn; 1926 }; 1927 1928 static struct ceph_object_extent *alloc_object_extent(void *arg) 1929 { 1930 struct rbd_img_request *img_req = arg; 1931 struct rbd_obj_request *obj_req; 1932 1933 obj_req = rbd_obj_request_create(); 1934 if (!obj_req) 1935 return NULL; 1936 1937 rbd_img_obj_request_add(img_req, obj_req); 1938 return &obj_req->ex; 1939 } 1940 1941 /* 1942 * While su != os && sc == 1 is technically not fancy (it's the same 1943 * layout as su == os && sc == 1), we can't use the nocopy path for it 1944 * because ->set_pos_fn() should be called only once per object. 1945 * ceph_file_to_extents() invokes action_fn once per stripe unit, so 1946 * treat su != os && sc == 1 as fancy. 1947 */ 1948 static bool rbd_layout_is_fancy(struct ceph_file_layout *l) 1949 { 1950 return l->stripe_unit != l->object_size; 1951 } 1952 1953 static int rbd_img_fill_request_nocopy(struct rbd_img_request *img_req, 1954 struct ceph_file_extent *img_extents, 1955 u32 num_img_extents, 1956 struct rbd_img_fill_ctx *fctx) 1957 { 1958 u32 i; 1959 int ret; 1960 1961 img_req->data_type = fctx->pos_type; 1962 1963 /* 1964 * Create object requests and set each object request's starting 1965 * position in the provided bio (list) or bio_vec array. 1966 */ 1967 fctx->iter = *fctx->pos; 1968 for (i = 0; i < num_img_extents; i++) { 1969 ret = ceph_file_to_extents(&img_req->rbd_dev->layout, 1970 img_extents[i].fe_off, 1971 img_extents[i].fe_len, 1972 &img_req->object_extents, 1973 alloc_object_extent, img_req, 1974 fctx->set_pos_fn, &fctx->iter); 1975 if (ret) 1976 return ret; 1977 } 1978 1979 return __rbd_img_fill_request(img_req); 1980 } 1981 1982 /* 1983 * Map a list of image extents to a list of object extents, create the 1984 * corresponding object requests (normally each to a different object, 1985 * but not always) and add them to @img_req. For each object request, 1986 * set up its data descriptor to point to the corresponding chunk(s) of 1987 * @fctx->pos data buffer. 1988 * 1989 * Because ceph_file_to_extents() will merge adjacent object extents 1990 * together, each object request's data descriptor may point to multiple 1991 * different chunks of @fctx->pos data buffer. 1992 * 1993 * @fctx->pos data buffer is assumed to be large enough. 1994 */ 1995 static int rbd_img_fill_request(struct rbd_img_request *img_req, 1996 struct ceph_file_extent *img_extents, 1997 u32 num_img_extents, 1998 struct rbd_img_fill_ctx *fctx) 1999 { 2000 struct rbd_device *rbd_dev = img_req->rbd_dev; 2001 struct rbd_obj_request *obj_req; 2002 u32 i; 2003 int ret; 2004 2005 if (fctx->pos_type == OBJ_REQUEST_NODATA || 2006 !rbd_layout_is_fancy(&rbd_dev->layout)) 2007 return rbd_img_fill_request_nocopy(img_req, img_extents, 2008 num_img_extents, fctx); 2009 2010 img_req->data_type = OBJ_REQUEST_OWN_BVECS; 2011 2012 /* 2013 * Create object requests and determine ->bvec_count for each object 2014 * request. Note that ->bvec_count sum over all object requests may 2015 * be greater than the number of bio_vecs in the provided bio (list) 2016 * or bio_vec array because when mapped, those bio_vecs can straddle 2017 * stripe unit boundaries. 2018 */ 2019 fctx->iter = *fctx->pos; 2020 for (i = 0; i < num_img_extents; i++) { 2021 ret = ceph_file_to_extents(&rbd_dev->layout, 2022 img_extents[i].fe_off, 2023 img_extents[i].fe_len, 2024 &img_req->object_extents, 2025 alloc_object_extent, img_req, 2026 fctx->count_fn, &fctx->iter); 2027 if (ret) 2028 return ret; 2029 } 2030 2031 for_each_obj_request(img_req, obj_req) { 2032 obj_req->bvec_pos.bvecs = kmalloc_array(obj_req->bvec_count, 2033 sizeof(*obj_req->bvec_pos.bvecs), 2034 GFP_NOIO); 2035 if (!obj_req->bvec_pos.bvecs) 2036 return -ENOMEM; 2037 } 2038 2039 /* 2040 * Fill in each object request's private bio_vec array, splitting and 2041 * rearranging the provided bio_vecs in stripe unit chunks as needed. 2042 */ 2043 fctx->iter = *fctx->pos; 2044 for (i = 0; i < num_img_extents; i++) { 2045 ret = ceph_iterate_extents(&rbd_dev->layout, 2046 img_extents[i].fe_off, 2047 img_extents[i].fe_len, 2048 &img_req->object_extents, 2049 fctx->copy_fn, &fctx->iter); 2050 if (ret) 2051 return ret; 2052 } 2053 2054 return __rbd_img_fill_request(img_req); 2055 } 2056 2057 static int rbd_img_fill_nodata(struct rbd_img_request *img_req, 2058 u64 off, u64 len) 2059 { 2060 struct ceph_file_extent ex = { off, len }; 2061 union rbd_img_fill_iter dummy; 2062 struct rbd_img_fill_ctx fctx = { 2063 .pos_type = OBJ_REQUEST_NODATA, 2064 .pos = &dummy, 2065 }; 2066 2067 return rbd_img_fill_request(img_req, &ex, 1, &fctx); 2068 } 2069 2070 static void set_bio_pos(struct ceph_object_extent *ex, u32 bytes, void *arg) 2071 { 2072 struct rbd_obj_request *obj_req = 2073 container_of(ex, struct rbd_obj_request, ex); 2074 struct ceph_bio_iter *it = arg; 2075 2076 dout("%s objno %llu bytes %u\n", __func__, ex->oe_objno, bytes); 2077 obj_req->bio_pos = *it; 2078 ceph_bio_iter_advance(it, bytes); 2079 } 2080 2081 static void count_bio_bvecs(struct ceph_object_extent *ex, u32 bytes, void *arg) 2082 { 2083 struct rbd_obj_request *obj_req = 2084 container_of(ex, struct rbd_obj_request, ex); 2085 struct ceph_bio_iter *it = arg; 2086 2087 dout("%s objno %llu bytes %u\n", __func__, ex->oe_objno, bytes); 2088 ceph_bio_iter_advance_step(it, bytes, ({ 2089 obj_req->bvec_count++; 2090 })); 2091 2092 } 2093 2094 static void copy_bio_bvecs(struct ceph_object_extent *ex, u32 bytes, void *arg) 2095 { 2096 struct rbd_obj_request *obj_req = 2097 container_of(ex, struct rbd_obj_request, ex); 2098 struct ceph_bio_iter *it = arg; 2099 2100 dout("%s objno %llu bytes %u\n", __func__, ex->oe_objno, bytes); 2101 ceph_bio_iter_advance_step(it, bytes, ({ 2102 obj_req->bvec_pos.bvecs[obj_req->bvec_idx++] = bv; 2103 obj_req->bvec_pos.iter.bi_size += bv.bv_len; 2104 })); 2105 } 2106 2107 static int __rbd_img_fill_from_bio(struct rbd_img_request *img_req, 2108 struct ceph_file_extent *img_extents, 2109 u32 num_img_extents, 2110 struct ceph_bio_iter *bio_pos) 2111 { 2112 struct rbd_img_fill_ctx fctx = { 2113 .pos_type = OBJ_REQUEST_BIO, 2114 .pos = (union rbd_img_fill_iter *)bio_pos, 2115 .set_pos_fn = set_bio_pos, 2116 .count_fn = count_bio_bvecs, 2117 .copy_fn = copy_bio_bvecs, 2118 }; 2119 2120 return rbd_img_fill_request(img_req, img_extents, num_img_extents, 2121 &fctx); 2122 } 2123 2124 static int rbd_img_fill_from_bio(struct rbd_img_request *img_req, 2125 u64 off, u64 len, struct bio *bio) 2126 { 2127 struct ceph_file_extent ex = { off, len }; 2128 struct ceph_bio_iter it = { .bio = bio, .iter = bio->bi_iter }; 2129 2130 return __rbd_img_fill_from_bio(img_req, &ex, 1, &it); 2131 } 2132 2133 static void set_bvec_pos(struct ceph_object_extent *ex, u32 bytes, void *arg) 2134 { 2135 struct rbd_obj_request *obj_req = 2136 container_of(ex, struct rbd_obj_request, ex); 2137 struct ceph_bvec_iter *it = arg; 2138 2139 obj_req->bvec_pos = *it; 2140 ceph_bvec_iter_shorten(&obj_req->bvec_pos, bytes); 2141 ceph_bvec_iter_advance(it, bytes); 2142 } 2143 2144 static void count_bvecs(struct ceph_object_extent *ex, u32 bytes, void *arg) 2145 { 2146 struct rbd_obj_request *obj_req = 2147 container_of(ex, struct rbd_obj_request, ex); 2148 struct ceph_bvec_iter *it = arg; 2149 2150 ceph_bvec_iter_advance_step(it, bytes, ({ 2151 obj_req->bvec_count++; 2152 })); 2153 } 2154 2155 static void copy_bvecs(struct ceph_object_extent *ex, u32 bytes, void *arg) 2156 { 2157 struct rbd_obj_request *obj_req = 2158 container_of(ex, struct rbd_obj_request, ex); 2159 struct ceph_bvec_iter *it = arg; 2160 2161 ceph_bvec_iter_advance_step(it, bytes, ({ 2162 obj_req->bvec_pos.bvecs[obj_req->bvec_idx++] = bv; 2163 obj_req->bvec_pos.iter.bi_size += bv.bv_len; 2164 })); 2165 } 2166 2167 static int __rbd_img_fill_from_bvecs(struct rbd_img_request *img_req, 2168 struct ceph_file_extent *img_extents, 2169 u32 num_img_extents, 2170 struct ceph_bvec_iter *bvec_pos) 2171 { 2172 struct rbd_img_fill_ctx fctx = { 2173 .pos_type = OBJ_REQUEST_BVECS, 2174 .pos = (union rbd_img_fill_iter *)bvec_pos, 2175 .set_pos_fn = set_bvec_pos, 2176 .count_fn = count_bvecs, 2177 .copy_fn = copy_bvecs, 2178 }; 2179 2180 return rbd_img_fill_request(img_req, img_extents, num_img_extents, 2181 &fctx); 2182 } 2183 2184 static int rbd_img_fill_from_bvecs(struct rbd_img_request *img_req, 2185 struct ceph_file_extent *img_extents, 2186 u32 num_img_extents, 2187 struct bio_vec *bvecs) 2188 { 2189 struct ceph_bvec_iter it = { 2190 .bvecs = bvecs, 2191 .iter = { .bi_size = ceph_file_extents_bytes(img_extents, 2192 num_img_extents) }, 2193 }; 2194 2195 return __rbd_img_fill_from_bvecs(img_req, img_extents, num_img_extents, 2196 &it); 2197 } 2198 2199 static void rbd_img_request_submit(struct rbd_img_request *img_request) 2200 { 2201 struct rbd_obj_request *obj_request; 2202 2203 dout("%s: img %p\n", __func__, img_request); 2204 2205 rbd_img_request_get(img_request); 2206 for_each_obj_request(img_request, obj_request) 2207 rbd_obj_request_submit(obj_request); 2208 2209 rbd_img_request_put(img_request); 2210 } 2211 2212 static int rbd_obj_read_from_parent(struct rbd_obj_request *obj_req) 2213 { 2214 struct rbd_img_request *img_req = obj_req->img_request; 2215 struct rbd_img_request *child_img_req; 2216 int ret; 2217 2218 child_img_req = rbd_img_request_create(img_req->rbd_dev->parent, 2219 OBJ_OP_READ, NULL); 2220 if (!child_img_req) 2221 return -ENOMEM; 2222 2223 __set_bit(IMG_REQ_CHILD, &child_img_req->flags); 2224 child_img_req->obj_request = obj_req; 2225 2226 if (!rbd_img_is_write(img_req)) { 2227 switch (img_req->data_type) { 2228 case OBJ_REQUEST_BIO: 2229 ret = __rbd_img_fill_from_bio(child_img_req, 2230 obj_req->img_extents, 2231 obj_req->num_img_extents, 2232 &obj_req->bio_pos); 2233 break; 2234 case OBJ_REQUEST_BVECS: 2235 case OBJ_REQUEST_OWN_BVECS: 2236 ret = __rbd_img_fill_from_bvecs(child_img_req, 2237 obj_req->img_extents, 2238 obj_req->num_img_extents, 2239 &obj_req->bvec_pos); 2240 break; 2241 default: 2242 rbd_assert(0); 2243 } 2244 } else { 2245 ret = rbd_img_fill_from_bvecs(child_img_req, 2246 obj_req->img_extents, 2247 obj_req->num_img_extents, 2248 obj_req->copyup_bvecs); 2249 } 2250 if (ret) { 2251 rbd_img_request_put(child_img_req); 2252 return ret; 2253 } 2254 2255 rbd_img_request_submit(child_img_req); 2256 return 0; 2257 } 2258 2259 static bool rbd_obj_handle_read(struct rbd_obj_request *obj_req) 2260 { 2261 struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev; 2262 int ret; 2263 2264 if (obj_req->result == -ENOENT && 2265 rbd_dev->parent_overlap && !obj_req->tried_parent) { 2266 /* reverse map this object extent onto the parent */ 2267 ret = rbd_obj_calc_img_extents(obj_req, false); 2268 if (ret) { 2269 obj_req->result = ret; 2270 return true; 2271 } 2272 2273 if (obj_req->num_img_extents) { 2274 obj_req->tried_parent = true; 2275 ret = rbd_obj_read_from_parent(obj_req); 2276 if (ret) { 2277 obj_req->result = ret; 2278 return true; 2279 } 2280 return false; 2281 } 2282 } 2283 2284 /* 2285 * -ENOENT means a hole in the image -- zero-fill the entire 2286 * length of the request. A short read also implies zero-fill 2287 * to the end of the request. In both cases we update xferred 2288 * count to indicate the whole request was satisfied. 2289 */ 2290 if (obj_req->result == -ENOENT || 2291 (!obj_req->result && obj_req->xferred < obj_req->ex.oe_len)) { 2292 rbd_assert(!obj_req->xferred || !obj_req->result); 2293 rbd_obj_zero_range(obj_req, obj_req->xferred, 2294 obj_req->ex.oe_len - obj_req->xferred); 2295 obj_req->result = 0; 2296 obj_req->xferred = obj_req->ex.oe_len; 2297 } 2298 2299 return true; 2300 } 2301 2302 /* 2303 * copyup_bvecs pages are never highmem pages 2304 */ 2305 static bool is_zero_bvecs(struct bio_vec *bvecs, u32 bytes) 2306 { 2307 struct ceph_bvec_iter it = { 2308 .bvecs = bvecs, 2309 .iter = { .bi_size = bytes }, 2310 }; 2311 2312 ceph_bvec_iter_advance_step(&it, bytes, ({ 2313 if (memchr_inv(page_address(bv.bv_page) + bv.bv_offset, 0, 2314 bv.bv_len)) 2315 return false; 2316 })); 2317 return true; 2318 } 2319 2320 static int rbd_obj_issue_copyup(struct rbd_obj_request *obj_req, u32 bytes) 2321 { 2322 unsigned int num_osd_ops = obj_req->osd_req->r_num_ops; 2323 2324 dout("%s obj_req %p bytes %u\n", __func__, obj_req, bytes); 2325 rbd_assert(obj_req->osd_req->r_ops[0].op == CEPH_OSD_OP_STAT); 2326 rbd_osd_req_destroy(obj_req->osd_req); 2327 2328 /* 2329 * Create a copyup request with the same number of OSD ops as 2330 * the original request. The original request was stat + op(s), 2331 * the new copyup request will be copyup + the same op(s). 2332 */ 2333 obj_req->osd_req = rbd_osd_req_create(obj_req, num_osd_ops); 2334 if (!obj_req->osd_req) 2335 return -ENOMEM; 2336 2337 /* 2338 * Only send non-zero copyup data to save some I/O and network 2339 * bandwidth -- zero copyup data is equivalent to the object not 2340 * existing. 2341 */ 2342 if (is_zero_bvecs(obj_req->copyup_bvecs, bytes)) { 2343 dout("%s obj_req %p detected zeroes\n", __func__, obj_req); 2344 bytes = 0; 2345 } 2346 2347 osd_req_op_cls_init(obj_req->osd_req, 0, CEPH_OSD_OP_CALL, "rbd", 2348 "copyup"); 2349 osd_req_op_cls_request_data_bvecs(obj_req->osd_req, 0, 2350 obj_req->copyup_bvecs, bytes); 2351 2352 switch (obj_req->img_request->op_type) { 2353 case OBJ_OP_WRITE: 2354 __rbd_obj_setup_write(obj_req, 1); 2355 break; 2356 case OBJ_OP_DISCARD: 2357 rbd_assert(!rbd_obj_is_entire(obj_req)); 2358 __rbd_obj_setup_discard(obj_req, 1); 2359 break; 2360 default: 2361 rbd_assert(0); 2362 } 2363 2364 rbd_obj_request_submit(obj_req); 2365 return 0; 2366 } 2367 2368 static int setup_copyup_bvecs(struct rbd_obj_request *obj_req, u64 obj_overlap) 2369 { 2370 u32 i; 2371 2372 rbd_assert(!obj_req->copyup_bvecs); 2373 obj_req->copyup_bvec_count = calc_pages_for(0, obj_overlap); 2374 obj_req->copyup_bvecs = kcalloc(obj_req->copyup_bvec_count, 2375 sizeof(*obj_req->copyup_bvecs), 2376 GFP_NOIO); 2377 if (!obj_req->copyup_bvecs) 2378 return -ENOMEM; 2379 2380 for (i = 0; i < obj_req->copyup_bvec_count; i++) { 2381 unsigned int len = min(obj_overlap, (u64)PAGE_SIZE); 2382 2383 obj_req->copyup_bvecs[i].bv_page = alloc_page(GFP_NOIO); 2384 if (!obj_req->copyup_bvecs[i].bv_page) 2385 return -ENOMEM; 2386 2387 obj_req->copyup_bvecs[i].bv_offset = 0; 2388 obj_req->copyup_bvecs[i].bv_len = len; 2389 obj_overlap -= len; 2390 } 2391 2392 rbd_assert(!obj_overlap); 2393 return 0; 2394 } 2395 2396 static int rbd_obj_handle_write_guard(struct rbd_obj_request *obj_req) 2397 { 2398 struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev; 2399 int ret; 2400 2401 rbd_assert(obj_req->num_img_extents); 2402 prune_extents(obj_req->img_extents, &obj_req->num_img_extents, 2403 rbd_dev->parent_overlap); 2404 if (!obj_req->num_img_extents) { 2405 /* 2406 * The overlap has become 0 (most likely because the 2407 * image has been flattened). Use rbd_obj_issue_copyup() 2408 * to re-submit the original write request -- the copyup 2409 * operation itself will be a no-op, since someone must 2410 * have populated the child object while we weren't 2411 * looking. Move to WRITE_FLAT state as we'll be done 2412 * with the operation once the null copyup completes. 2413 */ 2414 obj_req->write_state = RBD_OBJ_WRITE_FLAT; 2415 return rbd_obj_issue_copyup(obj_req, 0); 2416 } 2417 2418 ret = setup_copyup_bvecs(obj_req, rbd_obj_img_extents_bytes(obj_req)); 2419 if (ret) 2420 return ret; 2421 2422 obj_req->write_state = RBD_OBJ_WRITE_COPYUP; 2423 return rbd_obj_read_from_parent(obj_req); 2424 } 2425 2426 static bool rbd_obj_handle_write(struct rbd_obj_request *obj_req) 2427 { 2428 int ret; 2429 2430 again: 2431 switch (obj_req->write_state) { 2432 case RBD_OBJ_WRITE_GUARD: 2433 rbd_assert(!obj_req->xferred); 2434 if (obj_req->result == -ENOENT) { 2435 /* 2436 * The target object doesn't exist. Read the data for 2437 * the entire target object up to the overlap point (if 2438 * any) from the parent, so we can use it for a copyup. 2439 */ 2440 ret = rbd_obj_handle_write_guard(obj_req); 2441 if (ret) { 2442 obj_req->result = ret; 2443 return true; 2444 } 2445 return false; 2446 } 2447 /* fall through */ 2448 case RBD_OBJ_WRITE_FLAT: 2449 if (!obj_req->result) 2450 /* 2451 * There is no such thing as a successful short 2452 * write -- indicate the whole request was satisfied. 2453 */ 2454 obj_req->xferred = obj_req->ex.oe_len; 2455 return true; 2456 case RBD_OBJ_WRITE_COPYUP: 2457 obj_req->write_state = RBD_OBJ_WRITE_GUARD; 2458 if (obj_req->result) 2459 goto again; 2460 2461 rbd_assert(obj_req->xferred); 2462 ret = rbd_obj_issue_copyup(obj_req, obj_req->xferred); 2463 if (ret) { 2464 obj_req->result = ret; 2465 return true; 2466 } 2467 return false; 2468 default: 2469 rbd_assert(0); 2470 } 2471 } 2472 2473 /* 2474 * Returns true if @obj_req is completed, or false otherwise. 2475 */ 2476 static bool __rbd_obj_handle_request(struct rbd_obj_request *obj_req) 2477 { 2478 switch (obj_req->img_request->op_type) { 2479 case OBJ_OP_READ: 2480 return rbd_obj_handle_read(obj_req); 2481 case OBJ_OP_WRITE: 2482 return rbd_obj_handle_write(obj_req); 2483 case OBJ_OP_DISCARD: 2484 if (rbd_obj_handle_write(obj_req)) { 2485 /* 2486 * Hide -ENOENT from delete/truncate/zero -- discarding 2487 * a non-existent object is not a problem. 2488 */ 2489 if (obj_req->result == -ENOENT) { 2490 obj_req->result = 0; 2491 obj_req->xferred = obj_req->ex.oe_len; 2492 } 2493 return true; 2494 } 2495 return false; 2496 default: 2497 rbd_assert(0); 2498 } 2499 } 2500 2501 static void rbd_obj_end_request(struct rbd_obj_request *obj_req) 2502 { 2503 struct rbd_img_request *img_req = obj_req->img_request; 2504 2505 rbd_assert((!obj_req->result && 2506 obj_req->xferred == obj_req->ex.oe_len) || 2507 (obj_req->result < 0 && !obj_req->xferred)); 2508 if (!obj_req->result) { 2509 img_req->xferred += obj_req->xferred; 2510 return; 2511 } 2512 2513 rbd_warn(img_req->rbd_dev, 2514 "%s at objno %llu %llu~%llu result %d xferred %llu", 2515 obj_op_name(img_req->op_type), obj_req->ex.oe_objno, 2516 obj_req->ex.oe_off, obj_req->ex.oe_len, obj_req->result, 2517 obj_req->xferred); 2518 if (!img_req->result) { 2519 img_req->result = obj_req->result; 2520 img_req->xferred = 0; 2521 } 2522 } 2523 2524 static void rbd_img_end_child_request(struct rbd_img_request *img_req) 2525 { 2526 struct rbd_obj_request *obj_req = img_req->obj_request; 2527 2528 rbd_assert(test_bit(IMG_REQ_CHILD, &img_req->flags)); 2529 rbd_assert((!img_req->result && 2530 img_req->xferred == rbd_obj_img_extents_bytes(obj_req)) || 2531 (img_req->result < 0 && !img_req->xferred)); 2532 2533 obj_req->result = img_req->result; 2534 obj_req->xferred = img_req->xferred; 2535 rbd_img_request_put(img_req); 2536 } 2537 2538 static void rbd_img_end_request(struct rbd_img_request *img_req) 2539 { 2540 rbd_assert(!test_bit(IMG_REQ_CHILD, &img_req->flags)); 2541 rbd_assert((!img_req->result && 2542 img_req->xferred == blk_rq_bytes(img_req->rq)) || 2543 (img_req->result < 0 && !img_req->xferred)); 2544 2545 blk_mq_end_request(img_req->rq, 2546 errno_to_blk_status(img_req->result)); 2547 rbd_img_request_put(img_req); 2548 } 2549 2550 static void rbd_obj_handle_request(struct rbd_obj_request *obj_req) 2551 { 2552 struct rbd_img_request *img_req; 2553 2554 again: 2555 if (!__rbd_obj_handle_request(obj_req)) 2556 return; 2557 2558 img_req = obj_req->img_request; 2559 spin_lock(&img_req->completion_lock); 2560 rbd_obj_end_request(obj_req); 2561 rbd_assert(img_req->pending_count); 2562 if (--img_req->pending_count) { 2563 spin_unlock(&img_req->completion_lock); 2564 return; 2565 } 2566 2567 spin_unlock(&img_req->completion_lock); 2568 if (test_bit(IMG_REQ_CHILD, &img_req->flags)) { 2569 obj_req = img_req->obj_request; 2570 rbd_img_end_child_request(img_req); 2571 goto again; 2572 } 2573 rbd_img_end_request(img_req); 2574 } 2575 2576 static const struct rbd_client_id rbd_empty_cid; 2577 2578 static bool rbd_cid_equal(const struct rbd_client_id *lhs, 2579 const struct rbd_client_id *rhs) 2580 { 2581 return lhs->gid == rhs->gid && lhs->handle == rhs->handle; 2582 } 2583 2584 static struct rbd_client_id rbd_get_cid(struct rbd_device *rbd_dev) 2585 { 2586 struct rbd_client_id cid; 2587 2588 mutex_lock(&rbd_dev->watch_mutex); 2589 cid.gid = ceph_client_gid(rbd_dev->rbd_client->client); 2590 cid.handle = rbd_dev->watch_cookie; 2591 mutex_unlock(&rbd_dev->watch_mutex); 2592 return cid; 2593 } 2594 2595 /* 2596 * lock_rwsem must be held for write 2597 */ 2598 static void rbd_set_owner_cid(struct rbd_device *rbd_dev, 2599 const struct rbd_client_id *cid) 2600 { 2601 dout("%s rbd_dev %p %llu-%llu -> %llu-%llu\n", __func__, rbd_dev, 2602 rbd_dev->owner_cid.gid, rbd_dev->owner_cid.handle, 2603 cid->gid, cid->handle); 2604 rbd_dev->owner_cid = *cid; /* struct */ 2605 } 2606 2607 static void format_lock_cookie(struct rbd_device *rbd_dev, char *buf) 2608 { 2609 mutex_lock(&rbd_dev->watch_mutex); 2610 sprintf(buf, "%s %llu", RBD_LOCK_COOKIE_PREFIX, rbd_dev->watch_cookie); 2611 mutex_unlock(&rbd_dev->watch_mutex); 2612 } 2613 2614 static void __rbd_lock(struct rbd_device *rbd_dev, const char *cookie) 2615 { 2616 struct rbd_client_id cid = rbd_get_cid(rbd_dev); 2617 2618 strcpy(rbd_dev->lock_cookie, cookie); 2619 rbd_set_owner_cid(rbd_dev, &cid); 2620 queue_work(rbd_dev->task_wq, &rbd_dev->acquired_lock_work); 2621 } 2622 2623 /* 2624 * lock_rwsem must be held for write 2625 */ 2626 static int rbd_lock(struct rbd_device *rbd_dev) 2627 { 2628 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; 2629 char cookie[32]; 2630 int ret; 2631 2632 WARN_ON(__rbd_is_lock_owner(rbd_dev) || 2633 rbd_dev->lock_cookie[0] != '\0'); 2634 2635 format_lock_cookie(rbd_dev, cookie); 2636 ret = ceph_cls_lock(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc, 2637 RBD_LOCK_NAME, CEPH_CLS_LOCK_EXCLUSIVE, cookie, 2638 RBD_LOCK_TAG, "", 0); 2639 if (ret) 2640 return ret; 2641 2642 rbd_dev->lock_state = RBD_LOCK_STATE_LOCKED; 2643 __rbd_lock(rbd_dev, cookie); 2644 return 0; 2645 } 2646 2647 /* 2648 * lock_rwsem must be held for write 2649 */ 2650 static void rbd_unlock(struct rbd_device *rbd_dev) 2651 { 2652 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; 2653 int ret; 2654 2655 WARN_ON(!__rbd_is_lock_owner(rbd_dev) || 2656 rbd_dev->lock_cookie[0] == '\0'); 2657 2658 ret = ceph_cls_unlock(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc, 2659 RBD_LOCK_NAME, rbd_dev->lock_cookie); 2660 if (ret && ret != -ENOENT) 2661 rbd_warn(rbd_dev, "failed to unlock: %d", ret); 2662 2663 /* treat errors as the image is unlocked */ 2664 rbd_dev->lock_state = RBD_LOCK_STATE_UNLOCKED; 2665 rbd_dev->lock_cookie[0] = '\0'; 2666 rbd_set_owner_cid(rbd_dev, &rbd_empty_cid); 2667 queue_work(rbd_dev->task_wq, &rbd_dev->released_lock_work); 2668 } 2669 2670 static int __rbd_notify_op_lock(struct rbd_device *rbd_dev, 2671 enum rbd_notify_op notify_op, 2672 struct page ***preply_pages, 2673 size_t *preply_len) 2674 { 2675 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; 2676 struct rbd_client_id cid = rbd_get_cid(rbd_dev); 2677 char buf[4 + 8 + 8 + CEPH_ENCODING_START_BLK_LEN]; 2678 int buf_size = sizeof(buf); 2679 void *p = buf; 2680 2681 dout("%s rbd_dev %p notify_op %d\n", __func__, rbd_dev, notify_op); 2682 2683 /* encode *LockPayload NotifyMessage (op + ClientId) */ 2684 ceph_start_encoding(&p, 2, 1, buf_size - CEPH_ENCODING_START_BLK_LEN); 2685 ceph_encode_32(&p, notify_op); 2686 ceph_encode_64(&p, cid.gid); 2687 ceph_encode_64(&p, cid.handle); 2688 2689 return ceph_osdc_notify(osdc, &rbd_dev->header_oid, 2690 &rbd_dev->header_oloc, buf, buf_size, 2691 RBD_NOTIFY_TIMEOUT, preply_pages, preply_len); 2692 } 2693 2694 static void rbd_notify_op_lock(struct rbd_device *rbd_dev, 2695 enum rbd_notify_op notify_op) 2696 { 2697 struct page **reply_pages; 2698 size_t reply_len; 2699 2700 __rbd_notify_op_lock(rbd_dev, notify_op, &reply_pages, &reply_len); 2701 ceph_release_page_vector(reply_pages, calc_pages_for(0, reply_len)); 2702 } 2703 2704 static void rbd_notify_acquired_lock(struct work_struct *work) 2705 { 2706 struct rbd_device *rbd_dev = container_of(work, struct rbd_device, 2707 acquired_lock_work); 2708 2709 rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_ACQUIRED_LOCK); 2710 } 2711 2712 static void rbd_notify_released_lock(struct work_struct *work) 2713 { 2714 struct rbd_device *rbd_dev = container_of(work, struct rbd_device, 2715 released_lock_work); 2716 2717 rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_RELEASED_LOCK); 2718 } 2719 2720 static int rbd_request_lock(struct rbd_device *rbd_dev) 2721 { 2722 struct page **reply_pages; 2723 size_t reply_len; 2724 bool lock_owner_responded = false; 2725 int ret; 2726 2727 dout("%s rbd_dev %p\n", __func__, rbd_dev); 2728 2729 ret = __rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_REQUEST_LOCK, 2730 &reply_pages, &reply_len); 2731 if (ret && ret != -ETIMEDOUT) { 2732 rbd_warn(rbd_dev, "failed to request lock: %d", ret); 2733 goto out; 2734 } 2735 2736 if (reply_len > 0 && reply_len <= PAGE_SIZE) { 2737 void *p = page_address(reply_pages[0]); 2738 void *const end = p + reply_len; 2739 u32 n; 2740 2741 ceph_decode_32_safe(&p, end, n, e_inval); /* num_acks */ 2742 while (n--) { 2743 u8 struct_v; 2744 u32 len; 2745 2746 ceph_decode_need(&p, end, 8 + 8, e_inval); 2747 p += 8 + 8; /* skip gid and cookie */ 2748 2749 ceph_decode_32_safe(&p, end, len, e_inval); 2750 if (!len) 2751 continue; 2752 2753 if (lock_owner_responded) { 2754 rbd_warn(rbd_dev, 2755 "duplicate lock owners detected"); 2756 ret = -EIO; 2757 goto out; 2758 } 2759 2760 lock_owner_responded = true; 2761 ret = ceph_start_decoding(&p, end, 1, "ResponseMessage", 2762 &struct_v, &len); 2763 if (ret) { 2764 rbd_warn(rbd_dev, 2765 "failed to decode ResponseMessage: %d", 2766 ret); 2767 goto e_inval; 2768 } 2769 2770 ret = ceph_decode_32(&p); 2771 } 2772 } 2773 2774 if (!lock_owner_responded) { 2775 rbd_warn(rbd_dev, "no lock owners detected"); 2776 ret = -ETIMEDOUT; 2777 } 2778 2779 out: 2780 ceph_release_page_vector(reply_pages, calc_pages_for(0, reply_len)); 2781 return ret; 2782 2783 e_inval: 2784 ret = -EINVAL; 2785 goto out; 2786 } 2787 2788 static void wake_requests(struct rbd_device *rbd_dev, bool wake_all) 2789 { 2790 dout("%s rbd_dev %p wake_all %d\n", __func__, rbd_dev, wake_all); 2791 2792 cancel_delayed_work(&rbd_dev->lock_dwork); 2793 if (wake_all) 2794 wake_up_all(&rbd_dev->lock_waitq); 2795 else 2796 wake_up(&rbd_dev->lock_waitq); 2797 } 2798 2799 static int get_lock_owner_info(struct rbd_device *rbd_dev, 2800 struct ceph_locker **lockers, u32 *num_lockers) 2801 { 2802 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; 2803 u8 lock_type; 2804 char *lock_tag; 2805 int ret; 2806 2807 dout("%s rbd_dev %p\n", __func__, rbd_dev); 2808 2809 ret = ceph_cls_lock_info(osdc, &rbd_dev->header_oid, 2810 &rbd_dev->header_oloc, RBD_LOCK_NAME, 2811 &lock_type, &lock_tag, lockers, num_lockers); 2812 if (ret) 2813 return ret; 2814 2815 if (*num_lockers == 0) { 2816 dout("%s rbd_dev %p no lockers detected\n", __func__, rbd_dev); 2817 goto out; 2818 } 2819 2820 if (strcmp(lock_tag, RBD_LOCK_TAG)) { 2821 rbd_warn(rbd_dev, "locked by external mechanism, tag %s", 2822 lock_tag); 2823 ret = -EBUSY; 2824 goto out; 2825 } 2826 2827 if (lock_type == CEPH_CLS_LOCK_SHARED) { 2828 rbd_warn(rbd_dev, "shared lock type detected"); 2829 ret = -EBUSY; 2830 goto out; 2831 } 2832 2833 if (strncmp((*lockers)[0].id.cookie, RBD_LOCK_COOKIE_PREFIX, 2834 strlen(RBD_LOCK_COOKIE_PREFIX))) { 2835 rbd_warn(rbd_dev, "locked by external mechanism, cookie %s", 2836 (*lockers)[0].id.cookie); 2837 ret = -EBUSY; 2838 goto out; 2839 } 2840 2841 out: 2842 kfree(lock_tag); 2843 return ret; 2844 } 2845 2846 static int find_watcher(struct rbd_device *rbd_dev, 2847 const struct ceph_locker *locker) 2848 { 2849 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; 2850 struct ceph_watch_item *watchers; 2851 u32 num_watchers; 2852 u64 cookie; 2853 int i; 2854 int ret; 2855 2856 ret = ceph_osdc_list_watchers(osdc, &rbd_dev->header_oid, 2857 &rbd_dev->header_oloc, &watchers, 2858 &num_watchers); 2859 if (ret) 2860 return ret; 2861 2862 sscanf(locker->id.cookie, RBD_LOCK_COOKIE_PREFIX " %llu", &cookie); 2863 for (i = 0; i < num_watchers; i++) { 2864 if (!memcmp(&watchers[i].addr, &locker->info.addr, 2865 sizeof(locker->info.addr)) && 2866 watchers[i].cookie == cookie) { 2867 struct rbd_client_id cid = { 2868 .gid = le64_to_cpu(watchers[i].name.num), 2869 .handle = cookie, 2870 }; 2871 2872 dout("%s rbd_dev %p found cid %llu-%llu\n", __func__, 2873 rbd_dev, cid.gid, cid.handle); 2874 rbd_set_owner_cid(rbd_dev, &cid); 2875 ret = 1; 2876 goto out; 2877 } 2878 } 2879 2880 dout("%s rbd_dev %p no watchers\n", __func__, rbd_dev); 2881 ret = 0; 2882 out: 2883 kfree(watchers); 2884 return ret; 2885 } 2886 2887 /* 2888 * lock_rwsem must be held for write 2889 */ 2890 static int rbd_try_lock(struct rbd_device *rbd_dev) 2891 { 2892 struct ceph_client *client = rbd_dev->rbd_client->client; 2893 struct ceph_locker *lockers; 2894 u32 num_lockers; 2895 int ret; 2896 2897 for (;;) { 2898 ret = rbd_lock(rbd_dev); 2899 if (ret != -EBUSY) 2900 return ret; 2901 2902 /* determine if the current lock holder is still alive */ 2903 ret = get_lock_owner_info(rbd_dev, &lockers, &num_lockers); 2904 if (ret) 2905 return ret; 2906 2907 if (num_lockers == 0) 2908 goto again; 2909 2910 ret = find_watcher(rbd_dev, lockers); 2911 if (ret) { 2912 if (ret > 0) 2913 ret = 0; /* have to request lock */ 2914 goto out; 2915 } 2916 2917 rbd_warn(rbd_dev, "%s%llu seems dead, breaking lock", 2918 ENTITY_NAME(lockers[0].id.name)); 2919 2920 ret = ceph_monc_blacklist_add(&client->monc, 2921 &lockers[0].info.addr); 2922 if (ret) { 2923 rbd_warn(rbd_dev, "blacklist of %s%llu failed: %d", 2924 ENTITY_NAME(lockers[0].id.name), ret); 2925 goto out; 2926 } 2927 2928 ret = ceph_cls_break_lock(&client->osdc, &rbd_dev->header_oid, 2929 &rbd_dev->header_oloc, RBD_LOCK_NAME, 2930 lockers[0].id.cookie, 2931 &lockers[0].id.name); 2932 if (ret && ret != -ENOENT) 2933 goto out; 2934 2935 again: 2936 ceph_free_lockers(lockers, num_lockers); 2937 } 2938 2939 out: 2940 ceph_free_lockers(lockers, num_lockers); 2941 return ret; 2942 } 2943 2944 /* 2945 * ret is set only if lock_state is RBD_LOCK_STATE_UNLOCKED 2946 */ 2947 static enum rbd_lock_state rbd_try_acquire_lock(struct rbd_device *rbd_dev, 2948 int *pret) 2949 { 2950 enum rbd_lock_state lock_state; 2951 2952 down_read(&rbd_dev->lock_rwsem); 2953 dout("%s rbd_dev %p read lock_state %d\n", __func__, rbd_dev, 2954 rbd_dev->lock_state); 2955 if (__rbd_is_lock_owner(rbd_dev)) { 2956 lock_state = rbd_dev->lock_state; 2957 up_read(&rbd_dev->lock_rwsem); 2958 return lock_state; 2959 } 2960 2961 up_read(&rbd_dev->lock_rwsem); 2962 down_write(&rbd_dev->lock_rwsem); 2963 dout("%s rbd_dev %p write lock_state %d\n", __func__, rbd_dev, 2964 rbd_dev->lock_state); 2965 if (!__rbd_is_lock_owner(rbd_dev)) { 2966 *pret = rbd_try_lock(rbd_dev); 2967 if (*pret) 2968 rbd_warn(rbd_dev, "failed to acquire lock: %d", *pret); 2969 } 2970 2971 lock_state = rbd_dev->lock_state; 2972 up_write(&rbd_dev->lock_rwsem); 2973 return lock_state; 2974 } 2975 2976 static void rbd_acquire_lock(struct work_struct *work) 2977 { 2978 struct rbd_device *rbd_dev = container_of(to_delayed_work(work), 2979 struct rbd_device, lock_dwork); 2980 enum rbd_lock_state lock_state; 2981 int ret = 0; 2982 2983 dout("%s rbd_dev %p\n", __func__, rbd_dev); 2984 again: 2985 lock_state = rbd_try_acquire_lock(rbd_dev, &ret); 2986 if (lock_state != RBD_LOCK_STATE_UNLOCKED || ret == -EBLACKLISTED) { 2987 if (lock_state == RBD_LOCK_STATE_LOCKED) 2988 wake_requests(rbd_dev, true); 2989 dout("%s rbd_dev %p lock_state %d ret %d - done\n", __func__, 2990 rbd_dev, lock_state, ret); 2991 return; 2992 } 2993 2994 ret = rbd_request_lock(rbd_dev); 2995 if (ret == -ETIMEDOUT) { 2996 goto again; /* treat this as a dead client */ 2997 } else if (ret == -EROFS) { 2998 rbd_warn(rbd_dev, "peer will not release lock"); 2999 /* 3000 * If this is rbd_add_acquire_lock(), we want to fail 3001 * immediately -- reuse BLACKLISTED flag. Otherwise we 3002 * want to block. 3003 */ 3004 if (!(rbd_dev->disk->flags & GENHD_FL_UP)) { 3005 set_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags); 3006 /* wake "rbd map --exclusive" process */ 3007 wake_requests(rbd_dev, false); 3008 } 3009 } else if (ret < 0) { 3010 rbd_warn(rbd_dev, "error requesting lock: %d", ret); 3011 mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 3012 RBD_RETRY_DELAY); 3013 } else { 3014 /* 3015 * lock owner acked, but resend if we don't see them 3016 * release the lock 3017 */ 3018 dout("%s rbd_dev %p requeueing lock_dwork\n", __func__, 3019 rbd_dev); 3020 mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 3021 msecs_to_jiffies(2 * RBD_NOTIFY_TIMEOUT * MSEC_PER_SEC)); 3022 } 3023 } 3024 3025 /* 3026 * lock_rwsem must be held for write 3027 */ 3028 static bool rbd_release_lock(struct rbd_device *rbd_dev) 3029 { 3030 dout("%s rbd_dev %p read lock_state %d\n", __func__, rbd_dev, 3031 rbd_dev->lock_state); 3032 if (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED) 3033 return false; 3034 3035 rbd_dev->lock_state = RBD_LOCK_STATE_RELEASING; 3036 downgrade_write(&rbd_dev->lock_rwsem); 3037 /* 3038 * Ensure that all in-flight IO is flushed. 3039 * 3040 * FIXME: ceph_osdc_sync() flushes the entire OSD client, which 3041 * may be shared with other devices. 3042 */ 3043 ceph_osdc_sync(&rbd_dev->rbd_client->client->osdc); 3044 up_read(&rbd_dev->lock_rwsem); 3045 3046 down_write(&rbd_dev->lock_rwsem); 3047 dout("%s rbd_dev %p write lock_state %d\n", __func__, rbd_dev, 3048 rbd_dev->lock_state); 3049 if (rbd_dev->lock_state != RBD_LOCK_STATE_RELEASING) 3050 return false; 3051 3052 rbd_unlock(rbd_dev); 3053 /* 3054 * Give others a chance to grab the lock - we would re-acquire 3055 * almost immediately if we got new IO during ceph_osdc_sync() 3056 * otherwise. We need to ack our own notifications, so this 3057 * lock_dwork will be requeued from rbd_wait_state_locked() 3058 * after wake_requests() in rbd_handle_released_lock(). 3059 */ 3060 cancel_delayed_work(&rbd_dev->lock_dwork); 3061 return true; 3062 } 3063 3064 static void rbd_release_lock_work(struct work_struct *work) 3065 { 3066 struct rbd_device *rbd_dev = container_of(work, struct rbd_device, 3067 unlock_work); 3068 3069 down_write(&rbd_dev->lock_rwsem); 3070 rbd_release_lock(rbd_dev); 3071 up_write(&rbd_dev->lock_rwsem); 3072 } 3073 3074 static void rbd_handle_acquired_lock(struct rbd_device *rbd_dev, u8 struct_v, 3075 void **p) 3076 { 3077 struct rbd_client_id cid = { 0 }; 3078 3079 if (struct_v >= 2) { 3080 cid.gid = ceph_decode_64(p); 3081 cid.handle = ceph_decode_64(p); 3082 } 3083 3084 dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid, 3085 cid.handle); 3086 if (!rbd_cid_equal(&cid, &rbd_empty_cid)) { 3087 down_write(&rbd_dev->lock_rwsem); 3088 if (rbd_cid_equal(&cid, &rbd_dev->owner_cid)) { 3089 /* 3090 * we already know that the remote client is 3091 * the owner 3092 */ 3093 up_write(&rbd_dev->lock_rwsem); 3094 return; 3095 } 3096 3097 rbd_set_owner_cid(rbd_dev, &cid); 3098 downgrade_write(&rbd_dev->lock_rwsem); 3099 } else { 3100 down_read(&rbd_dev->lock_rwsem); 3101 } 3102 3103 if (!__rbd_is_lock_owner(rbd_dev)) 3104 wake_requests(rbd_dev, false); 3105 up_read(&rbd_dev->lock_rwsem); 3106 } 3107 3108 static void rbd_handle_released_lock(struct rbd_device *rbd_dev, u8 struct_v, 3109 void **p) 3110 { 3111 struct rbd_client_id cid = { 0 }; 3112 3113 if (struct_v >= 2) { 3114 cid.gid = ceph_decode_64(p); 3115 cid.handle = ceph_decode_64(p); 3116 } 3117 3118 dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid, 3119 cid.handle); 3120 if (!rbd_cid_equal(&cid, &rbd_empty_cid)) { 3121 down_write(&rbd_dev->lock_rwsem); 3122 if (!rbd_cid_equal(&cid, &rbd_dev->owner_cid)) { 3123 dout("%s rbd_dev %p unexpected owner, cid %llu-%llu != owner_cid %llu-%llu\n", 3124 __func__, rbd_dev, cid.gid, cid.handle, 3125 rbd_dev->owner_cid.gid, rbd_dev->owner_cid.handle); 3126 up_write(&rbd_dev->lock_rwsem); 3127 return; 3128 } 3129 3130 rbd_set_owner_cid(rbd_dev, &rbd_empty_cid); 3131 downgrade_write(&rbd_dev->lock_rwsem); 3132 } else { 3133 down_read(&rbd_dev->lock_rwsem); 3134 } 3135 3136 if (!__rbd_is_lock_owner(rbd_dev)) 3137 wake_requests(rbd_dev, false); 3138 up_read(&rbd_dev->lock_rwsem); 3139 } 3140 3141 /* 3142 * Returns result for ResponseMessage to be encoded (<= 0), or 1 if no 3143 * ResponseMessage is needed. 3144 */ 3145 static int rbd_handle_request_lock(struct rbd_device *rbd_dev, u8 struct_v, 3146 void **p) 3147 { 3148 struct rbd_client_id my_cid = rbd_get_cid(rbd_dev); 3149 struct rbd_client_id cid = { 0 }; 3150 int result = 1; 3151 3152 if (struct_v >= 2) { 3153 cid.gid = ceph_decode_64(p); 3154 cid.handle = ceph_decode_64(p); 3155 } 3156 3157 dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid, 3158 cid.handle); 3159 if (rbd_cid_equal(&cid, &my_cid)) 3160 return result; 3161 3162 down_read(&rbd_dev->lock_rwsem); 3163 if (__rbd_is_lock_owner(rbd_dev)) { 3164 if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED && 3165 rbd_cid_equal(&rbd_dev->owner_cid, &rbd_empty_cid)) 3166 goto out_unlock; 3167 3168 /* 3169 * encode ResponseMessage(0) so the peer can detect 3170 * a missing owner 3171 */ 3172 result = 0; 3173 3174 if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED) { 3175 if (!rbd_dev->opts->exclusive) { 3176 dout("%s rbd_dev %p queueing unlock_work\n", 3177 __func__, rbd_dev); 3178 queue_work(rbd_dev->task_wq, 3179 &rbd_dev->unlock_work); 3180 } else { 3181 /* refuse to release the lock */ 3182 result = -EROFS; 3183 } 3184 } 3185 } 3186 3187 out_unlock: 3188 up_read(&rbd_dev->lock_rwsem); 3189 return result; 3190 } 3191 3192 static void __rbd_acknowledge_notify(struct rbd_device *rbd_dev, 3193 u64 notify_id, u64 cookie, s32 *result) 3194 { 3195 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; 3196 char buf[4 + CEPH_ENCODING_START_BLK_LEN]; 3197 int buf_size = sizeof(buf); 3198 int ret; 3199 3200 if (result) { 3201 void *p = buf; 3202 3203 /* encode ResponseMessage */ 3204 ceph_start_encoding(&p, 1, 1, 3205 buf_size - CEPH_ENCODING_START_BLK_LEN); 3206 ceph_encode_32(&p, *result); 3207 } else { 3208 buf_size = 0; 3209 } 3210 3211 ret = ceph_osdc_notify_ack(osdc, &rbd_dev->header_oid, 3212 &rbd_dev->header_oloc, notify_id, cookie, 3213 buf, buf_size); 3214 if (ret) 3215 rbd_warn(rbd_dev, "acknowledge_notify failed: %d", ret); 3216 } 3217 3218 static void rbd_acknowledge_notify(struct rbd_device *rbd_dev, u64 notify_id, 3219 u64 cookie) 3220 { 3221 dout("%s rbd_dev %p\n", __func__, rbd_dev); 3222 __rbd_acknowledge_notify(rbd_dev, notify_id, cookie, NULL); 3223 } 3224 3225 static void rbd_acknowledge_notify_result(struct rbd_device *rbd_dev, 3226 u64 notify_id, u64 cookie, s32 result) 3227 { 3228 dout("%s rbd_dev %p result %d\n", __func__, rbd_dev, result); 3229 __rbd_acknowledge_notify(rbd_dev, notify_id, cookie, &result); 3230 } 3231 3232 static void rbd_watch_cb(void *arg, u64 notify_id, u64 cookie, 3233 u64 notifier_id, void *data, size_t data_len) 3234 { 3235 struct rbd_device *rbd_dev = arg; 3236 void *p = data; 3237 void *const end = p + data_len; 3238 u8 struct_v = 0; 3239 u32 len; 3240 u32 notify_op; 3241 int ret; 3242 3243 dout("%s rbd_dev %p cookie %llu notify_id %llu data_len %zu\n", 3244 __func__, rbd_dev, cookie, notify_id, data_len); 3245 if (data_len) { 3246 ret = ceph_start_decoding(&p, end, 1, "NotifyMessage", 3247 &struct_v, &len); 3248 if (ret) { 3249 rbd_warn(rbd_dev, "failed to decode NotifyMessage: %d", 3250 ret); 3251 return; 3252 } 3253 3254 notify_op = ceph_decode_32(&p); 3255 } else { 3256 /* legacy notification for header updates */ 3257 notify_op = RBD_NOTIFY_OP_HEADER_UPDATE; 3258 len = 0; 3259 } 3260 3261 dout("%s rbd_dev %p notify_op %u\n", __func__, rbd_dev, notify_op); 3262 switch (notify_op) { 3263 case RBD_NOTIFY_OP_ACQUIRED_LOCK: 3264 rbd_handle_acquired_lock(rbd_dev, struct_v, &p); 3265 rbd_acknowledge_notify(rbd_dev, notify_id, cookie); 3266 break; 3267 case RBD_NOTIFY_OP_RELEASED_LOCK: 3268 rbd_handle_released_lock(rbd_dev, struct_v, &p); 3269 rbd_acknowledge_notify(rbd_dev, notify_id, cookie); 3270 break; 3271 case RBD_NOTIFY_OP_REQUEST_LOCK: 3272 ret = rbd_handle_request_lock(rbd_dev, struct_v, &p); 3273 if (ret <= 0) 3274 rbd_acknowledge_notify_result(rbd_dev, notify_id, 3275 cookie, ret); 3276 else 3277 rbd_acknowledge_notify(rbd_dev, notify_id, cookie); 3278 break; 3279 case RBD_NOTIFY_OP_HEADER_UPDATE: 3280 ret = rbd_dev_refresh(rbd_dev); 3281 if (ret) 3282 rbd_warn(rbd_dev, "refresh failed: %d", ret); 3283 3284 rbd_acknowledge_notify(rbd_dev, notify_id, cookie); 3285 break; 3286 default: 3287 if (rbd_is_lock_owner(rbd_dev)) 3288 rbd_acknowledge_notify_result(rbd_dev, notify_id, 3289 cookie, -EOPNOTSUPP); 3290 else 3291 rbd_acknowledge_notify(rbd_dev, notify_id, cookie); 3292 break; 3293 } 3294 } 3295 3296 static void __rbd_unregister_watch(struct rbd_device *rbd_dev); 3297 3298 static void rbd_watch_errcb(void *arg, u64 cookie, int err) 3299 { 3300 struct rbd_device *rbd_dev = arg; 3301 3302 rbd_warn(rbd_dev, "encountered watch error: %d", err); 3303 3304 down_write(&rbd_dev->lock_rwsem); 3305 rbd_set_owner_cid(rbd_dev, &rbd_empty_cid); 3306 up_write(&rbd_dev->lock_rwsem); 3307 3308 mutex_lock(&rbd_dev->watch_mutex); 3309 if (rbd_dev->watch_state == RBD_WATCH_STATE_REGISTERED) { 3310 __rbd_unregister_watch(rbd_dev); 3311 rbd_dev->watch_state = RBD_WATCH_STATE_ERROR; 3312 3313 queue_delayed_work(rbd_dev->task_wq, &rbd_dev->watch_dwork, 0); 3314 } 3315 mutex_unlock(&rbd_dev->watch_mutex); 3316 } 3317 3318 /* 3319 * watch_mutex must be locked 3320 */ 3321 static int __rbd_register_watch(struct rbd_device *rbd_dev) 3322 { 3323 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; 3324 struct ceph_osd_linger_request *handle; 3325 3326 rbd_assert(!rbd_dev->watch_handle); 3327 dout("%s rbd_dev %p\n", __func__, rbd_dev); 3328 3329 handle = ceph_osdc_watch(osdc, &rbd_dev->header_oid, 3330 &rbd_dev->header_oloc, rbd_watch_cb, 3331 rbd_watch_errcb, rbd_dev); 3332 if (IS_ERR(handle)) 3333 return PTR_ERR(handle); 3334 3335 rbd_dev->watch_handle = handle; 3336 return 0; 3337 } 3338 3339 /* 3340 * watch_mutex must be locked 3341 */ 3342 static void __rbd_unregister_watch(struct rbd_device *rbd_dev) 3343 { 3344 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; 3345 int ret; 3346 3347 rbd_assert(rbd_dev->watch_handle); 3348 dout("%s rbd_dev %p\n", __func__, rbd_dev); 3349 3350 ret = ceph_osdc_unwatch(osdc, rbd_dev->watch_handle); 3351 if (ret) 3352 rbd_warn(rbd_dev, "failed to unwatch: %d", ret); 3353 3354 rbd_dev->watch_handle = NULL; 3355 } 3356 3357 static int rbd_register_watch(struct rbd_device *rbd_dev) 3358 { 3359 int ret; 3360 3361 mutex_lock(&rbd_dev->watch_mutex); 3362 rbd_assert(rbd_dev->watch_state == RBD_WATCH_STATE_UNREGISTERED); 3363 ret = __rbd_register_watch(rbd_dev); 3364 if (ret) 3365 goto out; 3366 3367 rbd_dev->watch_state = RBD_WATCH_STATE_REGISTERED; 3368 rbd_dev->watch_cookie = rbd_dev->watch_handle->linger_id; 3369 3370 out: 3371 mutex_unlock(&rbd_dev->watch_mutex); 3372 return ret; 3373 } 3374 3375 static void cancel_tasks_sync(struct rbd_device *rbd_dev) 3376 { 3377 dout("%s rbd_dev %p\n", __func__, rbd_dev); 3378 3379 cancel_delayed_work_sync(&rbd_dev->watch_dwork); 3380 cancel_work_sync(&rbd_dev->acquired_lock_work); 3381 cancel_work_sync(&rbd_dev->released_lock_work); 3382 cancel_delayed_work_sync(&rbd_dev->lock_dwork); 3383 cancel_work_sync(&rbd_dev->unlock_work); 3384 } 3385 3386 static void rbd_unregister_watch(struct rbd_device *rbd_dev) 3387 { 3388 WARN_ON(waitqueue_active(&rbd_dev->lock_waitq)); 3389 cancel_tasks_sync(rbd_dev); 3390 3391 mutex_lock(&rbd_dev->watch_mutex); 3392 if (rbd_dev->watch_state == RBD_WATCH_STATE_REGISTERED) 3393 __rbd_unregister_watch(rbd_dev); 3394 rbd_dev->watch_state = RBD_WATCH_STATE_UNREGISTERED; 3395 mutex_unlock(&rbd_dev->watch_mutex); 3396 3397 ceph_osdc_flush_notifies(&rbd_dev->rbd_client->client->osdc); 3398 } 3399 3400 /* 3401 * lock_rwsem must be held for write 3402 */ 3403 static void rbd_reacquire_lock(struct rbd_device *rbd_dev) 3404 { 3405 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; 3406 char cookie[32]; 3407 int ret; 3408 3409 WARN_ON(rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED); 3410 3411 format_lock_cookie(rbd_dev, cookie); 3412 ret = ceph_cls_set_cookie(osdc, &rbd_dev->header_oid, 3413 &rbd_dev->header_oloc, RBD_LOCK_NAME, 3414 CEPH_CLS_LOCK_EXCLUSIVE, rbd_dev->lock_cookie, 3415 RBD_LOCK_TAG, cookie); 3416 if (ret) { 3417 if (ret != -EOPNOTSUPP) 3418 rbd_warn(rbd_dev, "failed to update lock cookie: %d", 3419 ret); 3420 3421 /* 3422 * Lock cookie cannot be updated on older OSDs, so do 3423 * a manual release and queue an acquire. 3424 */ 3425 if (rbd_release_lock(rbd_dev)) 3426 queue_delayed_work(rbd_dev->task_wq, 3427 &rbd_dev->lock_dwork, 0); 3428 } else { 3429 __rbd_lock(rbd_dev, cookie); 3430 } 3431 } 3432 3433 static void rbd_reregister_watch(struct work_struct *work) 3434 { 3435 struct rbd_device *rbd_dev = container_of(to_delayed_work(work), 3436 struct rbd_device, watch_dwork); 3437 int ret; 3438 3439 dout("%s rbd_dev %p\n", __func__, rbd_dev); 3440 3441 mutex_lock(&rbd_dev->watch_mutex); 3442 if (rbd_dev->watch_state != RBD_WATCH_STATE_ERROR) { 3443 mutex_unlock(&rbd_dev->watch_mutex); 3444 return; 3445 } 3446 3447 ret = __rbd_register_watch(rbd_dev); 3448 if (ret) { 3449 rbd_warn(rbd_dev, "failed to reregister watch: %d", ret); 3450 if (ret == -EBLACKLISTED || ret == -ENOENT) { 3451 set_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags); 3452 wake_requests(rbd_dev, true); 3453 } else { 3454 queue_delayed_work(rbd_dev->task_wq, 3455 &rbd_dev->watch_dwork, 3456 RBD_RETRY_DELAY); 3457 } 3458 mutex_unlock(&rbd_dev->watch_mutex); 3459 return; 3460 } 3461 3462 rbd_dev->watch_state = RBD_WATCH_STATE_REGISTERED; 3463 rbd_dev->watch_cookie = rbd_dev->watch_handle->linger_id; 3464 mutex_unlock(&rbd_dev->watch_mutex); 3465 3466 down_write(&rbd_dev->lock_rwsem); 3467 if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED) 3468 rbd_reacquire_lock(rbd_dev); 3469 up_write(&rbd_dev->lock_rwsem); 3470 3471 ret = rbd_dev_refresh(rbd_dev); 3472 if (ret) 3473 rbd_warn(rbd_dev, "reregistration refresh failed: %d", ret); 3474 } 3475 3476 /* 3477 * Synchronous osd object method call. Returns the number of bytes 3478 * returned in the outbound buffer, or a negative error code. 3479 */ 3480 static int rbd_obj_method_sync(struct rbd_device *rbd_dev, 3481 struct ceph_object_id *oid, 3482 struct ceph_object_locator *oloc, 3483 const char *method_name, 3484 const void *outbound, 3485 size_t outbound_size, 3486 void *inbound, 3487 size_t inbound_size) 3488 { 3489 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; 3490 struct page *req_page = NULL; 3491 struct page *reply_page; 3492 int ret; 3493 3494 /* 3495 * Method calls are ultimately read operations. The result 3496 * should placed into the inbound buffer provided. They 3497 * also supply outbound data--parameters for the object 3498 * method. Currently if this is present it will be a 3499 * snapshot id. 3500 */ 3501 if (outbound) { 3502 if (outbound_size > PAGE_SIZE) 3503 return -E2BIG; 3504 3505 req_page = alloc_page(GFP_KERNEL); 3506 if (!req_page) 3507 return -ENOMEM; 3508 3509 memcpy(page_address(req_page), outbound, outbound_size); 3510 } 3511 3512 reply_page = alloc_page(GFP_KERNEL); 3513 if (!reply_page) { 3514 if (req_page) 3515 __free_page(req_page); 3516 return -ENOMEM; 3517 } 3518 3519 ret = ceph_osdc_call(osdc, oid, oloc, RBD_DRV_NAME, method_name, 3520 CEPH_OSD_FLAG_READ, req_page, outbound_size, 3521 reply_page, &inbound_size); 3522 if (!ret) { 3523 memcpy(inbound, page_address(reply_page), inbound_size); 3524 ret = inbound_size; 3525 } 3526 3527 if (req_page) 3528 __free_page(req_page); 3529 __free_page(reply_page); 3530 return ret; 3531 } 3532 3533 /* 3534 * lock_rwsem must be held for read 3535 */ 3536 static void rbd_wait_state_locked(struct rbd_device *rbd_dev) 3537 { 3538 DEFINE_WAIT(wait); 3539 3540 do { 3541 /* 3542 * Note the use of mod_delayed_work() in rbd_acquire_lock() 3543 * and cancel_delayed_work() in wake_requests(). 3544 */ 3545 dout("%s rbd_dev %p queueing lock_dwork\n", __func__, rbd_dev); 3546 queue_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 0); 3547 prepare_to_wait_exclusive(&rbd_dev->lock_waitq, &wait, 3548 TASK_UNINTERRUPTIBLE); 3549 up_read(&rbd_dev->lock_rwsem); 3550 schedule(); 3551 down_read(&rbd_dev->lock_rwsem); 3552 } while (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED && 3553 !test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags)); 3554 3555 finish_wait(&rbd_dev->lock_waitq, &wait); 3556 } 3557 3558 static void rbd_queue_workfn(struct work_struct *work) 3559 { 3560 struct request *rq = blk_mq_rq_from_pdu(work); 3561 struct rbd_device *rbd_dev = rq->q->queuedata; 3562 struct rbd_img_request *img_request; 3563 struct ceph_snap_context *snapc = NULL; 3564 u64 offset = (u64)blk_rq_pos(rq) << SECTOR_SHIFT; 3565 u64 length = blk_rq_bytes(rq); 3566 enum obj_operation_type op_type; 3567 u64 mapping_size; 3568 bool must_be_locked; 3569 int result; 3570 3571 switch (req_op(rq)) { 3572 case REQ_OP_DISCARD: 3573 case REQ_OP_WRITE_ZEROES: 3574 op_type = OBJ_OP_DISCARD; 3575 break; 3576 case REQ_OP_WRITE: 3577 op_type = OBJ_OP_WRITE; 3578 break; 3579 case REQ_OP_READ: 3580 op_type = OBJ_OP_READ; 3581 break; 3582 default: 3583 dout("%s: non-fs request type %d\n", __func__, req_op(rq)); 3584 result = -EIO; 3585 goto err; 3586 } 3587 3588 /* Ignore/skip any zero-length requests */ 3589 3590 if (!length) { 3591 dout("%s: zero-length request\n", __func__); 3592 result = 0; 3593 goto err_rq; 3594 } 3595 3596 rbd_assert(op_type == OBJ_OP_READ || 3597 rbd_dev->spec->snap_id == CEPH_NOSNAP); 3598 3599 /* 3600 * Quit early if the mapped snapshot no longer exists. It's 3601 * still possible the snapshot will have disappeared by the 3602 * time our request arrives at the osd, but there's no sense in 3603 * sending it if we already know. 3604 */ 3605 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) { 3606 dout("request for non-existent snapshot"); 3607 rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP); 3608 result = -ENXIO; 3609 goto err_rq; 3610 } 3611 3612 if (offset && length > U64_MAX - offset + 1) { 3613 rbd_warn(rbd_dev, "bad request range (%llu~%llu)", offset, 3614 length); 3615 result = -EINVAL; 3616 goto err_rq; /* Shouldn't happen */ 3617 } 3618 3619 blk_mq_start_request(rq); 3620 3621 down_read(&rbd_dev->header_rwsem); 3622 mapping_size = rbd_dev->mapping.size; 3623 if (op_type != OBJ_OP_READ) { 3624 snapc = rbd_dev->header.snapc; 3625 ceph_get_snap_context(snapc); 3626 } 3627 up_read(&rbd_dev->header_rwsem); 3628 3629 if (offset + length > mapping_size) { 3630 rbd_warn(rbd_dev, "beyond EOD (%llu~%llu > %llu)", offset, 3631 length, mapping_size); 3632 result = -EIO; 3633 goto err_rq; 3634 } 3635 3636 must_be_locked = 3637 (rbd_dev->header.features & RBD_FEATURE_EXCLUSIVE_LOCK) && 3638 (op_type != OBJ_OP_READ || rbd_dev->opts->lock_on_read); 3639 if (must_be_locked) { 3640 down_read(&rbd_dev->lock_rwsem); 3641 if (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED && 3642 !test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags)) { 3643 if (rbd_dev->opts->exclusive) { 3644 rbd_warn(rbd_dev, "exclusive lock required"); 3645 result = -EROFS; 3646 goto err_unlock; 3647 } 3648 rbd_wait_state_locked(rbd_dev); 3649 } 3650 if (test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags)) { 3651 result = -EBLACKLISTED; 3652 goto err_unlock; 3653 } 3654 } 3655 3656 img_request = rbd_img_request_create(rbd_dev, op_type, snapc); 3657 if (!img_request) { 3658 result = -ENOMEM; 3659 goto err_unlock; 3660 } 3661 img_request->rq = rq; 3662 snapc = NULL; /* img_request consumes a ref */ 3663 3664 if (op_type == OBJ_OP_DISCARD) 3665 result = rbd_img_fill_nodata(img_request, offset, length); 3666 else 3667 result = rbd_img_fill_from_bio(img_request, offset, length, 3668 rq->bio); 3669 if (result) 3670 goto err_img_request; 3671 3672 rbd_img_request_submit(img_request); 3673 if (must_be_locked) 3674 up_read(&rbd_dev->lock_rwsem); 3675 return; 3676 3677 err_img_request: 3678 rbd_img_request_put(img_request); 3679 err_unlock: 3680 if (must_be_locked) 3681 up_read(&rbd_dev->lock_rwsem); 3682 err_rq: 3683 if (result) 3684 rbd_warn(rbd_dev, "%s %llx at %llx result %d", 3685 obj_op_name(op_type), length, offset, result); 3686 ceph_put_snap_context(snapc); 3687 err: 3688 blk_mq_end_request(rq, errno_to_blk_status(result)); 3689 } 3690 3691 static blk_status_t rbd_queue_rq(struct blk_mq_hw_ctx *hctx, 3692 const struct blk_mq_queue_data *bd) 3693 { 3694 struct request *rq = bd->rq; 3695 struct work_struct *work = blk_mq_rq_to_pdu(rq); 3696 3697 queue_work(rbd_wq, work); 3698 return BLK_STS_OK; 3699 } 3700 3701 static void rbd_free_disk(struct rbd_device *rbd_dev) 3702 { 3703 blk_cleanup_queue(rbd_dev->disk->queue); 3704 blk_mq_free_tag_set(&rbd_dev->tag_set); 3705 put_disk(rbd_dev->disk); 3706 rbd_dev->disk = NULL; 3707 } 3708 3709 static int rbd_obj_read_sync(struct rbd_device *rbd_dev, 3710 struct ceph_object_id *oid, 3711 struct ceph_object_locator *oloc, 3712 void *buf, int buf_len) 3713 3714 { 3715 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; 3716 struct ceph_osd_request *req; 3717 struct page **pages; 3718 int num_pages = calc_pages_for(0, buf_len); 3719 int ret; 3720 3721 req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_KERNEL); 3722 if (!req) 3723 return -ENOMEM; 3724 3725 ceph_oid_copy(&req->r_base_oid, oid); 3726 ceph_oloc_copy(&req->r_base_oloc, oloc); 3727 req->r_flags = CEPH_OSD_FLAG_READ; 3728 3729 ret = ceph_osdc_alloc_messages(req, GFP_KERNEL); 3730 if (ret) 3731 goto out_req; 3732 3733 pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL); 3734 if (IS_ERR(pages)) { 3735 ret = PTR_ERR(pages); 3736 goto out_req; 3737 } 3738 3739 osd_req_op_extent_init(req, 0, CEPH_OSD_OP_READ, 0, buf_len, 0, 0); 3740 osd_req_op_extent_osd_data_pages(req, 0, pages, buf_len, 0, false, 3741 true); 3742 3743 ceph_osdc_start_request(osdc, req, false); 3744 ret = ceph_osdc_wait_request(osdc, req); 3745 if (ret >= 0) 3746 ceph_copy_from_page_vector(pages, buf, 0, ret); 3747 3748 out_req: 3749 ceph_osdc_put_request(req); 3750 return ret; 3751 } 3752 3753 /* 3754 * Read the complete header for the given rbd device. On successful 3755 * return, the rbd_dev->header field will contain up-to-date 3756 * information about the image. 3757 */ 3758 static int rbd_dev_v1_header_info(struct rbd_device *rbd_dev) 3759 { 3760 struct rbd_image_header_ondisk *ondisk = NULL; 3761 u32 snap_count = 0; 3762 u64 names_size = 0; 3763 u32 want_count; 3764 int ret; 3765 3766 /* 3767 * The complete header will include an array of its 64-bit 3768 * snapshot ids, followed by the names of those snapshots as 3769 * a contiguous block of NUL-terminated strings. Note that 3770 * the number of snapshots could change by the time we read 3771 * it in, in which case we re-read it. 3772 */ 3773 do { 3774 size_t size; 3775 3776 kfree(ondisk); 3777 3778 size = sizeof (*ondisk); 3779 size += snap_count * sizeof (struct rbd_image_snap_ondisk); 3780 size += names_size; 3781 ondisk = kmalloc(size, GFP_KERNEL); 3782 if (!ondisk) 3783 return -ENOMEM; 3784 3785 ret = rbd_obj_read_sync(rbd_dev, &rbd_dev->header_oid, 3786 &rbd_dev->header_oloc, ondisk, size); 3787 if (ret < 0) 3788 goto out; 3789 if ((size_t)ret < size) { 3790 ret = -ENXIO; 3791 rbd_warn(rbd_dev, "short header read (want %zd got %d)", 3792 size, ret); 3793 goto out; 3794 } 3795 if (!rbd_dev_ondisk_valid(ondisk)) { 3796 ret = -ENXIO; 3797 rbd_warn(rbd_dev, "invalid header"); 3798 goto out; 3799 } 3800 3801 names_size = le64_to_cpu(ondisk->snap_names_len); 3802 want_count = snap_count; 3803 snap_count = le32_to_cpu(ondisk->snap_count); 3804 } while (snap_count != want_count); 3805 3806 ret = rbd_header_from_disk(rbd_dev, ondisk); 3807 out: 3808 kfree(ondisk); 3809 3810 return ret; 3811 } 3812 3813 /* 3814 * Clear the rbd device's EXISTS flag if the snapshot it's mapped to 3815 * has disappeared from the (just updated) snapshot context. 3816 */ 3817 static void rbd_exists_validate(struct rbd_device *rbd_dev) 3818 { 3819 u64 snap_id; 3820 3821 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) 3822 return; 3823 3824 snap_id = rbd_dev->spec->snap_id; 3825 if (snap_id == CEPH_NOSNAP) 3826 return; 3827 3828 if (rbd_dev_snap_index(rbd_dev, snap_id) == BAD_SNAP_INDEX) 3829 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags); 3830 } 3831 3832 static void rbd_dev_update_size(struct rbd_device *rbd_dev) 3833 { 3834 sector_t size; 3835 3836 /* 3837 * If EXISTS is not set, rbd_dev->disk may be NULL, so don't 3838 * try to update its size. If REMOVING is set, updating size 3839 * is just useless work since the device can't be opened. 3840 */ 3841 if (test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags) && 3842 !test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags)) { 3843 size = (sector_t)rbd_dev->mapping.size / SECTOR_SIZE; 3844 dout("setting size to %llu sectors", (unsigned long long)size); 3845 set_capacity(rbd_dev->disk, size); 3846 revalidate_disk(rbd_dev->disk); 3847 } 3848 } 3849 3850 static int rbd_dev_refresh(struct rbd_device *rbd_dev) 3851 { 3852 u64 mapping_size; 3853 int ret; 3854 3855 down_write(&rbd_dev->header_rwsem); 3856 mapping_size = rbd_dev->mapping.size; 3857 3858 ret = rbd_dev_header_info(rbd_dev); 3859 if (ret) 3860 goto out; 3861 3862 /* 3863 * If there is a parent, see if it has disappeared due to the 3864 * mapped image getting flattened. 3865 */ 3866 if (rbd_dev->parent) { 3867 ret = rbd_dev_v2_parent_info(rbd_dev); 3868 if (ret) 3869 goto out; 3870 } 3871 3872 if (rbd_dev->spec->snap_id == CEPH_NOSNAP) { 3873 rbd_dev->mapping.size = rbd_dev->header.image_size; 3874 } else { 3875 /* validate mapped snapshot's EXISTS flag */ 3876 rbd_exists_validate(rbd_dev); 3877 } 3878 3879 out: 3880 up_write(&rbd_dev->header_rwsem); 3881 if (!ret && mapping_size != rbd_dev->mapping.size) 3882 rbd_dev_update_size(rbd_dev); 3883 3884 return ret; 3885 } 3886 3887 static int rbd_init_request(struct blk_mq_tag_set *set, struct request *rq, 3888 unsigned int hctx_idx, unsigned int numa_node) 3889 { 3890 struct work_struct *work = blk_mq_rq_to_pdu(rq); 3891 3892 INIT_WORK(work, rbd_queue_workfn); 3893 return 0; 3894 } 3895 3896 static const struct blk_mq_ops rbd_mq_ops = { 3897 .queue_rq = rbd_queue_rq, 3898 .init_request = rbd_init_request, 3899 }; 3900 3901 static int rbd_init_disk(struct rbd_device *rbd_dev) 3902 { 3903 struct gendisk *disk; 3904 struct request_queue *q; 3905 u64 segment_size; 3906 int err; 3907 3908 /* create gendisk info */ 3909 disk = alloc_disk(single_major ? 3910 (1 << RBD_SINGLE_MAJOR_PART_SHIFT) : 3911 RBD_MINORS_PER_MAJOR); 3912 if (!disk) 3913 return -ENOMEM; 3914 3915 snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d", 3916 rbd_dev->dev_id); 3917 disk->major = rbd_dev->major; 3918 disk->first_minor = rbd_dev->minor; 3919 if (single_major) 3920 disk->flags |= GENHD_FL_EXT_DEVT; 3921 disk->fops = &rbd_bd_ops; 3922 disk->private_data = rbd_dev; 3923 3924 memset(&rbd_dev->tag_set, 0, sizeof(rbd_dev->tag_set)); 3925 rbd_dev->tag_set.ops = &rbd_mq_ops; 3926 rbd_dev->tag_set.queue_depth = rbd_dev->opts->queue_depth; 3927 rbd_dev->tag_set.numa_node = NUMA_NO_NODE; 3928 rbd_dev->tag_set.flags = BLK_MQ_F_SHOULD_MERGE | BLK_MQ_F_SG_MERGE; 3929 rbd_dev->tag_set.nr_hw_queues = 1; 3930 rbd_dev->tag_set.cmd_size = sizeof(struct work_struct); 3931 3932 err = blk_mq_alloc_tag_set(&rbd_dev->tag_set); 3933 if (err) 3934 goto out_disk; 3935 3936 q = blk_mq_init_queue(&rbd_dev->tag_set); 3937 if (IS_ERR(q)) { 3938 err = PTR_ERR(q); 3939 goto out_tag_set; 3940 } 3941 3942 blk_queue_flag_set(QUEUE_FLAG_NONROT, q); 3943 /* QUEUE_FLAG_ADD_RANDOM is off by default for blk-mq */ 3944 3945 /* set io sizes to object size */ 3946 segment_size = rbd_obj_bytes(&rbd_dev->header); 3947 blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE); 3948 q->limits.max_sectors = queue_max_hw_sectors(q); 3949 blk_queue_max_segments(q, USHRT_MAX); 3950 blk_queue_max_segment_size(q, UINT_MAX); 3951 blk_queue_io_min(q, segment_size); 3952 blk_queue_io_opt(q, segment_size); 3953 3954 /* enable the discard support */ 3955 blk_queue_flag_set(QUEUE_FLAG_DISCARD, q); 3956 q->limits.discard_granularity = segment_size; 3957 blk_queue_max_discard_sectors(q, segment_size / SECTOR_SIZE); 3958 blk_queue_max_write_zeroes_sectors(q, segment_size / SECTOR_SIZE); 3959 3960 if (!ceph_test_opt(rbd_dev->rbd_client->client, NOCRC)) 3961 q->backing_dev_info->capabilities |= BDI_CAP_STABLE_WRITES; 3962 3963 /* 3964 * disk_release() expects a queue ref from add_disk() and will 3965 * put it. Hold an extra ref until add_disk() is called. 3966 */ 3967 WARN_ON(!blk_get_queue(q)); 3968 disk->queue = q; 3969 q->queuedata = rbd_dev; 3970 3971 rbd_dev->disk = disk; 3972 3973 return 0; 3974 out_tag_set: 3975 blk_mq_free_tag_set(&rbd_dev->tag_set); 3976 out_disk: 3977 put_disk(disk); 3978 return err; 3979 } 3980 3981 /* 3982 sysfs 3983 */ 3984 3985 static struct rbd_device *dev_to_rbd_dev(struct device *dev) 3986 { 3987 return container_of(dev, struct rbd_device, dev); 3988 } 3989 3990 static ssize_t rbd_size_show(struct device *dev, 3991 struct device_attribute *attr, char *buf) 3992 { 3993 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 3994 3995 return sprintf(buf, "%llu\n", 3996 (unsigned long long)rbd_dev->mapping.size); 3997 } 3998 3999 /* 4000 * Note this shows the features for whatever's mapped, which is not 4001 * necessarily the base image. 4002 */ 4003 static ssize_t rbd_features_show(struct device *dev, 4004 struct device_attribute *attr, char *buf) 4005 { 4006 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 4007 4008 return sprintf(buf, "0x%016llx\n", 4009 (unsigned long long)rbd_dev->mapping.features); 4010 } 4011 4012 static ssize_t rbd_major_show(struct device *dev, 4013 struct device_attribute *attr, char *buf) 4014 { 4015 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 4016 4017 if (rbd_dev->major) 4018 return sprintf(buf, "%d\n", rbd_dev->major); 4019 4020 return sprintf(buf, "(none)\n"); 4021 } 4022 4023 static ssize_t rbd_minor_show(struct device *dev, 4024 struct device_attribute *attr, char *buf) 4025 { 4026 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 4027 4028 return sprintf(buf, "%d\n", rbd_dev->minor); 4029 } 4030 4031 static ssize_t rbd_client_addr_show(struct device *dev, 4032 struct device_attribute *attr, char *buf) 4033 { 4034 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 4035 struct ceph_entity_addr *client_addr = 4036 ceph_client_addr(rbd_dev->rbd_client->client); 4037 4038 return sprintf(buf, "%pISpc/%u\n", &client_addr->in_addr, 4039 le32_to_cpu(client_addr->nonce)); 4040 } 4041 4042 static ssize_t rbd_client_id_show(struct device *dev, 4043 struct device_attribute *attr, char *buf) 4044 { 4045 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 4046 4047 return sprintf(buf, "client%lld\n", 4048 ceph_client_gid(rbd_dev->rbd_client->client)); 4049 } 4050 4051 static ssize_t rbd_cluster_fsid_show(struct device *dev, 4052 struct device_attribute *attr, char *buf) 4053 { 4054 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 4055 4056 return sprintf(buf, "%pU\n", &rbd_dev->rbd_client->client->fsid); 4057 } 4058 4059 static ssize_t rbd_config_info_show(struct device *dev, 4060 struct device_attribute *attr, char *buf) 4061 { 4062 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 4063 4064 return sprintf(buf, "%s\n", rbd_dev->config_info); 4065 } 4066 4067 static ssize_t rbd_pool_show(struct device *dev, 4068 struct device_attribute *attr, char *buf) 4069 { 4070 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 4071 4072 return sprintf(buf, "%s\n", rbd_dev->spec->pool_name); 4073 } 4074 4075 static ssize_t rbd_pool_id_show(struct device *dev, 4076 struct device_attribute *attr, char *buf) 4077 { 4078 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 4079 4080 return sprintf(buf, "%llu\n", 4081 (unsigned long long) rbd_dev->spec->pool_id); 4082 } 4083 4084 static ssize_t rbd_name_show(struct device *dev, 4085 struct device_attribute *attr, char *buf) 4086 { 4087 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 4088 4089 if (rbd_dev->spec->image_name) 4090 return sprintf(buf, "%s\n", rbd_dev->spec->image_name); 4091 4092 return sprintf(buf, "(unknown)\n"); 4093 } 4094 4095 static ssize_t rbd_image_id_show(struct device *dev, 4096 struct device_attribute *attr, char *buf) 4097 { 4098 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 4099 4100 return sprintf(buf, "%s\n", rbd_dev->spec->image_id); 4101 } 4102 4103 /* 4104 * Shows the name of the currently-mapped snapshot (or 4105 * RBD_SNAP_HEAD_NAME for the base image). 4106 */ 4107 static ssize_t rbd_snap_show(struct device *dev, 4108 struct device_attribute *attr, 4109 char *buf) 4110 { 4111 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 4112 4113 return sprintf(buf, "%s\n", rbd_dev->spec->snap_name); 4114 } 4115 4116 static ssize_t rbd_snap_id_show(struct device *dev, 4117 struct device_attribute *attr, char *buf) 4118 { 4119 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 4120 4121 return sprintf(buf, "%llu\n", rbd_dev->spec->snap_id); 4122 } 4123 4124 /* 4125 * For a v2 image, shows the chain of parent images, separated by empty 4126 * lines. For v1 images or if there is no parent, shows "(no parent 4127 * image)". 4128 */ 4129 static ssize_t rbd_parent_show(struct device *dev, 4130 struct device_attribute *attr, 4131 char *buf) 4132 { 4133 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 4134 ssize_t count = 0; 4135 4136 if (!rbd_dev->parent) 4137 return sprintf(buf, "(no parent image)\n"); 4138 4139 for ( ; rbd_dev->parent; rbd_dev = rbd_dev->parent) { 4140 struct rbd_spec *spec = rbd_dev->parent_spec; 4141 4142 count += sprintf(&buf[count], "%s" 4143 "pool_id %llu\npool_name %s\n" 4144 "image_id %s\nimage_name %s\n" 4145 "snap_id %llu\nsnap_name %s\n" 4146 "overlap %llu\n", 4147 !count ? "" : "\n", /* first? */ 4148 spec->pool_id, spec->pool_name, 4149 spec->image_id, spec->image_name ?: "(unknown)", 4150 spec->snap_id, spec->snap_name, 4151 rbd_dev->parent_overlap); 4152 } 4153 4154 return count; 4155 } 4156 4157 static ssize_t rbd_image_refresh(struct device *dev, 4158 struct device_attribute *attr, 4159 const char *buf, 4160 size_t size) 4161 { 4162 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 4163 int ret; 4164 4165 ret = rbd_dev_refresh(rbd_dev); 4166 if (ret) 4167 return ret; 4168 4169 return size; 4170 } 4171 4172 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL); 4173 static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL); 4174 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL); 4175 static DEVICE_ATTR(minor, S_IRUGO, rbd_minor_show, NULL); 4176 static DEVICE_ATTR(client_addr, S_IRUGO, rbd_client_addr_show, NULL); 4177 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL); 4178 static DEVICE_ATTR(cluster_fsid, S_IRUGO, rbd_cluster_fsid_show, NULL); 4179 static DEVICE_ATTR(config_info, S_IRUSR, rbd_config_info_show, NULL); 4180 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL); 4181 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL); 4182 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL); 4183 static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL); 4184 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh); 4185 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL); 4186 static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL); 4187 static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL); 4188 4189 static struct attribute *rbd_attrs[] = { 4190 &dev_attr_size.attr, 4191 &dev_attr_features.attr, 4192 &dev_attr_major.attr, 4193 &dev_attr_minor.attr, 4194 &dev_attr_client_addr.attr, 4195 &dev_attr_client_id.attr, 4196 &dev_attr_cluster_fsid.attr, 4197 &dev_attr_config_info.attr, 4198 &dev_attr_pool.attr, 4199 &dev_attr_pool_id.attr, 4200 &dev_attr_name.attr, 4201 &dev_attr_image_id.attr, 4202 &dev_attr_current_snap.attr, 4203 &dev_attr_snap_id.attr, 4204 &dev_attr_parent.attr, 4205 &dev_attr_refresh.attr, 4206 NULL 4207 }; 4208 4209 static struct attribute_group rbd_attr_group = { 4210 .attrs = rbd_attrs, 4211 }; 4212 4213 static const struct attribute_group *rbd_attr_groups[] = { 4214 &rbd_attr_group, 4215 NULL 4216 }; 4217 4218 static void rbd_dev_release(struct device *dev); 4219 4220 static const struct device_type rbd_device_type = { 4221 .name = "rbd", 4222 .groups = rbd_attr_groups, 4223 .release = rbd_dev_release, 4224 }; 4225 4226 static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec) 4227 { 4228 kref_get(&spec->kref); 4229 4230 return spec; 4231 } 4232 4233 static void rbd_spec_free(struct kref *kref); 4234 static void rbd_spec_put(struct rbd_spec *spec) 4235 { 4236 if (spec) 4237 kref_put(&spec->kref, rbd_spec_free); 4238 } 4239 4240 static struct rbd_spec *rbd_spec_alloc(void) 4241 { 4242 struct rbd_spec *spec; 4243 4244 spec = kzalloc(sizeof (*spec), GFP_KERNEL); 4245 if (!spec) 4246 return NULL; 4247 4248 spec->pool_id = CEPH_NOPOOL; 4249 spec->snap_id = CEPH_NOSNAP; 4250 kref_init(&spec->kref); 4251 4252 return spec; 4253 } 4254 4255 static void rbd_spec_free(struct kref *kref) 4256 { 4257 struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref); 4258 4259 kfree(spec->pool_name); 4260 kfree(spec->image_id); 4261 kfree(spec->image_name); 4262 kfree(spec->snap_name); 4263 kfree(spec); 4264 } 4265 4266 static void rbd_dev_free(struct rbd_device *rbd_dev) 4267 { 4268 WARN_ON(rbd_dev->watch_state != RBD_WATCH_STATE_UNREGISTERED); 4269 WARN_ON(rbd_dev->lock_state != RBD_LOCK_STATE_UNLOCKED); 4270 4271 ceph_oid_destroy(&rbd_dev->header_oid); 4272 ceph_oloc_destroy(&rbd_dev->header_oloc); 4273 kfree(rbd_dev->config_info); 4274 4275 rbd_put_client(rbd_dev->rbd_client); 4276 rbd_spec_put(rbd_dev->spec); 4277 kfree(rbd_dev->opts); 4278 kfree(rbd_dev); 4279 } 4280 4281 static void rbd_dev_release(struct device *dev) 4282 { 4283 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 4284 bool need_put = !!rbd_dev->opts; 4285 4286 if (need_put) { 4287 destroy_workqueue(rbd_dev->task_wq); 4288 ida_simple_remove(&rbd_dev_id_ida, rbd_dev->dev_id); 4289 } 4290 4291 rbd_dev_free(rbd_dev); 4292 4293 /* 4294 * This is racy, but way better than putting module outside of 4295 * the release callback. The race window is pretty small, so 4296 * doing something similar to dm (dm-builtin.c) is overkill. 4297 */ 4298 if (need_put) 4299 module_put(THIS_MODULE); 4300 } 4301 4302 static struct rbd_device *__rbd_dev_create(struct rbd_client *rbdc, 4303 struct rbd_spec *spec) 4304 { 4305 struct rbd_device *rbd_dev; 4306 4307 rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL); 4308 if (!rbd_dev) 4309 return NULL; 4310 4311 spin_lock_init(&rbd_dev->lock); 4312 INIT_LIST_HEAD(&rbd_dev->node); 4313 init_rwsem(&rbd_dev->header_rwsem); 4314 4315 rbd_dev->header.data_pool_id = CEPH_NOPOOL; 4316 ceph_oid_init(&rbd_dev->header_oid); 4317 rbd_dev->header_oloc.pool = spec->pool_id; 4318 4319 mutex_init(&rbd_dev->watch_mutex); 4320 rbd_dev->watch_state = RBD_WATCH_STATE_UNREGISTERED; 4321 INIT_DELAYED_WORK(&rbd_dev->watch_dwork, rbd_reregister_watch); 4322 4323 init_rwsem(&rbd_dev->lock_rwsem); 4324 rbd_dev->lock_state = RBD_LOCK_STATE_UNLOCKED; 4325 INIT_WORK(&rbd_dev->acquired_lock_work, rbd_notify_acquired_lock); 4326 INIT_WORK(&rbd_dev->released_lock_work, rbd_notify_released_lock); 4327 INIT_DELAYED_WORK(&rbd_dev->lock_dwork, rbd_acquire_lock); 4328 INIT_WORK(&rbd_dev->unlock_work, rbd_release_lock_work); 4329 init_waitqueue_head(&rbd_dev->lock_waitq); 4330 4331 rbd_dev->dev.bus = &rbd_bus_type; 4332 rbd_dev->dev.type = &rbd_device_type; 4333 rbd_dev->dev.parent = &rbd_root_dev; 4334 device_initialize(&rbd_dev->dev); 4335 4336 rbd_dev->rbd_client = rbdc; 4337 rbd_dev->spec = spec; 4338 4339 return rbd_dev; 4340 } 4341 4342 /* 4343 * Create a mapping rbd_dev. 4344 */ 4345 static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc, 4346 struct rbd_spec *spec, 4347 struct rbd_options *opts) 4348 { 4349 struct rbd_device *rbd_dev; 4350 4351 rbd_dev = __rbd_dev_create(rbdc, spec); 4352 if (!rbd_dev) 4353 return NULL; 4354 4355 rbd_dev->opts = opts; 4356 4357 /* get an id and fill in device name */ 4358 rbd_dev->dev_id = ida_simple_get(&rbd_dev_id_ida, 0, 4359 minor_to_rbd_dev_id(1 << MINORBITS), 4360 GFP_KERNEL); 4361 if (rbd_dev->dev_id < 0) 4362 goto fail_rbd_dev; 4363 4364 sprintf(rbd_dev->name, RBD_DRV_NAME "%d", rbd_dev->dev_id); 4365 rbd_dev->task_wq = alloc_ordered_workqueue("%s-tasks", WQ_MEM_RECLAIM, 4366 rbd_dev->name); 4367 if (!rbd_dev->task_wq) 4368 goto fail_dev_id; 4369 4370 /* we have a ref from do_rbd_add() */ 4371 __module_get(THIS_MODULE); 4372 4373 dout("%s rbd_dev %p dev_id %d\n", __func__, rbd_dev, rbd_dev->dev_id); 4374 return rbd_dev; 4375 4376 fail_dev_id: 4377 ida_simple_remove(&rbd_dev_id_ida, rbd_dev->dev_id); 4378 fail_rbd_dev: 4379 rbd_dev_free(rbd_dev); 4380 return NULL; 4381 } 4382 4383 static void rbd_dev_destroy(struct rbd_device *rbd_dev) 4384 { 4385 if (rbd_dev) 4386 put_device(&rbd_dev->dev); 4387 } 4388 4389 /* 4390 * Get the size and object order for an image snapshot, or if 4391 * snap_id is CEPH_NOSNAP, gets this information for the base 4392 * image. 4393 */ 4394 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id, 4395 u8 *order, u64 *snap_size) 4396 { 4397 __le64 snapid = cpu_to_le64(snap_id); 4398 int ret; 4399 struct { 4400 u8 order; 4401 __le64 size; 4402 } __attribute__ ((packed)) size_buf = { 0 }; 4403 4404 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid, 4405 &rbd_dev->header_oloc, "get_size", 4406 &snapid, sizeof(snapid), 4407 &size_buf, sizeof(size_buf)); 4408 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret); 4409 if (ret < 0) 4410 return ret; 4411 if (ret < sizeof (size_buf)) 4412 return -ERANGE; 4413 4414 if (order) { 4415 *order = size_buf.order; 4416 dout(" order %u", (unsigned int)*order); 4417 } 4418 *snap_size = le64_to_cpu(size_buf.size); 4419 4420 dout(" snap_id 0x%016llx snap_size = %llu\n", 4421 (unsigned long long)snap_id, 4422 (unsigned long long)*snap_size); 4423 4424 return 0; 4425 } 4426 4427 static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev) 4428 { 4429 return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP, 4430 &rbd_dev->header.obj_order, 4431 &rbd_dev->header.image_size); 4432 } 4433 4434 static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev) 4435 { 4436 void *reply_buf; 4437 int ret; 4438 void *p; 4439 4440 reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL); 4441 if (!reply_buf) 4442 return -ENOMEM; 4443 4444 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid, 4445 &rbd_dev->header_oloc, "get_object_prefix", 4446 NULL, 0, reply_buf, RBD_OBJ_PREFIX_LEN_MAX); 4447 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret); 4448 if (ret < 0) 4449 goto out; 4450 4451 p = reply_buf; 4452 rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p, 4453 p + ret, NULL, GFP_NOIO); 4454 ret = 0; 4455 4456 if (IS_ERR(rbd_dev->header.object_prefix)) { 4457 ret = PTR_ERR(rbd_dev->header.object_prefix); 4458 rbd_dev->header.object_prefix = NULL; 4459 } else { 4460 dout(" object_prefix = %s\n", rbd_dev->header.object_prefix); 4461 } 4462 out: 4463 kfree(reply_buf); 4464 4465 return ret; 4466 } 4467 4468 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id, 4469 u64 *snap_features) 4470 { 4471 __le64 snapid = cpu_to_le64(snap_id); 4472 struct { 4473 __le64 features; 4474 __le64 incompat; 4475 } __attribute__ ((packed)) features_buf = { 0 }; 4476 u64 unsup; 4477 int ret; 4478 4479 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid, 4480 &rbd_dev->header_oloc, "get_features", 4481 &snapid, sizeof(snapid), 4482 &features_buf, sizeof(features_buf)); 4483 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret); 4484 if (ret < 0) 4485 return ret; 4486 if (ret < sizeof (features_buf)) 4487 return -ERANGE; 4488 4489 unsup = le64_to_cpu(features_buf.incompat) & ~RBD_FEATURES_SUPPORTED; 4490 if (unsup) { 4491 rbd_warn(rbd_dev, "image uses unsupported features: 0x%llx", 4492 unsup); 4493 return -ENXIO; 4494 } 4495 4496 *snap_features = le64_to_cpu(features_buf.features); 4497 4498 dout(" snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n", 4499 (unsigned long long)snap_id, 4500 (unsigned long long)*snap_features, 4501 (unsigned long long)le64_to_cpu(features_buf.incompat)); 4502 4503 return 0; 4504 } 4505 4506 static int rbd_dev_v2_features(struct rbd_device *rbd_dev) 4507 { 4508 return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP, 4509 &rbd_dev->header.features); 4510 } 4511 4512 static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev) 4513 { 4514 struct rbd_spec *parent_spec; 4515 size_t size; 4516 void *reply_buf = NULL; 4517 __le64 snapid; 4518 void *p; 4519 void *end; 4520 u64 pool_id; 4521 char *image_id; 4522 u64 snap_id; 4523 u64 overlap; 4524 int ret; 4525 4526 parent_spec = rbd_spec_alloc(); 4527 if (!parent_spec) 4528 return -ENOMEM; 4529 4530 size = sizeof (__le64) + /* pool_id */ 4531 sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX + /* image_id */ 4532 sizeof (__le64) + /* snap_id */ 4533 sizeof (__le64); /* overlap */ 4534 reply_buf = kmalloc(size, GFP_KERNEL); 4535 if (!reply_buf) { 4536 ret = -ENOMEM; 4537 goto out_err; 4538 } 4539 4540 snapid = cpu_to_le64(rbd_dev->spec->snap_id); 4541 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid, 4542 &rbd_dev->header_oloc, "get_parent", 4543 &snapid, sizeof(snapid), reply_buf, size); 4544 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret); 4545 if (ret < 0) 4546 goto out_err; 4547 4548 p = reply_buf; 4549 end = reply_buf + ret; 4550 ret = -ERANGE; 4551 ceph_decode_64_safe(&p, end, pool_id, out_err); 4552 if (pool_id == CEPH_NOPOOL) { 4553 /* 4554 * Either the parent never existed, or we have 4555 * record of it but the image got flattened so it no 4556 * longer has a parent. When the parent of a 4557 * layered image disappears we immediately set the 4558 * overlap to 0. The effect of this is that all new 4559 * requests will be treated as if the image had no 4560 * parent. 4561 */ 4562 if (rbd_dev->parent_overlap) { 4563 rbd_dev->parent_overlap = 0; 4564 rbd_dev_parent_put(rbd_dev); 4565 pr_info("%s: clone image has been flattened\n", 4566 rbd_dev->disk->disk_name); 4567 } 4568 4569 goto out; /* No parent? No problem. */ 4570 } 4571 4572 /* The ceph file layout needs to fit pool id in 32 bits */ 4573 4574 ret = -EIO; 4575 if (pool_id > (u64)U32_MAX) { 4576 rbd_warn(NULL, "parent pool id too large (%llu > %u)", 4577 (unsigned long long)pool_id, U32_MAX); 4578 goto out_err; 4579 } 4580 4581 image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL); 4582 if (IS_ERR(image_id)) { 4583 ret = PTR_ERR(image_id); 4584 goto out_err; 4585 } 4586 ceph_decode_64_safe(&p, end, snap_id, out_err); 4587 ceph_decode_64_safe(&p, end, overlap, out_err); 4588 4589 /* 4590 * The parent won't change (except when the clone is 4591 * flattened, already handled that). So we only need to 4592 * record the parent spec we have not already done so. 4593 */ 4594 if (!rbd_dev->parent_spec) { 4595 parent_spec->pool_id = pool_id; 4596 parent_spec->image_id = image_id; 4597 parent_spec->snap_id = snap_id; 4598 rbd_dev->parent_spec = parent_spec; 4599 parent_spec = NULL; /* rbd_dev now owns this */ 4600 } else { 4601 kfree(image_id); 4602 } 4603 4604 /* 4605 * We always update the parent overlap. If it's zero we issue 4606 * a warning, as we will proceed as if there was no parent. 4607 */ 4608 if (!overlap) { 4609 if (parent_spec) { 4610 /* refresh, careful to warn just once */ 4611 if (rbd_dev->parent_overlap) 4612 rbd_warn(rbd_dev, 4613 "clone now standalone (overlap became 0)"); 4614 } else { 4615 /* initial probe */ 4616 rbd_warn(rbd_dev, "clone is standalone (overlap 0)"); 4617 } 4618 } 4619 rbd_dev->parent_overlap = overlap; 4620 4621 out: 4622 ret = 0; 4623 out_err: 4624 kfree(reply_buf); 4625 rbd_spec_put(parent_spec); 4626 4627 return ret; 4628 } 4629 4630 static int rbd_dev_v2_striping_info(struct rbd_device *rbd_dev) 4631 { 4632 struct { 4633 __le64 stripe_unit; 4634 __le64 stripe_count; 4635 } __attribute__ ((packed)) striping_info_buf = { 0 }; 4636 size_t size = sizeof (striping_info_buf); 4637 void *p; 4638 int ret; 4639 4640 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid, 4641 &rbd_dev->header_oloc, "get_stripe_unit_count", 4642 NULL, 0, &striping_info_buf, size); 4643 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret); 4644 if (ret < 0) 4645 return ret; 4646 if (ret < size) 4647 return -ERANGE; 4648 4649 p = &striping_info_buf; 4650 rbd_dev->header.stripe_unit = ceph_decode_64(&p); 4651 rbd_dev->header.stripe_count = ceph_decode_64(&p); 4652 return 0; 4653 } 4654 4655 static int rbd_dev_v2_data_pool(struct rbd_device *rbd_dev) 4656 { 4657 __le64 data_pool_id; 4658 int ret; 4659 4660 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid, 4661 &rbd_dev->header_oloc, "get_data_pool", 4662 NULL, 0, &data_pool_id, sizeof(data_pool_id)); 4663 if (ret < 0) 4664 return ret; 4665 if (ret < sizeof(data_pool_id)) 4666 return -EBADMSG; 4667 4668 rbd_dev->header.data_pool_id = le64_to_cpu(data_pool_id); 4669 WARN_ON(rbd_dev->header.data_pool_id == CEPH_NOPOOL); 4670 return 0; 4671 } 4672 4673 static char *rbd_dev_image_name(struct rbd_device *rbd_dev) 4674 { 4675 CEPH_DEFINE_OID_ONSTACK(oid); 4676 size_t image_id_size; 4677 char *image_id; 4678 void *p; 4679 void *end; 4680 size_t size; 4681 void *reply_buf = NULL; 4682 size_t len = 0; 4683 char *image_name = NULL; 4684 int ret; 4685 4686 rbd_assert(!rbd_dev->spec->image_name); 4687 4688 len = strlen(rbd_dev->spec->image_id); 4689 image_id_size = sizeof (__le32) + len; 4690 image_id = kmalloc(image_id_size, GFP_KERNEL); 4691 if (!image_id) 4692 return NULL; 4693 4694 p = image_id; 4695 end = image_id + image_id_size; 4696 ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32)len); 4697 4698 size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX; 4699 reply_buf = kmalloc(size, GFP_KERNEL); 4700 if (!reply_buf) 4701 goto out; 4702 4703 ceph_oid_printf(&oid, "%s", RBD_DIRECTORY); 4704 ret = rbd_obj_method_sync(rbd_dev, &oid, &rbd_dev->header_oloc, 4705 "dir_get_name", image_id, image_id_size, 4706 reply_buf, size); 4707 if (ret < 0) 4708 goto out; 4709 p = reply_buf; 4710 end = reply_buf + ret; 4711 4712 image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL); 4713 if (IS_ERR(image_name)) 4714 image_name = NULL; 4715 else 4716 dout("%s: name is %s len is %zd\n", __func__, image_name, len); 4717 out: 4718 kfree(reply_buf); 4719 kfree(image_id); 4720 4721 return image_name; 4722 } 4723 4724 static u64 rbd_v1_snap_id_by_name(struct rbd_device *rbd_dev, const char *name) 4725 { 4726 struct ceph_snap_context *snapc = rbd_dev->header.snapc; 4727 const char *snap_name; 4728 u32 which = 0; 4729 4730 /* Skip over names until we find the one we are looking for */ 4731 4732 snap_name = rbd_dev->header.snap_names; 4733 while (which < snapc->num_snaps) { 4734 if (!strcmp(name, snap_name)) 4735 return snapc->snaps[which]; 4736 snap_name += strlen(snap_name) + 1; 4737 which++; 4738 } 4739 return CEPH_NOSNAP; 4740 } 4741 4742 static u64 rbd_v2_snap_id_by_name(struct rbd_device *rbd_dev, const char *name) 4743 { 4744 struct ceph_snap_context *snapc = rbd_dev->header.snapc; 4745 u32 which; 4746 bool found = false; 4747 u64 snap_id; 4748 4749 for (which = 0; !found && which < snapc->num_snaps; which++) { 4750 const char *snap_name; 4751 4752 snap_id = snapc->snaps[which]; 4753 snap_name = rbd_dev_v2_snap_name(rbd_dev, snap_id); 4754 if (IS_ERR(snap_name)) { 4755 /* ignore no-longer existing snapshots */ 4756 if (PTR_ERR(snap_name) == -ENOENT) 4757 continue; 4758 else 4759 break; 4760 } 4761 found = !strcmp(name, snap_name); 4762 kfree(snap_name); 4763 } 4764 return found ? snap_id : CEPH_NOSNAP; 4765 } 4766 4767 /* 4768 * Assumes name is never RBD_SNAP_HEAD_NAME; returns CEPH_NOSNAP if 4769 * no snapshot by that name is found, or if an error occurs. 4770 */ 4771 static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name) 4772 { 4773 if (rbd_dev->image_format == 1) 4774 return rbd_v1_snap_id_by_name(rbd_dev, name); 4775 4776 return rbd_v2_snap_id_by_name(rbd_dev, name); 4777 } 4778 4779 /* 4780 * An image being mapped will have everything but the snap id. 4781 */ 4782 static int rbd_spec_fill_snap_id(struct rbd_device *rbd_dev) 4783 { 4784 struct rbd_spec *spec = rbd_dev->spec; 4785 4786 rbd_assert(spec->pool_id != CEPH_NOPOOL && spec->pool_name); 4787 rbd_assert(spec->image_id && spec->image_name); 4788 rbd_assert(spec->snap_name); 4789 4790 if (strcmp(spec->snap_name, RBD_SNAP_HEAD_NAME)) { 4791 u64 snap_id; 4792 4793 snap_id = rbd_snap_id_by_name(rbd_dev, spec->snap_name); 4794 if (snap_id == CEPH_NOSNAP) 4795 return -ENOENT; 4796 4797 spec->snap_id = snap_id; 4798 } else { 4799 spec->snap_id = CEPH_NOSNAP; 4800 } 4801 4802 return 0; 4803 } 4804 4805 /* 4806 * A parent image will have all ids but none of the names. 4807 * 4808 * All names in an rbd spec are dynamically allocated. It's OK if we 4809 * can't figure out the name for an image id. 4810 */ 4811 static int rbd_spec_fill_names(struct rbd_device *rbd_dev) 4812 { 4813 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; 4814 struct rbd_spec *spec = rbd_dev->spec; 4815 const char *pool_name; 4816 const char *image_name; 4817 const char *snap_name; 4818 int ret; 4819 4820 rbd_assert(spec->pool_id != CEPH_NOPOOL); 4821 rbd_assert(spec->image_id); 4822 rbd_assert(spec->snap_id != CEPH_NOSNAP); 4823 4824 /* Get the pool name; we have to make our own copy of this */ 4825 4826 pool_name = ceph_pg_pool_name_by_id(osdc->osdmap, spec->pool_id); 4827 if (!pool_name) { 4828 rbd_warn(rbd_dev, "no pool with id %llu", spec->pool_id); 4829 return -EIO; 4830 } 4831 pool_name = kstrdup(pool_name, GFP_KERNEL); 4832 if (!pool_name) 4833 return -ENOMEM; 4834 4835 /* Fetch the image name; tolerate failure here */ 4836 4837 image_name = rbd_dev_image_name(rbd_dev); 4838 if (!image_name) 4839 rbd_warn(rbd_dev, "unable to get image name"); 4840 4841 /* Fetch the snapshot name */ 4842 4843 snap_name = rbd_snap_name(rbd_dev, spec->snap_id); 4844 if (IS_ERR(snap_name)) { 4845 ret = PTR_ERR(snap_name); 4846 goto out_err; 4847 } 4848 4849 spec->pool_name = pool_name; 4850 spec->image_name = image_name; 4851 spec->snap_name = snap_name; 4852 4853 return 0; 4854 4855 out_err: 4856 kfree(image_name); 4857 kfree(pool_name); 4858 return ret; 4859 } 4860 4861 static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev) 4862 { 4863 size_t size; 4864 int ret; 4865 void *reply_buf; 4866 void *p; 4867 void *end; 4868 u64 seq; 4869 u32 snap_count; 4870 struct ceph_snap_context *snapc; 4871 u32 i; 4872 4873 /* 4874 * We'll need room for the seq value (maximum snapshot id), 4875 * snapshot count, and array of that many snapshot ids. 4876 * For now we have a fixed upper limit on the number we're 4877 * prepared to receive. 4878 */ 4879 size = sizeof (__le64) + sizeof (__le32) + 4880 RBD_MAX_SNAP_COUNT * sizeof (__le64); 4881 reply_buf = kzalloc(size, GFP_KERNEL); 4882 if (!reply_buf) 4883 return -ENOMEM; 4884 4885 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid, 4886 &rbd_dev->header_oloc, "get_snapcontext", 4887 NULL, 0, reply_buf, size); 4888 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret); 4889 if (ret < 0) 4890 goto out; 4891 4892 p = reply_buf; 4893 end = reply_buf + ret; 4894 ret = -ERANGE; 4895 ceph_decode_64_safe(&p, end, seq, out); 4896 ceph_decode_32_safe(&p, end, snap_count, out); 4897 4898 /* 4899 * Make sure the reported number of snapshot ids wouldn't go 4900 * beyond the end of our buffer. But before checking that, 4901 * make sure the computed size of the snapshot context we 4902 * allocate is representable in a size_t. 4903 */ 4904 if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context)) 4905 / sizeof (u64)) { 4906 ret = -EINVAL; 4907 goto out; 4908 } 4909 if (!ceph_has_room(&p, end, snap_count * sizeof (__le64))) 4910 goto out; 4911 ret = 0; 4912 4913 snapc = ceph_create_snap_context(snap_count, GFP_KERNEL); 4914 if (!snapc) { 4915 ret = -ENOMEM; 4916 goto out; 4917 } 4918 snapc->seq = seq; 4919 for (i = 0; i < snap_count; i++) 4920 snapc->snaps[i] = ceph_decode_64(&p); 4921 4922 ceph_put_snap_context(rbd_dev->header.snapc); 4923 rbd_dev->header.snapc = snapc; 4924 4925 dout(" snap context seq = %llu, snap_count = %u\n", 4926 (unsigned long long)seq, (unsigned int)snap_count); 4927 out: 4928 kfree(reply_buf); 4929 4930 return ret; 4931 } 4932 4933 static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev, 4934 u64 snap_id) 4935 { 4936 size_t size; 4937 void *reply_buf; 4938 __le64 snapid; 4939 int ret; 4940 void *p; 4941 void *end; 4942 char *snap_name; 4943 4944 size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN; 4945 reply_buf = kmalloc(size, GFP_KERNEL); 4946 if (!reply_buf) 4947 return ERR_PTR(-ENOMEM); 4948 4949 snapid = cpu_to_le64(snap_id); 4950 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid, 4951 &rbd_dev->header_oloc, "get_snapshot_name", 4952 &snapid, sizeof(snapid), reply_buf, size); 4953 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret); 4954 if (ret < 0) { 4955 snap_name = ERR_PTR(ret); 4956 goto out; 4957 } 4958 4959 p = reply_buf; 4960 end = reply_buf + ret; 4961 snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL); 4962 if (IS_ERR(snap_name)) 4963 goto out; 4964 4965 dout(" snap_id 0x%016llx snap_name = %s\n", 4966 (unsigned long long)snap_id, snap_name); 4967 out: 4968 kfree(reply_buf); 4969 4970 return snap_name; 4971 } 4972 4973 static int rbd_dev_v2_header_info(struct rbd_device *rbd_dev) 4974 { 4975 bool first_time = rbd_dev->header.object_prefix == NULL; 4976 int ret; 4977 4978 ret = rbd_dev_v2_image_size(rbd_dev); 4979 if (ret) 4980 return ret; 4981 4982 if (first_time) { 4983 ret = rbd_dev_v2_header_onetime(rbd_dev); 4984 if (ret) 4985 return ret; 4986 } 4987 4988 ret = rbd_dev_v2_snap_context(rbd_dev); 4989 if (ret && first_time) { 4990 kfree(rbd_dev->header.object_prefix); 4991 rbd_dev->header.object_prefix = NULL; 4992 } 4993 4994 return ret; 4995 } 4996 4997 static int rbd_dev_header_info(struct rbd_device *rbd_dev) 4998 { 4999 rbd_assert(rbd_image_format_valid(rbd_dev->image_format)); 5000 5001 if (rbd_dev->image_format == 1) 5002 return rbd_dev_v1_header_info(rbd_dev); 5003 5004 return rbd_dev_v2_header_info(rbd_dev); 5005 } 5006 5007 /* 5008 * Skips over white space at *buf, and updates *buf to point to the 5009 * first found non-space character (if any). Returns the length of 5010 * the token (string of non-white space characters) found. Note 5011 * that *buf must be terminated with '\0'. 5012 */ 5013 static inline size_t next_token(const char **buf) 5014 { 5015 /* 5016 * These are the characters that produce nonzero for 5017 * isspace() in the "C" and "POSIX" locales. 5018 */ 5019 const char *spaces = " \f\n\r\t\v"; 5020 5021 *buf += strspn(*buf, spaces); /* Find start of token */ 5022 5023 return strcspn(*buf, spaces); /* Return token length */ 5024 } 5025 5026 /* 5027 * Finds the next token in *buf, dynamically allocates a buffer big 5028 * enough to hold a copy of it, and copies the token into the new 5029 * buffer. The copy is guaranteed to be terminated with '\0'. Note 5030 * that a duplicate buffer is created even for a zero-length token. 5031 * 5032 * Returns a pointer to the newly-allocated duplicate, or a null 5033 * pointer if memory for the duplicate was not available. If 5034 * the lenp argument is a non-null pointer, the length of the token 5035 * (not including the '\0') is returned in *lenp. 5036 * 5037 * If successful, the *buf pointer will be updated to point beyond 5038 * the end of the found token. 5039 * 5040 * Note: uses GFP_KERNEL for allocation. 5041 */ 5042 static inline char *dup_token(const char **buf, size_t *lenp) 5043 { 5044 char *dup; 5045 size_t len; 5046 5047 len = next_token(buf); 5048 dup = kmemdup(*buf, len + 1, GFP_KERNEL); 5049 if (!dup) 5050 return NULL; 5051 *(dup + len) = '\0'; 5052 *buf += len; 5053 5054 if (lenp) 5055 *lenp = len; 5056 5057 return dup; 5058 } 5059 5060 /* 5061 * Parse the options provided for an "rbd add" (i.e., rbd image 5062 * mapping) request. These arrive via a write to /sys/bus/rbd/add, 5063 * and the data written is passed here via a NUL-terminated buffer. 5064 * Returns 0 if successful or an error code otherwise. 5065 * 5066 * The information extracted from these options is recorded in 5067 * the other parameters which return dynamically-allocated 5068 * structures: 5069 * ceph_opts 5070 * The address of a pointer that will refer to a ceph options 5071 * structure. Caller must release the returned pointer using 5072 * ceph_destroy_options() when it is no longer needed. 5073 * rbd_opts 5074 * Address of an rbd options pointer. Fully initialized by 5075 * this function; caller must release with kfree(). 5076 * spec 5077 * Address of an rbd image specification pointer. Fully 5078 * initialized by this function based on parsed options. 5079 * Caller must release with rbd_spec_put(). 5080 * 5081 * The options passed take this form: 5082 * <mon_addrs> <options> <pool_name> <image_name> [<snap_id>] 5083 * where: 5084 * <mon_addrs> 5085 * A comma-separated list of one or more monitor addresses. 5086 * A monitor address is an ip address, optionally followed 5087 * by a port number (separated by a colon). 5088 * I.e.: ip1[:port1][,ip2[:port2]...] 5089 * <options> 5090 * A comma-separated list of ceph and/or rbd options. 5091 * <pool_name> 5092 * The name of the rados pool containing the rbd image. 5093 * <image_name> 5094 * The name of the image in that pool to map. 5095 * <snap_id> 5096 * An optional snapshot id. If provided, the mapping will 5097 * present data from the image at the time that snapshot was 5098 * created. The image head is used if no snapshot id is 5099 * provided. Snapshot mappings are always read-only. 5100 */ 5101 static int rbd_add_parse_args(const char *buf, 5102 struct ceph_options **ceph_opts, 5103 struct rbd_options **opts, 5104 struct rbd_spec **rbd_spec) 5105 { 5106 size_t len; 5107 char *options; 5108 const char *mon_addrs; 5109 char *snap_name; 5110 size_t mon_addrs_size; 5111 struct rbd_spec *spec = NULL; 5112 struct rbd_options *rbd_opts = NULL; 5113 struct ceph_options *copts; 5114 int ret; 5115 5116 /* The first four tokens are required */ 5117 5118 len = next_token(&buf); 5119 if (!len) { 5120 rbd_warn(NULL, "no monitor address(es) provided"); 5121 return -EINVAL; 5122 } 5123 mon_addrs = buf; 5124 mon_addrs_size = len + 1; 5125 buf += len; 5126 5127 ret = -EINVAL; 5128 options = dup_token(&buf, NULL); 5129 if (!options) 5130 return -ENOMEM; 5131 if (!*options) { 5132 rbd_warn(NULL, "no options provided"); 5133 goto out_err; 5134 } 5135 5136 spec = rbd_spec_alloc(); 5137 if (!spec) 5138 goto out_mem; 5139 5140 spec->pool_name = dup_token(&buf, NULL); 5141 if (!spec->pool_name) 5142 goto out_mem; 5143 if (!*spec->pool_name) { 5144 rbd_warn(NULL, "no pool name provided"); 5145 goto out_err; 5146 } 5147 5148 spec->image_name = dup_token(&buf, NULL); 5149 if (!spec->image_name) 5150 goto out_mem; 5151 if (!*spec->image_name) { 5152 rbd_warn(NULL, "no image name provided"); 5153 goto out_err; 5154 } 5155 5156 /* 5157 * Snapshot name is optional; default is to use "-" 5158 * (indicating the head/no snapshot). 5159 */ 5160 len = next_token(&buf); 5161 if (!len) { 5162 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */ 5163 len = sizeof (RBD_SNAP_HEAD_NAME) - 1; 5164 } else if (len > RBD_MAX_SNAP_NAME_LEN) { 5165 ret = -ENAMETOOLONG; 5166 goto out_err; 5167 } 5168 snap_name = kmemdup(buf, len + 1, GFP_KERNEL); 5169 if (!snap_name) 5170 goto out_mem; 5171 *(snap_name + len) = '\0'; 5172 spec->snap_name = snap_name; 5173 5174 /* Initialize all rbd options to the defaults */ 5175 5176 rbd_opts = kzalloc(sizeof (*rbd_opts), GFP_KERNEL); 5177 if (!rbd_opts) 5178 goto out_mem; 5179 5180 rbd_opts->read_only = RBD_READ_ONLY_DEFAULT; 5181 rbd_opts->queue_depth = RBD_QUEUE_DEPTH_DEFAULT; 5182 rbd_opts->lock_on_read = RBD_LOCK_ON_READ_DEFAULT; 5183 rbd_opts->exclusive = RBD_EXCLUSIVE_DEFAULT; 5184 5185 copts = ceph_parse_options(options, mon_addrs, 5186 mon_addrs + mon_addrs_size - 1, 5187 parse_rbd_opts_token, rbd_opts); 5188 if (IS_ERR(copts)) { 5189 ret = PTR_ERR(copts); 5190 goto out_err; 5191 } 5192 kfree(options); 5193 5194 *ceph_opts = copts; 5195 *opts = rbd_opts; 5196 *rbd_spec = spec; 5197 5198 return 0; 5199 out_mem: 5200 ret = -ENOMEM; 5201 out_err: 5202 kfree(rbd_opts); 5203 rbd_spec_put(spec); 5204 kfree(options); 5205 5206 return ret; 5207 } 5208 5209 static void rbd_dev_image_unlock(struct rbd_device *rbd_dev) 5210 { 5211 down_write(&rbd_dev->lock_rwsem); 5212 if (__rbd_is_lock_owner(rbd_dev)) 5213 rbd_unlock(rbd_dev); 5214 up_write(&rbd_dev->lock_rwsem); 5215 } 5216 5217 static int rbd_add_acquire_lock(struct rbd_device *rbd_dev) 5218 { 5219 if (!(rbd_dev->header.features & RBD_FEATURE_EXCLUSIVE_LOCK)) { 5220 rbd_warn(rbd_dev, "exclusive-lock feature is not enabled"); 5221 return -EINVAL; 5222 } 5223 5224 /* FIXME: "rbd map --exclusive" should be in interruptible */ 5225 down_read(&rbd_dev->lock_rwsem); 5226 rbd_wait_state_locked(rbd_dev); 5227 up_read(&rbd_dev->lock_rwsem); 5228 if (test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags)) { 5229 rbd_warn(rbd_dev, "failed to acquire exclusive lock"); 5230 return -EROFS; 5231 } 5232 5233 return 0; 5234 } 5235 5236 /* 5237 * An rbd format 2 image has a unique identifier, distinct from the 5238 * name given to it by the user. Internally, that identifier is 5239 * what's used to specify the names of objects related to the image. 5240 * 5241 * A special "rbd id" object is used to map an rbd image name to its 5242 * id. If that object doesn't exist, then there is no v2 rbd image 5243 * with the supplied name. 5244 * 5245 * This function will record the given rbd_dev's image_id field if 5246 * it can be determined, and in that case will return 0. If any 5247 * errors occur a negative errno will be returned and the rbd_dev's 5248 * image_id field will be unchanged (and should be NULL). 5249 */ 5250 static int rbd_dev_image_id(struct rbd_device *rbd_dev) 5251 { 5252 int ret; 5253 size_t size; 5254 CEPH_DEFINE_OID_ONSTACK(oid); 5255 void *response; 5256 char *image_id; 5257 5258 /* 5259 * When probing a parent image, the image id is already 5260 * known (and the image name likely is not). There's no 5261 * need to fetch the image id again in this case. We 5262 * do still need to set the image format though. 5263 */ 5264 if (rbd_dev->spec->image_id) { 5265 rbd_dev->image_format = *rbd_dev->spec->image_id ? 2 : 1; 5266 5267 return 0; 5268 } 5269 5270 /* 5271 * First, see if the format 2 image id file exists, and if 5272 * so, get the image's persistent id from it. 5273 */ 5274 ret = ceph_oid_aprintf(&oid, GFP_KERNEL, "%s%s", RBD_ID_PREFIX, 5275 rbd_dev->spec->image_name); 5276 if (ret) 5277 return ret; 5278 5279 dout("rbd id object name is %s\n", oid.name); 5280 5281 /* Response will be an encoded string, which includes a length */ 5282 5283 size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX; 5284 response = kzalloc(size, GFP_NOIO); 5285 if (!response) { 5286 ret = -ENOMEM; 5287 goto out; 5288 } 5289 5290 /* If it doesn't exist we'll assume it's a format 1 image */ 5291 5292 ret = rbd_obj_method_sync(rbd_dev, &oid, &rbd_dev->header_oloc, 5293 "get_id", NULL, 0, 5294 response, RBD_IMAGE_ID_LEN_MAX); 5295 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret); 5296 if (ret == -ENOENT) { 5297 image_id = kstrdup("", GFP_KERNEL); 5298 ret = image_id ? 0 : -ENOMEM; 5299 if (!ret) 5300 rbd_dev->image_format = 1; 5301 } else if (ret >= 0) { 5302 void *p = response; 5303 5304 image_id = ceph_extract_encoded_string(&p, p + ret, 5305 NULL, GFP_NOIO); 5306 ret = PTR_ERR_OR_ZERO(image_id); 5307 if (!ret) 5308 rbd_dev->image_format = 2; 5309 } 5310 5311 if (!ret) { 5312 rbd_dev->spec->image_id = image_id; 5313 dout("image_id is %s\n", image_id); 5314 } 5315 out: 5316 kfree(response); 5317 ceph_oid_destroy(&oid); 5318 return ret; 5319 } 5320 5321 /* 5322 * Undo whatever state changes are made by v1 or v2 header info 5323 * call. 5324 */ 5325 static void rbd_dev_unprobe(struct rbd_device *rbd_dev) 5326 { 5327 struct rbd_image_header *header; 5328 5329 rbd_dev_parent_put(rbd_dev); 5330 5331 /* Free dynamic fields from the header, then zero it out */ 5332 5333 header = &rbd_dev->header; 5334 ceph_put_snap_context(header->snapc); 5335 kfree(header->snap_sizes); 5336 kfree(header->snap_names); 5337 kfree(header->object_prefix); 5338 memset(header, 0, sizeof (*header)); 5339 } 5340 5341 static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev) 5342 { 5343 int ret; 5344 5345 ret = rbd_dev_v2_object_prefix(rbd_dev); 5346 if (ret) 5347 goto out_err; 5348 5349 /* 5350 * Get the and check features for the image. Currently the 5351 * features are assumed to never change. 5352 */ 5353 ret = rbd_dev_v2_features(rbd_dev); 5354 if (ret) 5355 goto out_err; 5356 5357 /* If the image supports fancy striping, get its parameters */ 5358 5359 if (rbd_dev->header.features & RBD_FEATURE_STRIPINGV2) { 5360 ret = rbd_dev_v2_striping_info(rbd_dev); 5361 if (ret < 0) 5362 goto out_err; 5363 } 5364 5365 if (rbd_dev->header.features & RBD_FEATURE_DATA_POOL) { 5366 ret = rbd_dev_v2_data_pool(rbd_dev); 5367 if (ret) 5368 goto out_err; 5369 } 5370 5371 rbd_init_layout(rbd_dev); 5372 return 0; 5373 5374 out_err: 5375 rbd_dev->header.features = 0; 5376 kfree(rbd_dev->header.object_prefix); 5377 rbd_dev->header.object_prefix = NULL; 5378 return ret; 5379 } 5380 5381 /* 5382 * @depth is rbd_dev_image_probe() -> rbd_dev_probe_parent() -> 5383 * rbd_dev_image_probe() recursion depth, which means it's also the 5384 * length of the already discovered part of the parent chain. 5385 */ 5386 static int rbd_dev_probe_parent(struct rbd_device *rbd_dev, int depth) 5387 { 5388 struct rbd_device *parent = NULL; 5389 int ret; 5390 5391 if (!rbd_dev->parent_spec) 5392 return 0; 5393 5394 if (++depth > RBD_MAX_PARENT_CHAIN_LEN) { 5395 pr_info("parent chain is too long (%d)\n", depth); 5396 ret = -EINVAL; 5397 goto out_err; 5398 } 5399 5400 parent = __rbd_dev_create(rbd_dev->rbd_client, rbd_dev->parent_spec); 5401 if (!parent) { 5402 ret = -ENOMEM; 5403 goto out_err; 5404 } 5405 5406 /* 5407 * Images related by parent/child relationships always share 5408 * rbd_client and spec/parent_spec, so bump their refcounts. 5409 */ 5410 __rbd_get_client(rbd_dev->rbd_client); 5411 rbd_spec_get(rbd_dev->parent_spec); 5412 5413 ret = rbd_dev_image_probe(parent, depth); 5414 if (ret < 0) 5415 goto out_err; 5416 5417 rbd_dev->parent = parent; 5418 atomic_set(&rbd_dev->parent_ref, 1); 5419 return 0; 5420 5421 out_err: 5422 rbd_dev_unparent(rbd_dev); 5423 rbd_dev_destroy(parent); 5424 return ret; 5425 } 5426 5427 static void rbd_dev_device_release(struct rbd_device *rbd_dev) 5428 { 5429 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags); 5430 rbd_dev_mapping_clear(rbd_dev); 5431 rbd_free_disk(rbd_dev); 5432 if (!single_major) 5433 unregister_blkdev(rbd_dev->major, rbd_dev->name); 5434 } 5435 5436 /* 5437 * rbd_dev->header_rwsem must be locked for write and will be unlocked 5438 * upon return. 5439 */ 5440 static int rbd_dev_device_setup(struct rbd_device *rbd_dev) 5441 { 5442 int ret; 5443 5444 /* Record our major and minor device numbers. */ 5445 5446 if (!single_major) { 5447 ret = register_blkdev(0, rbd_dev->name); 5448 if (ret < 0) 5449 goto err_out_unlock; 5450 5451 rbd_dev->major = ret; 5452 rbd_dev->minor = 0; 5453 } else { 5454 rbd_dev->major = rbd_major; 5455 rbd_dev->minor = rbd_dev_id_to_minor(rbd_dev->dev_id); 5456 } 5457 5458 /* Set up the blkdev mapping. */ 5459 5460 ret = rbd_init_disk(rbd_dev); 5461 if (ret) 5462 goto err_out_blkdev; 5463 5464 ret = rbd_dev_mapping_set(rbd_dev); 5465 if (ret) 5466 goto err_out_disk; 5467 5468 set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE); 5469 set_disk_ro(rbd_dev->disk, rbd_dev->opts->read_only); 5470 5471 ret = dev_set_name(&rbd_dev->dev, "%d", rbd_dev->dev_id); 5472 if (ret) 5473 goto err_out_mapping; 5474 5475 set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags); 5476 up_write(&rbd_dev->header_rwsem); 5477 return 0; 5478 5479 err_out_mapping: 5480 rbd_dev_mapping_clear(rbd_dev); 5481 err_out_disk: 5482 rbd_free_disk(rbd_dev); 5483 err_out_blkdev: 5484 if (!single_major) 5485 unregister_blkdev(rbd_dev->major, rbd_dev->name); 5486 err_out_unlock: 5487 up_write(&rbd_dev->header_rwsem); 5488 return ret; 5489 } 5490 5491 static int rbd_dev_header_name(struct rbd_device *rbd_dev) 5492 { 5493 struct rbd_spec *spec = rbd_dev->spec; 5494 int ret; 5495 5496 /* Record the header object name for this rbd image. */ 5497 5498 rbd_assert(rbd_image_format_valid(rbd_dev->image_format)); 5499 if (rbd_dev->image_format == 1) 5500 ret = ceph_oid_aprintf(&rbd_dev->header_oid, GFP_KERNEL, "%s%s", 5501 spec->image_name, RBD_SUFFIX); 5502 else 5503 ret = ceph_oid_aprintf(&rbd_dev->header_oid, GFP_KERNEL, "%s%s", 5504 RBD_HEADER_PREFIX, spec->image_id); 5505 5506 return ret; 5507 } 5508 5509 static void rbd_dev_image_release(struct rbd_device *rbd_dev) 5510 { 5511 rbd_dev_unprobe(rbd_dev); 5512 if (rbd_dev->opts) 5513 rbd_unregister_watch(rbd_dev); 5514 rbd_dev->image_format = 0; 5515 kfree(rbd_dev->spec->image_id); 5516 rbd_dev->spec->image_id = NULL; 5517 } 5518 5519 /* 5520 * Probe for the existence of the header object for the given rbd 5521 * device. If this image is the one being mapped (i.e., not a 5522 * parent), initiate a watch on its header object before using that 5523 * object to get detailed information about the rbd image. 5524 */ 5525 static int rbd_dev_image_probe(struct rbd_device *rbd_dev, int depth) 5526 { 5527 int ret; 5528 5529 /* 5530 * Get the id from the image id object. Unless there's an 5531 * error, rbd_dev->spec->image_id will be filled in with 5532 * a dynamically-allocated string, and rbd_dev->image_format 5533 * will be set to either 1 or 2. 5534 */ 5535 ret = rbd_dev_image_id(rbd_dev); 5536 if (ret) 5537 return ret; 5538 5539 ret = rbd_dev_header_name(rbd_dev); 5540 if (ret) 5541 goto err_out_format; 5542 5543 if (!depth) { 5544 ret = rbd_register_watch(rbd_dev); 5545 if (ret) { 5546 if (ret == -ENOENT) 5547 pr_info("image %s/%s does not exist\n", 5548 rbd_dev->spec->pool_name, 5549 rbd_dev->spec->image_name); 5550 goto err_out_format; 5551 } 5552 } 5553 5554 ret = rbd_dev_header_info(rbd_dev); 5555 if (ret) 5556 goto err_out_watch; 5557 5558 /* 5559 * If this image is the one being mapped, we have pool name and 5560 * id, image name and id, and snap name - need to fill snap id. 5561 * Otherwise this is a parent image, identified by pool, image 5562 * and snap ids - need to fill in names for those ids. 5563 */ 5564 if (!depth) 5565 ret = rbd_spec_fill_snap_id(rbd_dev); 5566 else 5567 ret = rbd_spec_fill_names(rbd_dev); 5568 if (ret) { 5569 if (ret == -ENOENT) 5570 pr_info("snap %s/%s@%s does not exist\n", 5571 rbd_dev->spec->pool_name, 5572 rbd_dev->spec->image_name, 5573 rbd_dev->spec->snap_name); 5574 goto err_out_probe; 5575 } 5576 5577 if (rbd_dev->header.features & RBD_FEATURE_LAYERING) { 5578 ret = rbd_dev_v2_parent_info(rbd_dev); 5579 if (ret) 5580 goto err_out_probe; 5581 5582 /* 5583 * Need to warn users if this image is the one being 5584 * mapped and has a parent. 5585 */ 5586 if (!depth && rbd_dev->parent_spec) 5587 rbd_warn(rbd_dev, 5588 "WARNING: kernel layering is EXPERIMENTAL!"); 5589 } 5590 5591 ret = rbd_dev_probe_parent(rbd_dev, depth); 5592 if (ret) 5593 goto err_out_probe; 5594 5595 dout("discovered format %u image, header name is %s\n", 5596 rbd_dev->image_format, rbd_dev->header_oid.name); 5597 return 0; 5598 5599 err_out_probe: 5600 rbd_dev_unprobe(rbd_dev); 5601 err_out_watch: 5602 if (!depth) 5603 rbd_unregister_watch(rbd_dev); 5604 err_out_format: 5605 rbd_dev->image_format = 0; 5606 kfree(rbd_dev->spec->image_id); 5607 rbd_dev->spec->image_id = NULL; 5608 return ret; 5609 } 5610 5611 static ssize_t do_rbd_add(struct bus_type *bus, 5612 const char *buf, 5613 size_t count) 5614 { 5615 struct rbd_device *rbd_dev = NULL; 5616 struct ceph_options *ceph_opts = NULL; 5617 struct rbd_options *rbd_opts = NULL; 5618 struct rbd_spec *spec = NULL; 5619 struct rbd_client *rbdc; 5620 int rc; 5621 5622 if (!try_module_get(THIS_MODULE)) 5623 return -ENODEV; 5624 5625 /* parse add command */ 5626 rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec); 5627 if (rc < 0) 5628 goto out; 5629 5630 rbdc = rbd_get_client(ceph_opts); 5631 if (IS_ERR(rbdc)) { 5632 rc = PTR_ERR(rbdc); 5633 goto err_out_args; 5634 } 5635 5636 /* pick the pool */ 5637 rc = ceph_pg_poolid_by_name(rbdc->client->osdc.osdmap, spec->pool_name); 5638 if (rc < 0) { 5639 if (rc == -ENOENT) 5640 pr_info("pool %s does not exist\n", spec->pool_name); 5641 goto err_out_client; 5642 } 5643 spec->pool_id = (u64)rc; 5644 5645 rbd_dev = rbd_dev_create(rbdc, spec, rbd_opts); 5646 if (!rbd_dev) { 5647 rc = -ENOMEM; 5648 goto err_out_client; 5649 } 5650 rbdc = NULL; /* rbd_dev now owns this */ 5651 spec = NULL; /* rbd_dev now owns this */ 5652 rbd_opts = NULL; /* rbd_dev now owns this */ 5653 5654 rbd_dev->config_info = kstrdup(buf, GFP_KERNEL); 5655 if (!rbd_dev->config_info) { 5656 rc = -ENOMEM; 5657 goto err_out_rbd_dev; 5658 } 5659 5660 down_write(&rbd_dev->header_rwsem); 5661 rc = rbd_dev_image_probe(rbd_dev, 0); 5662 if (rc < 0) { 5663 up_write(&rbd_dev->header_rwsem); 5664 goto err_out_rbd_dev; 5665 } 5666 5667 /* If we are mapping a snapshot it must be marked read-only */ 5668 if (rbd_dev->spec->snap_id != CEPH_NOSNAP) 5669 rbd_dev->opts->read_only = true; 5670 5671 rc = rbd_dev_device_setup(rbd_dev); 5672 if (rc) 5673 goto err_out_image_probe; 5674 5675 if (rbd_dev->opts->exclusive) { 5676 rc = rbd_add_acquire_lock(rbd_dev); 5677 if (rc) 5678 goto err_out_device_setup; 5679 } 5680 5681 /* Everything's ready. Announce the disk to the world. */ 5682 5683 rc = device_add(&rbd_dev->dev); 5684 if (rc) 5685 goto err_out_image_lock; 5686 5687 add_disk(rbd_dev->disk); 5688 /* see rbd_init_disk() */ 5689 blk_put_queue(rbd_dev->disk->queue); 5690 5691 spin_lock(&rbd_dev_list_lock); 5692 list_add_tail(&rbd_dev->node, &rbd_dev_list); 5693 spin_unlock(&rbd_dev_list_lock); 5694 5695 pr_info("%s: capacity %llu features 0x%llx\n", rbd_dev->disk->disk_name, 5696 (unsigned long long)get_capacity(rbd_dev->disk) << SECTOR_SHIFT, 5697 rbd_dev->header.features); 5698 rc = count; 5699 out: 5700 module_put(THIS_MODULE); 5701 return rc; 5702 5703 err_out_image_lock: 5704 rbd_dev_image_unlock(rbd_dev); 5705 err_out_device_setup: 5706 rbd_dev_device_release(rbd_dev); 5707 err_out_image_probe: 5708 rbd_dev_image_release(rbd_dev); 5709 err_out_rbd_dev: 5710 rbd_dev_destroy(rbd_dev); 5711 err_out_client: 5712 rbd_put_client(rbdc); 5713 err_out_args: 5714 rbd_spec_put(spec); 5715 kfree(rbd_opts); 5716 goto out; 5717 } 5718 5719 static ssize_t rbd_add(struct bus_type *bus, 5720 const char *buf, 5721 size_t count) 5722 { 5723 if (single_major) 5724 return -EINVAL; 5725 5726 return do_rbd_add(bus, buf, count); 5727 } 5728 5729 static ssize_t rbd_add_single_major(struct bus_type *bus, 5730 const char *buf, 5731 size_t count) 5732 { 5733 return do_rbd_add(bus, buf, count); 5734 } 5735 5736 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev) 5737 { 5738 while (rbd_dev->parent) { 5739 struct rbd_device *first = rbd_dev; 5740 struct rbd_device *second = first->parent; 5741 struct rbd_device *third; 5742 5743 /* 5744 * Follow to the parent with no grandparent and 5745 * remove it. 5746 */ 5747 while (second && (third = second->parent)) { 5748 first = second; 5749 second = third; 5750 } 5751 rbd_assert(second); 5752 rbd_dev_image_release(second); 5753 rbd_dev_destroy(second); 5754 first->parent = NULL; 5755 first->parent_overlap = 0; 5756 5757 rbd_assert(first->parent_spec); 5758 rbd_spec_put(first->parent_spec); 5759 first->parent_spec = NULL; 5760 } 5761 } 5762 5763 static ssize_t do_rbd_remove(struct bus_type *bus, 5764 const char *buf, 5765 size_t count) 5766 { 5767 struct rbd_device *rbd_dev = NULL; 5768 struct list_head *tmp; 5769 int dev_id; 5770 char opt_buf[6]; 5771 bool already = false; 5772 bool force = false; 5773 int ret; 5774 5775 dev_id = -1; 5776 opt_buf[0] = '\0'; 5777 sscanf(buf, "%d %5s", &dev_id, opt_buf); 5778 if (dev_id < 0) { 5779 pr_err("dev_id out of range\n"); 5780 return -EINVAL; 5781 } 5782 if (opt_buf[0] != '\0') { 5783 if (!strcmp(opt_buf, "force")) { 5784 force = true; 5785 } else { 5786 pr_err("bad remove option at '%s'\n", opt_buf); 5787 return -EINVAL; 5788 } 5789 } 5790 5791 ret = -ENOENT; 5792 spin_lock(&rbd_dev_list_lock); 5793 list_for_each(tmp, &rbd_dev_list) { 5794 rbd_dev = list_entry(tmp, struct rbd_device, node); 5795 if (rbd_dev->dev_id == dev_id) { 5796 ret = 0; 5797 break; 5798 } 5799 } 5800 if (!ret) { 5801 spin_lock_irq(&rbd_dev->lock); 5802 if (rbd_dev->open_count && !force) 5803 ret = -EBUSY; 5804 else 5805 already = test_and_set_bit(RBD_DEV_FLAG_REMOVING, 5806 &rbd_dev->flags); 5807 spin_unlock_irq(&rbd_dev->lock); 5808 } 5809 spin_unlock(&rbd_dev_list_lock); 5810 if (ret < 0 || already) 5811 return ret; 5812 5813 if (force) { 5814 /* 5815 * Prevent new IO from being queued and wait for existing 5816 * IO to complete/fail. 5817 */ 5818 blk_mq_freeze_queue(rbd_dev->disk->queue); 5819 blk_set_queue_dying(rbd_dev->disk->queue); 5820 } 5821 5822 del_gendisk(rbd_dev->disk); 5823 spin_lock(&rbd_dev_list_lock); 5824 list_del_init(&rbd_dev->node); 5825 spin_unlock(&rbd_dev_list_lock); 5826 device_del(&rbd_dev->dev); 5827 5828 rbd_dev_image_unlock(rbd_dev); 5829 rbd_dev_device_release(rbd_dev); 5830 rbd_dev_image_release(rbd_dev); 5831 rbd_dev_destroy(rbd_dev); 5832 return count; 5833 } 5834 5835 static ssize_t rbd_remove(struct bus_type *bus, 5836 const char *buf, 5837 size_t count) 5838 { 5839 if (single_major) 5840 return -EINVAL; 5841 5842 return do_rbd_remove(bus, buf, count); 5843 } 5844 5845 static ssize_t rbd_remove_single_major(struct bus_type *bus, 5846 const char *buf, 5847 size_t count) 5848 { 5849 return do_rbd_remove(bus, buf, count); 5850 } 5851 5852 /* 5853 * create control files in sysfs 5854 * /sys/bus/rbd/... 5855 */ 5856 static int rbd_sysfs_init(void) 5857 { 5858 int ret; 5859 5860 ret = device_register(&rbd_root_dev); 5861 if (ret < 0) 5862 return ret; 5863 5864 ret = bus_register(&rbd_bus_type); 5865 if (ret < 0) 5866 device_unregister(&rbd_root_dev); 5867 5868 return ret; 5869 } 5870 5871 static void rbd_sysfs_cleanup(void) 5872 { 5873 bus_unregister(&rbd_bus_type); 5874 device_unregister(&rbd_root_dev); 5875 } 5876 5877 static int rbd_slab_init(void) 5878 { 5879 rbd_assert(!rbd_img_request_cache); 5880 rbd_img_request_cache = KMEM_CACHE(rbd_img_request, 0); 5881 if (!rbd_img_request_cache) 5882 return -ENOMEM; 5883 5884 rbd_assert(!rbd_obj_request_cache); 5885 rbd_obj_request_cache = KMEM_CACHE(rbd_obj_request, 0); 5886 if (!rbd_obj_request_cache) 5887 goto out_err; 5888 5889 return 0; 5890 5891 out_err: 5892 kmem_cache_destroy(rbd_img_request_cache); 5893 rbd_img_request_cache = NULL; 5894 return -ENOMEM; 5895 } 5896 5897 static void rbd_slab_exit(void) 5898 { 5899 rbd_assert(rbd_obj_request_cache); 5900 kmem_cache_destroy(rbd_obj_request_cache); 5901 rbd_obj_request_cache = NULL; 5902 5903 rbd_assert(rbd_img_request_cache); 5904 kmem_cache_destroy(rbd_img_request_cache); 5905 rbd_img_request_cache = NULL; 5906 } 5907 5908 static int __init rbd_init(void) 5909 { 5910 int rc; 5911 5912 if (!libceph_compatible(NULL)) { 5913 rbd_warn(NULL, "libceph incompatibility (quitting)"); 5914 return -EINVAL; 5915 } 5916 5917 rc = rbd_slab_init(); 5918 if (rc) 5919 return rc; 5920 5921 /* 5922 * The number of active work items is limited by the number of 5923 * rbd devices * queue depth, so leave @max_active at default. 5924 */ 5925 rbd_wq = alloc_workqueue(RBD_DRV_NAME, WQ_MEM_RECLAIM, 0); 5926 if (!rbd_wq) { 5927 rc = -ENOMEM; 5928 goto err_out_slab; 5929 } 5930 5931 if (single_major) { 5932 rbd_major = register_blkdev(0, RBD_DRV_NAME); 5933 if (rbd_major < 0) { 5934 rc = rbd_major; 5935 goto err_out_wq; 5936 } 5937 } 5938 5939 rc = rbd_sysfs_init(); 5940 if (rc) 5941 goto err_out_blkdev; 5942 5943 if (single_major) 5944 pr_info("loaded (major %d)\n", rbd_major); 5945 else 5946 pr_info("loaded\n"); 5947 5948 return 0; 5949 5950 err_out_blkdev: 5951 if (single_major) 5952 unregister_blkdev(rbd_major, RBD_DRV_NAME); 5953 err_out_wq: 5954 destroy_workqueue(rbd_wq); 5955 err_out_slab: 5956 rbd_slab_exit(); 5957 return rc; 5958 } 5959 5960 static void __exit rbd_exit(void) 5961 { 5962 ida_destroy(&rbd_dev_id_ida); 5963 rbd_sysfs_cleanup(); 5964 if (single_major) 5965 unregister_blkdev(rbd_major, RBD_DRV_NAME); 5966 destroy_workqueue(rbd_wq); 5967 rbd_slab_exit(); 5968 } 5969 5970 module_init(rbd_init); 5971 module_exit(rbd_exit); 5972 5973 MODULE_AUTHOR("Alex Elder <elder@inktank.com>"); 5974 MODULE_AUTHOR("Sage Weil <sage@newdream.net>"); 5975 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>"); 5976 /* following authorship retained from original osdblk.c */ 5977 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>"); 5978 5979 MODULE_DESCRIPTION("RADOS Block Device (RBD) driver"); 5980 MODULE_LICENSE("GPL"); 5981