1 2 /* 3 rbd.c -- Export ceph rados objects as a Linux block device 4 5 6 based on drivers/block/osdblk.c: 7 8 Copyright 2009 Red Hat, Inc. 9 10 This program is free software; you can redistribute it and/or modify 11 it under the terms of the GNU General Public License as published by 12 the Free Software Foundation. 13 14 This program is distributed in the hope that it will be useful, 15 but WITHOUT ANY WARRANTY; without even the implied warranty of 16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 17 GNU General Public License for more details. 18 19 You should have received a copy of the GNU General Public License 20 along with this program; see the file COPYING. If not, write to 21 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA. 22 23 24 25 For usage instructions, please refer to: 26 27 Documentation/ABI/testing/sysfs-bus-rbd 28 29 */ 30 31 #include <linux/ceph/libceph.h> 32 #include <linux/ceph/osd_client.h> 33 #include <linux/ceph/mon_client.h> 34 #include <linux/ceph/cls_lock_client.h> 35 #include <linux/ceph/striper.h> 36 #include <linux/ceph/decode.h> 37 #include <linux/parser.h> 38 #include <linux/bsearch.h> 39 40 #include <linux/kernel.h> 41 #include <linux/device.h> 42 #include <linux/module.h> 43 #include <linux/blk-mq.h> 44 #include <linux/fs.h> 45 #include <linux/blkdev.h> 46 #include <linux/slab.h> 47 #include <linux/idr.h> 48 #include <linux/workqueue.h> 49 50 #include "rbd_types.h" 51 52 #define RBD_DEBUG /* Activate rbd_assert() calls */ 53 54 /* 55 * Increment the given counter and return its updated value. 56 * If the counter is already 0 it will not be incremented. 57 * If the counter is already at its maximum value returns 58 * -EINVAL without updating it. 59 */ 60 static int atomic_inc_return_safe(atomic_t *v) 61 { 62 unsigned int counter; 63 64 counter = (unsigned int)atomic_fetch_add_unless(v, 1, 0); 65 if (counter <= (unsigned int)INT_MAX) 66 return (int)counter; 67 68 atomic_dec(v); 69 70 return -EINVAL; 71 } 72 73 /* Decrement the counter. Return the resulting value, or -EINVAL */ 74 static int atomic_dec_return_safe(atomic_t *v) 75 { 76 int counter; 77 78 counter = atomic_dec_return(v); 79 if (counter >= 0) 80 return counter; 81 82 atomic_inc(v); 83 84 return -EINVAL; 85 } 86 87 #define RBD_DRV_NAME "rbd" 88 89 #define RBD_MINORS_PER_MAJOR 256 90 #define RBD_SINGLE_MAJOR_PART_SHIFT 4 91 92 #define RBD_MAX_PARENT_CHAIN_LEN 16 93 94 #define RBD_SNAP_DEV_NAME_PREFIX "snap_" 95 #define RBD_MAX_SNAP_NAME_LEN \ 96 (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1)) 97 98 #define RBD_MAX_SNAP_COUNT 510 /* allows max snapc to fit in 4KB */ 99 100 #define RBD_SNAP_HEAD_NAME "-" 101 102 #define BAD_SNAP_INDEX U32_MAX /* invalid index into snap array */ 103 104 /* This allows a single page to hold an image name sent by OSD */ 105 #define RBD_IMAGE_NAME_LEN_MAX (PAGE_SIZE - sizeof (__le32) - 1) 106 #define RBD_IMAGE_ID_LEN_MAX 64 107 108 #define RBD_OBJ_PREFIX_LEN_MAX 64 109 110 #define RBD_NOTIFY_TIMEOUT 5 /* seconds */ 111 #define RBD_RETRY_DELAY msecs_to_jiffies(1000) 112 113 /* Feature bits */ 114 115 #define RBD_FEATURE_LAYERING (1ULL<<0) 116 #define RBD_FEATURE_STRIPINGV2 (1ULL<<1) 117 #define RBD_FEATURE_EXCLUSIVE_LOCK (1ULL<<2) 118 #define RBD_FEATURE_DEEP_FLATTEN (1ULL<<5) 119 #define RBD_FEATURE_DATA_POOL (1ULL<<7) 120 #define RBD_FEATURE_OPERATIONS (1ULL<<8) 121 122 #define RBD_FEATURES_ALL (RBD_FEATURE_LAYERING | \ 123 RBD_FEATURE_STRIPINGV2 | \ 124 RBD_FEATURE_EXCLUSIVE_LOCK | \ 125 RBD_FEATURE_DEEP_FLATTEN | \ 126 RBD_FEATURE_DATA_POOL | \ 127 RBD_FEATURE_OPERATIONS) 128 129 /* Features supported by this (client software) implementation. */ 130 131 #define RBD_FEATURES_SUPPORTED (RBD_FEATURES_ALL) 132 133 /* 134 * An RBD device name will be "rbd#", where the "rbd" comes from 135 * RBD_DRV_NAME above, and # is a unique integer identifier. 136 */ 137 #define DEV_NAME_LEN 32 138 139 /* 140 * block device image metadata (in-memory version) 141 */ 142 struct rbd_image_header { 143 /* These six fields never change for a given rbd image */ 144 char *object_prefix; 145 __u8 obj_order; 146 u64 stripe_unit; 147 u64 stripe_count; 148 s64 data_pool_id; 149 u64 features; /* Might be changeable someday? */ 150 151 /* The remaining fields need to be updated occasionally */ 152 u64 image_size; 153 struct ceph_snap_context *snapc; 154 char *snap_names; /* format 1 only */ 155 u64 *snap_sizes; /* format 1 only */ 156 }; 157 158 /* 159 * An rbd image specification. 160 * 161 * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely 162 * identify an image. Each rbd_dev structure includes a pointer to 163 * an rbd_spec structure that encapsulates this identity. 164 * 165 * Each of the id's in an rbd_spec has an associated name. For a 166 * user-mapped image, the names are supplied and the id's associated 167 * with them are looked up. For a layered image, a parent image is 168 * defined by the tuple, and the names are looked up. 169 * 170 * An rbd_dev structure contains a parent_spec pointer which is 171 * non-null if the image it represents is a child in a layered 172 * image. This pointer will refer to the rbd_spec structure used 173 * by the parent rbd_dev for its own identity (i.e., the structure 174 * is shared between the parent and child). 175 * 176 * Since these structures are populated once, during the discovery 177 * phase of image construction, they are effectively immutable so 178 * we make no effort to synchronize access to them. 179 * 180 * Note that code herein does not assume the image name is known (it 181 * could be a null pointer). 182 */ 183 struct rbd_spec { 184 u64 pool_id; 185 const char *pool_name; 186 const char *pool_ns; /* NULL if default, never "" */ 187 188 const char *image_id; 189 const char *image_name; 190 191 u64 snap_id; 192 const char *snap_name; 193 194 struct kref kref; 195 }; 196 197 /* 198 * an instance of the client. multiple devices may share an rbd client. 199 */ 200 struct rbd_client { 201 struct ceph_client *client; 202 struct kref kref; 203 struct list_head node; 204 }; 205 206 struct rbd_img_request; 207 208 enum obj_request_type { 209 OBJ_REQUEST_NODATA = 1, 210 OBJ_REQUEST_BIO, /* pointer into provided bio (list) */ 211 OBJ_REQUEST_BVECS, /* pointer into provided bio_vec array */ 212 OBJ_REQUEST_OWN_BVECS, /* private bio_vec array, doesn't own pages */ 213 }; 214 215 enum obj_operation_type { 216 OBJ_OP_READ = 1, 217 OBJ_OP_WRITE, 218 OBJ_OP_DISCARD, 219 OBJ_OP_ZEROOUT, 220 }; 221 222 /* 223 * Writes go through the following state machine to deal with 224 * layering: 225 * 226 * . . . . . RBD_OBJ_WRITE_GUARD. . . . . . . . . . . . . . 227 * . | . 228 * . v . 229 * . RBD_OBJ_WRITE_READ_FROM_PARENT. . . . 230 * . | . . 231 * . v v (deep-copyup . 232 * (image . RBD_OBJ_WRITE_COPYUP_EMPTY_SNAPC . not needed) . 233 * flattened) v | . . 234 * . v . . 235 * . . . .RBD_OBJ_WRITE_COPYUP_OPS. . . . . (copyup . 236 * | not needed) v 237 * v . 238 * done . . . . . . . . . . . . . . . . . . 239 * ^ 240 * | 241 * RBD_OBJ_WRITE_FLAT 242 * 243 * Writes start in RBD_OBJ_WRITE_GUARD or _FLAT, depending on whether 244 * assert_exists guard is needed or not (in some cases it's not needed 245 * even if there is a parent). 246 */ 247 enum rbd_obj_write_state { 248 RBD_OBJ_WRITE_FLAT = 1, 249 RBD_OBJ_WRITE_GUARD, 250 RBD_OBJ_WRITE_READ_FROM_PARENT, 251 RBD_OBJ_WRITE_COPYUP_EMPTY_SNAPC, 252 RBD_OBJ_WRITE_COPYUP_OPS, 253 }; 254 255 struct rbd_obj_request { 256 struct ceph_object_extent ex; 257 union { 258 bool tried_parent; /* for reads */ 259 enum rbd_obj_write_state write_state; /* for writes */ 260 }; 261 262 struct rbd_img_request *img_request; 263 struct ceph_file_extent *img_extents; 264 u32 num_img_extents; 265 266 union { 267 struct ceph_bio_iter bio_pos; 268 struct { 269 struct ceph_bvec_iter bvec_pos; 270 u32 bvec_count; 271 u32 bvec_idx; 272 }; 273 }; 274 struct bio_vec *copyup_bvecs; 275 u32 copyup_bvec_count; 276 277 struct ceph_osd_request *osd_req; 278 279 u64 xferred; /* bytes transferred */ 280 int result; 281 282 struct kref kref; 283 }; 284 285 enum img_req_flags { 286 IMG_REQ_CHILD, /* initiator: block = 0, child image = 1 */ 287 IMG_REQ_LAYERED, /* ENOENT handling: normal = 0, layered = 1 */ 288 }; 289 290 struct rbd_img_request { 291 struct rbd_device *rbd_dev; 292 enum obj_operation_type op_type; 293 enum obj_request_type data_type; 294 unsigned long flags; 295 union { 296 u64 snap_id; /* for reads */ 297 struct ceph_snap_context *snapc; /* for writes */ 298 }; 299 union { 300 struct request *rq; /* block request */ 301 struct rbd_obj_request *obj_request; /* obj req initiator */ 302 }; 303 spinlock_t completion_lock; 304 u64 xferred;/* aggregate bytes transferred */ 305 int result; /* first nonzero obj_request result */ 306 307 struct list_head object_extents; /* obj_req.ex structs */ 308 u32 pending_count; 309 310 struct kref kref; 311 }; 312 313 #define for_each_obj_request(ireq, oreq) \ 314 list_for_each_entry(oreq, &(ireq)->object_extents, ex.oe_item) 315 #define for_each_obj_request_safe(ireq, oreq, n) \ 316 list_for_each_entry_safe(oreq, n, &(ireq)->object_extents, ex.oe_item) 317 318 enum rbd_watch_state { 319 RBD_WATCH_STATE_UNREGISTERED, 320 RBD_WATCH_STATE_REGISTERED, 321 RBD_WATCH_STATE_ERROR, 322 }; 323 324 enum rbd_lock_state { 325 RBD_LOCK_STATE_UNLOCKED, 326 RBD_LOCK_STATE_LOCKED, 327 RBD_LOCK_STATE_RELEASING, 328 }; 329 330 /* WatchNotify::ClientId */ 331 struct rbd_client_id { 332 u64 gid; 333 u64 handle; 334 }; 335 336 struct rbd_mapping { 337 u64 size; 338 u64 features; 339 }; 340 341 /* 342 * a single device 343 */ 344 struct rbd_device { 345 int dev_id; /* blkdev unique id */ 346 347 int major; /* blkdev assigned major */ 348 int minor; 349 struct gendisk *disk; /* blkdev's gendisk and rq */ 350 351 u32 image_format; /* Either 1 or 2 */ 352 struct rbd_client *rbd_client; 353 354 char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */ 355 356 spinlock_t lock; /* queue, flags, open_count */ 357 358 struct rbd_image_header header; 359 unsigned long flags; /* possibly lock protected */ 360 struct rbd_spec *spec; 361 struct rbd_options *opts; 362 char *config_info; /* add{,_single_major} string */ 363 364 struct ceph_object_id header_oid; 365 struct ceph_object_locator header_oloc; 366 367 struct ceph_file_layout layout; /* used for all rbd requests */ 368 369 struct mutex watch_mutex; 370 enum rbd_watch_state watch_state; 371 struct ceph_osd_linger_request *watch_handle; 372 u64 watch_cookie; 373 struct delayed_work watch_dwork; 374 375 struct rw_semaphore lock_rwsem; 376 enum rbd_lock_state lock_state; 377 char lock_cookie[32]; 378 struct rbd_client_id owner_cid; 379 struct work_struct acquired_lock_work; 380 struct work_struct released_lock_work; 381 struct delayed_work lock_dwork; 382 struct work_struct unlock_work; 383 wait_queue_head_t lock_waitq; 384 385 struct workqueue_struct *task_wq; 386 387 struct rbd_spec *parent_spec; 388 u64 parent_overlap; 389 atomic_t parent_ref; 390 struct rbd_device *parent; 391 392 /* Block layer tags. */ 393 struct blk_mq_tag_set tag_set; 394 395 /* protects updating the header */ 396 struct rw_semaphore header_rwsem; 397 398 struct rbd_mapping mapping; 399 400 struct list_head node; 401 402 /* sysfs related */ 403 struct device dev; 404 unsigned long open_count; /* protected by lock */ 405 }; 406 407 /* 408 * Flag bits for rbd_dev->flags: 409 * - REMOVING (which is coupled with rbd_dev->open_count) is protected 410 * by rbd_dev->lock 411 * - BLACKLISTED is protected by rbd_dev->lock_rwsem 412 */ 413 enum rbd_dev_flags { 414 RBD_DEV_FLAG_EXISTS, /* mapped snapshot has not been deleted */ 415 RBD_DEV_FLAG_REMOVING, /* this mapping is being removed */ 416 RBD_DEV_FLAG_BLACKLISTED, /* our ceph_client is blacklisted */ 417 }; 418 419 static DEFINE_MUTEX(client_mutex); /* Serialize client creation */ 420 421 static LIST_HEAD(rbd_dev_list); /* devices */ 422 static DEFINE_SPINLOCK(rbd_dev_list_lock); 423 424 static LIST_HEAD(rbd_client_list); /* clients */ 425 static DEFINE_SPINLOCK(rbd_client_list_lock); 426 427 /* Slab caches for frequently-allocated structures */ 428 429 static struct kmem_cache *rbd_img_request_cache; 430 static struct kmem_cache *rbd_obj_request_cache; 431 432 static int rbd_major; 433 static DEFINE_IDA(rbd_dev_id_ida); 434 435 static struct workqueue_struct *rbd_wq; 436 437 static struct ceph_snap_context rbd_empty_snapc = { 438 .nref = REFCOUNT_INIT(1), 439 }; 440 441 /* 442 * single-major requires >= 0.75 version of userspace rbd utility. 443 */ 444 static bool single_major = true; 445 module_param(single_major, bool, 0444); 446 MODULE_PARM_DESC(single_major, "Use a single major number for all rbd devices (default: true)"); 447 448 static ssize_t add_store(struct bus_type *bus, const char *buf, size_t count); 449 static ssize_t remove_store(struct bus_type *bus, const char *buf, 450 size_t count); 451 static ssize_t add_single_major_store(struct bus_type *bus, const char *buf, 452 size_t count); 453 static ssize_t remove_single_major_store(struct bus_type *bus, const char *buf, 454 size_t count); 455 static int rbd_dev_image_probe(struct rbd_device *rbd_dev, int depth); 456 457 static int rbd_dev_id_to_minor(int dev_id) 458 { 459 return dev_id << RBD_SINGLE_MAJOR_PART_SHIFT; 460 } 461 462 static int minor_to_rbd_dev_id(int minor) 463 { 464 return minor >> RBD_SINGLE_MAJOR_PART_SHIFT; 465 } 466 467 static bool __rbd_is_lock_owner(struct rbd_device *rbd_dev) 468 { 469 return rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED || 470 rbd_dev->lock_state == RBD_LOCK_STATE_RELEASING; 471 } 472 473 static bool rbd_is_lock_owner(struct rbd_device *rbd_dev) 474 { 475 bool is_lock_owner; 476 477 down_read(&rbd_dev->lock_rwsem); 478 is_lock_owner = __rbd_is_lock_owner(rbd_dev); 479 up_read(&rbd_dev->lock_rwsem); 480 return is_lock_owner; 481 } 482 483 static ssize_t supported_features_show(struct bus_type *bus, char *buf) 484 { 485 return sprintf(buf, "0x%llx\n", RBD_FEATURES_SUPPORTED); 486 } 487 488 static BUS_ATTR_WO(add); 489 static BUS_ATTR_WO(remove); 490 static BUS_ATTR_WO(add_single_major); 491 static BUS_ATTR_WO(remove_single_major); 492 static BUS_ATTR_RO(supported_features); 493 494 static struct attribute *rbd_bus_attrs[] = { 495 &bus_attr_add.attr, 496 &bus_attr_remove.attr, 497 &bus_attr_add_single_major.attr, 498 &bus_attr_remove_single_major.attr, 499 &bus_attr_supported_features.attr, 500 NULL, 501 }; 502 503 static umode_t rbd_bus_is_visible(struct kobject *kobj, 504 struct attribute *attr, int index) 505 { 506 if (!single_major && 507 (attr == &bus_attr_add_single_major.attr || 508 attr == &bus_attr_remove_single_major.attr)) 509 return 0; 510 511 return attr->mode; 512 } 513 514 static const struct attribute_group rbd_bus_group = { 515 .attrs = rbd_bus_attrs, 516 .is_visible = rbd_bus_is_visible, 517 }; 518 __ATTRIBUTE_GROUPS(rbd_bus); 519 520 static struct bus_type rbd_bus_type = { 521 .name = "rbd", 522 .bus_groups = rbd_bus_groups, 523 }; 524 525 static void rbd_root_dev_release(struct device *dev) 526 { 527 } 528 529 static struct device rbd_root_dev = { 530 .init_name = "rbd", 531 .release = rbd_root_dev_release, 532 }; 533 534 static __printf(2, 3) 535 void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...) 536 { 537 struct va_format vaf; 538 va_list args; 539 540 va_start(args, fmt); 541 vaf.fmt = fmt; 542 vaf.va = &args; 543 544 if (!rbd_dev) 545 printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf); 546 else if (rbd_dev->disk) 547 printk(KERN_WARNING "%s: %s: %pV\n", 548 RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf); 549 else if (rbd_dev->spec && rbd_dev->spec->image_name) 550 printk(KERN_WARNING "%s: image %s: %pV\n", 551 RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf); 552 else if (rbd_dev->spec && rbd_dev->spec->image_id) 553 printk(KERN_WARNING "%s: id %s: %pV\n", 554 RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf); 555 else /* punt */ 556 printk(KERN_WARNING "%s: rbd_dev %p: %pV\n", 557 RBD_DRV_NAME, rbd_dev, &vaf); 558 va_end(args); 559 } 560 561 #ifdef RBD_DEBUG 562 #define rbd_assert(expr) \ 563 if (unlikely(!(expr))) { \ 564 printk(KERN_ERR "\nAssertion failure in %s() " \ 565 "at line %d:\n\n" \ 566 "\trbd_assert(%s);\n\n", \ 567 __func__, __LINE__, #expr); \ 568 BUG(); \ 569 } 570 #else /* !RBD_DEBUG */ 571 # define rbd_assert(expr) ((void) 0) 572 #endif /* !RBD_DEBUG */ 573 574 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev); 575 576 static int rbd_dev_refresh(struct rbd_device *rbd_dev); 577 static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev); 578 static int rbd_dev_header_info(struct rbd_device *rbd_dev); 579 static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev); 580 static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev, 581 u64 snap_id); 582 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id, 583 u8 *order, u64 *snap_size); 584 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id, 585 u64 *snap_features); 586 587 static int rbd_open(struct block_device *bdev, fmode_t mode) 588 { 589 struct rbd_device *rbd_dev = bdev->bd_disk->private_data; 590 bool removing = false; 591 592 spin_lock_irq(&rbd_dev->lock); 593 if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags)) 594 removing = true; 595 else 596 rbd_dev->open_count++; 597 spin_unlock_irq(&rbd_dev->lock); 598 if (removing) 599 return -ENOENT; 600 601 (void) get_device(&rbd_dev->dev); 602 603 return 0; 604 } 605 606 static void rbd_release(struct gendisk *disk, fmode_t mode) 607 { 608 struct rbd_device *rbd_dev = disk->private_data; 609 unsigned long open_count_before; 610 611 spin_lock_irq(&rbd_dev->lock); 612 open_count_before = rbd_dev->open_count--; 613 spin_unlock_irq(&rbd_dev->lock); 614 rbd_assert(open_count_before > 0); 615 616 put_device(&rbd_dev->dev); 617 } 618 619 static int rbd_ioctl_set_ro(struct rbd_device *rbd_dev, unsigned long arg) 620 { 621 int ro; 622 623 if (get_user(ro, (int __user *)arg)) 624 return -EFAULT; 625 626 /* Snapshots can't be marked read-write */ 627 if (rbd_dev->spec->snap_id != CEPH_NOSNAP && !ro) 628 return -EROFS; 629 630 /* Let blkdev_roset() handle it */ 631 return -ENOTTY; 632 } 633 634 static int rbd_ioctl(struct block_device *bdev, fmode_t mode, 635 unsigned int cmd, unsigned long arg) 636 { 637 struct rbd_device *rbd_dev = bdev->bd_disk->private_data; 638 int ret; 639 640 switch (cmd) { 641 case BLKROSET: 642 ret = rbd_ioctl_set_ro(rbd_dev, arg); 643 break; 644 default: 645 ret = -ENOTTY; 646 } 647 648 return ret; 649 } 650 651 #ifdef CONFIG_COMPAT 652 static int rbd_compat_ioctl(struct block_device *bdev, fmode_t mode, 653 unsigned int cmd, unsigned long arg) 654 { 655 return rbd_ioctl(bdev, mode, cmd, arg); 656 } 657 #endif /* CONFIG_COMPAT */ 658 659 static const struct block_device_operations rbd_bd_ops = { 660 .owner = THIS_MODULE, 661 .open = rbd_open, 662 .release = rbd_release, 663 .ioctl = rbd_ioctl, 664 #ifdef CONFIG_COMPAT 665 .compat_ioctl = rbd_compat_ioctl, 666 #endif 667 }; 668 669 /* 670 * Initialize an rbd client instance. Success or not, this function 671 * consumes ceph_opts. Caller holds client_mutex. 672 */ 673 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts) 674 { 675 struct rbd_client *rbdc; 676 int ret = -ENOMEM; 677 678 dout("%s:\n", __func__); 679 rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL); 680 if (!rbdc) 681 goto out_opt; 682 683 kref_init(&rbdc->kref); 684 INIT_LIST_HEAD(&rbdc->node); 685 686 rbdc->client = ceph_create_client(ceph_opts, rbdc); 687 if (IS_ERR(rbdc->client)) 688 goto out_rbdc; 689 ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */ 690 691 ret = ceph_open_session(rbdc->client); 692 if (ret < 0) 693 goto out_client; 694 695 spin_lock(&rbd_client_list_lock); 696 list_add_tail(&rbdc->node, &rbd_client_list); 697 spin_unlock(&rbd_client_list_lock); 698 699 dout("%s: rbdc %p\n", __func__, rbdc); 700 701 return rbdc; 702 out_client: 703 ceph_destroy_client(rbdc->client); 704 out_rbdc: 705 kfree(rbdc); 706 out_opt: 707 if (ceph_opts) 708 ceph_destroy_options(ceph_opts); 709 dout("%s: error %d\n", __func__, ret); 710 711 return ERR_PTR(ret); 712 } 713 714 static struct rbd_client *__rbd_get_client(struct rbd_client *rbdc) 715 { 716 kref_get(&rbdc->kref); 717 718 return rbdc; 719 } 720 721 /* 722 * Find a ceph client with specific addr and configuration. If 723 * found, bump its reference count. 724 */ 725 static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts) 726 { 727 struct rbd_client *client_node; 728 bool found = false; 729 730 if (ceph_opts->flags & CEPH_OPT_NOSHARE) 731 return NULL; 732 733 spin_lock(&rbd_client_list_lock); 734 list_for_each_entry(client_node, &rbd_client_list, node) { 735 if (!ceph_compare_options(ceph_opts, client_node->client)) { 736 __rbd_get_client(client_node); 737 738 found = true; 739 break; 740 } 741 } 742 spin_unlock(&rbd_client_list_lock); 743 744 return found ? client_node : NULL; 745 } 746 747 /* 748 * (Per device) rbd map options 749 */ 750 enum { 751 Opt_queue_depth, 752 Opt_alloc_size, 753 Opt_lock_timeout, 754 Opt_last_int, 755 /* int args above */ 756 Opt_pool_ns, 757 Opt_last_string, 758 /* string args above */ 759 Opt_read_only, 760 Opt_read_write, 761 Opt_lock_on_read, 762 Opt_exclusive, 763 Opt_notrim, 764 Opt_err 765 }; 766 767 static match_table_t rbd_opts_tokens = { 768 {Opt_queue_depth, "queue_depth=%d"}, 769 {Opt_alloc_size, "alloc_size=%d"}, 770 {Opt_lock_timeout, "lock_timeout=%d"}, 771 /* int args above */ 772 {Opt_pool_ns, "_pool_ns=%s"}, 773 /* string args above */ 774 {Opt_read_only, "read_only"}, 775 {Opt_read_only, "ro"}, /* Alternate spelling */ 776 {Opt_read_write, "read_write"}, 777 {Opt_read_write, "rw"}, /* Alternate spelling */ 778 {Opt_lock_on_read, "lock_on_read"}, 779 {Opt_exclusive, "exclusive"}, 780 {Opt_notrim, "notrim"}, 781 {Opt_err, NULL} 782 }; 783 784 struct rbd_options { 785 int queue_depth; 786 int alloc_size; 787 unsigned long lock_timeout; 788 bool read_only; 789 bool lock_on_read; 790 bool exclusive; 791 bool trim; 792 }; 793 794 #define RBD_QUEUE_DEPTH_DEFAULT BLKDEV_MAX_RQ 795 #define RBD_ALLOC_SIZE_DEFAULT (64 * 1024) 796 #define RBD_LOCK_TIMEOUT_DEFAULT 0 /* no timeout */ 797 #define RBD_READ_ONLY_DEFAULT false 798 #define RBD_LOCK_ON_READ_DEFAULT false 799 #define RBD_EXCLUSIVE_DEFAULT false 800 #define RBD_TRIM_DEFAULT true 801 802 struct parse_rbd_opts_ctx { 803 struct rbd_spec *spec; 804 struct rbd_options *opts; 805 }; 806 807 static int parse_rbd_opts_token(char *c, void *private) 808 { 809 struct parse_rbd_opts_ctx *pctx = private; 810 substring_t argstr[MAX_OPT_ARGS]; 811 int token, intval, ret; 812 813 token = match_token(c, rbd_opts_tokens, argstr); 814 if (token < Opt_last_int) { 815 ret = match_int(&argstr[0], &intval); 816 if (ret < 0) { 817 pr_err("bad option arg (not int) at '%s'\n", c); 818 return ret; 819 } 820 dout("got int token %d val %d\n", token, intval); 821 } else if (token > Opt_last_int && token < Opt_last_string) { 822 dout("got string token %d val %s\n", token, argstr[0].from); 823 } else { 824 dout("got token %d\n", token); 825 } 826 827 switch (token) { 828 case Opt_queue_depth: 829 if (intval < 1) { 830 pr_err("queue_depth out of range\n"); 831 return -EINVAL; 832 } 833 pctx->opts->queue_depth = intval; 834 break; 835 case Opt_alloc_size: 836 if (intval < SECTOR_SIZE) { 837 pr_err("alloc_size out of range\n"); 838 return -EINVAL; 839 } 840 if (!is_power_of_2(intval)) { 841 pr_err("alloc_size must be a power of 2\n"); 842 return -EINVAL; 843 } 844 pctx->opts->alloc_size = intval; 845 break; 846 case Opt_lock_timeout: 847 /* 0 is "wait forever" (i.e. infinite timeout) */ 848 if (intval < 0 || intval > INT_MAX / 1000) { 849 pr_err("lock_timeout out of range\n"); 850 return -EINVAL; 851 } 852 pctx->opts->lock_timeout = msecs_to_jiffies(intval * 1000); 853 break; 854 case Opt_pool_ns: 855 kfree(pctx->spec->pool_ns); 856 pctx->spec->pool_ns = match_strdup(argstr); 857 if (!pctx->spec->pool_ns) 858 return -ENOMEM; 859 break; 860 case Opt_read_only: 861 pctx->opts->read_only = true; 862 break; 863 case Opt_read_write: 864 pctx->opts->read_only = false; 865 break; 866 case Opt_lock_on_read: 867 pctx->opts->lock_on_read = true; 868 break; 869 case Opt_exclusive: 870 pctx->opts->exclusive = true; 871 break; 872 case Opt_notrim: 873 pctx->opts->trim = false; 874 break; 875 default: 876 /* libceph prints "bad option" msg */ 877 return -EINVAL; 878 } 879 880 return 0; 881 } 882 883 static char* obj_op_name(enum obj_operation_type op_type) 884 { 885 switch (op_type) { 886 case OBJ_OP_READ: 887 return "read"; 888 case OBJ_OP_WRITE: 889 return "write"; 890 case OBJ_OP_DISCARD: 891 return "discard"; 892 case OBJ_OP_ZEROOUT: 893 return "zeroout"; 894 default: 895 return "???"; 896 } 897 } 898 899 /* 900 * Destroy ceph client 901 * 902 * Caller must hold rbd_client_list_lock. 903 */ 904 static void rbd_client_release(struct kref *kref) 905 { 906 struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref); 907 908 dout("%s: rbdc %p\n", __func__, rbdc); 909 spin_lock(&rbd_client_list_lock); 910 list_del(&rbdc->node); 911 spin_unlock(&rbd_client_list_lock); 912 913 ceph_destroy_client(rbdc->client); 914 kfree(rbdc); 915 } 916 917 /* 918 * Drop reference to ceph client node. If it's not referenced anymore, release 919 * it. 920 */ 921 static void rbd_put_client(struct rbd_client *rbdc) 922 { 923 if (rbdc) 924 kref_put(&rbdc->kref, rbd_client_release); 925 } 926 927 /* 928 * Get a ceph client with specific addr and configuration, if one does 929 * not exist create it. Either way, ceph_opts is consumed by this 930 * function. 931 */ 932 static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts) 933 { 934 struct rbd_client *rbdc; 935 int ret; 936 937 mutex_lock_nested(&client_mutex, SINGLE_DEPTH_NESTING); 938 rbdc = rbd_client_find(ceph_opts); 939 if (rbdc) { 940 ceph_destroy_options(ceph_opts); 941 942 /* 943 * Using an existing client. Make sure ->pg_pools is up to 944 * date before we look up the pool id in do_rbd_add(). 945 */ 946 ret = ceph_wait_for_latest_osdmap(rbdc->client, 947 rbdc->client->options->mount_timeout); 948 if (ret) { 949 rbd_warn(NULL, "failed to get latest osdmap: %d", ret); 950 rbd_put_client(rbdc); 951 rbdc = ERR_PTR(ret); 952 } 953 } else { 954 rbdc = rbd_client_create(ceph_opts); 955 } 956 mutex_unlock(&client_mutex); 957 958 return rbdc; 959 } 960 961 static bool rbd_image_format_valid(u32 image_format) 962 { 963 return image_format == 1 || image_format == 2; 964 } 965 966 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk) 967 { 968 size_t size; 969 u32 snap_count; 970 971 /* The header has to start with the magic rbd header text */ 972 if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT))) 973 return false; 974 975 /* The bio layer requires at least sector-sized I/O */ 976 977 if (ondisk->options.order < SECTOR_SHIFT) 978 return false; 979 980 /* If we use u64 in a few spots we may be able to loosen this */ 981 982 if (ondisk->options.order > 8 * sizeof (int) - 1) 983 return false; 984 985 /* 986 * The size of a snapshot header has to fit in a size_t, and 987 * that limits the number of snapshots. 988 */ 989 snap_count = le32_to_cpu(ondisk->snap_count); 990 size = SIZE_MAX - sizeof (struct ceph_snap_context); 991 if (snap_count > size / sizeof (__le64)) 992 return false; 993 994 /* 995 * Not only that, but the size of the entire the snapshot 996 * header must also be representable in a size_t. 997 */ 998 size -= snap_count * sizeof (__le64); 999 if ((u64) size < le64_to_cpu(ondisk->snap_names_len)) 1000 return false; 1001 1002 return true; 1003 } 1004 1005 /* 1006 * returns the size of an object in the image 1007 */ 1008 static u32 rbd_obj_bytes(struct rbd_image_header *header) 1009 { 1010 return 1U << header->obj_order; 1011 } 1012 1013 static void rbd_init_layout(struct rbd_device *rbd_dev) 1014 { 1015 if (rbd_dev->header.stripe_unit == 0 || 1016 rbd_dev->header.stripe_count == 0) { 1017 rbd_dev->header.stripe_unit = rbd_obj_bytes(&rbd_dev->header); 1018 rbd_dev->header.stripe_count = 1; 1019 } 1020 1021 rbd_dev->layout.stripe_unit = rbd_dev->header.stripe_unit; 1022 rbd_dev->layout.stripe_count = rbd_dev->header.stripe_count; 1023 rbd_dev->layout.object_size = rbd_obj_bytes(&rbd_dev->header); 1024 rbd_dev->layout.pool_id = rbd_dev->header.data_pool_id == CEPH_NOPOOL ? 1025 rbd_dev->spec->pool_id : rbd_dev->header.data_pool_id; 1026 RCU_INIT_POINTER(rbd_dev->layout.pool_ns, NULL); 1027 } 1028 1029 /* 1030 * Fill an rbd image header with information from the given format 1 1031 * on-disk header. 1032 */ 1033 static int rbd_header_from_disk(struct rbd_device *rbd_dev, 1034 struct rbd_image_header_ondisk *ondisk) 1035 { 1036 struct rbd_image_header *header = &rbd_dev->header; 1037 bool first_time = header->object_prefix == NULL; 1038 struct ceph_snap_context *snapc; 1039 char *object_prefix = NULL; 1040 char *snap_names = NULL; 1041 u64 *snap_sizes = NULL; 1042 u32 snap_count; 1043 int ret = -ENOMEM; 1044 u32 i; 1045 1046 /* Allocate this now to avoid having to handle failure below */ 1047 1048 if (first_time) { 1049 object_prefix = kstrndup(ondisk->object_prefix, 1050 sizeof(ondisk->object_prefix), 1051 GFP_KERNEL); 1052 if (!object_prefix) 1053 return -ENOMEM; 1054 } 1055 1056 /* Allocate the snapshot context and fill it in */ 1057 1058 snap_count = le32_to_cpu(ondisk->snap_count); 1059 snapc = ceph_create_snap_context(snap_count, GFP_KERNEL); 1060 if (!snapc) 1061 goto out_err; 1062 snapc->seq = le64_to_cpu(ondisk->snap_seq); 1063 if (snap_count) { 1064 struct rbd_image_snap_ondisk *snaps; 1065 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len); 1066 1067 /* We'll keep a copy of the snapshot names... */ 1068 1069 if (snap_names_len > (u64)SIZE_MAX) 1070 goto out_2big; 1071 snap_names = kmalloc(snap_names_len, GFP_KERNEL); 1072 if (!snap_names) 1073 goto out_err; 1074 1075 /* ...as well as the array of their sizes. */ 1076 snap_sizes = kmalloc_array(snap_count, 1077 sizeof(*header->snap_sizes), 1078 GFP_KERNEL); 1079 if (!snap_sizes) 1080 goto out_err; 1081 1082 /* 1083 * Copy the names, and fill in each snapshot's id 1084 * and size. 1085 * 1086 * Note that rbd_dev_v1_header_info() guarantees the 1087 * ondisk buffer we're working with has 1088 * snap_names_len bytes beyond the end of the 1089 * snapshot id array, this memcpy() is safe. 1090 */ 1091 memcpy(snap_names, &ondisk->snaps[snap_count], snap_names_len); 1092 snaps = ondisk->snaps; 1093 for (i = 0; i < snap_count; i++) { 1094 snapc->snaps[i] = le64_to_cpu(snaps[i].id); 1095 snap_sizes[i] = le64_to_cpu(snaps[i].image_size); 1096 } 1097 } 1098 1099 /* We won't fail any more, fill in the header */ 1100 1101 if (first_time) { 1102 header->object_prefix = object_prefix; 1103 header->obj_order = ondisk->options.order; 1104 rbd_init_layout(rbd_dev); 1105 } else { 1106 ceph_put_snap_context(header->snapc); 1107 kfree(header->snap_names); 1108 kfree(header->snap_sizes); 1109 } 1110 1111 /* The remaining fields always get updated (when we refresh) */ 1112 1113 header->image_size = le64_to_cpu(ondisk->image_size); 1114 header->snapc = snapc; 1115 header->snap_names = snap_names; 1116 header->snap_sizes = snap_sizes; 1117 1118 return 0; 1119 out_2big: 1120 ret = -EIO; 1121 out_err: 1122 kfree(snap_sizes); 1123 kfree(snap_names); 1124 ceph_put_snap_context(snapc); 1125 kfree(object_prefix); 1126 1127 return ret; 1128 } 1129 1130 static const char *_rbd_dev_v1_snap_name(struct rbd_device *rbd_dev, u32 which) 1131 { 1132 const char *snap_name; 1133 1134 rbd_assert(which < rbd_dev->header.snapc->num_snaps); 1135 1136 /* Skip over names until we find the one we are looking for */ 1137 1138 snap_name = rbd_dev->header.snap_names; 1139 while (which--) 1140 snap_name += strlen(snap_name) + 1; 1141 1142 return kstrdup(snap_name, GFP_KERNEL); 1143 } 1144 1145 /* 1146 * Snapshot id comparison function for use with qsort()/bsearch(). 1147 * Note that result is for snapshots in *descending* order. 1148 */ 1149 static int snapid_compare_reverse(const void *s1, const void *s2) 1150 { 1151 u64 snap_id1 = *(u64 *)s1; 1152 u64 snap_id2 = *(u64 *)s2; 1153 1154 if (snap_id1 < snap_id2) 1155 return 1; 1156 return snap_id1 == snap_id2 ? 0 : -1; 1157 } 1158 1159 /* 1160 * Search a snapshot context to see if the given snapshot id is 1161 * present. 1162 * 1163 * Returns the position of the snapshot id in the array if it's found, 1164 * or BAD_SNAP_INDEX otherwise. 1165 * 1166 * Note: The snapshot array is in kept sorted (by the osd) in 1167 * reverse order, highest snapshot id first. 1168 */ 1169 static u32 rbd_dev_snap_index(struct rbd_device *rbd_dev, u64 snap_id) 1170 { 1171 struct ceph_snap_context *snapc = rbd_dev->header.snapc; 1172 u64 *found; 1173 1174 found = bsearch(&snap_id, &snapc->snaps, snapc->num_snaps, 1175 sizeof (snap_id), snapid_compare_reverse); 1176 1177 return found ? (u32)(found - &snapc->snaps[0]) : BAD_SNAP_INDEX; 1178 } 1179 1180 static const char *rbd_dev_v1_snap_name(struct rbd_device *rbd_dev, 1181 u64 snap_id) 1182 { 1183 u32 which; 1184 const char *snap_name; 1185 1186 which = rbd_dev_snap_index(rbd_dev, snap_id); 1187 if (which == BAD_SNAP_INDEX) 1188 return ERR_PTR(-ENOENT); 1189 1190 snap_name = _rbd_dev_v1_snap_name(rbd_dev, which); 1191 return snap_name ? snap_name : ERR_PTR(-ENOMEM); 1192 } 1193 1194 static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id) 1195 { 1196 if (snap_id == CEPH_NOSNAP) 1197 return RBD_SNAP_HEAD_NAME; 1198 1199 rbd_assert(rbd_image_format_valid(rbd_dev->image_format)); 1200 if (rbd_dev->image_format == 1) 1201 return rbd_dev_v1_snap_name(rbd_dev, snap_id); 1202 1203 return rbd_dev_v2_snap_name(rbd_dev, snap_id); 1204 } 1205 1206 static int rbd_snap_size(struct rbd_device *rbd_dev, u64 snap_id, 1207 u64 *snap_size) 1208 { 1209 rbd_assert(rbd_image_format_valid(rbd_dev->image_format)); 1210 if (snap_id == CEPH_NOSNAP) { 1211 *snap_size = rbd_dev->header.image_size; 1212 } else if (rbd_dev->image_format == 1) { 1213 u32 which; 1214 1215 which = rbd_dev_snap_index(rbd_dev, snap_id); 1216 if (which == BAD_SNAP_INDEX) 1217 return -ENOENT; 1218 1219 *snap_size = rbd_dev->header.snap_sizes[which]; 1220 } else { 1221 u64 size = 0; 1222 int ret; 1223 1224 ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, NULL, &size); 1225 if (ret) 1226 return ret; 1227 1228 *snap_size = size; 1229 } 1230 return 0; 1231 } 1232 1233 static int rbd_snap_features(struct rbd_device *rbd_dev, u64 snap_id, 1234 u64 *snap_features) 1235 { 1236 rbd_assert(rbd_image_format_valid(rbd_dev->image_format)); 1237 if (snap_id == CEPH_NOSNAP) { 1238 *snap_features = rbd_dev->header.features; 1239 } else if (rbd_dev->image_format == 1) { 1240 *snap_features = 0; /* No features for format 1 */ 1241 } else { 1242 u64 features = 0; 1243 int ret; 1244 1245 ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, &features); 1246 if (ret) 1247 return ret; 1248 1249 *snap_features = features; 1250 } 1251 return 0; 1252 } 1253 1254 static int rbd_dev_mapping_set(struct rbd_device *rbd_dev) 1255 { 1256 u64 snap_id = rbd_dev->spec->snap_id; 1257 u64 size = 0; 1258 u64 features = 0; 1259 int ret; 1260 1261 ret = rbd_snap_size(rbd_dev, snap_id, &size); 1262 if (ret) 1263 return ret; 1264 ret = rbd_snap_features(rbd_dev, snap_id, &features); 1265 if (ret) 1266 return ret; 1267 1268 rbd_dev->mapping.size = size; 1269 rbd_dev->mapping.features = features; 1270 1271 return 0; 1272 } 1273 1274 static void rbd_dev_mapping_clear(struct rbd_device *rbd_dev) 1275 { 1276 rbd_dev->mapping.size = 0; 1277 rbd_dev->mapping.features = 0; 1278 } 1279 1280 static void zero_bvec(struct bio_vec *bv) 1281 { 1282 void *buf; 1283 unsigned long flags; 1284 1285 buf = bvec_kmap_irq(bv, &flags); 1286 memset(buf, 0, bv->bv_len); 1287 flush_dcache_page(bv->bv_page); 1288 bvec_kunmap_irq(buf, &flags); 1289 } 1290 1291 static void zero_bios(struct ceph_bio_iter *bio_pos, u32 off, u32 bytes) 1292 { 1293 struct ceph_bio_iter it = *bio_pos; 1294 1295 ceph_bio_iter_advance(&it, off); 1296 ceph_bio_iter_advance_step(&it, bytes, ({ 1297 zero_bvec(&bv); 1298 })); 1299 } 1300 1301 static void zero_bvecs(struct ceph_bvec_iter *bvec_pos, u32 off, u32 bytes) 1302 { 1303 struct ceph_bvec_iter it = *bvec_pos; 1304 1305 ceph_bvec_iter_advance(&it, off); 1306 ceph_bvec_iter_advance_step(&it, bytes, ({ 1307 zero_bvec(&bv); 1308 })); 1309 } 1310 1311 /* 1312 * Zero a range in @obj_req data buffer defined by a bio (list) or 1313 * (private) bio_vec array. 1314 * 1315 * @off is relative to the start of the data buffer. 1316 */ 1317 static void rbd_obj_zero_range(struct rbd_obj_request *obj_req, u32 off, 1318 u32 bytes) 1319 { 1320 switch (obj_req->img_request->data_type) { 1321 case OBJ_REQUEST_BIO: 1322 zero_bios(&obj_req->bio_pos, off, bytes); 1323 break; 1324 case OBJ_REQUEST_BVECS: 1325 case OBJ_REQUEST_OWN_BVECS: 1326 zero_bvecs(&obj_req->bvec_pos, off, bytes); 1327 break; 1328 default: 1329 rbd_assert(0); 1330 } 1331 } 1332 1333 static void rbd_obj_request_destroy(struct kref *kref); 1334 static void rbd_obj_request_put(struct rbd_obj_request *obj_request) 1335 { 1336 rbd_assert(obj_request != NULL); 1337 dout("%s: obj %p (was %d)\n", __func__, obj_request, 1338 kref_read(&obj_request->kref)); 1339 kref_put(&obj_request->kref, rbd_obj_request_destroy); 1340 } 1341 1342 static void rbd_img_request_get(struct rbd_img_request *img_request) 1343 { 1344 dout("%s: img %p (was %d)\n", __func__, img_request, 1345 kref_read(&img_request->kref)); 1346 kref_get(&img_request->kref); 1347 } 1348 1349 static void rbd_img_request_destroy(struct kref *kref); 1350 static void rbd_img_request_put(struct rbd_img_request *img_request) 1351 { 1352 rbd_assert(img_request != NULL); 1353 dout("%s: img %p (was %d)\n", __func__, img_request, 1354 kref_read(&img_request->kref)); 1355 kref_put(&img_request->kref, rbd_img_request_destroy); 1356 } 1357 1358 static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request, 1359 struct rbd_obj_request *obj_request) 1360 { 1361 rbd_assert(obj_request->img_request == NULL); 1362 1363 /* Image request now owns object's original reference */ 1364 obj_request->img_request = img_request; 1365 img_request->pending_count++; 1366 dout("%s: img %p obj %p\n", __func__, img_request, obj_request); 1367 } 1368 1369 static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request, 1370 struct rbd_obj_request *obj_request) 1371 { 1372 dout("%s: img %p obj %p\n", __func__, img_request, obj_request); 1373 list_del(&obj_request->ex.oe_item); 1374 rbd_assert(obj_request->img_request == img_request); 1375 rbd_obj_request_put(obj_request); 1376 } 1377 1378 static void rbd_obj_request_submit(struct rbd_obj_request *obj_request) 1379 { 1380 struct ceph_osd_request *osd_req = obj_request->osd_req; 1381 1382 dout("%s %p object_no %016llx %llu~%llu osd_req %p\n", __func__, 1383 obj_request, obj_request->ex.oe_objno, obj_request->ex.oe_off, 1384 obj_request->ex.oe_len, osd_req); 1385 ceph_osdc_start_request(osd_req->r_osdc, osd_req, false); 1386 } 1387 1388 /* 1389 * The default/initial value for all image request flags is 0. Each 1390 * is conditionally set to 1 at image request initialization time 1391 * and currently never change thereafter. 1392 */ 1393 static void img_request_layered_set(struct rbd_img_request *img_request) 1394 { 1395 set_bit(IMG_REQ_LAYERED, &img_request->flags); 1396 smp_mb(); 1397 } 1398 1399 static void img_request_layered_clear(struct rbd_img_request *img_request) 1400 { 1401 clear_bit(IMG_REQ_LAYERED, &img_request->flags); 1402 smp_mb(); 1403 } 1404 1405 static bool img_request_layered_test(struct rbd_img_request *img_request) 1406 { 1407 smp_mb(); 1408 return test_bit(IMG_REQ_LAYERED, &img_request->flags) != 0; 1409 } 1410 1411 static bool rbd_obj_is_entire(struct rbd_obj_request *obj_req) 1412 { 1413 struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev; 1414 1415 return !obj_req->ex.oe_off && 1416 obj_req->ex.oe_len == rbd_dev->layout.object_size; 1417 } 1418 1419 static bool rbd_obj_is_tail(struct rbd_obj_request *obj_req) 1420 { 1421 struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev; 1422 1423 return obj_req->ex.oe_off + obj_req->ex.oe_len == 1424 rbd_dev->layout.object_size; 1425 } 1426 1427 /* 1428 * Must be called after rbd_obj_calc_img_extents(). 1429 */ 1430 static bool rbd_obj_copyup_enabled(struct rbd_obj_request *obj_req) 1431 { 1432 if (!obj_req->num_img_extents || 1433 (rbd_obj_is_entire(obj_req) && 1434 !obj_req->img_request->snapc->num_snaps)) 1435 return false; 1436 1437 return true; 1438 } 1439 1440 static u64 rbd_obj_img_extents_bytes(struct rbd_obj_request *obj_req) 1441 { 1442 return ceph_file_extents_bytes(obj_req->img_extents, 1443 obj_req->num_img_extents); 1444 } 1445 1446 static bool rbd_img_is_write(struct rbd_img_request *img_req) 1447 { 1448 switch (img_req->op_type) { 1449 case OBJ_OP_READ: 1450 return false; 1451 case OBJ_OP_WRITE: 1452 case OBJ_OP_DISCARD: 1453 case OBJ_OP_ZEROOUT: 1454 return true; 1455 default: 1456 BUG(); 1457 } 1458 } 1459 1460 static void rbd_obj_handle_request(struct rbd_obj_request *obj_req); 1461 1462 static void rbd_osd_req_callback(struct ceph_osd_request *osd_req) 1463 { 1464 struct rbd_obj_request *obj_req = osd_req->r_priv; 1465 1466 dout("%s osd_req %p result %d for obj_req %p\n", __func__, osd_req, 1467 osd_req->r_result, obj_req); 1468 rbd_assert(osd_req == obj_req->osd_req); 1469 1470 obj_req->result = osd_req->r_result < 0 ? osd_req->r_result : 0; 1471 if (!obj_req->result && !rbd_img_is_write(obj_req->img_request)) 1472 obj_req->xferred = osd_req->r_result; 1473 else 1474 /* 1475 * Writes aren't allowed to return a data payload. In some 1476 * guarded write cases (e.g. stat + zero on an empty object) 1477 * a stat response makes it through, but we don't care. 1478 */ 1479 obj_req->xferred = 0; 1480 1481 rbd_obj_handle_request(obj_req); 1482 } 1483 1484 static void rbd_osd_req_format_read(struct rbd_obj_request *obj_request) 1485 { 1486 struct ceph_osd_request *osd_req = obj_request->osd_req; 1487 1488 osd_req->r_flags = CEPH_OSD_FLAG_READ; 1489 osd_req->r_snapid = obj_request->img_request->snap_id; 1490 } 1491 1492 static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request) 1493 { 1494 struct ceph_osd_request *osd_req = obj_request->osd_req; 1495 1496 osd_req->r_flags = CEPH_OSD_FLAG_WRITE; 1497 ktime_get_real_ts64(&osd_req->r_mtime); 1498 osd_req->r_data_offset = obj_request->ex.oe_off; 1499 } 1500 1501 static struct ceph_osd_request * 1502 __rbd_osd_req_create(struct rbd_obj_request *obj_req, 1503 struct ceph_snap_context *snapc, unsigned int num_ops) 1504 { 1505 struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev; 1506 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; 1507 struct ceph_osd_request *req; 1508 const char *name_format = rbd_dev->image_format == 1 ? 1509 RBD_V1_DATA_FORMAT : RBD_V2_DATA_FORMAT; 1510 1511 req = ceph_osdc_alloc_request(osdc, snapc, num_ops, false, GFP_NOIO); 1512 if (!req) 1513 return NULL; 1514 1515 req->r_callback = rbd_osd_req_callback; 1516 req->r_priv = obj_req; 1517 1518 /* 1519 * Data objects may be stored in a separate pool, but always in 1520 * the same namespace in that pool as the header in its pool. 1521 */ 1522 ceph_oloc_copy(&req->r_base_oloc, &rbd_dev->header_oloc); 1523 req->r_base_oloc.pool = rbd_dev->layout.pool_id; 1524 1525 if (ceph_oid_aprintf(&req->r_base_oid, GFP_NOIO, name_format, 1526 rbd_dev->header.object_prefix, obj_req->ex.oe_objno)) 1527 goto err_req; 1528 1529 return req; 1530 1531 err_req: 1532 ceph_osdc_put_request(req); 1533 return NULL; 1534 } 1535 1536 static struct ceph_osd_request * 1537 rbd_osd_req_create(struct rbd_obj_request *obj_req, unsigned int num_ops) 1538 { 1539 return __rbd_osd_req_create(obj_req, obj_req->img_request->snapc, 1540 num_ops); 1541 } 1542 1543 static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req) 1544 { 1545 ceph_osdc_put_request(osd_req); 1546 } 1547 1548 static struct rbd_obj_request *rbd_obj_request_create(void) 1549 { 1550 struct rbd_obj_request *obj_request; 1551 1552 obj_request = kmem_cache_zalloc(rbd_obj_request_cache, GFP_NOIO); 1553 if (!obj_request) 1554 return NULL; 1555 1556 ceph_object_extent_init(&obj_request->ex); 1557 kref_init(&obj_request->kref); 1558 1559 dout("%s %p\n", __func__, obj_request); 1560 return obj_request; 1561 } 1562 1563 static void rbd_obj_request_destroy(struct kref *kref) 1564 { 1565 struct rbd_obj_request *obj_request; 1566 u32 i; 1567 1568 obj_request = container_of(kref, struct rbd_obj_request, kref); 1569 1570 dout("%s: obj %p\n", __func__, obj_request); 1571 1572 if (obj_request->osd_req) 1573 rbd_osd_req_destroy(obj_request->osd_req); 1574 1575 switch (obj_request->img_request->data_type) { 1576 case OBJ_REQUEST_NODATA: 1577 case OBJ_REQUEST_BIO: 1578 case OBJ_REQUEST_BVECS: 1579 break; /* Nothing to do */ 1580 case OBJ_REQUEST_OWN_BVECS: 1581 kfree(obj_request->bvec_pos.bvecs); 1582 break; 1583 default: 1584 rbd_assert(0); 1585 } 1586 1587 kfree(obj_request->img_extents); 1588 if (obj_request->copyup_bvecs) { 1589 for (i = 0; i < obj_request->copyup_bvec_count; i++) { 1590 if (obj_request->copyup_bvecs[i].bv_page) 1591 __free_page(obj_request->copyup_bvecs[i].bv_page); 1592 } 1593 kfree(obj_request->copyup_bvecs); 1594 } 1595 1596 kmem_cache_free(rbd_obj_request_cache, obj_request); 1597 } 1598 1599 /* It's OK to call this for a device with no parent */ 1600 1601 static void rbd_spec_put(struct rbd_spec *spec); 1602 static void rbd_dev_unparent(struct rbd_device *rbd_dev) 1603 { 1604 rbd_dev_remove_parent(rbd_dev); 1605 rbd_spec_put(rbd_dev->parent_spec); 1606 rbd_dev->parent_spec = NULL; 1607 rbd_dev->parent_overlap = 0; 1608 } 1609 1610 /* 1611 * Parent image reference counting is used to determine when an 1612 * image's parent fields can be safely torn down--after there are no 1613 * more in-flight requests to the parent image. When the last 1614 * reference is dropped, cleaning them up is safe. 1615 */ 1616 static void rbd_dev_parent_put(struct rbd_device *rbd_dev) 1617 { 1618 int counter; 1619 1620 if (!rbd_dev->parent_spec) 1621 return; 1622 1623 counter = atomic_dec_return_safe(&rbd_dev->parent_ref); 1624 if (counter > 0) 1625 return; 1626 1627 /* Last reference; clean up parent data structures */ 1628 1629 if (!counter) 1630 rbd_dev_unparent(rbd_dev); 1631 else 1632 rbd_warn(rbd_dev, "parent reference underflow"); 1633 } 1634 1635 /* 1636 * If an image has a non-zero parent overlap, get a reference to its 1637 * parent. 1638 * 1639 * Returns true if the rbd device has a parent with a non-zero 1640 * overlap and a reference for it was successfully taken, or 1641 * false otherwise. 1642 */ 1643 static bool rbd_dev_parent_get(struct rbd_device *rbd_dev) 1644 { 1645 int counter = 0; 1646 1647 if (!rbd_dev->parent_spec) 1648 return false; 1649 1650 down_read(&rbd_dev->header_rwsem); 1651 if (rbd_dev->parent_overlap) 1652 counter = atomic_inc_return_safe(&rbd_dev->parent_ref); 1653 up_read(&rbd_dev->header_rwsem); 1654 1655 if (counter < 0) 1656 rbd_warn(rbd_dev, "parent reference overflow"); 1657 1658 return counter > 0; 1659 } 1660 1661 /* 1662 * Caller is responsible for filling in the list of object requests 1663 * that comprises the image request, and the Linux request pointer 1664 * (if there is one). 1665 */ 1666 static struct rbd_img_request *rbd_img_request_create( 1667 struct rbd_device *rbd_dev, 1668 enum obj_operation_type op_type, 1669 struct ceph_snap_context *snapc) 1670 { 1671 struct rbd_img_request *img_request; 1672 1673 img_request = kmem_cache_zalloc(rbd_img_request_cache, GFP_NOIO); 1674 if (!img_request) 1675 return NULL; 1676 1677 img_request->rbd_dev = rbd_dev; 1678 img_request->op_type = op_type; 1679 if (!rbd_img_is_write(img_request)) 1680 img_request->snap_id = rbd_dev->spec->snap_id; 1681 else 1682 img_request->snapc = snapc; 1683 1684 if (rbd_dev_parent_get(rbd_dev)) 1685 img_request_layered_set(img_request); 1686 1687 spin_lock_init(&img_request->completion_lock); 1688 INIT_LIST_HEAD(&img_request->object_extents); 1689 kref_init(&img_request->kref); 1690 1691 dout("%s: rbd_dev %p %s -> img %p\n", __func__, rbd_dev, 1692 obj_op_name(op_type), img_request); 1693 return img_request; 1694 } 1695 1696 static void rbd_img_request_destroy(struct kref *kref) 1697 { 1698 struct rbd_img_request *img_request; 1699 struct rbd_obj_request *obj_request; 1700 struct rbd_obj_request *next_obj_request; 1701 1702 img_request = container_of(kref, struct rbd_img_request, kref); 1703 1704 dout("%s: img %p\n", __func__, img_request); 1705 1706 for_each_obj_request_safe(img_request, obj_request, next_obj_request) 1707 rbd_img_obj_request_del(img_request, obj_request); 1708 1709 if (img_request_layered_test(img_request)) { 1710 img_request_layered_clear(img_request); 1711 rbd_dev_parent_put(img_request->rbd_dev); 1712 } 1713 1714 if (rbd_img_is_write(img_request)) 1715 ceph_put_snap_context(img_request->snapc); 1716 1717 kmem_cache_free(rbd_img_request_cache, img_request); 1718 } 1719 1720 static void prune_extents(struct ceph_file_extent *img_extents, 1721 u32 *num_img_extents, u64 overlap) 1722 { 1723 u32 cnt = *num_img_extents; 1724 1725 /* drop extents completely beyond the overlap */ 1726 while (cnt && img_extents[cnt - 1].fe_off >= overlap) 1727 cnt--; 1728 1729 if (cnt) { 1730 struct ceph_file_extent *ex = &img_extents[cnt - 1]; 1731 1732 /* trim final overlapping extent */ 1733 if (ex->fe_off + ex->fe_len > overlap) 1734 ex->fe_len = overlap - ex->fe_off; 1735 } 1736 1737 *num_img_extents = cnt; 1738 } 1739 1740 /* 1741 * Determine the byte range(s) covered by either just the object extent 1742 * or the entire object in the parent image. 1743 */ 1744 static int rbd_obj_calc_img_extents(struct rbd_obj_request *obj_req, 1745 bool entire) 1746 { 1747 struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev; 1748 int ret; 1749 1750 if (!rbd_dev->parent_overlap) 1751 return 0; 1752 1753 ret = ceph_extent_to_file(&rbd_dev->layout, obj_req->ex.oe_objno, 1754 entire ? 0 : obj_req->ex.oe_off, 1755 entire ? rbd_dev->layout.object_size : 1756 obj_req->ex.oe_len, 1757 &obj_req->img_extents, 1758 &obj_req->num_img_extents); 1759 if (ret) 1760 return ret; 1761 1762 prune_extents(obj_req->img_extents, &obj_req->num_img_extents, 1763 rbd_dev->parent_overlap); 1764 return 0; 1765 } 1766 1767 static void rbd_osd_req_setup_data(struct rbd_obj_request *obj_req, u32 which) 1768 { 1769 switch (obj_req->img_request->data_type) { 1770 case OBJ_REQUEST_BIO: 1771 osd_req_op_extent_osd_data_bio(obj_req->osd_req, which, 1772 &obj_req->bio_pos, 1773 obj_req->ex.oe_len); 1774 break; 1775 case OBJ_REQUEST_BVECS: 1776 case OBJ_REQUEST_OWN_BVECS: 1777 rbd_assert(obj_req->bvec_pos.iter.bi_size == 1778 obj_req->ex.oe_len); 1779 rbd_assert(obj_req->bvec_idx == obj_req->bvec_count); 1780 osd_req_op_extent_osd_data_bvec_pos(obj_req->osd_req, which, 1781 &obj_req->bvec_pos); 1782 break; 1783 default: 1784 rbd_assert(0); 1785 } 1786 } 1787 1788 static int rbd_obj_setup_read(struct rbd_obj_request *obj_req) 1789 { 1790 obj_req->osd_req = __rbd_osd_req_create(obj_req, NULL, 1); 1791 if (!obj_req->osd_req) 1792 return -ENOMEM; 1793 1794 osd_req_op_extent_init(obj_req->osd_req, 0, CEPH_OSD_OP_READ, 1795 obj_req->ex.oe_off, obj_req->ex.oe_len, 0, 0); 1796 rbd_osd_req_setup_data(obj_req, 0); 1797 1798 rbd_osd_req_format_read(obj_req); 1799 return 0; 1800 } 1801 1802 static int __rbd_obj_setup_stat(struct rbd_obj_request *obj_req, 1803 unsigned int which) 1804 { 1805 struct page **pages; 1806 1807 /* 1808 * The response data for a STAT call consists of: 1809 * le64 length; 1810 * struct { 1811 * le32 tv_sec; 1812 * le32 tv_nsec; 1813 * } mtime; 1814 */ 1815 pages = ceph_alloc_page_vector(1, GFP_NOIO); 1816 if (IS_ERR(pages)) 1817 return PTR_ERR(pages); 1818 1819 osd_req_op_init(obj_req->osd_req, which, CEPH_OSD_OP_STAT, 0); 1820 osd_req_op_raw_data_in_pages(obj_req->osd_req, which, pages, 1821 8 + sizeof(struct ceph_timespec), 1822 0, false, true); 1823 return 0; 1824 } 1825 1826 static int count_write_ops(struct rbd_obj_request *obj_req) 1827 { 1828 return 2; /* setallochint + write/writefull */ 1829 } 1830 1831 static void __rbd_obj_setup_write(struct rbd_obj_request *obj_req, 1832 unsigned int which) 1833 { 1834 struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev; 1835 u16 opcode; 1836 1837 osd_req_op_alloc_hint_init(obj_req->osd_req, which++, 1838 rbd_dev->layout.object_size, 1839 rbd_dev->layout.object_size); 1840 1841 if (rbd_obj_is_entire(obj_req)) 1842 opcode = CEPH_OSD_OP_WRITEFULL; 1843 else 1844 opcode = CEPH_OSD_OP_WRITE; 1845 1846 osd_req_op_extent_init(obj_req->osd_req, which, opcode, 1847 obj_req->ex.oe_off, obj_req->ex.oe_len, 0, 0); 1848 rbd_osd_req_setup_data(obj_req, which++); 1849 1850 rbd_assert(which == obj_req->osd_req->r_num_ops); 1851 rbd_osd_req_format_write(obj_req); 1852 } 1853 1854 static int rbd_obj_setup_write(struct rbd_obj_request *obj_req) 1855 { 1856 unsigned int num_osd_ops, which = 0; 1857 bool need_guard; 1858 int ret; 1859 1860 /* reverse map the entire object onto the parent */ 1861 ret = rbd_obj_calc_img_extents(obj_req, true); 1862 if (ret) 1863 return ret; 1864 1865 need_guard = rbd_obj_copyup_enabled(obj_req); 1866 num_osd_ops = need_guard + count_write_ops(obj_req); 1867 1868 obj_req->osd_req = rbd_osd_req_create(obj_req, num_osd_ops); 1869 if (!obj_req->osd_req) 1870 return -ENOMEM; 1871 1872 if (need_guard) { 1873 ret = __rbd_obj_setup_stat(obj_req, which++); 1874 if (ret) 1875 return ret; 1876 1877 obj_req->write_state = RBD_OBJ_WRITE_GUARD; 1878 } else { 1879 obj_req->write_state = RBD_OBJ_WRITE_FLAT; 1880 } 1881 1882 __rbd_obj_setup_write(obj_req, which); 1883 return 0; 1884 } 1885 1886 static u16 truncate_or_zero_opcode(struct rbd_obj_request *obj_req) 1887 { 1888 return rbd_obj_is_tail(obj_req) ? CEPH_OSD_OP_TRUNCATE : 1889 CEPH_OSD_OP_ZERO; 1890 } 1891 1892 static int rbd_obj_setup_discard(struct rbd_obj_request *obj_req) 1893 { 1894 struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev; 1895 u64 off = obj_req->ex.oe_off; 1896 u64 next_off = obj_req->ex.oe_off + obj_req->ex.oe_len; 1897 int ret; 1898 1899 /* 1900 * Align the range to alloc_size boundary and punt on discards 1901 * that are too small to free up any space. 1902 * 1903 * alloc_size == object_size && is_tail() is a special case for 1904 * filestore with filestore_punch_hole = false, needed to allow 1905 * truncate (in addition to delete). 1906 */ 1907 if (rbd_dev->opts->alloc_size != rbd_dev->layout.object_size || 1908 !rbd_obj_is_tail(obj_req)) { 1909 off = round_up(off, rbd_dev->opts->alloc_size); 1910 next_off = round_down(next_off, rbd_dev->opts->alloc_size); 1911 if (off >= next_off) 1912 return 1; 1913 } 1914 1915 /* reverse map the entire object onto the parent */ 1916 ret = rbd_obj_calc_img_extents(obj_req, true); 1917 if (ret) 1918 return ret; 1919 1920 obj_req->osd_req = rbd_osd_req_create(obj_req, 1); 1921 if (!obj_req->osd_req) 1922 return -ENOMEM; 1923 1924 if (rbd_obj_is_entire(obj_req) && !obj_req->num_img_extents) { 1925 osd_req_op_init(obj_req->osd_req, 0, CEPH_OSD_OP_DELETE, 0); 1926 } else { 1927 dout("%s %p %llu~%llu -> %llu~%llu\n", __func__, 1928 obj_req, obj_req->ex.oe_off, obj_req->ex.oe_len, 1929 off, next_off - off); 1930 osd_req_op_extent_init(obj_req->osd_req, 0, 1931 truncate_or_zero_opcode(obj_req), 1932 off, next_off - off, 0, 0); 1933 } 1934 1935 obj_req->write_state = RBD_OBJ_WRITE_FLAT; 1936 rbd_osd_req_format_write(obj_req); 1937 return 0; 1938 } 1939 1940 static int count_zeroout_ops(struct rbd_obj_request *obj_req) 1941 { 1942 int num_osd_ops; 1943 1944 if (rbd_obj_is_entire(obj_req) && obj_req->num_img_extents && 1945 !rbd_obj_copyup_enabled(obj_req)) 1946 num_osd_ops = 2; /* create + truncate */ 1947 else 1948 num_osd_ops = 1; /* delete/truncate/zero */ 1949 1950 return num_osd_ops; 1951 } 1952 1953 static void __rbd_obj_setup_zeroout(struct rbd_obj_request *obj_req, 1954 unsigned int which) 1955 { 1956 u16 opcode; 1957 1958 if (rbd_obj_is_entire(obj_req)) { 1959 if (obj_req->num_img_extents) { 1960 if (!rbd_obj_copyup_enabled(obj_req)) 1961 osd_req_op_init(obj_req->osd_req, which++, 1962 CEPH_OSD_OP_CREATE, 0); 1963 opcode = CEPH_OSD_OP_TRUNCATE; 1964 } else { 1965 osd_req_op_init(obj_req->osd_req, which++, 1966 CEPH_OSD_OP_DELETE, 0); 1967 opcode = 0; 1968 } 1969 } else { 1970 opcode = truncate_or_zero_opcode(obj_req); 1971 } 1972 1973 if (opcode) 1974 osd_req_op_extent_init(obj_req->osd_req, which++, opcode, 1975 obj_req->ex.oe_off, obj_req->ex.oe_len, 1976 0, 0); 1977 1978 rbd_assert(which == obj_req->osd_req->r_num_ops); 1979 rbd_osd_req_format_write(obj_req); 1980 } 1981 1982 static int rbd_obj_setup_zeroout(struct rbd_obj_request *obj_req) 1983 { 1984 unsigned int num_osd_ops, which = 0; 1985 bool need_guard; 1986 int ret; 1987 1988 /* reverse map the entire object onto the parent */ 1989 ret = rbd_obj_calc_img_extents(obj_req, true); 1990 if (ret) 1991 return ret; 1992 1993 need_guard = rbd_obj_copyup_enabled(obj_req); 1994 num_osd_ops = need_guard + count_zeroout_ops(obj_req); 1995 1996 obj_req->osd_req = rbd_osd_req_create(obj_req, num_osd_ops); 1997 if (!obj_req->osd_req) 1998 return -ENOMEM; 1999 2000 if (need_guard) { 2001 ret = __rbd_obj_setup_stat(obj_req, which++); 2002 if (ret) 2003 return ret; 2004 2005 obj_req->write_state = RBD_OBJ_WRITE_GUARD; 2006 } else { 2007 obj_req->write_state = RBD_OBJ_WRITE_FLAT; 2008 } 2009 2010 __rbd_obj_setup_zeroout(obj_req, which); 2011 return 0; 2012 } 2013 2014 /* 2015 * For each object request in @img_req, allocate an OSD request, add 2016 * individual OSD ops and prepare them for submission. The number of 2017 * OSD ops depends on op_type and the overlap point (if any). 2018 */ 2019 static int __rbd_img_fill_request(struct rbd_img_request *img_req) 2020 { 2021 struct rbd_obj_request *obj_req, *next_obj_req; 2022 int ret; 2023 2024 for_each_obj_request_safe(img_req, obj_req, next_obj_req) { 2025 switch (img_req->op_type) { 2026 case OBJ_OP_READ: 2027 ret = rbd_obj_setup_read(obj_req); 2028 break; 2029 case OBJ_OP_WRITE: 2030 ret = rbd_obj_setup_write(obj_req); 2031 break; 2032 case OBJ_OP_DISCARD: 2033 ret = rbd_obj_setup_discard(obj_req); 2034 break; 2035 case OBJ_OP_ZEROOUT: 2036 ret = rbd_obj_setup_zeroout(obj_req); 2037 break; 2038 default: 2039 rbd_assert(0); 2040 } 2041 if (ret < 0) 2042 return ret; 2043 if (ret > 0) { 2044 img_req->xferred += obj_req->ex.oe_len; 2045 img_req->pending_count--; 2046 rbd_img_obj_request_del(img_req, obj_req); 2047 continue; 2048 } 2049 2050 ret = ceph_osdc_alloc_messages(obj_req->osd_req, GFP_NOIO); 2051 if (ret) 2052 return ret; 2053 } 2054 2055 return 0; 2056 } 2057 2058 union rbd_img_fill_iter { 2059 struct ceph_bio_iter bio_iter; 2060 struct ceph_bvec_iter bvec_iter; 2061 }; 2062 2063 struct rbd_img_fill_ctx { 2064 enum obj_request_type pos_type; 2065 union rbd_img_fill_iter *pos; 2066 union rbd_img_fill_iter iter; 2067 ceph_object_extent_fn_t set_pos_fn; 2068 ceph_object_extent_fn_t count_fn; 2069 ceph_object_extent_fn_t copy_fn; 2070 }; 2071 2072 static struct ceph_object_extent *alloc_object_extent(void *arg) 2073 { 2074 struct rbd_img_request *img_req = arg; 2075 struct rbd_obj_request *obj_req; 2076 2077 obj_req = rbd_obj_request_create(); 2078 if (!obj_req) 2079 return NULL; 2080 2081 rbd_img_obj_request_add(img_req, obj_req); 2082 return &obj_req->ex; 2083 } 2084 2085 /* 2086 * While su != os && sc == 1 is technically not fancy (it's the same 2087 * layout as su == os && sc == 1), we can't use the nocopy path for it 2088 * because ->set_pos_fn() should be called only once per object. 2089 * ceph_file_to_extents() invokes action_fn once per stripe unit, so 2090 * treat su != os && sc == 1 as fancy. 2091 */ 2092 static bool rbd_layout_is_fancy(struct ceph_file_layout *l) 2093 { 2094 return l->stripe_unit != l->object_size; 2095 } 2096 2097 static int rbd_img_fill_request_nocopy(struct rbd_img_request *img_req, 2098 struct ceph_file_extent *img_extents, 2099 u32 num_img_extents, 2100 struct rbd_img_fill_ctx *fctx) 2101 { 2102 u32 i; 2103 int ret; 2104 2105 img_req->data_type = fctx->pos_type; 2106 2107 /* 2108 * Create object requests and set each object request's starting 2109 * position in the provided bio (list) or bio_vec array. 2110 */ 2111 fctx->iter = *fctx->pos; 2112 for (i = 0; i < num_img_extents; i++) { 2113 ret = ceph_file_to_extents(&img_req->rbd_dev->layout, 2114 img_extents[i].fe_off, 2115 img_extents[i].fe_len, 2116 &img_req->object_extents, 2117 alloc_object_extent, img_req, 2118 fctx->set_pos_fn, &fctx->iter); 2119 if (ret) 2120 return ret; 2121 } 2122 2123 return __rbd_img_fill_request(img_req); 2124 } 2125 2126 /* 2127 * Map a list of image extents to a list of object extents, create the 2128 * corresponding object requests (normally each to a different object, 2129 * but not always) and add them to @img_req. For each object request, 2130 * set up its data descriptor to point to the corresponding chunk(s) of 2131 * @fctx->pos data buffer. 2132 * 2133 * Because ceph_file_to_extents() will merge adjacent object extents 2134 * together, each object request's data descriptor may point to multiple 2135 * different chunks of @fctx->pos data buffer. 2136 * 2137 * @fctx->pos data buffer is assumed to be large enough. 2138 */ 2139 static int rbd_img_fill_request(struct rbd_img_request *img_req, 2140 struct ceph_file_extent *img_extents, 2141 u32 num_img_extents, 2142 struct rbd_img_fill_ctx *fctx) 2143 { 2144 struct rbd_device *rbd_dev = img_req->rbd_dev; 2145 struct rbd_obj_request *obj_req; 2146 u32 i; 2147 int ret; 2148 2149 if (fctx->pos_type == OBJ_REQUEST_NODATA || 2150 !rbd_layout_is_fancy(&rbd_dev->layout)) 2151 return rbd_img_fill_request_nocopy(img_req, img_extents, 2152 num_img_extents, fctx); 2153 2154 img_req->data_type = OBJ_REQUEST_OWN_BVECS; 2155 2156 /* 2157 * Create object requests and determine ->bvec_count for each object 2158 * request. Note that ->bvec_count sum over all object requests may 2159 * be greater than the number of bio_vecs in the provided bio (list) 2160 * or bio_vec array because when mapped, those bio_vecs can straddle 2161 * stripe unit boundaries. 2162 */ 2163 fctx->iter = *fctx->pos; 2164 for (i = 0; i < num_img_extents; i++) { 2165 ret = ceph_file_to_extents(&rbd_dev->layout, 2166 img_extents[i].fe_off, 2167 img_extents[i].fe_len, 2168 &img_req->object_extents, 2169 alloc_object_extent, img_req, 2170 fctx->count_fn, &fctx->iter); 2171 if (ret) 2172 return ret; 2173 } 2174 2175 for_each_obj_request(img_req, obj_req) { 2176 obj_req->bvec_pos.bvecs = kmalloc_array(obj_req->bvec_count, 2177 sizeof(*obj_req->bvec_pos.bvecs), 2178 GFP_NOIO); 2179 if (!obj_req->bvec_pos.bvecs) 2180 return -ENOMEM; 2181 } 2182 2183 /* 2184 * Fill in each object request's private bio_vec array, splitting and 2185 * rearranging the provided bio_vecs in stripe unit chunks as needed. 2186 */ 2187 fctx->iter = *fctx->pos; 2188 for (i = 0; i < num_img_extents; i++) { 2189 ret = ceph_iterate_extents(&rbd_dev->layout, 2190 img_extents[i].fe_off, 2191 img_extents[i].fe_len, 2192 &img_req->object_extents, 2193 fctx->copy_fn, &fctx->iter); 2194 if (ret) 2195 return ret; 2196 } 2197 2198 return __rbd_img_fill_request(img_req); 2199 } 2200 2201 static int rbd_img_fill_nodata(struct rbd_img_request *img_req, 2202 u64 off, u64 len) 2203 { 2204 struct ceph_file_extent ex = { off, len }; 2205 union rbd_img_fill_iter dummy; 2206 struct rbd_img_fill_ctx fctx = { 2207 .pos_type = OBJ_REQUEST_NODATA, 2208 .pos = &dummy, 2209 }; 2210 2211 return rbd_img_fill_request(img_req, &ex, 1, &fctx); 2212 } 2213 2214 static void set_bio_pos(struct ceph_object_extent *ex, u32 bytes, void *arg) 2215 { 2216 struct rbd_obj_request *obj_req = 2217 container_of(ex, struct rbd_obj_request, ex); 2218 struct ceph_bio_iter *it = arg; 2219 2220 dout("%s objno %llu bytes %u\n", __func__, ex->oe_objno, bytes); 2221 obj_req->bio_pos = *it; 2222 ceph_bio_iter_advance(it, bytes); 2223 } 2224 2225 static void count_bio_bvecs(struct ceph_object_extent *ex, u32 bytes, void *arg) 2226 { 2227 struct rbd_obj_request *obj_req = 2228 container_of(ex, struct rbd_obj_request, ex); 2229 struct ceph_bio_iter *it = arg; 2230 2231 dout("%s objno %llu bytes %u\n", __func__, ex->oe_objno, bytes); 2232 ceph_bio_iter_advance_step(it, bytes, ({ 2233 obj_req->bvec_count++; 2234 })); 2235 2236 } 2237 2238 static void copy_bio_bvecs(struct ceph_object_extent *ex, u32 bytes, void *arg) 2239 { 2240 struct rbd_obj_request *obj_req = 2241 container_of(ex, struct rbd_obj_request, ex); 2242 struct ceph_bio_iter *it = arg; 2243 2244 dout("%s objno %llu bytes %u\n", __func__, ex->oe_objno, bytes); 2245 ceph_bio_iter_advance_step(it, bytes, ({ 2246 obj_req->bvec_pos.bvecs[obj_req->bvec_idx++] = bv; 2247 obj_req->bvec_pos.iter.bi_size += bv.bv_len; 2248 })); 2249 } 2250 2251 static int __rbd_img_fill_from_bio(struct rbd_img_request *img_req, 2252 struct ceph_file_extent *img_extents, 2253 u32 num_img_extents, 2254 struct ceph_bio_iter *bio_pos) 2255 { 2256 struct rbd_img_fill_ctx fctx = { 2257 .pos_type = OBJ_REQUEST_BIO, 2258 .pos = (union rbd_img_fill_iter *)bio_pos, 2259 .set_pos_fn = set_bio_pos, 2260 .count_fn = count_bio_bvecs, 2261 .copy_fn = copy_bio_bvecs, 2262 }; 2263 2264 return rbd_img_fill_request(img_req, img_extents, num_img_extents, 2265 &fctx); 2266 } 2267 2268 static int rbd_img_fill_from_bio(struct rbd_img_request *img_req, 2269 u64 off, u64 len, struct bio *bio) 2270 { 2271 struct ceph_file_extent ex = { off, len }; 2272 struct ceph_bio_iter it = { .bio = bio, .iter = bio->bi_iter }; 2273 2274 return __rbd_img_fill_from_bio(img_req, &ex, 1, &it); 2275 } 2276 2277 static void set_bvec_pos(struct ceph_object_extent *ex, u32 bytes, void *arg) 2278 { 2279 struct rbd_obj_request *obj_req = 2280 container_of(ex, struct rbd_obj_request, ex); 2281 struct ceph_bvec_iter *it = arg; 2282 2283 obj_req->bvec_pos = *it; 2284 ceph_bvec_iter_shorten(&obj_req->bvec_pos, bytes); 2285 ceph_bvec_iter_advance(it, bytes); 2286 } 2287 2288 static void count_bvecs(struct ceph_object_extent *ex, u32 bytes, void *arg) 2289 { 2290 struct rbd_obj_request *obj_req = 2291 container_of(ex, struct rbd_obj_request, ex); 2292 struct ceph_bvec_iter *it = arg; 2293 2294 ceph_bvec_iter_advance_step(it, bytes, ({ 2295 obj_req->bvec_count++; 2296 })); 2297 } 2298 2299 static void copy_bvecs(struct ceph_object_extent *ex, u32 bytes, void *arg) 2300 { 2301 struct rbd_obj_request *obj_req = 2302 container_of(ex, struct rbd_obj_request, ex); 2303 struct ceph_bvec_iter *it = arg; 2304 2305 ceph_bvec_iter_advance_step(it, bytes, ({ 2306 obj_req->bvec_pos.bvecs[obj_req->bvec_idx++] = bv; 2307 obj_req->bvec_pos.iter.bi_size += bv.bv_len; 2308 })); 2309 } 2310 2311 static int __rbd_img_fill_from_bvecs(struct rbd_img_request *img_req, 2312 struct ceph_file_extent *img_extents, 2313 u32 num_img_extents, 2314 struct ceph_bvec_iter *bvec_pos) 2315 { 2316 struct rbd_img_fill_ctx fctx = { 2317 .pos_type = OBJ_REQUEST_BVECS, 2318 .pos = (union rbd_img_fill_iter *)bvec_pos, 2319 .set_pos_fn = set_bvec_pos, 2320 .count_fn = count_bvecs, 2321 .copy_fn = copy_bvecs, 2322 }; 2323 2324 return rbd_img_fill_request(img_req, img_extents, num_img_extents, 2325 &fctx); 2326 } 2327 2328 static int rbd_img_fill_from_bvecs(struct rbd_img_request *img_req, 2329 struct ceph_file_extent *img_extents, 2330 u32 num_img_extents, 2331 struct bio_vec *bvecs) 2332 { 2333 struct ceph_bvec_iter it = { 2334 .bvecs = bvecs, 2335 .iter = { .bi_size = ceph_file_extents_bytes(img_extents, 2336 num_img_extents) }, 2337 }; 2338 2339 return __rbd_img_fill_from_bvecs(img_req, img_extents, num_img_extents, 2340 &it); 2341 } 2342 2343 static void rbd_img_request_submit(struct rbd_img_request *img_request) 2344 { 2345 struct rbd_obj_request *obj_request; 2346 2347 dout("%s: img %p\n", __func__, img_request); 2348 2349 rbd_img_request_get(img_request); 2350 for_each_obj_request(img_request, obj_request) 2351 rbd_obj_request_submit(obj_request); 2352 2353 rbd_img_request_put(img_request); 2354 } 2355 2356 static int rbd_obj_read_from_parent(struct rbd_obj_request *obj_req) 2357 { 2358 struct rbd_img_request *img_req = obj_req->img_request; 2359 struct rbd_img_request *child_img_req; 2360 int ret; 2361 2362 child_img_req = rbd_img_request_create(img_req->rbd_dev->parent, 2363 OBJ_OP_READ, NULL); 2364 if (!child_img_req) 2365 return -ENOMEM; 2366 2367 __set_bit(IMG_REQ_CHILD, &child_img_req->flags); 2368 child_img_req->obj_request = obj_req; 2369 2370 if (!rbd_img_is_write(img_req)) { 2371 switch (img_req->data_type) { 2372 case OBJ_REQUEST_BIO: 2373 ret = __rbd_img_fill_from_bio(child_img_req, 2374 obj_req->img_extents, 2375 obj_req->num_img_extents, 2376 &obj_req->bio_pos); 2377 break; 2378 case OBJ_REQUEST_BVECS: 2379 case OBJ_REQUEST_OWN_BVECS: 2380 ret = __rbd_img_fill_from_bvecs(child_img_req, 2381 obj_req->img_extents, 2382 obj_req->num_img_extents, 2383 &obj_req->bvec_pos); 2384 break; 2385 default: 2386 rbd_assert(0); 2387 } 2388 } else { 2389 ret = rbd_img_fill_from_bvecs(child_img_req, 2390 obj_req->img_extents, 2391 obj_req->num_img_extents, 2392 obj_req->copyup_bvecs); 2393 } 2394 if (ret) { 2395 rbd_img_request_put(child_img_req); 2396 return ret; 2397 } 2398 2399 rbd_img_request_submit(child_img_req); 2400 return 0; 2401 } 2402 2403 static bool rbd_obj_handle_read(struct rbd_obj_request *obj_req) 2404 { 2405 struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev; 2406 int ret; 2407 2408 if (obj_req->result == -ENOENT && 2409 rbd_dev->parent_overlap && !obj_req->tried_parent) { 2410 /* reverse map this object extent onto the parent */ 2411 ret = rbd_obj_calc_img_extents(obj_req, false); 2412 if (ret) { 2413 obj_req->result = ret; 2414 return true; 2415 } 2416 2417 if (obj_req->num_img_extents) { 2418 obj_req->tried_parent = true; 2419 ret = rbd_obj_read_from_parent(obj_req); 2420 if (ret) { 2421 obj_req->result = ret; 2422 return true; 2423 } 2424 return false; 2425 } 2426 } 2427 2428 /* 2429 * -ENOENT means a hole in the image -- zero-fill the entire 2430 * length of the request. A short read also implies zero-fill 2431 * to the end of the request. In both cases we update xferred 2432 * count to indicate the whole request was satisfied. 2433 */ 2434 if (obj_req->result == -ENOENT || 2435 (!obj_req->result && obj_req->xferred < obj_req->ex.oe_len)) { 2436 rbd_assert(!obj_req->xferred || !obj_req->result); 2437 rbd_obj_zero_range(obj_req, obj_req->xferred, 2438 obj_req->ex.oe_len - obj_req->xferred); 2439 obj_req->result = 0; 2440 obj_req->xferred = obj_req->ex.oe_len; 2441 } 2442 2443 return true; 2444 } 2445 2446 /* 2447 * copyup_bvecs pages are never highmem pages 2448 */ 2449 static bool is_zero_bvecs(struct bio_vec *bvecs, u32 bytes) 2450 { 2451 struct ceph_bvec_iter it = { 2452 .bvecs = bvecs, 2453 .iter = { .bi_size = bytes }, 2454 }; 2455 2456 ceph_bvec_iter_advance_step(&it, bytes, ({ 2457 if (memchr_inv(page_address(bv.bv_page) + bv.bv_offset, 0, 2458 bv.bv_len)) 2459 return false; 2460 })); 2461 return true; 2462 } 2463 2464 #define MODS_ONLY U32_MAX 2465 2466 static int rbd_obj_issue_copyup_empty_snapc(struct rbd_obj_request *obj_req, 2467 u32 bytes) 2468 { 2469 int ret; 2470 2471 dout("%s obj_req %p bytes %u\n", __func__, obj_req, bytes); 2472 rbd_assert(obj_req->osd_req->r_ops[0].op == CEPH_OSD_OP_STAT); 2473 rbd_assert(bytes > 0 && bytes != MODS_ONLY); 2474 rbd_osd_req_destroy(obj_req->osd_req); 2475 2476 obj_req->osd_req = __rbd_osd_req_create(obj_req, &rbd_empty_snapc, 1); 2477 if (!obj_req->osd_req) 2478 return -ENOMEM; 2479 2480 ret = osd_req_op_cls_init(obj_req->osd_req, 0, "rbd", "copyup"); 2481 if (ret) 2482 return ret; 2483 2484 osd_req_op_cls_request_data_bvecs(obj_req->osd_req, 0, 2485 obj_req->copyup_bvecs, 2486 obj_req->copyup_bvec_count, 2487 bytes); 2488 rbd_osd_req_format_write(obj_req); 2489 2490 ret = ceph_osdc_alloc_messages(obj_req->osd_req, GFP_NOIO); 2491 if (ret) 2492 return ret; 2493 2494 rbd_obj_request_submit(obj_req); 2495 return 0; 2496 } 2497 2498 static int rbd_obj_issue_copyup_ops(struct rbd_obj_request *obj_req, u32 bytes) 2499 { 2500 struct rbd_img_request *img_req = obj_req->img_request; 2501 unsigned int num_osd_ops = (bytes != MODS_ONLY); 2502 unsigned int which = 0; 2503 int ret; 2504 2505 dout("%s obj_req %p bytes %u\n", __func__, obj_req, bytes); 2506 rbd_assert(obj_req->osd_req->r_ops[0].op == CEPH_OSD_OP_STAT || 2507 obj_req->osd_req->r_ops[0].op == CEPH_OSD_OP_CALL); 2508 rbd_osd_req_destroy(obj_req->osd_req); 2509 2510 switch (img_req->op_type) { 2511 case OBJ_OP_WRITE: 2512 num_osd_ops += count_write_ops(obj_req); 2513 break; 2514 case OBJ_OP_ZEROOUT: 2515 num_osd_ops += count_zeroout_ops(obj_req); 2516 break; 2517 default: 2518 rbd_assert(0); 2519 } 2520 2521 obj_req->osd_req = rbd_osd_req_create(obj_req, num_osd_ops); 2522 if (!obj_req->osd_req) 2523 return -ENOMEM; 2524 2525 if (bytes != MODS_ONLY) { 2526 ret = osd_req_op_cls_init(obj_req->osd_req, which, "rbd", 2527 "copyup"); 2528 if (ret) 2529 return ret; 2530 2531 osd_req_op_cls_request_data_bvecs(obj_req->osd_req, which++, 2532 obj_req->copyup_bvecs, 2533 obj_req->copyup_bvec_count, 2534 bytes); 2535 } 2536 2537 switch (img_req->op_type) { 2538 case OBJ_OP_WRITE: 2539 __rbd_obj_setup_write(obj_req, which); 2540 break; 2541 case OBJ_OP_ZEROOUT: 2542 __rbd_obj_setup_zeroout(obj_req, which); 2543 break; 2544 default: 2545 rbd_assert(0); 2546 } 2547 2548 ret = ceph_osdc_alloc_messages(obj_req->osd_req, GFP_NOIO); 2549 if (ret) 2550 return ret; 2551 2552 rbd_obj_request_submit(obj_req); 2553 return 0; 2554 } 2555 2556 static int rbd_obj_issue_copyup(struct rbd_obj_request *obj_req, u32 bytes) 2557 { 2558 /* 2559 * Only send non-zero copyup data to save some I/O and network 2560 * bandwidth -- zero copyup data is equivalent to the object not 2561 * existing. 2562 */ 2563 if (is_zero_bvecs(obj_req->copyup_bvecs, bytes)) { 2564 dout("%s obj_req %p detected zeroes\n", __func__, obj_req); 2565 bytes = 0; 2566 } 2567 2568 if (obj_req->img_request->snapc->num_snaps && bytes > 0) { 2569 /* 2570 * Send a copyup request with an empty snapshot context to 2571 * deep-copyup the object through all existing snapshots. 2572 * A second request with the current snapshot context will be 2573 * sent for the actual modification. 2574 */ 2575 obj_req->write_state = RBD_OBJ_WRITE_COPYUP_EMPTY_SNAPC; 2576 return rbd_obj_issue_copyup_empty_snapc(obj_req, bytes); 2577 } 2578 2579 obj_req->write_state = RBD_OBJ_WRITE_COPYUP_OPS; 2580 return rbd_obj_issue_copyup_ops(obj_req, bytes); 2581 } 2582 2583 static int setup_copyup_bvecs(struct rbd_obj_request *obj_req, u64 obj_overlap) 2584 { 2585 u32 i; 2586 2587 rbd_assert(!obj_req->copyup_bvecs); 2588 obj_req->copyup_bvec_count = calc_pages_for(0, obj_overlap); 2589 obj_req->copyup_bvecs = kcalloc(obj_req->copyup_bvec_count, 2590 sizeof(*obj_req->copyup_bvecs), 2591 GFP_NOIO); 2592 if (!obj_req->copyup_bvecs) 2593 return -ENOMEM; 2594 2595 for (i = 0; i < obj_req->copyup_bvec_count; i++) { 2596 unsigned int len = min(obj_overlap, (u64)PAGE_SIZE); 2597 2598 obj_req->copyup_bvecs[i].bv_page = alloc_page(GFP_NOIO); 2599 if (!obj_req->copyup_bvecs[i].bv_page) 2600 return -ENOMEM; 2601 2602 obj_req->copyup_bvecs[i].bv_offset = 0; 2603 obj_req->copyup_bvecs[i].bv_len = len; 2604 obj_overlap -= len; 2605 } 2606 2607 rbd_assert(!obj_overlap); 2608 return 0; 2609 } 2610 2611 static int rbd_obj_handle_write_guard(struct rbd_obj_request *obj_req) 2612 { 2613 struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev; 2614 int ret; 2615 2616 rbd_assert(obj_req->num_img_extents); 2617 prune_extents(obj_req->img_extents, &obj_req->num_img_extents, 2618 rbd_dev->parent_overlap); 2619 if (!obj_req->num_img_extents) { 2620 /* 2621 * The overlap has become 0 (most likely because the 2622 * image has been flattened). Re-submit the original write 2623 * request -- pass MODS_ONLY since the copyup isn't needed 2624 * anymore. 2625 */ 2626 obj_req->write_state = RBD_OBJ_WRITE_COPYUP_OPS; 2627 return rbd_obj_issue_copyup_ops(obj_req, MODS_ONLY); 2628 } 2629 2630 ret = setup_copyup_bvecs(obj_req, rbd_obj_img_extents_bytes(obj_req)); 2631 if (ret) 2632 return ret; 2633 2634 obj_req->write_state = RBD_OBJ_WRITE_READ_FROM_PARENT; 2635 return rbd_obj_read_from_parent(obj_req); 2636 } 2637 2638 static bool rbd_obj_handle_write(struct rbd_obj_request *obj_req) 2639 { 2640 int ret; 2641 2642 switch (obj_req->write_state) { 2643 case RBD_OBJ_WRITE_GUARD: 2644 rbd_assert(!obj_req->xferred); 2645 if (obj_req->result == -ENOENT) { 2646 /* 2647 * The target object doesn't exist. Read the data for 2648 * the entire target object up to the overlap point (if 2649 * any) from the parent, so we can use it for a copyup. 2650 */ 2651 ret = rbd_obj_handle_write_guard(obj_req); 2652 if (ret) { 2653 obj_req->result = ret; 2654 return true; 2655 } 2656 return false; 2657 } 2658 /* fall through */ 2659 case RBD_OBJ_WRITE_FLAT: 2660 case RBD_OBJ_WRITE_COPYUP_OPS: 2661 if (!obj_req->result) 2662 /* 2663 * There is no such thing as a successful short 2664 * write -- indicate the whole request was satisfied. 2665 */ 2666 obj_req->xferred = obj_req->ex.oe_len; 2667 return true; 2668 case RBD_OBJ_WRITE_READ_FROM_PARENT: 2669 if (obj_req->result) 2670 return true; 2671 2672 rbd_assert(obj_req->xferred); 2673 ret = rbd_obj_issue_copyup(obj_req, obj_req->xferred); 2674 if (ret) { 2675 obj_req->result = ret; 2676 obj_req->xferred = 0; 2677 return true; 2678 } 2679 return false; 2680 case RBD_OBJ_WRITE_COPYUP_EMPTY_SNAPC: 2681 if (obj_req->result) 2682 return true; 2683 2684 obj_req->write_state = RBD_OBJ_WRITE_COPYUP_OPS; 2685 ret = rbd_obj_issue_copyup_ops(obj_req, MODS_ONLY); 2686 if (ret) { 2687 obj_req->result = ret; 2688 return true; 2689 } 2690 return false; 2691 default: 2692 BUG(); 2693 } 2694 } 2695 2696 /* 2697 * Returns true if @obj_req is completed, or false otherwise. 2698 */ 2699 static bool __rbd_obj_handle_request(struct rbd_obj_request *obj_req) 2700 { 2701 switch (obj_req->img_request->op_type) { 2702 case OBJ_OP_READ: 2703 return rbd_obj_handle_read(obj_req); 2704 case OBJ_OP_WRITE: 2705 return rbd_obj_handle_write(obj_req); 2706 case OBJ_OP_DISCARD: 2707 case OBJ_OP_ZEROOUT: 2708 if (rbd_obj_handle_write(obj_req)) { 2709 /* 2710 * Hide -ENOENT from delete/truncate/zero -- discarding 2711 * a non-existent object is not a problem. 2712 */ 2713 if (obj_req->result == -ENOENT) { 2714 obj_req->result = 0; 2715 obj_req->xferred = obj_req->ex.oe_len; 2716 } 2717 return true; 2718 } 2719 return false; 2720 default: 2721 BUG(); 2722 } 2723 } 2724 2725 static void rbd_obj_end_request(struct rbd_obj_request *obj_req) 2726 { 2727 struct rbd_img_request *img_req = obj_req->img_request; 2728 2729 rbd_assert((!obj_req->result && 2730 obj_req->xferred == obj_req->ex.oe_len) || 2731 (obj_req->result < 0 && !obj_req->xferred)); 2732 if (!obj_req->result) { 2733 img_req->xferred += obj_req->xferred; 2734 return; 2735 } 2736 2737 rbd_warn(img_req->rbd_dev, 2738 "%s at objno %llu %llu~%llu result %d xferred %llu", 2739 obj_op_name(img_req->op_type), obj_req->ex.oe_objno, 2740 obj_req->ex.oe_off, obj_req->ex.oe_len, obj_req->result, 2741 obj_req->xferred); 2742 if (!img_req->result) { 2743 img_req->result = obj_req->result; 2744 img_req->xferred = 0; 2745 } 2746 } 2747 2748 static void rbd_img_end_child_request(struct rbd_img_request *img_req) 2749 { 2750 struct rbd_obj_request *obj_req = img_req->obj_request; 2751 2752 rbd_assert(test_bit(IMG_REQ_CHILD, &img_req->flags)); 2753 rbd_assert((!img_req->result && 2754 img_req->xferred == rbd_obj_img_extents_bytes(obj_req)) || 2755 (img_req->result < 0 && !img_req->xferred)); 2756 2757 obj_req->result = img_req->result; 2758 obj_req->xferred = img_req->xferred; 2759 rbd_img_request_put(img_req); 2760 } 2761 2762 static void rbd_img_end_request(struct rbd_img_request *img_req) 2763 { 2764 rbd_assert(!test_bit(IMG_REQ_CHILD, &img_req->flags)); 2765 rbd_assert((!img_req->result && 2766 img_req->xferred == blk_rq_bytes(img_req->rq)) || 2767 (img_req->result < 0 && !img_req->xferred)); 2768 2769 blk_mq_end_request(img_req->rq, 2770 errno_to_blk_status(img_req->result)); 2771 rbd_img_request_put(img_req); 2772 } 2773 2774 static void rbd_obj_handle_request(struct rbd_obj_request *obj_req) 2775 { 2776 struct rbd_img_request *img_req; 2777 2778 again: 2779 if (!__rbd_obj_handle_request(obj_req)) 2780 return; 2781 2782 img_req = obj_req->img_request; 2783 spin_lock(&img_req->completion_lock); 2784 rbd_obj_end_request(obj_req); 2785 rbd_assert(img_req->pending_count); 2786 if (--img_req->pending_count) { 2787 spin_unlock(&img_req->completion_lock); 2788 return; 2789 } 2790 2791 spin_unlock(&img_req->completion_lock); 2792 if (test_bit(IMG_REQ_CHILD, &img_req->flags)) { 2793 obj_req = img_req->obj_request; 2794 rbd_img_end_child_request(img_req); 2795 goto again; 2796 } 2797 rbd_img_end_request(img_req); 2798 } 2799 2800 static const struct rbd_client_id rbd_empty_cid; 2801 2802 static bool rbd_cid_equal(const struct rbd_client_id *lhs, 2803 const struct rbd_client_id *rhs) 2804 { 2805 return lhs->gid == rhs->gid && lhs->handle == rhs->handle; 2806 } 2807 2808 static struct rbd_client_id rbd_get_cid(struct rbd_device *rbd_dev) 2809 { 2810 struct rbd_client_id cid; 2811 2812 mutex_lock(&rbd_dev->watch_mutex); 2813 cid.gid = ceph_client_gid(rbd_dev->rbd_client->client); 2814 cid.handle = rbd_dev->watch_cookie; 2815 mutex_unlock(&rbd_dev->watch_mutex); 2816 return cid; 2817 } 2818 2819 /* 2820 * lock_rwsem must be held for write 2821 */ 2822 static void rbd_set_owner_cid(struct rbd_device *rbd_dev, 2823 const struct rbd_client_id *cid) 2824 { 2825 dout("%s rbd_dev %p %llu-%llu -> %llu-%llu\n", __func__, rbd_dev, 2826 rbd_dev->owner_cid.gid, rbd_dev->owner_cid.handle, 2827 cid->gid, cid->handle); 2828 rbd_dev->owner_cid = *cid; /* struct */ 2829 } 2830 2831 static void format_lock_cookie(struct rbd_device *rbd_dev, char *buf) 2832 { 2833 mutex_lock(&rbd_dev->watch_mutex); 2834 sprintf(buf, "%s %llu", RBD_LOCK_COOKIE_PREFIX, rbd_dev->watch_cookie); 2835 mutex_unlock(&rbd_dev->watch_mutex); 2836 } 2837 2838 static void __rbd_lock(struct rbd_device *rbd_dev, const char *cookie) 2839 { 2840 struct rbd_client_id cid = rbd_get_cid(rbd_dev); 2841 2842 strcpy(rbd_dev->lock_cookie, cookie); 2843 rbd_set_owner_cid(rbd_dev, &cid); 2844 queue_work(rbd_dev->task_wq, &rbd_dev->acquired_lock_work); 2845 } 2846 2847 /* 2848 * lock_rwsem must be held for write 2849 */ 2850 static int rbd_lock(struct rbd_device *rbd_dev) 2851 { 2852 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; 2853 char cookie[32]; 2854 int ret; 2855 2856 WARN_ON(__rbd_is_lock_owner(rbd_dev) || 2857 rbd_dev->lock_cookie[0] != '\0'); 2858 2859 format_lock_cookie(rbd_dev, cookie); 2860 ret = ceph_cls_lock(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc, 2861 RBD_LOCK_NAME, CEPH_CLS_LOCK_EXCLUSIVE, cookie, 2862 RBD_LOCK_TAG, "", 0); 2863 if (ret) 2864 return ret; 2865 2866 rbd_dev->lock_state = RBD_LOCK_STATE_LOCKED; 2867 __rbd_lock(rbd_dev, cookie); 2868 return 0; 2869 } 2870 2871 /* 2872 * lock_rwsem must be held for write 2873 */ 2874 static void rbd_unlock(struct rbd_device *rbd_dev) 2875 { 2876 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; 2877 int ret; 2878 2879 WARN_ON(!__rbd_is_lock_owner(rbd_dev) || 2880 rbd_dev->lock_cookie[0] == '\0'); 2881 2882 ret = ceph_cls_unlock(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc, 2883 RBD_LOCK_NAME, rbd_dev->lock_cookie); 2884 if (ret && ret != -ENOENT) 2885 rbd_warn(rbd_dev, "failed to unlock: %d", ret); 2886 2887 /* treat errors as the image is unlocked */ 2888 rbd_dev->lock_state = RBD_LOCK_STATE_UNLOCKED; 2889 rbd_dev->lock_cookie[0] = '\0'; 2890 rbd_set_owner_cid(rbd_dev, &rbd_empty_cid); 2891 queue_work(rbd_dev->task_wq, &rbd_dev->released_lock_work); 2892 } 2893 2894 static int __rbd_notify_op_lock(struct rbd_device *rbd_dev, 2895 enum rbd_notify_op notify_op, 2896 struct page ***preply_pages, 2897 size_t *preply_len) 2898 { 2899 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; 2900 struct rbd_client_id cid = rbd_get_cid(rbd_dev); 2901 char buf[4 + 8 + 8 + CEPH_ENCODING_START_BLK_LEN]; 2902 int buf_size = sizeof(buf); 2903 void *p = buf; 2904 2905 dout("%s rbd_dev %p notify_op %d\n", __func__, rbd_dev, notify_op); 2906 2907 /* encode *LockPayload NotifyMessage (op + ClientId) */ 2908 ceph_start_encoding(&p, 2, 1, buf_size - CEPH_ENCODING_START_BLK_LEN); 2909 ceph_encode_32(&p, notify_op); 2910 ceph_encode_64(&p, cid.gid); 2911 ceph_encode_64(&p, cid.handle); 2912 2913 return ceph_osdc_notify(osdc, &rbd_dev->header_oid, 2914 &rbd_dev->header_oloc, buf, buf_size, 2915 RBD_NOTIFY_TIMEOUT, preply_pages, preply_len); 2916 } 2917 2918 static void rbd_notify_op_lock(struct rbd_device *rbd_dev, 2919 enum rbd_notify_op notify_op) 2920 { 2921 struct page **reply_pages; 2922 size_t reply_len; 2923 2924 __rbd_notify_op_lock(rbd_dev, notify_op, &reply_pages, &reply_len); 2925 ceph_release_page_vector(reply_pages, calc_pages_for(0, reply_len)); 2926 } 2927 2928 static void rbd_notify_acquired_lock(struct work_struct *work) 2929 { 2930 struct rbd_device *rbd_dev = container_of(work, struct rbd_device, 2931 acquired_lock_work); 2932 2933 rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_ACQUIRED_LOCK); 2934 } 2935 2936 static void rbd_notify_released_lock(struct work_struct *work) 2937 { 2938 struct rbd_device *rbd_dev = container_of(work, struct rbd_device, 2939 released_lock_work); 2940 2941 rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_RELEASED_LOCK); 2942 } 2943 2944 static int rbd_request_lock(struct rbd_device *rbd_dev) 2945 { 2946 struct page **reply_pages; 2947 size_t reply_len; 2948 bool lock_owner_responded = false; 2949 int ret; 2950 2951 dout("%s rbd_dev %p\n", __func__, rbd_dev); 2952 2953 ret = __rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_REQUEST_LOCK, 2954 &reply_pages, &reply_len); 2955 if (ret && ret != -ETIMEDOUT) { 2956 rbd_warn(rbd_dev, "failed to request lock: %d", ret); 2957 goto out; 2958 } 2959 2960 if (reply_len > 0 && reply_len <= PAGE_SIZE) { 2961 void *p = page_address(reply_pages[0]); 2962 void *const end = p + reply_len; 2963 u32 n; 2964 2965 ceph_decode_32_safe(&p, end, n, e_inval); /* num_acks */ 2966 while (n--) { 2967 u8 struct_v; 2968 u32 len; 2969 2970 ceph_decode_need(&p, end, 8 + 8, e_inval); 2971 p += 8 + 8; /* skip gid and cookie */ 2972 2973 ceph_decode_32_safe(&p, end, len, e_inval); 2974 if (!len) 2975 continue; 2976 2977 if (lock_owner_responded) { 2978 rbd_warn(rbd_dev, 2979 "duplicate lock owners detected"); 2980 ret = -EIO; 2981 goto out; 2982 } 2983 2984 lock_owner_responded = true; 2985 ret = ceph_start_decoding(&p, end, 1, "ResponseMessage", 2986 &struct_v, &len); 2987 if (ret) { 2988 rbd_warn(rbd_dev, 2989 "failed to decode ResponseMessage: %d", 2990 ret); 2991 goto e_inval; 2992 } 2993 2994 ret = ceph_decode_32(&p); 2995 } 2996 } 2997 2998 if (!lock_owner_responded) { 2999 rbd_warn(rbd_dev, "no lock owners detected"); 3000 ret = -ETIMEDOUT; 3001 } 3002 3003 out: 3004 ceph_release_page_vector(reply_pages, calc_pages_for(0, reply_len)); 3005 return ret; 3006 3007 e_inval: 3008 ret = -EINVAL; 3009 goto out; 3010 } 3011 3012 static void wake_requests(struct rbd_device *rbd_dev, bool wake_all) 3013 { 3014 dout("%s rbd_dev %p wake_all %d\n", __func__, rbd_dev, wake_all); 3015 3016 cancel_delayed_work(&rbd_dev->lock_dwork); 3017 if (wake_all) 3018 wake_up_all(&rbd_dev->lock_waitq); 3019 else 3020 wake_up(&rbd_dev->lock_waitq); 3021 } 3022 3023 static int get_lock_owner_info(struct rbd_device *rbd_dev, 3024 struct ceph_locker **lockers, u32 *num_lockers) 3025 { 3026 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; 3027 u8 lock_type; 3028 char *lock_tag; 3029 int ret; 3030 3031 dout("%s rbd_dev %p\n", __func__, rbd_dev); 3032 3033 ret = ceph_cls_lock_info(osdc, &rbd_dev->header_oid, 3034 &rbd_dev->header_oloc, RBD_LOCK_NAME, 3035 &lock_type, &lock_tag, lockers, num_lockers); 3036 if (ret) 3037 return ret; 3038 3039 if (*num_lockers == 0) { 3040 dout("%s rbd_dev %p no lockers detected\n", __func__, rbd_dev); 3041 goto out; 3042 } 3043 3044 if (strcmp(lock_tag, RBD_LOCK_TAG)) { 3045 rbd_warn(rbd_dev, "locked by external mechanism, tag %s", 3046 lock_tag); 3047 ret = -EBUSY; 3048 goto out; 3049 } 3050 3051 if (lock_type == CEPH_CLS_LOCK_SHARED) { 3052 rbd_warn(rbd_dev, "shared lock type detected"); 3053 ret = -EBUSY; 3054 goto out; 3055 } 3056 3057 if (strncmp((*lockers)[0].id.cookie, RBD_LOCK_COOKIE_PREFIX, 3058 strlen(RBD_LOCK_COOKIE_PREFIX))) { 3059 rbd_warn(rbd_dev, "locked by external mechanism, cookie %s", 3060 (*lockers)[0].id.cookie); 3061 ret = -EBUSY; 3062 goto out; 3063 } 3064 3065 out: 3066 kfree(lock_tag); 3067 return ret; 3068 } 3069 3070 static int find_watcher(struct rbd_device *rbd_dev, 3071 const struct ceph_locker *locker) 3072 { 3073 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; 3074 struct ceph_watch_item *watchers; 3075 u32 num_watchers; 3076 u64 cookie; 3077 int i; 3078 int ret; 3079 3080 ret = ceph_osdc_list_watchers(osdc, &rbd_dev->header_oid, 3081 &rbd_dev->header_oloc, &watchers, 3082 &num_watchers); 3083 if (ret) 3084 return ret; 3085 3086 sscanf(locker->id.cookie, RBD_LOCK_COOKIE_PREFIX " %llu", &cookie); 3087 for (i = 0; i < num_watchers; i++) { 3088 if (!memcmp(&watchers[i].addr, &locker->info.addr, 3089 sizeof(locker->info.addr)) && 3090 watchers[i].cookie == cookie) { 3091 struct rbd_client_id cid = { 3092 .gid = le64_to_cpu(watchers[i].name.num), 3093 .handle = cookie, 3094 }; 3095 3096 dout("%s rbd_dev %p found cid %llu-%llu\n", __func__, 3097 rbd_dev, cid.gid, cid.handle); 3098 rbd_set_owner_cid(rbd_dev, &cid); 3099 ret = 1; 3100 goto out; 3101 } 3102 } 3103 3104 dout("%s rbd_dev %p no watchers\n", __func__, rbd_dev); 3105 ret = 0; 3106 out: 3107 kfree(watchers); 3108 return ret; 3109 } 3110 3111 /* 3112 * lock_rwsem must be held for write 3113 */ 3114 static int rbd_try_lock(struct rbd_device *rbd_dev) 3115 { 3116 struct ceph_client *client = rbd_dev->rbd_client->client; 3117 struct ceph_locker *lockers; 3118 u32 num_lockers; 3119 int ret; 3120 3121 for (;;) { 3122 ret = rbd_lock(rbd_dev); 3123 if (ret != -EBUSY) 3124 return ret; 3125 3126 /* determine if the current lock holder is still alive */ 3127 ret = get_lock_owner_info(rbd_dev, &lockers, &num_lockers); 3128 if (ret) 3129 return ret; 3130 3131 if (num_lockers == 0) 3132 goto again; 3133 3134 ret = find_watcher(rbd_dev, lockers); 3135 if (ret) { 3136 if (ret > 0) 3137 ret = 0; /* have to request lock */ 3138 goto out; 3139 } 3140 3141 rbd_warn(rbd_dev, "%s%llu seems dead, breaking lock", 3142 ENTITY_NAME(lockers[0].id.name)); 3143 3144 ret = ceph_monc_blacklist_add(&client->monc, 3145 &lockers[0].info.addr); 3146 if (ret) { 3147 rbd_warn(rbd_dev, "blacklist of %s%llu failed: %d", 3148 ENTITY_NAME(lockers[0].id.name), ret); 3149 goto out; 3150 } 3151 3152 ret = ceph_cls_break_lock(&client->osdc, &rbd_dev->header_oid, 3153 &rbd_dev->header_oloc, RBD_LOCK_NAME, 3154 lockers[0].id.cookie, 3155 &lockers[0].id.name); 3156 if (ret && ret != -ENOENT) 3157 goto out; 3158 3159 again: 3160 ceph_free_lockers(lockers, num_lockers); 3161 } 3162 3163 out: 3164 ceph_free_lockers(lockers, num_lockers); 3165 return ret; 3166 } 3167 3168 /* 3169 * ret is set only if lock_state is RBD_LOCK_STATE_UNLOCKED 3170 */ 3171 static enum rbd_lock_state rbd_try_acquire_lock(struct rbd_device *rbd_dev, 3172 int *pret) 3173 { 3174 enum rbd_lock_state lock_state; 3175 3176 down_read(&rbd_dev->lock_rwsem); 3177 dout("%s rbd_dev %p read lock_state %d\n", __func__, rbd_dev, 3178 rbd_dev->lock_state); 3179 if (__rbd_is_lock_owner(rbd_dev)) { 3180 lock_state = rbd_dev->lock_state; 3181 up_read(&rbd_dev->lock_rwsem); 3182 return lock_state; 3183 } 3184 3185 up_read(&rbd_dev->lock_rwsem); 3186 down_write(&rbd_dev->lock_rwsem); 3187 dout("%s rbd_dev %p write lock_state %d\n", __func__, rbd_dev, 3188 rbd_dev->lock_state); 3189 if (!__rbd_is_lock_owner(rbd_dev)) { 3190 *pret = rbd_try_lock(rbd_dev); 3191 if (*pret) 3192 rbd_warn(rbd_dev, "failed to acquire lock: %d", *pret); 3193 } 3194 3195 lock_state = rbd_dev->lock_state; 3196 up_write(&rbd_dev->lock_rwsem); 3197 return lock_state; 3198 } 3199 3200 static void rbd_acquire_lock(struct work_struct *work) 3201 { 3202 struct rbd_device *rbd_dev = container_of(to_delayed_work(work), 3203 struct rbd_device, lock_dwork); 3204 enum rbd_lock_state lock_state; 3205 int ret = 0; 3206 3207 dout("%s rbd_dev %p\n", __func__, rbd_dev); 3208 again: 3209 lock_state = rbd_try_acquire_lock(rbd_dev, &ret); 3210 if (lock_state != RBD_LOCK_STATE_UNLOCKED || ret == -EBLACKLISTED) { 3211 if (lock_state == RBD_LOCK_STATE_LOCKED) 3212 wake_requests(rbd_dev, true); 3213 dout("%s rbd_dev %p lock_state %d ret %d - done\n", __func__, 3214 rbd_dev, lock_state, ret); 3215 return; 3216 } 3217 3218 ret = rbd_request_lock(rbd_dev); 3219 if (ret == -ETIMEDOUT) { 3220 goto again; /* treat this as a dead client */ 3221 } else if (ret == -EROFS) { 3222 rbd_warn(rbd_dev, "peer will not release lock"); 3223 /* 3224 * If this is rbd_add_acquire_lock(), we want to fail 3225 * immediately -- reuse BLACKLISTED flag. Otherwise we 3226 * want to block. 3227 */ 3228 if (!(rbd_dev->disk->flags & GENHD_FL_UP)) { 3229 set_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags); 3230 /* wake "rbd map --exclusive" process */ 3231 wake_requests(rbd_dev, false); 3232 } 3233 } else if (ret < 0) { 3234 rbd_warn(rbd_dev, "error requesting lock: %d", ret); 3235 mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 3236 RBD_RETRY_DELAY); 3237 } else { 3238 /* 3239 * lock owner acked, but resend if we don't see them 3240 * release the lock 3241 */ 3242 dout("%s rbd_dev %p requeueing lock_dwork\n", __func__, 3243 rbd_dev); 3244 mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 3245 msecs_to_jiffies(2 * RBD_NOTIFY_TIMEOUT * MSEC_PER_SEC)); 3246 } 3247 } 3248 3249 /* 3250 * lock_rwsem must be held for write 3251 */ 3252 static bool rbd_release_lock(struct rbd_device *rbd_dev) 3253 { 3254 dout("%s rbd_dev %p read lock_state %d\n", __func__, rbd_dev, 3255 rbd_dev->lock_state); 3256 if (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED) 3257 return false; 3258 3259 rbd_dev->lock_state = RBD_LOCK_STATE_RELEASING; 3260 downgrade_write(&rbd_dev->lock_rwsem); 3261 /* 3262 * Ensure that all in-flight IO is flushed. 3263 * 3264 * FIXME: ceph_osdc_sync() flushes the entire OSD client, which 3265 * may be shared with other devices. 3266 */ 3267 ceph_osdc_sync(&rbd_dev->rbd_client->client->osdc); 3268 up_read(&rbd_dev->lock_rwsem); 3269 3270 down_write(&rbd_dev->lock_rwsem); 3271 dout("%s rbd_dev %p write lock_state %d\n", __func__, rbd_dev, 3272 rbd_dev->lock_state); 3273 if (rbd_dev->lock_state != RBD_LOCK_STATE_RELEASING) 3274 return false; 3275 3276 rbd_unlock(rbd_dev); 3277 /* 3278 * Give others a chance to grab the lock - we would re-acquire 3279 * almost immediately if we got new IO during ceph_osdc_sync() 3280 * otherwise. We need to ack our own notifications, so this 3281 * lock_dwork will be requeued from rbd_wait_state_locked() 3282 * after wake_requests() in rbd_handle_released_lock(). 3283 */ 3284 cancel_delayed_work(&rbd_dev->lock_dwork); 3285 return true; 3286 } 3287 3288 static void rbd_release_lock_work(struct work_struct *work) 3289 { 3290 struct rbd_device *rbd_dev = container_of(work, struct rbd_device, 3291 unlock_work); 3292 3293 down_write(&rbd_dev->lock_rwsem); 3294 rbd_release_lock(rbd_dev); 3295 up_write(&rbd_dev->lock_rwsem); 3296 } 3297 3298 static void rbd_handle_acquired_lock(struct rbd_device *rbd_dev, u8 struct_v, 3299 void **p) 3300 { 3301 struct rbd_client_id cid = { 0 }; 3302 3303 if (struct_v >= 2) { 3304 cid.gid = ceph_decode_64(p); 3305 cid.handle = ceph_decode_64(p); 3306 } 3307 3308 dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid, 3309 cid.handle); 3310 if (!rbd_cid_equal(&cid, &rbd_empty_cid)) { 3311 down_write(&rbd_dev->lock_rwsem); 3312 if (rbd_cid_equal(&cid, &rbd_dev->owner_cid)) { 3313 /* 3314 * we already know that the remote client is 3315 * the owner 3316 */ 3317 up_write(&rbd_dev->lock_rwsem); 3318 return; 3319 } 3320 3321 rbd_set_owner_cid(rbd_dev, &cid); 3322 downgrade_write(&rbd_dev->lock_rwsem); 3323 } else { 3324 down_read(&rbd_dev->lock_rwsem); 3325 } 3326 3327 if (!__rbd_is_lock_owner(rbd_dev)) 3328 wake_requests(rbd_dev, false); 3329 up_read(&rbd_dev->lock_rwsem); 3330 } 3331 3332 static void rbd_handle_released_lock(struct rbd_device *rbd_dev, u8 struct_v, 3333 void **p) 3334 { 3335 struct rbd_client_id cid = { 0 }; 3336 3337 if (struct_v >= 2) { 3338 cid.gid = ceph_decode_64(p); 3339 cid.handle = ceph_decode_64(p); 3340 } 3341 3342 dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid, 3343 cid.handle); 3344 if (!rbd_cid_equal(&cid, &rbd_empty_cid)) { 3345 down_write(&rbd_dev->lock_rwsem); 3346 if (!rbd_cid_equal(&cid, &rbd_dev->owner_cid)) { 3347 dout("%s rbd_dev %p unexpected owner, cid %llu-%llu != owner_cid %llu-%llu\n", 3348 __func__, rbd_dev, cid.gid, cid.handle, 3349 rbd_dev->owner_cid.gid, rbd_dev->owner_cid.handle); 3350 up_write(&rbd_dev->lock_rwsem); 3351 return; 3352 } 3353 3354 rbd_set_owner_cid(rbd_dev, &rbd_empty_cid); 3355 downgrade_write(&rbd_dev->lock_rwsem); 3356 } else { 3357 down_read(&rbd_dev->lock_rwsem); 3358 } 3359 3360 if (!__rbd_is_lock_owner(rbd_dev)) 3361 wake_requests(rbd_dev, false); 3362 up_read(&rbd_dev->lock_rwsem); 3363 } 3364 3365 /* 3366 * Returns result for ResponseMessage to be encoded (<= 0), or 1 if no 3367 * ResponseMessage is needed. 3368 */ 3369 static int rbd_handle_request_lock(struct rbd_device *rbd_dev, u8 struct_v, 3370 void **p) 3371 { 3372 struct rbd_client_id my_cid = rbd_get_cid(rbd_dev); 3373 struct rbd_client_id cid = { 0 }; 3374 int result = 1; 3375 3376 if (struct_v >= 2) { 3377 cid.gid = ceph_decode_64(p); 3378 cid.handle = ceph_decode_64(p); 3379 } 3380 3381 dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid, 3382 cid.handle); 3383 if (rbd_cid_equal(&cid, &my_cid)) 3384 return result; 3385 3386 down_read(&rbd_dev->lock_rwsem); 3387 if (__rbd_is_lock_owner(rbd_dev)) { 3388 if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED && 3389 rbd_cid_equal(&rbd_dev->owner_cid, &rbd_empty_cid)) 3390 goto out_unlock; 3391 3392 /* 3393 * encode ResponseMessage(0) so the peer can detect 3394 * a missing owner 3395 */ 3396 result = 0; 3397 3398 if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED) { 3399 if (!rbd_dev->opts->exclusive) { 3400 dout("%s rbd_dev %p queueing unlock_work\n", 3401 __func__, rbd_dev); 3402 queue_work(rbd_dev->task_wq, 3403 &rbd_dev->unlock_work); 3404 } else { 3405 /* refuse to release the lock */ 3406 result = -EROFS; 3407 } 3408 } 3409 } 3410 3411 out_unlock: 3412 up_read(&rbd_dev->lock_rwsem); 3413 return result; 3414 } 3415 3416 static void __rbd_acknowledge_notify(struct rbd_device *rbd_dev, 3417 u64 notify_id, u64 cookie, s32 *result) 3418 { 3419 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; 3420 char buf[4 + CEPH_ENCODING_START_BLK_LEN]; 3421 int buf_size = sizeof(buf); 3422 int ret; 3423 3424 if (result) { 3425 void *p = buf; 3426 3427 /* encode ResponseMessage */ 3428 ceph_start_encoding(&p, 1, 1, 3429 buf_size - CEPH_ENCODING_START_BLK_LEN); 3430 ceph_encode_32(&p, *result); 3431 } else { 3432 buf_size = 0; 3433 } 3434 3435 ret = ceph_osdc_notify_ack(osdc, &rbd_dev->header_oid, 3436 &rbd_dev->header_oloc, notify_id, cookie, 3437 buf, buf_size); 3438 if (ret) 3439 rbd_warn(rbd_dev, "acknowledge_notify failed: %d", ret); 3440 } 3441 3442 static void rbd_acknowledge_notify(struct rbd_device *rbd_dev, u64 notify_id, 3443 u64 cookie) 3444 { 3445 dout("%s rbd_dev %p\n", __func__, rbd_dev); 3446 __rbd_acknowledge_notify(rbd_dev, notify_id, cookie, NULL); 3447 } 3448 3449 static void rbd_acknowledge_notify_result(struct rbd_device *rbd_dev, 3450 u64 notify_id, u64 cookie, s32 result) 3451 { 3452 dout("%s rbd_dev %p result %d\n", __func__, rbd_dev, result); 3453 __rbd_acknowledge_notify(rbd_dev, notify_id, cookie, &result); 3454 } 3455 3456 static void rbd_watch_cb(void *arg, u64 notify_id, u64 cookie, 3457 u64 notifier_id, void *data, size_t data_len) 3458 { 3459 struct rbd_device *rbd_dev = arg; 3460 void *p = data; 3461 void *const end = p + data_len; 3462 u8 struct_v = 0; 3463 u32 len; 3464 u32 notify_op; 3465 int ret; 3466 3467 dout("%s rbd_dev %p cookie %llu notify_id %llu data_len %zu\n", 3468 __func__, rbd_dev, cookie, notify_id, data_len); 3469 if (data_len) { 3470 ret = ceph_start_decoding(&p, end, 1, "NotifyMessage", 3471 &struct_v, &len); 3472 if (ret) { 3473 rbd_warn(rbd_dev, "failed to decode NotifyMessage: %d", 3474 ret); 3475 return; 3476 } 3477 3478 notify_op = ceph_decode_32(&p); 3479 } else { 3480 /* legacy notification for header updates */ 3481 notify_op = RBD_NOTIFY_OP_HEADER_UPDATE; 3482 len = 0; 3483 } 3484 3485 dout("%s rbd_dev %p notify_op %u\n", __func__, rbd_dev, notify_op); 3486 switch (notify_op) { 3487 case RBD_NOTIFY_OP_ACQUIRED_LOCK: 3488 rbd_handle_acquired_lock(rbd_dev, struct_v, &p); 3489 rbd_acknowledge_notify(rbd_dev, notify_id, cookie); 3490 break; 3491 case RBD_NOTIFY_OP_RELEASED_LOCK: 3492 rbd_handle_released_lock(rbd_dev, struct_v, &p); 3493 rbd_acknowledge_notify(rbd_dev, notify_id, cookie); 3494 break; 3495 case RBD_NOTIFY_OP_REQUEST_LOCK: 3496 ret = rbd_handle_request_lock(rbd_dev, struct_v, &p); 3497 if (ret <= 0) 3498 rbd_acknowledge_notify_result(rbd_dev, notify_id, 3499 cookie, ret); 3500 else 3501 rbd_acknowledge_notify(rbd_dev, notify_id, cookie); 3502 break; 3503 case RBD_NOTIFY_OP_HEADER_UPDATE: 3504 ret = rbd_dev_refresh(rbd_dev); 3505 if (ret) 3506 rbd_warn(rbd_dev, "refresh failed: %d", ret); 3507 3508 rbd_acknowledge_notify(rbd_dev, notify_id, cookie); 3509 break; 3510 default: 3511 if (rbd_is_lock_owner(rbd_dev)) 3512 rbd_acknowledge_notify_result(rbd_dev, notify_id, 3513 cookie, -EOPNOTSUPP); 3514 else 3515 rbd_acknowledge_notify(rbd_dev, notify_id, cookie); 3516 break; 3517 } 3518 } 3519 3520 static void __rbd_unregister_watch(struct rbd_device *rbd_dev); 3521 3522 static void rbd_watch_errcb(void *arg, u64 cookie, int err) 3523 { 3524 struct rbd_device *rbd_dev = arg; 3525 3526 rbd_warn(rbd_dev, "encountered watch error: %d", err); 3527 3528 down_write(&rbd_dev->lock_rwsem); 3529 rbd_set_owner_cid(rbd_dev, &rbd_empty_cid); 3530 up_write(&rbd_dev->lock_rwsem); 3531 3532 mutex_lock(&rbd_dev->watch_mutex); 3533 if (rbd_dev->watch_state == RBD_WATCH_STATE_REGISTERED) { 3534 __rbd_unregister_watch(rbd_dev); 3535 rbd_dev->watch_state = RBD_WATCH_STATE_ERROR; 3536 3537 queue_delayed_work(rbd_dev->task_wq, &rbd_dev->watch_dwork, 0); 3538 } 3539 mutex_unlock(&rbd_dev->watch_mutex); 3540 } 3541 3542 /* 3543 * watch_mutex must be locked 3544 */ 3545 static int __rbd_register_watch(struct rbd_device *rbd_dev) 3546 { 3547 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; 3548 struct ceph_osd_linger_request *handle; 3549 3550 rbd_assert(!rbd_dev->watch_handle); 3551 dout("%s rbd_dev %p\n", __func__, rbd_dev); 3552 3553 handle = ceph_osdc_watch(osdc, &rbd_dev->header_oid, 3554 &rbd_dev->header_oloc, rbd_watch_cb, 3555 rbd_watch_errcb, rbd_dev); 3556 if (IS_ERR(handle)) 3557 return PTR_ERR(handle); 3558 3559 rbd_dev->watch_handle = handle; 3560 return 0; 3561 } 3562 3563 /* 3564 * watch_mutex must be locked 3565 */ 3566 static void __rbd_unregister_watch(struct rbd_device *rbd_dev) 3567 { 3568 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; 3569 int ret; 3570 3571 rbd_assert(rbd_dev->watch_handle); 3572 dout("%s rbd_dev %p\n", __func__, rbd_dev); 3573 3574 ret = ceph_osdc_unwatch(osdc, rbd_dev->watch_handle); 3575 if (ret) 3576 rbd_warn(rbd_dev, "failed to unwatch: %d", ret); 3577 3578 rbd_dev->watch_handle = NULL; 3579 } 3580 3581 static int rbd_register_watch(struct rbd_device *rbd_dev) 3582 { 3583 int ret; 3584 3585 mutex_lock(&rbd_dev->watch_mutex); 3586 rbd_assert(rbd_dev->watch_state == RBD_WATCH_STATE_UNREGISTERED); 3587 ret = __rbd_register_watch(rbd_dev); 3588 if (ret) 3589 goto out; 3590 3591 rbd_dev->watch_state = RBD_WATCH_STATE_REGISTERED; 3592 rbd_dev->watch_cookie = rbd_dev->watch_handle->linger_id; 3593 3594 out: 3595 mutex_unlock(&rbd_dev->watch_mutex); 3596 return ret; 3597 } 3598 3599 static void cancel_tasks_sync(struct rbd_device *rbd_dev) 3600 { 3601 dout("%s rbd_dev %p\n", __func__, rbd_dev); 3602 3603 cancel_work_sync(&rbd_dev->acquired_lock_work); 3604 cancel_work_sync(&rbd_dev->released_lock_work); 3605 cancel_delayed_work_sync(&rbd_dev->lock_dwork); 3606 cancel_work_sync(&rbd_dev->unlock_work); 3607 } 3608 3609 static void rbd_unregister_watch(struct rbd_device *rbd_dev) 3610 { 3611 WARN_ON(waitqueue_active(&rbd_dev->lock_waitq)); 3612 cancel_tasks_sync(rbd_dev); 3613 3614 mutex_lock(&rbd_dev->watch_mutex); 3615 if (rbd_dev->watch_state == RBD_WATCH_STATE_REGISTERED) 3616 __rbd_unregister_watch(rbd_dev); 3617 rbd_dev->watch_state = RBD_WATCH_STATE_UNREGISTERED; 3618 mutex_unlock(&rbd_dev->watch_mutex); 3619 3620 cancel_delayed_work_sync(&rbd_dev->watch_dwork); 3621 ceph_osdc_flush_notifies(&rbd_dev->rbd_client->client->osdc); 3622 } 3623 3624 /* 3625 * lock_rwsem must be held for write 3626 */ 3627 static void rbd_reacquire_lock(struct rbd_device *rbd_dev) 3628 { 3629 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; 3630 char cookie[32]; 3631 int ret; 3632 3633 WARN_ON(rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED); 3634 3635 format_lock_cookie(rbd_dev, cookie); 3636 ret = ceph_cls_set_cookie(osdc, &rbd_dev->header_oid, 3637 &rbd_dev->header_oloc, RBD_LOCK_NAME, 3638 CEPH_CLS_LOCK_EXCLUSIVE, rbd_dev->lock_cookie, 3639 RBD_LOCK_TAG, cookie); 3640 if (ret) { 3641 if (ret != -EOPNOTSUPP) 3642 rbd_warn(rbd_dev, "failed to update lock cookie: %d", 3643 ret); 3644 3645 /* 3646 * Lock cookie cannot be updated on older OSDs, so do 3647 * a manual release and queue an acquire. 3648 */ 3649 if (rbd_release_lock(rbd_dev)) 3650 queue_delayed_work(rbd_dev->task_wq, 3651 &rbd_dev->lock_dwork, 0); 3652 } else { 3653 __rbd_lock(rbd_dev, cookie); 3654 } 3655 } 3656 3657 static void rbd_reregister_watch(struct work_struct *work) 3658 { 3659 struct rbd_device *rbd_dev = container_of(to_delayed_work(work), 3660 struct rbd_device, watch_dwork); 3661 int ret; 3662 3663 dout("%s rbd_dev %p\n", __func__, rbd_dev); 3664 3665 mutex_lock(&rbd_dev->watch_mutex); 3666 if (rbd_dev->watch_state != RBD_WATCH_STATE_ERROR) { 3667 mutex_unlock(&rbd_dev->watch_mutex); 3668 return; 3669 } 3670 3671 ret = __rbd_register_watch(rbd_dev); 3672 if (ret) { 3673 rbd_warn(rbd_dev, "failed to reregister watch: %d", ret); 3674 if (ret == -EBLACKLISTED || ret == -ENOENT) { 3675 set_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags); 3676 wake_requests(rbd_dev, true); 3677 } else { 3678 queue_delayed_work(rbd_dev->task_wq, 3679 &rbd_dev->watch_dwork, 3680 RBD_RETRY_DELAY); 3681 } 3682 mutex_unlock(&rbd_dev->watch_mutex); 3683 return; 3684 } 3685 3686 rbd_dev->watch_state = RBD_WATCH_STATE_REGISTERED; 3687 rbd_dev->watch_cookie = rbd_dev->watch_handle->linger_id; 3688 mutex_unlock(&rbd_dev->watch_mutex); 3689 3690 down_write(&rbd_dev->lock_rwsem); 3691 if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED) 3692 rbd_reacquire_lock(rbd_dev); 3693 up_write(&rbd_dev->lock_rwsem); 3694 3695 ret = rbd_dev_refresh(rbd_dev); 3696 if (ret) 3697 rbd_warn(rbd_dev, "reregistration refresh failed: %d", ret); 3698 } 3699 3700 /* 3701 * Synchronous osd object method call. Returns the number of bytes 3702 * returned in the outbound buffer, or a negative error code. 3703 */ 3704 static int rbd_obj_method_sync(struct rbd_device *rbd_dev, 3705 struct ceph_object_id *oid, 3706 struct ceph_object_locator *oloc, 3707 const char *method_name, 3708 const void *outbound, 3709 size_t outbound_size, 3710 void *inbound, 3711 size_t inbound_size) 3712 { 3713 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; 3714 struct page *req_page = NULL; 3715 struct page *reply_page; 3716 int ret; 3717 3718 /* 3719 * Method calls are ultimately read operations. The result 3720 * should placed into the inbound buffer provided. They 3721 * also supply outbound data--parameters for the object 3722 * method. Currently if this is present it will be a 3723 * snapshot id. 3724 */ 3725 if (outbound) { 3726 if (outbound_size > PAGE_SIZE) 3727 return -E2BIG; 3728 3729 req_page = alloc_page(GFP_KERNEL); 3730 if (!req_page) 3731 return -ENOMEM; 3732 3733 memcpy(page_address(req_page), outbound, outbound_size); 3734 } 3735 3736 reply_page = alloc_page(GFP_KERNEL); 3737 if (!reply_page) { 3738 if (req_page) 3739 __free_page(req_page); 3740 return -ENOMEM; 3741 } 3742 3743 ret = ceph_osdc_call(osdc, oid, oloc, RBD_DRV_NAME, method_name, 3744 CEPH_OSD_FLAG_READ, req_page, outbound_size, 3745 reply_page, &inbound_size); 3746 if (!ret) { 3747 memcpy(inbound, page_address(reply_page), inbound_size); 3748 ret = inbound_size; 3749 } 3750 3751 if (req_page) 3752 __free_page(req_page); 3753 __free_page(reply_page); 3754 return ret; 3755 } 3756 3757 /* 3758 * lock_rwsem must be held for read 3759 */ 3760 static int rbd_wait_state_locked(struct rbd_device *rbd_dev, bool may_acquire) 3761 { 3762 DEFINE_WAIT(wait); 3763 unsigned long timeout; 3764 int ret = 0; 3765 3766 if (test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags)) 3767 return -EBLACKLISTED; 3768 3769 if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED) 3770 return 0; 3771 3772 if (!may_acquire) { 3773 rbd_warn(rbd_dev, "exclusive lock required"); 3774 return -EROFS; 3775 } 3776 3777 do { 3778 /* 3779 * Note the use of mod_delayed_work() in rbd_acquire_lock() 3780 * and cancel_delayed_work() in wake_requests(). 3781 */ 3782 dout("%s rbd_dev %p queueing lock_dwork\n", __func__, rbd_dev); 3783 queue_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 0); 3784 prepare_to_wait_exclusive(&rbd_dev->lock_waitq, &wait, 3785 TASK_UNINTERRUPTIBLE); 3786 up_read(&rbd_dev->lock_rwsem); 3787 timeout = schedule_timeout(ceph_timeout_jiffies( 3788 rbd_dev->opts->lock_timeout)); 3789 down_read(&rbd_dev->lock_rwsem); 3790 if (test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags)) { 3791 ret = -EBLACKLISTED; 3792 break; 3793 } 3794 if (!timeout) { 3795 rbd_warn(rbd_dev, "timed out waiting for lock"); 3796 ret = -ETIMEDOUT; 3797 break; 3798 } 3799 } while (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED); 3800 3801 finish_wait(&rbd_dev->lock_waitq, &wait); 3802 return ret; 3803 } 3804 3805 static void rbd_queue_workfn(struct work_struct *work) 3806 { 3807 struct request *rq = blk_mq_rq_from_pdu(work); 3808 struct rbd_device *rbd_dev = rq->q->queuedata; 3809 struct rbd_img_request *img_request; 3810 struct ceph_snap_context *snapc = NULL; 3811 u64 offset = (u64)blk_rq_pos(rq) << SECTOR_SHIFT; 3812 u64 length = blk_rq_bytes(rq); 3813 enum obj_operation_type op_type; 3814 u64 mapping_size; 3815 bool must_be_locked; 3816 int result; 3817 3818 switch (req_op(rq)) { 3819 case REQ_OP_DISCARD: 3820 op_type = OBJ_OP_DISCARD; 3821 break; 3822 case REQ_OP_WRITE_ZEROES: 3823 op_type = OBJ_OP_ZEROOUT; 3824 break; 3825 case REQ_OP_WRITE: 3826 op_type = OBJ_OP_WRITE; 3827 break; 3828 case REQ_OP_READ: 3829 op_type = OBJ_OP_READ; 3830 break; 3831 default: 3832 dout("%s: non-fs request type %d\n", __func__, req_op(rq)); 3833 result = -EIO; 3834 goto err; 3835 } 3836 3837 /* Ignore/skip any zero-length requests */ 3838 3839 if (!length) { 3840 dout("%s: zero-length request\n", __func__); 3841 result = 0; 3842 goto err_rq; 3843 } 3844 3845 rbd_assert(op_type == OBJ_OP_READ || 3846 rbd_dev->spec->snap_id == CEPH_NOSNAP); 3847 3848 /* 3849 * Quit early if the mapped snapshot no longer exists. It's 3850 * still possible the snapshot will have disappeared by the 3851 * time our request arrives at the osd, but there's no sense in 3852 * sending it if we already know. 3853 */ 3854 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) { 3855 dout("request for non-existent snapshot"); 3856 rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP); 3857 result = -ENXIO; 3858 goto err_rq; 3859 } 3860 3861 if (offset && length > U64_MAX - offset + 1) { 3862 rbd_warn(rbd_dev, "bad request range (%llu~%llu)", offset, 3863 length); 3864 result = -EINVAL; 3865 goto err_rq; /* Shouldn't happen */ 3866 } 3867 3868 blk_mq_start_request(rq); 3869 3870 down_read(&rbd_dev->header_rwsem); 3871 mapping_size = rbd_dev->mapping.size; 3872 if (op_type != OBJ_OP_READ) { 3873 snapc = rbd_dev->header.snapc; 3874 ceph_get_snap_context(snapc); 3875 } 3876 up_read(&rbd_dev->header_rwsem); 3877 3878 if (offset + length > mapping_size) { 3879 rbd_warn(rbd_dev, "beyond EOD (%llu~%llu > %llu)", offset, 3880 length, mapping_size); 3881 result = -EIO; 3882 goto err_rq; 3883 } 3884 3885 must_be_locked = 3886 (rbd_dev->header.features & RBD_FEATURE_EXCLUSIVE_LOCK) && 3887 (op_type != OBJ_OP_READ || rbd_dev->opts->lock_on_read); 3888 if (must_be_locked) { 3889 down_read(&rbd_dev->lock_rwsem); 3890 result = rbd_wait_state_locked(rbd_dev, 3891 !rbd_dev->opts->exclusive); 3892 if (result) 3893 goto err_unlock; 3894 } 3895 3896 img_request = rbd_img_request_create(rbd_dev, op_type, snapc); 3897 if (!img_request) { 3898 result = -ENOMEM; 3899 goto err_unlock; 3900 } 3901 img_request->rq = rq; 3902 snapc = NULL; /* img_request consumes a ref */ 3903 3904 if (op_type == OBJ_OP_DISCARD || op_type == OBJ_OP_ZEROOUT) 3905 result = rbd_img_fill_nodata(img_request, offset, length); 3906 else 3907 result = rbd_img_fill_from_bio(img_request, offset, length, 3908 rq->bio); 3909 if (result || !img_request->pending_count) 3910 goto err_img_request; 3911 3912 rbd_img_request_submit(img_request); 3913 if (must_be_locked) 3914 up_read(&rbd_dev->lock_rwsem); 3915 return; 3916 3917 err_img_request: 3918 rbd_img_request_put(img_request); 3919 err_unlock: 3920 if (must_be_locked) 3921 up_read(&rbd_dev->lock_rwsem); 3922 err_rq: 3923 if (result) 3924 rbd_warn(rbd_dev, "%s %llx at %llx result %d", 3925 obj_op_name(op_type), length, offset, result); 3926 ceph_put_snap_context(snapc); 3927 err: 3928 blk_mq_end_request(rq, errno_to_blk_status(result)); 3929 } 3930 3931 static blk_status_t rbd_queue_rq(struct blk_mq_hw_ctx *hctx, 3932 const struct blk_mq_queue_data *bd) 3933 { 3934 struct request *rq = bd->rq; 3935 struct work_struct *work = blk_mq_rq_to_pdu(rq); 3936 3937 queue_work(rbd_wq, work); 3938 return BLK_STS_OK; 3939 } 3940 3941 static void rbd_free_disk(struct rbd_device *rbd_dev) 3942 { 3943 blk_cleanup_queue(rbd_dev->disk->queue); 3944 blk_mq_free_tag_set(&rbd_dev->tag_set); 3945 put_disk(rbd_dev->disk); 3946 rbd_dev->disk = NULL; 3947 } 3948 3949 static int rbd_obj_read_sync(struct rbd_device *rbd_dev, 3950 struct ceph_object_id *oid, 3951 struct ceph_object_locator *oloc, 3952 void *buf, int buf_len) 3953 3954 { 3955 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; 3956 struct ceph_osd_request *req; 3957 struct page **pages; 3958 int num_pages = calc_pages_for(0, buf_len); 3959 int ret; 3960 3961 req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_KERNEL); 3962 if (!req) 3963 return -ENOMEM; 3964 3965 ceph_oid_copy(&req->r_base_oid, oid); 3966 ceph_oloc_copy(&req->r_base_oloc, oloc); 3967 req->r_flags = CEPH_OSD_FLAG_READ; 3968 3969 pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL); 3970 if (IS_ERR(pages)) { 3971 ret = PTR_ERR(pages); 3972 goto out_req; 3973 } 3974 3975 osd_req_op_extent_init(req, 0, CEPH_OSD_OP_READ, 0, buf_len, 0, 0); 3976 osd_req_op_extent_osd_data_pages(req, 0, pages, buf_len, 0, false, 3977 true); 3978 3979 ret = ceph_osdc_alloc_messages(req, GFP_KERNEL); 3980 if (ret) 3981 goto out_req; 3982 3983 ceph_osdc_start_request(osdc, req, false); 3984 ret = ceph_osdc_wait_request(osdc, req); 3985 if (ret >= 0) 3986 ceph_copy_from_page_vector(pages, buf, 0, ret); 3987 3988 out_req: 3989 ceph_osdc_put_request(req); 3990 return ret; 3991 } 3992 3993 /* 3994 * Read the complete header for the given rbd device. On successful 3995 * return, the rbd_dev->header field will contain up-to-date 3996 * information about the image. 3997 */ 3998 static int rbd_dev_v1_header_info(struct rbd_device *rbd_dev) 3999 { 4000 struct rbd_image_header_ondisk *ondisk = NULL; 4001 u32 snap_count = 0; 4002 u64 names_size = 0; 4003 u32 want_count; 4004 int ret; 4005 4006 /* 4007 * The complete header will include an array of its 64-bit 4008 * snapshot ids, followed by the names of those snapshots as 4009 * a contiguous block of NUL-terminated strings. Note that 4010 * the number of snapshots could change by the time we read 4011 * it in, in which case we re-read it. 4012 */ 4013 do { 4014 size_t size; 4015 4016 kfree(ondisk); 4017 4018 size = sizeof (*ondisk); 4019 size += snap_count * sizeof (struct rbd_image_snap_ondisk); 4020 size += names_size; 4021 ondisk = kmalloc(size, GFP_KERNEL); 4022 if (!ondisk) 4023 return -ENOMEM; 4024 4025 ret = rbd_obj_read_sync(rbd_dev, &rbd_dev->header_oid, 4026 &rbd_dev->header_oloc, ondisk, size); 4027 if (ret < 0) 4028 goto out; 4029 if ((size_t)ret < size) { 4030 ret = -ENXIO; 4031 rbd_warn(rbd_dev, "short header read (want %zd got %d)", 4032 size, ret); 4033 goto out; 4034 } 4035 if (!rbd_dev_ondisk_valid(ondisk)) { 4036 ret = -ENXIO; 4037 rbd_warn(rbd_dev, "invalid header"); 4038 goto out; 4039 } 4040 4041 names_size = le64_to_cpu(ondisk->snap_names_len); 4042 want_count = snap_count; 4043 snap_count = le32_to_cpu(ondisk->snap_count); 4044 } while (snap_count != want_count); 4045 4046 ret = rbd_header_from_disk(rbd_dev, ondisk); 4047 out: 4048 kfree(ondisk); 4049 4050 return ret; 4051 } 4052 4053 /* 4054 * Clear the rbd device's EXISTS flag if the snapshot it's mapped to 4055 * has disappeared from the (just updated) snapshot context. 4056 */ 4057 static void rbd_exists_validate(struct rbd_device *rbd_dev) 4058 { 4059 u64 snap_id; 4060 4061 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) 4062 return; 4063 4064 snap_id = rbd_dev->spec->snap_id; 4065 if (snap_id == CEPH_NOSNAP) 4066 return; 4067 4068 if (rbd_dev_snap_index(rbd_dev, snap_id) == BAD_SNAP_INDEX) 4069 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags); 4070 } 4071 4072 static void rbd_dev_update_size(struct rbd_device *rbd_dev) 4073 { 4074 sector_t size; 4075 4076 /* 4077 * If EXISTS is not set, rbd_dev->disk may be NULL, so don't 4078 * try to update its size. If REMOVING is set, updating size 4079 * is just useless work since the device can't be opened. 4080 */ 4081 if (test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags) && 4082 !test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags)) { 4083 size = (sector_t)rbd_dev->mapping.size / SECTOR_SIZE; 4084 dout("setting size to %llu sectors", (unsigned long long)size); 4085 set_capacity(rbd_dev->disk, size); 4086 revalidate_disk(rbd_dev->disk); 4087 } 4088 } 4089 4090 static int rbd_dev_refresh(struct rbd_device *rbd_dev) 4091 { 4092 u64 mapping_size; 4093 int ret; 4094 4095 down_write(&rbd_dev->header_rwsem); 4096 mapping_size = rbd_dev->mapping.size; 4097 4098 ret = rbd_dev_header_info(rbd_dev); 4099 if (ret) 4100 goto out; 4101 4102 /* 4103 * If there is a parent, see if it has disappeared due to the 4104 * mapped image getting flattened. 4105 */ 4106 if (rbd_dev->parent) { 4107 ret = rbd_dev_v2_parent_info(rbd_dev); 4108 if (ret) 4109 goto out; 4110 } 4111 4112 if (rbd_dev->spec->snap_id == CEPH_NOSNAP) { 4113 rbd_dev->mapping.size = rbd_dev->header.image_size; 4114 } else { 4115 /* validate mapped snapshot's EXISTS flag */ 4116 rbd_exists_validate(rbd_dev); 4117 } 4118 4119 out: 4120 up_write(&rbd_dev->header_rwsem); 4121 if (!ret && mapping_size != rbd_dev->mapping.size) 4122 rbd_dev_update_size(rbd_dev); 4123 4124 return ret; 4125 } 4126 4127 static int rbd_init_request(struct blk_mq_tag_set *set, struct request *rq, 4128 unsigned int hctx_idx, unsigned int numa_node) 4129 { 4130 struct work_struct *work = blk_mq_rq_to_pdu(rq); 4131 4132 INIT_WORK(work, rbd_queue_workfn); 4133 return 0; 4134 } 4135 4136 static const struct blk_mq_ops rbd_mq_ops = { 4137 .queue_rq = rbd_queue_rq, 4138 .init_request = rbd_init_request, 4139 }; 4140 4141 static int rbd_init_disk(struct rbd_device *rbd_dev) 4142 { 4143 struct gendisk *disk; 4144 struct request_queue *q; 4145 unsigned int objset_bytes = 4146 rbd_dev->layout.object_size * rbd_dev->layout.stripe_count; 4147 int err; 4148 4149 /* create gendisk info */ 4150 disk = alloc_disk(single_major ? 4151 (1 << RBD_SINGLE_MAJOR_PART_SHIFT) : 4152 RBD_MINORS_PER_MAJOR); 4153 if (!disk) 4154 return -ENOMEM; 4155 4156 snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d", 4157 rbd_dev->dev_id); 4158 disk->major = rbd_dev->major; 4159 disk->first_minor = rbd_dev->minor; 4160 if (single_major) 4161 disk->flags |= GENHD_FL_EXT_DEVT; 4162 disk->fops = &rbd_bd_ops; 4163 disk->private_data = rbd_dev; 4164 4165 memset(&rbd_dev->tag_set, 0, sizeof(rbd_dev->tag_set)); 4166 rbd_dev->tag_set.ops = &rbd_mq_ops; 4167 rbd_dev->tag_set.queue_depth = rbd_dev->opts->queue_depth; 4168 rbd_dev->tag_set.numa_node = NUMA_NO_NODE; 4169 rbd_dev->tag_set.flags = BLK_MQ_F_SHOULD_MERGE; 4170 rbd_dev->tag_set.nr_hw_queues = 1; 4171 rbd_dev->tag_set.cmd_size = sizeof(struct work_struct); 4172 4173 err = blk_mq_alloc_tag_set(&rbd_dev->tag_set); 4174 if (err) 4175 goto out_disk; 4176 4177 q = blk_mq_init_queue(&rbd_dev->tag_set); 4178 if (IS_ERR(q)) { 4179 err = PTR_ERR(q); 4180 goto out_tag_set; 4181 } 4182 4183 blk_queue_flag_set(QUEUE_FLAG_NONROT, q); 4184 /* QUEUE_FLAG_ADD_RANDOM is off by default for blk-mq */ 4185 4186 blk_queue_max_hw_sectors(q, objset_bytes >> SECTOR_SHIFT); 4187 q->limits.max_sectors = queue_max_hw_sectors(q); 4188 blk_queue_max_segments(q, USHRT_MAX); 4189 blk_queue_max_segment_size(q, UINT_MAX); 4190 blk_queue_io_min(q, rbd_dev->opts->alloc_size); 4191 blk_queue_io_opt(q, rbd_dev->opts->alloc_size); 4192 4193 if (rbd_dev->opts->trim) { 4194 blk_queue_flag_set(QUEUE_FLAG_DISCARD, q); 4195 q->limits.discard_granularity = rbd_dev->opts->alloc_size; 4196 blk_queue_max_discard_sectors(q, objset_bytes >> SECTOR_SHIFT); 4197 blk_queue_max_write_zeroes_sectors(q, objset_bytes >> SECTOR_SHIFT); 4198 } 4199 4200 if (!ceph_test_opt(rbd_dev->rbd_client->client, NOCRC)) 4201 q->backing_dev_info->capabilities |= BDI_CAP_STABLE_WRITES; 4202 4203 /* 4204 * disk_release() expects a queue ref from add_disk() and will 4205 * put it. Hold an extra ref until add_disk() is called. 4206 */ 4207 WARN_ON(!blk_get_queue(q)); 4208 disk->queue = q; 4209 q->queuedata = rbd_dev; 4210 4211 rbd_dev->disk = disk; 4212 4213 return 0; 4214 out_tag_set: 4215 blk_mq_free_tag_set(&rbd_dev->tag_set); 4216 out_disk: 4217 put_disk(disk); 4218 return err; 4219 } 4220 4221 /* 4222 sysfs 4223 */ 4224 4225 static struct rbd_device *dev_to_rbd_dev(struct device *dev) 4226 { 4227 return container_of(dev, struct rbd_device, dev); 4228 } 4229 4230 static ssize_t rbd_size_show(struct device *dev, 4231 struct device_attribute *attr, char *buf) 4232 { 4233 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 4234 4235 return sprintf(buf, "%llu\n", 4236 (unsigned long long)rbd_dev->mapping.size); 4237 } 4238 4239 /* 4240 * Note this shows the features for whatever's mapped, which is not 4241 * necessarily the base image. 4242 */ 4243 static ssize_t rbd_features_show(struct device *dev, 4244 struct device_attribute *attr, char *buf) 4245 { 4246 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 4247 4248 return sprintf(buf, "0x%016llx\n", 4249 (unsigned long long)rbd_dev->mapping.features); 4250 } 4251 4252 static ssize_t rbd_major_show(struct device *dev, 4253 struct device_attribute *attr, char *buf) 4254 { 4255 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 4256 4257 if (rbd_dev->major) 4258 return sprintf(buf, "%d\n", rbd_dev->major); 4259 4260 return sprintf(buf, "(none)\n"); 4261 } 4262 4263 static ssize_t rbd_minor_show(struct device *dev, 4264 struct device_attribute *attr, char *buf) 4265 { 4266 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 4267 4268 return sprintf(buf, "%d\n", rbd_dev->minor); 4269 } 4270 4271 static ssize_t rbd_client_addr_show(struct device *dev, 4272 struct device_attribute *attr, char *buf) 4273 { 4274 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 4275 struct ceph_entity_addr *client_addr = 4276 ceph_client_addr(rbd_dev->rbd_client->client); 4277 4278 return sprintf(buf, "%pISpc/%u\n", &client_addr->in_addr, 4279 le32_to_cpu(client_addr->nonce)); 4280 } 4281 4282 static ssize_t rbd_client_id_show(struct device *dev, 4283 struct device_attribute *attr, char *buf) 4284 { 4285 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 4286 4287 return sprintf(buf, "client%lld\n", 4288 ceph_client_gid(rbd_dev->rbd_client->client)); 4289 } 4290 4291 static ssize_t rbd_cluster_fsid_show(struct device *dev, 4292 struct device_attribute *attr, char *buf) 4293 { 4294 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 4295 4296 return sprintf(buf, "%pU\n", &rbd_dev->rbd_client->client->fsid); 4297 } 4298 4299 static ssize_t rbd_config_info_show(struct device *dev, 4300 struct device_attribute *attr, char *buf) 4301 { 4302 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 4303 4304 return sprintf(buf, "%s\n", rbd_dev->config_info); 4305 } 4306 4307 static ssize_t rbd_pool_show(struct device *dev, 4308 struct device_attribute *attr, char *buf) 4309 { 4310 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 4311 4312 return sprintf(buf, "%s\n", rbd_dev->spec->pool_name); 4313 } 4314 4315 static ssize_t rbd_pool_id_show(struct device *dev, 4316 struct device_attribute *attr, char *buf) 4317 { 4318 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 4319 4320 return sprintf(buf, "%llu\n", 4321 (unsigned long long) rbd_dev->spec->pool_id); 4322 } 4323 4324 static ssize_t rbd_pool_ns_show(struct device *dev, 4325 struct device_attribute *attr, char *buf) 4326 { 4327 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 4328 4329 return sprintf(buf, "%s\n", rbd_dev->spec->pool_ns ?: ""); 4330 } 4331 4332 static ssize_t rbd_name_show(struct device *dev, 4333 struct device_attribute *attr, char *buf) 4334 { 4335 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 4336 4337 if (rbd_dev->spec->image_name) 4338 return sprintf(buf, "%s\n", rbd_dev->spec->image_name); 4339 4340 return sprintf(buf, "(unknown)\n"); 4341 } 4342 4343 static ssize_t rbd_image_id_show(struct device *dev, 4344 struct device_attribute *attr, char *buf) 4345 { 4346 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 4347 4348 return sprintf(buf, "%s\n", rbd_dev->spec->image_id); 4349 } 4350 4351 /* 4352 * Shows the name of the currently-mapped snapshot (or 4353 * RBD_SNAP_HEAD_NAME for the base image). 4354 */ 4355 static ssize_t rbd_snap_show(struct device *dev, 4356 struct device_attribute *attr, 4357 char *buf) 4358 { 4359 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 4360 4361 return sprintf(buf, "%s\n", rbd_dev->spec->snap_name); 4362 } 4363 4364 static ssize_t rbd_snap_id_show(struct device *dev, 4365 struct device_attribute *attr, char *buf) 4366 { 4367 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 4368 4369 return sprintf(buf, "%llu\n", rbd_dev->spec->snap_id); 4370 } 4371 4372 /* 4373 * For a v2 image, shows the chain of parent images, separated by empty 4374 * lines. For v1 images or if there is no parent, shows "(no parent 4375 * image)". 4376 */ 4377 static ssize_t rbd_parent_show(struct device *dev, 4378 struct device_attribute *attr, 4379 char *buf) 4380 { 4381 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 4382 ssize_t count = 0; 4383 4384 if (!rbd_dev->parent) 4385 return sprintf(buf, "(no parent image)\n"); 4386 4387 for ( ; rbd_dev->parent; rbd_dev = rbd_dev->parent) { 4388 struct rbd_spec *spec = rbd_dev->parent_spec; 4389 4390 count += sprintf(&buf[count], "%s" 4391 "pool_id %llu\npool_name %s\n" 4392 "pool_ns %s\n" 4393 "image_id %s\nimage_name %s\n" 4394 "snap_id %llu\nsnap_name %s\n" 4395 "overlap %llu\n", 4396 !count ? "" : "\n", /* first? */ 4397 spec->pool_id, spec->pool_name, 4398 spec->pool_ns ?: "", 4399 spec->image_id, spec->image_name ?: "(unknown)", 4400 spec->snap_id, spec->snap_name, 4401 rbd_dev->parent_overlap); 4402 } 4403 4404 return count; 4405 } 4406 4407 static ssize_t rbd_image_refresh(struct device *dev, 4408 struct device_attribute *attr, 4409 const char *buf, 4410 size_t size) 4411 { 4412 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 4413 int ret; 4414 4415 ret = rbd_dev_refresh(rbd_dev); 4416 if (ret) 4417 return ret; 4418 4419 return size; 4420 } 4421 4422 static DEVICE_ATTR(size, 0444, rbd_size_show, NULL); 4423 static DEVICE_ATTR(features, 0444, rbd_features_show, NULL); 4424 static DEVICE_ATTR(major, 0444, rbd_major_show, NULL); 4425 static DEVICE_ATTR(minor, 0444, rbd_minor_show, NULL); 4426 static DEVICE_ATTR(client_addr, 0444, rbd_client_addr_show, NULL); 4427 static DEVICE_ATTR(client_id, 0444, rbd_client_id_show, NULL); 4428 static DEVICE_ATTR(cluster_fsid, 0444, rbd_cluster_fsid_show, NULL); 4429 static DEVICE_ATTR(config_info, 0400, rbd_config_info_show, NULL); 4430 static DEVICE_ATTR(pool, 0444, rbd_pool_show, NULL); 4431 static DEVICE_ATTR(pool_id, 0444, rbd_pool_id_show, NULL); 4432 static DEVICE_ATTR(pool_ns, 0444, rbd_pool_ns_show, NULL); 4433 static DEVICE_ATTR(name, 0444, rbd_name_show, NULL); 4434 static DEVICE_ATTR(image_id, 0444, rbd_image_id_show, NULL); 4435 static DEVICE_ATTR(refresh, 0200, NULL, rbd_image_refresh); 4436 static DEVICE_ATTR(current_snap, 0444, rbd_snap_show, NULL); 4437 static DEVICE_ATTR(snap_id, 0444, rbd_snap_id_show, NULL); 4438 static DEVICE_ATTR(parent, 0444, rbd_parent_show, NULL); 4439 4440 static struct attribute *rbd_attrs[] = { 4441 &dev_attr_size.attr, 4442 &dev_attr_features.attr, 4443 &dev_attr_major.attr, 4444 &dev_attr_minor.attr, 4445 &dev_attr_client_addr.attr, 4446 &dev_attr_client_id.attr, 4447 &dev_attr_cluster_fsid.attr, 4448 &dev_attr_config_info.attr, 4449 &dev_attr_pool.attr, 4450 &dev_attr_pool_id.attr, 4451 &dev_attr_pool_ns.attr, 4452 &dev_attr_name.attr, 4453 &dev_attr_image_id.attr, 4454 &dev_attr_current_snap.attr, 4455 &dev_attr_snap_id.attr, 4456 &dev_attr_parent.attr, 4457 &dev_attr_refresh.attr, 4458 NULL 4459 }; 4460 4461 static struct attribute_group rbd_attr_group = { 4462 .attrs = rbd_attrs, 4463 }; 4464 4465 static const struct attribute_group *rbd_attr_groups[] = { 4466 &rbd_attr_group, 4467 NULL 4468 }; 4469 4470 static void rbd_dev_release(struct device *dev); 4471 4472 static const struct device_type rbd_device_type = { 4473 .name = "rbd", 4474 .groups = rbd_attr_groups, 4475 .release = rbd_dev_release, 4476 }; 4477 4478 static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec) 4479 { 4480 kref_get(&spec->kref); 4481 4482 return spec; 4483 } 4484 4485 static void rbd_spec_free(struct kref *kref); 4486 static void rbd_spec_put(struct rbd_spec *spec) 4487 { 4488 if (spec) 4489 kref_put(&spec->kref, rbd_spec_free); 4490 } 4491 4492 static struct rbd_spec *rbd_spec_alloc(void) 4493 { 4494 struct rbd_spec *spec; 4495 4496 spec = kzalloc(sizeof (*spec), GFP_KERNEL); 4497 if (!spec) 4498 return NULL; 4499 4500 spec->pool_id = CEPH_NOPOOL; 4501 spec->snap_id = CEPH_NOSNAP; 4502 kref_init(&spec->kref); 4503 4504 return spec; 4505 } 4506 4507 static void rbd_spec_free(struct kref *kref) 4508 { 4509 struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref); 4510 4511 kfree(spec->pool_name); 4512 kfree(spec->pool_ns); 4513 kfree(spec->image_id); 4514 kfree(spec->image_name); 4515 kfree(spec->snap_name); 4516 kfree(spec); 4517 } 4518 4519 static void rbd_dev_free(struct rbd_device *rbd_dev) 4520 { 4521 WARN_ON(rbd_dev->watch_state != RBD_WATCH_STATE_UNREGISTERED); 4522 WARN_ON(rbd_dev->lock_state != RBD_LOCK_STATE_UNLOCKED); 4523 4524 ceph_oid_destroy(&rbd_dev->header_oid); 4525 ceph_oloc_destroy(&rbd_dev->header_oloc); 4526 kfree(rbd_dev->config_info); 4527 4528 rbd_put_client(rbd_dev->rbd_client); 4529 rbd_spec_put(rbd_dev->spec); 4530 kfree(rbd_dev->opts); 4531 kfree(rbd_dev); 4532 } 4533 4534 static void rbd_dev_release(struct device *dev) 4535 { 4536 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 4537 bool need_put = !!rbd_dev->opts; 4538 4539 if (need_put) { 4540 destroy_workqueue(rbd_dev->task_wq); 4541 ida_simple_remove(&rbd_dev_id_ida, rbd_dev->dev_id); 4542 } 4543 4544 rbd_dev_free(rbd_dev); 4545 4546 /* 4547 * This is racy, but way better than putting module outside of 4548 * the release callback. The race window is pretty small, so 4549 * doing something similar to dm (dm-builtin.c) is overkill. 4550 */ 4551 if (need_put) 4552 module_put(THIS_MODULE); 4553 } 4554 4555 static struct rbd_device *__rbd_dev_create(struct rbd_client *rbdc, 4556 struct rbd_spec *spec) 4557 { 4558 struct rbd_device *rbd_dev; 4559 4560 rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL); 4561 if (!rbd_dev) 4562 return NULL; 4563 4564 spin_lock_init(&rbd_dev->lock); 4565 INIT_LIST_HEAD(&rbd_dev->node); 4566 init_rwsem(&rbd_dev->header_rwsem); 4567 4568 rbd_dev->header.data_pool_id = CEPH_NOPOOL; 4569 ceph_oid_init(&rbd_dev->header_oid); 4570 rbd_dev->header_oloc.pool = spec->pool_id; 4571 if (spec->pool_ns) { 4572 WARN_ON(!*spec->pool_ns); 4573 rbd_dev->header_oloc.pool_ns = 4574 ceph_find_or_create_string(spec->pool_ns, 4575 strlen(spec->pool_ns)); 4576 } 4577 4578 mutex_init(&rbd_dev->watch_mutex); 4579 rbd_dev->watch_state = RBD_WATCH_STATE_UNREGISTERED; 4580 INIT_DELAYED_WORK(&rbd_dev->watch_dwork, rbd_reregister_watch); 4581 4582 init_rwsem(&rbd_dev->lock_rwsem); 4583 rbd_dev->lock_state = RBD_LOCK_STATE_UNLOCKED; 4584 INIT_WORK(&rbd_dev->acquired_lock_work, rbd_notify_acquired_lock); 4585 INIT_WORK(&rbd_dev->released_lock_work, rbd_notify_released_lock); 4586 INIT_DELAYED_WORK(&rbd_dev->lock_dwork, rbd_acquire_lock); 4587 INIT_WORK(&rbd_dev->unlock_work, rbd_release_lock_work); 4588 init_waitqueue_head(&rbd_dev->lock_waitq); 4589 4590 rbd_dev->dev.bus = &rbd_bus_type; 4591 rbd_dev->dev.type = &rbd_device_type; 4592 rbd_dev->dev.parent = &rbd_root_dev; 4593 device_initialize(&rbd_dev->dev); 4594 4595 rbd_dev->rbd_client = rbdc; 4596 rbd_dev->spec = spec; 4597 4598 return rbd_dev; 4599 } 4600 4601 /* 4602 * Create a mapping rbd_dev. 4603 */ 4604 static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc, 4605 struct rbd_spec *spec, 4606 struct rbd_options *opts) 4607 { 4608 struct rbd_device *rbd_dev; 4609 4610 rbd_dev = __rbd_dev_create(rbdc, spec); 4611 if (!rbd_dev) 4612 return NULL; 4613 4614 rbd_dev->opts = opts; 4615 4616 /* get an id and fill in device name */ 4617 rbd_dev->dev_id = ida_simple_get(&rbd_dev_id_ida, 0, 4618 minor_to_rbd_dev_id(1 << MINORBITS), 4619 GFP_KERNEL); 4620 if (rbd_dev->dev_id < 0) 4621 goto fail_rbd_dev; 4622 4623 sprintf(rbd_dev->name, RBD_DRV_NAME "%d", rbd_dev->dev_id); 4624 rbd_dev->task_wq = alloc_ordered_workqueue("%s-tasks", WQ_MEM_RECLAIM, 4625 rbd_dev->name); 4626 if (!rbd_dev->task_wq) 4627 goto fail_dev_id; 4628 4629 /* we have a ref from do_rbd_add() */ 4630 __module_get(THIS_MODULE); 4631 4632 dout("%s rbd_dev %p dev_id %d\n", __func__, rbd_dev, rbd_dev->dev_id); 4633 return rbd_dev; 4634 4635 fail_dev_id: 4636 ida_simple_remove(&rbd_dev_id_ida, rbd_dev->dev_id); 4637 fail_rbd_dev: 4638 rbd_dev_free(rbd_dev); 4639 return NULL; 4640 } 4641 4642 static void rbd_dev_destroy(struct rbd_device *rbd_dev) 4643 { 4644 if (rbd_dev) 4645 put_device(&rbd_dev->dev); 4646 } 4647 4648 /* 4649 * Get the size and object order for an image snapshot, or if 4650 * snap_id is CEPH_NOSNAP, gets this information for the base 4651 * image. 4652 */ 4653 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id, 4654 u8 *order, u64 *snap_size) 4655 { 4656 __le64 snapid = cpu_to_le64(snap_id); 4657 int ret; 4658 struct { 4659 u8 order; 4660 __le64 size; 4661 } __attribute__ ((packed)) size_buf = { 0 }; 4662 4663 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid, 4664 &rbd_dev->header_oloc, "get_size", 4665 &snapid, sizeof(snapid), 4666 &size_buf, sizeof(size_buf)); 4667 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret); 4668 if (ret < 0) 4669 return ret; 4670 if (ret < sizeof (size_buf)) 4671 return -ERANGE; 4672 4673 if (order) { 4674 *order = size_buf.order; 4675 dout(" order %u", (unsigned int)*order); 4676 } 4677 *snap_size = le64_to_cpu(size_buf.size); 4678 4679 dout(" snap_id 0x%016llx snap_size = %llu\n", 4680 (unsigned long long)snap_id, 4681 (unsigned long long)*snap_size); 4682 4683 return 0; 4684 } 4685 4686 static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev) 4687 { 4688 return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP, 4689 &rbd_dev->header.obj_order, 4690 &rbd_dev->header.image_size); 4691 } 4692 4693 static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev) 4694 { 4695 void *reply_buf; 4696 int ret; 4697 void *p; 4698 4699 reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL); 4700 if (!reply_buf) 4701 return -ENOMEM; 4702 4703 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid, 4704 &rbd_dev->header_oloc, "get_object_prefix", 4705 NULL, 0, reply_buf, RBD_OBJ_PREFIX_LEN_MAX); 4706 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret); 4707 if (ret < 0) 4708 goto out; 4709 4710 p = reply_buf; 4711 rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p, 4712 p + ret, NULL, GFP_NOIO); 4713 ret = 0; 4714 4715 if (IS_ERR(rbd_dev->header.object_prefix)) { 4716 ret = PTR_ERR(rbd_dev->header.object_prefix); 4717 rbd_dev->header.object_prefix = NULL; 4718 } else { 4719 dout(" object_prefix = %s\n", rbd_dev->header.object_prefix); 4720 } 4721 out: 4722 kfree(reply_buf); 4723 4724 return ret; 4725 } 4726 4727 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id, 4728 u64 *snap_features) 4729 { 4730 __le64 snapid = cpu_to_le64(snap_id); 4731 struct { 4732 __le64 features; 4733 __le64 incompat; 4734 } __attribute__ ((packed)) features_buf = { 0 }; 4735 u64 unsup; 4736 int ret; 4737 4738 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid, 4739 &rbd_dev->header_oloc, "get_features", 4740 &snapid, sizeof(snapid), 4741 &features_buf, sizeof(features_buf)); 4742 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret); 4743 if (ret < 0) 4744 return ret; 4745 if (ret < sizeof (features_buf)) 4746 return -ERANGE; 4747 4748 unsup = le64_to_cpu(features_buf.incompat) & ~RBD_FEATURES_SUPPORTED; 4749 if (unsup) { 4750 rbd_warn(rbd_dev, "image uses unsupported features: 0x%llx", 4751 unsup); 4752 return -ENXIO; 4753 } 4754 4755 *snap_features = le64_to_cpu(features_buf.features); 4756 4757 dout(" snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n", 4758 (unsigned long long)snap_id, 4759 (unsigned long long)*snap_features, 4760 (unsigned long long)le64_to_cpu(features_buf.incompat)); 4761 4762 return 0; 4763 } 4764 4765 static int rbd_dev_v2_features(struct rbd_device *rbd_dev) 4766 { 4767 return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP, 4768 &rbd_dev->header.features); 4769 } 4770 4771 struct parent_image_info { 4772 u64 pool_id; 4773 const char *pool_ns; 4774 const char *image_id; 4775 u64 snap_id; 4776 4777 bool has_overlap; 4778 u64 overlap; 4779 }; 4780 4781 /* 4782 * The caller is responsible for @pii. 4783 */ 4784 static int decode_parent_image_spec(void **p, void *end, 4785 struct parent_image_info *pii) 4786 { 4787 u8 struct_v; 4788 u32 struct_len; 4789 int ret; 4790 4791 ret = ceph_start_decoding(p, end, 1, "ParentImageSpec", 4792 &struct_v, &struct_len); 4793 if (ret) 4794 return ret; 4795 4796 ceph_decode_64_safe(p, end, pii->pool_id, e_inval); 4797 pii->pool_ns = ceph_extract_encoded_string(p, end, NULL, GFP_KERNEL); 4798 if (IS_ERR(pii->pool_ns)) { 4799 ret = PTR_ERR(pii->pool_ns); 4800 pii->pool_ns = NULL; 4801 return ret; 4802 } 4803 pii->image_id = ceph_extract_encoded_string(p, end, NULL, GFP_KERNEL); 4804 if (IS_ERR(pii->image_id)) { 4805 ret = PTR_ERR(pii->image_id); 4806 pii->image_id = NULL; 4807 return ret; 4808 } 4809 ceph_decode_64_safe(p, end, pii->snap_id, e_inval); 4810 return 0; 4811 4812 e_inval: 4813 return -EINVAL; 4814 } 4815 4816 static int __get_parent_info(struct rbd_device *rbd_dev, 4817 struct page *req_page, 4818 struct page *reply_page, 4819 struct parent_image_info *pii) 4820 { 4821 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; 4822 size_t reply_len = PAGE_SIZE; 4823 void *p, *end; 4824 int ret; 4825 4826 ret = ceph_osdc_call(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc, 4827 "rbd", "parent_get", CEPH_OSD_FLAG_READ, 4828 req_page, sizeof(u64), reply_page, &reply_len); 4829 if (ret) 4830 return ret == -EOPNOTSUPP ? 1 : ret; 4831 4832 p = page_address(reply_page); 4833 end = p + reply_len; 4834 ret = decode_parent_image_spec(&p, end, pii); 4835 if (ret) 4836 return ret; 4837 4838 ret = ceph_osdc_call(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc, 4839 "rbd", "parent_overlap_get", CEPH_OSD_FLAG_READ, 4840 req_page, sizeof(u64), reply_page, &reply_len); 4841 if (ret) 4842 return ret; 4843 4844 p = page_address(reply_page); 4845 end = p + reply_len; 4846 ceph_decode_8_safe(&p, end, pii->has_overlap, e_inval); 4847 if (pii->has_overlap) 4848 ceph_decode_64_safe(&p, end, pii->overlap, e_inval); 4849 4850 return 0; 4851 4852 e_inval: 4853 return -EINVAL; 4854 } 4855 4856 /* 4857 * The caller is responsible for @pii. 4858 */ 4859 static int __get_parent_info_legacy(struct rbd_device *rbd_dev, 4860 struct page *req_page, 4861 struct page *reply_page, 4862 struct parent_image_info *pii) 4863 { 4864 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; 4865 size_t reply_len = PAGE_SIZE; 4866 void *p, *end; 4867 int ret; 4868 4869 ret = ceph_osdc_call(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc, 4870 "rbd", "get_parent", CEPH_OSD_FLAG_READ, 4871 req_page, sizeof(u64), reply_page, &reply_len); 4872 if (ret) 4873 return ret; 4874 4875 p = page_address(reply_page); 4876 end = p + reply_len; 4877 ceph_decode_64_safe(&p, end, pii->pool_id, e_inval); 4878 pii->image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL); 4879 if (IS_ERR(pii->image_id)) { 4880 ret = PTR_ERR(pii->image_id); 4881 pii->image_id = NULL; 4882 return ret; 4883 } 4884 ceph_decode_64_safe(&p, end, pii->snap_id, e_inval); 4885 pii->has_overlap = true; 4886 ceph_decode_64_safe(&p, end, pii->overlap, e_inval); 4887 4888 return 0; 4889 4890 e_inval: 4891 return -EINVAL; 4892 } 4893 4894 static int get_parent_info(struct rbd_device *rbd_dev, 4895 struct parent_image_info *pii) 4896 { 4897 struct page *req_page, *reply_page; 4898 void *p; 4899 int ret; 4900 4901 req_page = alloc_page(GFP_KERNEL); 4902 if (!req_page) 4903 return -ENOMEM; 4904 4905 reply_page = alloc_page(GFP_KERNEL); 4906 if (!reply_page) { 4907 __free_page(req_page); 4908 return -ENOMEM; 4909 } 4910 4911 p = page_address(req_page); 4912 ceph_encode_64(&p, rbd_dev->spec->snap_id); 4913 ret = __get_parent_info(rbd_dev, req_page, reply_page, pii); 4914 if (ret > 0) 4915 ret = __get_parent_info_legacy(rbd_dev, req_page, reply_page, 4916 pii); 4917 4918 __free_page(req_page); 4919 __free_page(reply_page); 4920 return ret; 4921 } 4922 4923 static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev) 4924 { 4925 struct rbd_spec *parent_spec; 4926 struct parent_image_info pii = { 0 }; 4927 int ret; 4928 4929 parent_spec = rbd_spec_alloc(); 4930 if (!parent_spec) 4931 return -ENOMEM; 4932 4933 ret = get_parent_info(rbd_dev, &pii); 4934 if (ret) 4935 goto out_err; 4936 4937 dout("%s pool_id %llu pool_ns %s image_id %s snap_id %llu has_overlap %d overlap %llu\n", 4938 __func__, pii.pool_id, pii.pool_ns, pii.image_id, pii.snap_id, 4939 pii.has_overlap, pii.overlap); 4940 4941 if (pii.pool_id == CEPH_NOPOOL || !pii.has_overlap) { 4942 /* 4943 * Either the parent never existed, or we have 4944 * record of it but the image got flattened so it no 4945 * longer has a parent. When the parent of a 4946 * layered image disappears we immediately set the 4947 * overlap to 0. The effect of this is that all new 4948 * requests will be treated as if the image had no 4949 * parent. 4950 * 4951 * If !pii.has_overlap, the parent image spec is not 4952 * applicable. It's there to avoid duplication in each 4953 * snapshot record. 4954 */ 4955 if (rbd_dev->parent_overlap) { 4956 rbd_dev->parent_overlap = 0; 4957 rbd_dev_parent_put(rbd_dev); 4958 pr_info("%s: clone image has been flattened\n", 4959 rbd_dev->disk->disk_name); 4960 } 4961 4962 goto out; /* No parent? No problem. */ 4963 } 4964 4965 /* The ceph file layout needs to fit pool id in 32 bits */ 4966 4967 ret = -EIO; 4968 if (pii.pool_id > (u64)U32_MAX) { 4969 rbd_warn(NULL, "parent pool id too large (%llu > %u)", 4970 (unsigned long long)pii.pool_id, U32_MAX); 4971 goto out_err; 4972 } 4973 4974 /* 4975 * The parent won't change (except when the clone is 4976 * flattened, already handled that). So we only need to 4977 * record the parent spec we have not already done so. 4978 */ 4979 if (!rbd_dev->parent_spec) { 4980 parent_spec->pool_id = pii.pool_id; 4981 if (pii.pool_ns && *pii.pool_ns) { 4982 parent_spec->pool_ns = pii.pool_ns; 4983 pii.pool_ns = NULL; 4984 } 4985 parent_spec->image_id = pii.image_id; 4986 pii.image_id = NULL; 4987 parent_spec->snap_id = pii.snap_id; 4988 4989 rbd_dev->parent_spec = parent_spec; 4990 parent_spec = NULL; /* rbd_dev now owns this */ 4991 } 4992 4993 /* 4994 * We always update the parent overlap. If it's zero we issue 4995 * a warning, as we will proceed as if there was no parent. 4996 */ 4997 if (!pii.overlap) { 4998 if (parent_spec) { 4999 /* refresh, careful to warn just once */ 5000 if (rbd_dev->parent_overlap) 5001 rbd_warn(rbd_dev, 5002 "clone now standalone (overlap became 0)"); 5003 } else { 5004 /* initial probe */ 5005 rbd_warn(rbd_dev, "clone is standalone (overlap 0)"); 5006 } 5007 } 5008 rbd_dev->parent_overlap = pii.overlap; 5009 5010 out: 5011 ret = 0; 5012 out_err: 5013 kfree(pii.pool_ns); 5014 kfree(pii.image_id); 5015 rbd_spec_put(parent_spec); 5016 return ret; 5017 } 5018 5019 static int rbd_dev_v2_striping_info(struct rbd_device *rbd_dev) 5020 { 5021 struct { 5022 __le64 stripe_unit; 5023 __le64 stripe_count; 5024 } __attribute__ ((packed)) striping_info_buf = { 0 }; 5025 size_t size = sizeof (striping_info_buf); 5026 void *p; 5027 int ret; 5028 5029 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid, 5030 &rbd_dev->header_oloc, "get_stripe_unit_count", 5031 NULL, 0, &striping_info_buf, size); 5032 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret); 5033 if (ret < 0) 5034 return ret; 5035 if (ret < size) 5036 return -ERANGE; 5037 5038 p = &striping_info_buf; 5039 rbd_dev->header.stripe_unit = ceph_decode_64(&p); 5040 rbd_dev->header.stripe_count = ceph_decode_64(&p); 5041 return 0; 5042 } 5043 5044 static int rbd_dev_v2_data_pool(struct rbd_device *rbd_dev) 5045 { 5046 __le64 data_pool_id; 5047 int ret; 5048 5049 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid, 5050 &rbd_dev->header_oloc, "get_data_pool", 5051 NULL, 0, &data_pool_id, sizeof(data_pool_id)); 5052 if (ret < 0) 5053 return ret; 5054 if (ret < sizeof(data_pool_id)) 5055 return -EBADMSG; 5056 5057 rbd_dev->header.data_pool_id = le64_to_cpu(data_pool_id); 5058 WARN_ON(rbd_dev->header.data_pool_id == CEPH_NOPOOL); 5059 return 0; 5060 } 5061 5062 static char *rbd_dev_image_name(struct rbd_device *rbd_dev) 5063 { 5064 CEPH_DEFINE_OID_ONSTACK(oid); 5065 size_t image_id_size; 5066 char *image_id; 5067 void *p; 5068 void *end; 5069 size_t size; 5070 void *reply_buf = NULL; 5071 size_t len = 0; 5072 char *image_name = NULL; 5073 int ret; 5074 5075 rbd_assert(!rbd_dev->spec->image_name); 5076 5077 len = strlen(rbd_dev->spec->image_id); 5078 image_id_size = sizeof (__le32) + len; 5079 image_id = kmalloc(image_id_size, GFP_KERNEL); 5080 if (!image_id) 5081 return NULL; 5082 5083 p = image_id; 5084 end = image_id + image_id_size; 5085 ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32)len); 5086 5087 size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX; 5088 reply_buf = kmalloc(size, GFP_KERNEL); 5089 if (!reply_buf) 5090 goto out; 5091 5092 ceph_oid_printf(&oid, "%s", RBD_DIRECTORY); 5093 ret = rbd_obj_method_sync(rbd_dev, &oid, &rbd_dev->header_oloc, 5094 "dir_get_name", image_id, image_id_size, 5095 reply_buf, size); 5096 if (ret < 0) 5097 goto out; 5098 p = reply_buf; 5099 end = reply_buf + ret; 5100 5101 image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL); 5102 if (IS_ERR(image_name)) 5103 image_name = NULL; 5104 else 5105 dout("%s: name is %s len is %zd\n", __func__, image_name, len); 5106 out: 5107 kfree(reply_buf); 5108 kfree(image_id); 5109 5110 return image_name; 5111 } 5112 5113 static u64 rbd_v1_snap_id_by_name(struct rbd_device *rbd_dev, const char *name) 5114 { 5115 struct ceph_snap_context *snapc = rbd_dev->header.snapc; 5116 const char *snap_name; 5117 u32 which = 0; 5118 5119 /* Skip over names until we find the one we are looking for */ 5120 5121 snap_name = rbd_dev->header.snap_names; 5122 while (which < snapc->num_snaps) { 5123 if (!strcmp(name, snap_name)) 5124 return snapc->snaps[which]; 5125 snap_name += strlen(snap_name) + 1; 5126 which++; 5127 } 5128 return CEPH_NOSNAP; 5129 } 5130 5131 static u64 rbd_v2_snap_id_by_name(struct rbd_device *rbd_dev, const char *name) 5132 { 5133 struct ceph_snap_context *snapc = rbd_dev->header.snapc; 5134 u32 which; 5135 bool found = false; 5136 u64 snap_id; 5137 5138 for (which = 0; !found && which < snapc->num_snaps; which++) { 5139 const char *snap_name; 5140 5141 snap_id = snapc->snaps[which]; 5142 snap_name = rbd_dev_v2_snap_name(rbd_dev, snap_id); 5143 if (IS_ERR(snap_name)) { 5144 /* ignore no-longer existing snapshots */ 5145 if (PTR_ERR(snap_name) == -ENOENT) 5146 continue; 5147 else 5148 break; 5149 } 5150 found = !strcmp(name, snap_name); 5151 kfree(snap_name); 5152 } 5153 return found ? snap_id : CEPH_NOSNAP; 5154 } 5155 5156 /* 5157 * Assumes name is never RBD_SNAP_HEAD_NAME; returns CEPH_NOSNAP if 5158 * no snapshot by that name is found, or if an error occurs. 5159 */ 5160 static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name) 5161 { 5162 if (rbd_dev->image_format == 1) 5163 return rbd_v1_snap_id_by_name(rbd_dev, name); 5164 5165 return rbd_v2_snap_id_by_name(rbd_dev, name); 5166 } 5167 5168 /* 5169 * An image being mapped will have everything but the snap id. 5170 */ 5171 static int rbd_spec_fill_snap_id(struct rbd_device *rbd_dev) 5172 { 5173 struct rbd_spec *spec = rbd_dev->spec; 5174 5175 rbd_assert(spec->pool_id != CEPH_NOPOOL && spec->pool_name); 5176 rbd_assert(spec->image_id && spec->image_name); 5177 rbd_assert(spec->snap_name); 5178 5179 if (strcmp(spec->snap_name, RBD_SNAP_HEAD_NAME)) { 5180 u64 snap_id; 5181 5182 snap_id = rbd_snap_id_by_name(rbd_dev, spec->snap_name); 5183 if (snap_id == CEPH_NOSNAP) 5184 return -ENOENT; 5185 5186 spec->snap_id = snap_id; 5187 } else { 5188 spec->snap_id = CEPH_NOSNAP; 5189 } 5190 5191 return 0; 5192 } 5193 5194 /* 5195 * A parent image will have all ids but none of the names. 5196 * 5197 * All names in an rbd spec are dynamically allocated. It's OK if we 5198 * can't figure out the name for an image id. 5199 */ 5200 static int rbd_spec_fill_names(struct rbd_device *rbd_dev) 5201 { 5202 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; 5203 struct rbd_spec *spec = rbd_dev->spec; 5204 const char *pool_name; 5205 const char *image_name; 5206 const char *snap_name; 5207 int ret; 5208 5209 rbd_assert(spec->pool_id != CEPH_NOPOOL); 5210 rbd_assert(spec->image_id); 5211 rbd_assert(spec->snap_id != CEPH_NOSNAP); 5212 5213 /* Get the pool name; we have to make our own copy of this */ 5214 5215 pool_name = ceph_pg_pool_name_by_id(osdc->osdmap, spec->pool_id); 5216 if (!pool_name) { 5217 rbd_warn(rbd_dev, "no pool with id %llu", spec->pool_id); 5218 return -EIO; 5219 } 5220 pool_name = kstrdup(pool_name, GFP_KERNEL); 5221 if (!pool_name) 5222 return -ENOMEM; 5223 5224 /* Fetch the image name; tolerate failure here */ 5225 5226 image_name = rbd_dev_image_name(rbd_dev); 5227 if (!image_name) 5228 rbd_warn(rbd_dev, "unable to get image name"); 5229 5230 /* Fetch the snapshot name */ 5231 5232 snap_name = rbd_snap_name(rbd_dev, spec->snap_id); 5233 if (IS_ERR(snap_name)) { 5234 ret = PTR_ERR(snap_name); 5235 goto out_err; 5236 } 5237 5238 spec->pool_name = pool_name; 5239 spec->image_name = image_name; 5240 spec->snap_name = snap_name; 5241 5242 return 0; 5243 5244 out_err: 5245 kfree(image_name); 5246 kfree(pool_name); 5247 return ret; 5248 } 5249 5250 static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev) 5251 { 5252 size_t size; 5253 int ret; 5254 void *reply_buf; 5255 void *p; 5256 void *end; 5257 u64 seq; 5258 u32 snap_count; 5259 struct ceph_snap_context *snapc; 5260 u32 i; 5261 5262 /* 5263 * We'll need room for the seq value (maximum snapshot id), 5264 * snapshot count, and array of that many snapshot ids. 5265 * For now we have a fixed upper limit on the number we're 5266 * prepared to receive. 5267 */ 5268 size = sizeof (__le64) + sizeof (__le32) + 5269 RBD_MAX_SNAP_COUNT * sizeof (__le64); 5270 reply_buf = kzalloc(size, GFP_KERNEL); 5271 if (!reply_buf) 5272 return -ENOMEM; 5273 5274 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid, 5275 &rbd_dev->header_oloc, "get_snapcontext", 5276 NULL, 0, reply_buf, size); 5277 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret); 5278 if (ret < 0) 5279 goto out; 5280 5281 p = reply_buf; 5282 end = reply_buf + ret; 5283 ret = -ERANGE; 5284 ceph_decode_64_safe(&p, end, seq, out); 5285 ceph_decode_32_safe(&p, end, snap_count, out); 5286 5287 /* 5288 * Make sure the reported number of snapshot ids wouldn't go 5289 * beyond the end of our buffer. But before checking that, 5290 * make sure the computed size of the snapshot context we 5291 * allocate is representable in a size_t. 5292 */ 5293 if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context)) 5294 / sizeof (u64)) { 5295 ret = -EINVAL; 5296 goto out; 5297 } 5298 if (!ceph_has_room(&p, end, snap_count * sizeof (__le64))) 5299 goto out; 5300 ret = 0; 5301 5302 snapc = ceph_create_snap_context(snap_count, GFP_KERNEL); 5303 if (!snapc) { 5304 ret = -ENOMEM; 5305 goto out; 5306 } 5307 snapc->seq = seq; 5308 for (i = 0; i < snap_count; i++) 5309 snapc->snaps[i] = ceph_decode_64(&p); 5310 5311 ceph_put_snap_context(rbd_dev->header.snapc); 5312 rbd_dev->header.snapc = snapc; 5313 5314 dout(" snap context seq = %llu, snap_count = %u\n", 5315 (unsigned long long)seq, (unsigned int)snap_count); 5316 out: 5317 kfree(reply_buf); 5318 5319 return ret; 5320 } 5321 5322 static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev, 5323 u64 snap_id) 5324 { 5325 size_t size; 5326 void *reply_buf; 5327 __le64 snapid; 5328 int ret; 5329 void *p; 5330 void *end; 5331 char *snap_name; 5332 5333 size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN; 5334 reply_buf = kmalloc(size, GFP_KERNEL); 5335 if (!reply_buf) 5336 return ERR_PTR(-ENOMEM); 5337 5338 snapid = cpu_to_le64(snap_id); 5339 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid, 5340 &rbd_dev->header_oloc, "get_snapshot_name", 5341 &snapid, sizeof(snapid), reply_buf, size); 5342 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret); 5343 if (ret < 0) { 5344 snap_name = ERR_PTR(ret); 5345 goto out; 5346 } 5347 5348 p = reply_buf; 5349 end = reply_buf + ret; 5350 snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL); 5351 if (IS_ERR(snap_name)) 5352 goto out; 5353 5354 dout(" snap_id 0x%016llx snap_name = %s\n", 5355 (unsigned long long)snap_id, snap_name); 5356 out: 5357 kfree(reply_buf); 5358 5359 return snap_name; 5360 } 5361 5362 static int rbd_dev_v2_header_info(struct rbd_device *rbd_dev) 5363 { 5364 bool first_time = rbd_dev->header.object_prefix == NULL; 5365 int ret; 5366 5367 ret = rbd_dev_v2_image_size(rbd_dev); 5368 if (ret) 5369 return ret; 5370 5371 if (first_time) { 5372 ret = rbd_dev_v2_header_onetime(rbd_dev); 5373 if (ret) 5374 return ret; 5375 } 5376 5377 ret = rbd_dev_v2_snap_context(rbd_dev); 5378 if (ret && first_time) { 5379 kfree(rbd_dev->header.object_prefix); 5380 rbd_dev->header.object_prefix = NULL; 5381 } 5382 5383 return ret; 5384 } 5385 5386 static int rbd_dev_header_info(struct rbd_device *rbd_dev) 5387 { 5388 rbd_assert(rbd_image_format_valid(rbd_dev->image_format)); 5389 5390 if (rbd_dev->image_format == 1) 5391 return rbd_dev_v1_header_info(rbd_dev); 5392 5393 return rbd_dev_v2_header_info(rbd_dev); 5394 } 5395 5396 /* 5397 * Skips over white space at *buf, and updates *buf to point to the 5398 * first found non-space character (if any). Returns the length of 5399 * the token (string of non-white space characters) found. Note 5400 * that *buf must be terminated with '\0'. 5401 */ 5402 static inline size_t next_token(const char **buf) 5403 { 5404 /* 5405 * These are the characters that produce nonzero for 5406 * isspace() in the "C" and "POSIX" locales. 5407 */ 5408 const char *spaces = " \f\n\r\t\v"; 5409 5410 *buf += strspn(*buf, spaces); /* Find start of token */ 5411 5412 return strcspn(*buf, spaces); /* Return token length */ 5413 } 5414 5415 /* 5416 * Finds the next token in *buf, dynamically allocates a buffer big 5417 * enough to hold a copy of it, and copies the token into the new 5418 * buffer. The copy is guaranteed to be terminated with '\0'. Note 5419 * that a duplicate buffer is created even for a zero-length token. 5420 * 5421 * Returns a pointer to the newly-allocated duplicate, or a null 5422 * pointer if memory for the duplicate was not available. If 5423 * the lenp argument is a non-null pointer, the length of the token 5424 * (not including the '\0') is returned in *lenp. 5425 * 5426 * If successful, the *buf pointer will be updated to point beyond 5427 * the end of the found token. 5428 * 5429 * Note: uses GFP_KERNEL for allocation. 5430 */ 5431 static inline char *dup_token(const char **buf, size_t *lenp) 5432 { 5433 char *dup; 5434 size_t len; 5435 5436 len = next_token(buf); 5437 dup = kmemdup(*buf, len + 1, GFP_KERNEL); 5438 if (!dup) 5439 return NULL; 5440 *(dup + len) = '\0'; 5441 *buf += len; 5442 5443 if (lenp) 5444 *lenp = len; 5445 5446 return dup; 5447 } 5448 5449 /* 5450 * Parse the options provided for an "rbd add" (i.e., rbd image 5451 * mapping) request. These arrive via a write to /sys/bus/rbd/add, 5452 * and the data written is passed here via a NUL-terminated buffer. 5453 * Returns 0 if successful or an error code otherwise. 5454 * 5455 * The information extracted from these options is recorded in 5456 * the other parameters which return dynamically-allocated 5457 * structures: 5458 * ceph_opts 5459 * The address of a pointer that will refer to a ceph options 5460 * structure. Caller must release the returned pointer using 5461 * ceph_destroy_options() when it is no longer needed. 5462 * rbd_opts 5463 * Address of an rbd options pointer. Fully initialized by 5464 * this function; caller must release with kfree(). 5465 * spec 5466 * Address of an rbd image specification pointer. Fully 5467 * initialized by this function based on parsed options. 5468 * Caller must release with rbd_spec_put(). 5469 * 5470 * The options passed take this form: 5471 * <mon_addrs> <options> <pool_name> <image_name> [<snap_id>] 5472 * where: 5473 * <mon_addrs> 5474 * A comma-separated list of one or more monitor addresses. 5475 * A monitor address is an ip address, optionally followed 5476 * by a port number (separated by a colon). 5477 * I.e.: ip1[:port1][,ip2[:port2]...] 5478 * <options> 5479 * A comma-separated list of ceph and/or rbd options. 5480 * <pool_name> 5481 * The name of the rados pool containing the rbd image. 5482 * <image_name> 5483 * The name of the image in that pool to map. 5484 * <snap_id> 5485 * An optional snapshot id. If provided, the mapping will 5486 * present data from the image at the time that snapshot was 5487 * created. The image head is used if no snapshot id is 5488 * provided. Snapshot mappings are always read-only. 5489 */ 5490 static int rbd_add_parse_args(const char *buf, 5491 struct ceph_options **ceph_opts, 5492 struct rbd_options **opts, 5493 struct rbd_spec **rbd_spec) 5494 { 5495 size_t len; 5496 char *options; 5497 const char *mon_addrs; 5498 char *snap_name; 5499 size_t mon_addrs_size; 5500 struct parse_rbd_opts_ctx pctx = { 0 }; 5501 struct ceph_options *copts; 5502 int ret; 5503 5504 /* The first four tokens are required */ 5505 5506 len = next_token(&buf); 5507 if (!len) { 5508 rbd_warn(NULL, "no monitor address(es) provided"); 5509 return -EINVAL; 5510 } 5511 mon_addrs = buf; 5512 mon_addrs_size = len + 1; 5513 buf += len; 5514 5515 ret = -EINVAL; 5516 options = dup_token(&buf, NULL); 5517 if (!options) 5518 return -ENOMEM; 5519 if (!*options) { 5520 rbd_warn(NULL, "no options provided"); 5521 goto out_err; 5522 } 5523 5524 pctx.spec = rbd_spec_alloc(); 5525 if (!pctx.spec) 5526 goto out_mem; 5527 5528 pctx.spec->pool_name = dup_token(&buf, NULL); 5529 if (!pctx.spec->pool_name) 5530 goto out_mem; 5531 if (!*pctx.spec->pool_name) { 5532 rbd_warn(NULL, "no pool name provided"); 5533 goto out_err; 5534 } 5535 5536 pctx.spec->image_name = dup_token(&buf, NULL); 5537 if (!pctx.spec->image_name) 5538 goto out_mem; 5539 if (!*pctx.spec->image_name) { 5540 rbd_warn(NULL, "no image name provided"); 5541 goto out_err; 5542 } 5543 5544 /* 5545 * Snapshot name is optional; default is to use "-" 5546 * (indicating the head/no snapshot). 5547 */ 5548 len = next_token(&buf); 5549 if (!len) { 5550 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */ 5551 len = sizeof (RBD_SNAP_HEAD_NAME) - 1; 5552 } else if (len > RBD_MAX_SNAP_NAME_LEN) { 5553 ret = -ENAMETOOLONG; 5554 goto out_err; 5555 } 5556 snap_name = kmemdup(buf, len + 1, GFP_KERNEL); 5557 if (!snap_name) 5558 goto out_mem; 5559 *(snap_name + len) = '\0'; 5560 pctx.spec->snap_name = snap_name; 5561 5562 /* Initialize all rbd options to the defaults */ 5563 5564 pctx.opts = kzalloc(sizeof(*pctx.opts), GFP_KERNEL); 5565 if (!pctx.opts) 5566 goto out_mem; 5567 5568 pctx.opts->read_only = RBD_READ_ONLY_DEFAULT; 5569 pctx.opts->queue_depth = RBD_QUEUE_DEPTH_DEFAULT; 5570 pctx.opts->alloc_size = RBD_ALLOC_SIZE_DEFAULT; 5571 pctx.opts->lock_timeout = RBD_LOCK_TIMEOUT_DEFAULT; 5572 pctx.opts->lock_on_read = RBD_LOCK_ON_READ_DEFAULT; 5573 pctx.opts->exclusive = RBD_EXCLUSIVE_DEFAULT; 5574 pctx.opts->trim = RBD_TRIM_DEFAULT; 5575 5576 copts = ceph_parse_options(options, mon_addrs, 5577 mon_addrs + mon_addrs_size - 1, 5578 parse_rbd_opts_token, &pctx); 5579 if (IS_ERR(copts)) { 5580 ret = PTR_ERR(copts); 5581 goto out_err; 5582 } 5583 kfree(options); 5584 5585 *ceph_opts = copts; 5586 *opts = pctx.opts; 5587 *rbd_spec = pctx.spec; 5588 5589 return 0; 5590 out_mem: 5591 ret = -ENOMEM; 5592 out_err: 5593 kfree(pctx.opts); 5594 rbd_spec_put(pctx.spec); 5595 kfree(options); 5596 5597 return ret; 5598 } 5599 5600 static void rbd_dev_image_unlock(struct rbd_device *rbd_dev) 5601 { 5602 down_write(&rbd_dev->lock_rwsem); 5603 if (__rbd_is_lock_owner(rbd_dev)) 5604 rbd_unlock(rbd_dev); 5605 up_write(&rbd_dev->lock_rwsem); 5606 } 5607 5608 static int rbd_add_acquire_lock(struct rbd_device *rbd_dev) 5609 { 5610 int ret; 5611 5612 if (!(rbd_dev->header.features & RBD_FEATURE_EXCLUSIVE_LOCK)) { 5613 rbd_warn(rbd_dev, "exclusive-lock feature is not enabled"); 5614 return -EINVAL; 5615 } 5616 5617 /* FIXME: "rbd map --exclusive" should be in interruptible */ 5618 down_read(&rbd_dev->lock_rwsem); 5619 ret = rbd_wait_state_locked(rbd_dev, true); 5620 up_read(&rbd_dev->lock_rwsem); 5621 if (ret) { 5622 rbd_warn(rbd_dev, "failed to acquire exclusive lock"); 5623 return -EROFS; 5624 } 5625 5626 return 0; 5627 } 5628 5629 /* 5630 * An rbd format 2 image has a unique identifier, distinct from the 5631 * name given to it by the user. Internally, that identifier is 5632 * what's used to specify the names of objects related to the image. 5633 * 5634 * A special "rbd id" object is used to map an rbd image name to its 5635 * id. If that object doesn't exist, then there is no v2 rbd image 5636 * with the supplied name. 5637 * 5638 * This function will record the given rbd_dev's image_id field if 5639 * it can be determined, and in that case will return 0. If any 5640 * errors occur a negative errno will be returned and the rbd_dev's 5641 * image_id field will be unchanged (and should be NULL). 5642 */ 5643 static int rbd_dev_image_id(struct rbd_device *rbd_dev) 5644 { 5645 int ret; 5646 size_t size; 5647 CEPH_DEFINE_OID_ONSTACK(oid); 5648 void *response; 5649 char *image_id; 5650 5651 /* 5652 * When probing a parent image, the image id is already 5653 * known (and the image name likely is not). There's no 5654 * need to fetch the image id again in this case. We 5655 * do still need to set the image format though. 5656 */ 5657 if (rbd_dev->spec->image_id) { 5658 rbd_dev->image_format = *rbd_dev->spec->image_id ? 2 : 1; 5659 5660 return 0; 5661 } 5662 5663 /* 5664 * First, see if the format 2 image id file exists, and if 5665 * so, get the image's persistent id from it. 5666 */ 5667 ret = ceph_oid_aprintf(&oid, GFP_KERNEL, "%s%s", RBD_ID_PREFIX, 5668 rbd_dev->spec->image_name); 5669 if (ret) 5670 return ret; 5671 5672 dout("rbd id object name is %s\n", oid.name); 5673 5674 /* Response will be an encoded string, which includes a length */ 5675 5676 size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX; 5677 response = kzalloc(size, GFP_NOIO); 5678 if (!response) { 5679 ret = -ENOMEM; 5680 goto out; 5681 } 5682 5683 /* If it doesn't exist we'll assume it's a format 1 image */ 5684 5685 ret = rbd_obj_method_sync(rbd_dev, &oid, &rbd_dev->header_oloc, 5686 "get_id", NULL, 0, 5687 response, RBD_IMAGE_ID_LEN_MAX); 5688 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret); 5689 if (ret == -ENOENT) { 5690 image_id = kstrdup("", GFP_KERNEL); 5691 ret = image_id ? 0 : -ENOMEM; 5692 if (!ret) 5693 rbd_dev->image_format = 1; 5694 } else if (ret >= 0) { 5695 void *p = response; 5696 5697 image_id = ceph_extract_encoded_string(&p, p + ret, 5698 NULL, GFP_NOIO); 5699 ret = PTR_ERR_OR_ZERO(image_id); 5700 if (!ret) 5701 rbd_dev->image_format = 2; 5702 } 5703 5704 if (!ret) { 5705 rbd_dev->spec->image_id = image_id; 5706 dout("image_id is %s\n", image_id); 5707 } 5708 out: 5709 kfree(response); 5710 ceph_oid_destroy(&oid); 5711 return ret; 5712 } 5713 5714 /* 5715 * Undo whatever state changes are made by v1 or v2 header info 5716 * call. 5717 */ 5718 static void rbd_dev_unprobe(struct rbd_device *rbd_dev) 5719 { 5720 struct rbd_image_header *header; 5721 5722 rbd_dev_parent_put(rbd_dev); 5723 5724 /* Free dynamic fields from the header, then zero it out */ 5725 5726 header = &rbd_dev->header; 5727 ceph_put_snap_context(header->snapc); 5728 kfree(header->snap_sizes); 5729 kfree(header->snap_names); 5730 kfree(header->object_prefix); 5731 memset(header, 0, sizeof (*header)); 5732 } 5733 5734 static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev) 5735 { 5736 int ret; 5737 5738 ret = rbd_dev_v2_object_prefix(rbd_dev); 5739 if (ret) 5740 goto out_err; 5741 5742 /* 5743 * Get the and check features for the image. Currently the 5744 * features are assumed to never change. 5745 */ 5746 ret = rbd_dev_v2_features(rbd_dev); 5747 if (ret) 5748 goto out_err; 5749 5750 /* If the image supports fancy striping, get its parameters */ 5751 5752 if (rbd_dev->header.features & RBD_FEATURE_STRIPINGV2) { 5753 ret = rbd_dev_v2_striping_info(rbd_dev); 5754 if (ret < 0) 5755 goto out_err; 5756 } 5757 5758 if (rbd_dev->header.features & RBD_FEATURE_DATA_POOL) { 5759 ret = rbd_dev_v2_data_pool(rbd_dev); 5760 if (ret) 5761 goto out_err; 5762 } 5763 5764 rbd_init_layout(rbd_dev); 5765 return 0; 5766 5767 out_err: 5768 rbd_dev->header.features = 0; 5769 kfree(rbd_dev->header.object_prefix); 5770 rbd_dev->header.object_prefix = NULL; 5771 return ret; 5772 } 5773 5774 /* 5775 * @depth is rbd_dev_image_probe() -> rbd_dev_probe_parent() -> 5776 * rbd_dev_image_probe() recursion depth, which means it's also the 5777 * length of the already discovered part of the parent chain. 5778 */ 5779 static int rbd_dev_probe_parent(struct rbd_device *rbd_dev, int depth) 5780 { 5781 struct rbd_device *parent = NULL; 5782 int ret; 5783 5784 if (!rbd_dev->parent_spec) 5785 return 0; 5786 5787 if (++depth > RBD_MAX_PARENT_CHAIN_LEN) { 5788 pr_info("parent chain is too long (%d)\n", depth); 5789 ret = -EINVAL; 5790 goto out_err; 5791 } 5792 5793 parent = __rbd_dev_create(rbd_dev->rbd_client, rbd_dev->parent_spec); 5794 if (!parent) { 5795 ret = -ENOMEM; 5796 goto out_err; 5797 } 5798 5799 /* 5800 * Images related by parent/child relationships always share 5801 * rbd_client and spec/parent_spec, so bump their refcounts. 5802 */ 5803 __rbd_get_client(rbd_dev->rbd_client); 5804 rbd_spec_get(rbd_dev->parent_spec); 5805 5806 ret = rbd_dev_image_probe(parent, depth); 5807 if (ret < 0) 5808 goto out_err; 5809 5810 rbd_dev->parent = parent; 5811 atomic_set(&rbd_dev->parent_ref, 1); 5812 return 0; 5813 5814 out_err: 5815 rbd_dev_unparent(rbd_dev); 5816 rbd_dev_destroy(parent); 5817 return ret; 5818 } 5819 5820 static void rbd_dev_device_release(struct rbd_device *rbd_dev) 5821 { 5822 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags); 5823 rbd_dev_mapping_clear(rbd_dev); 5824 rbd_free_disk(rbd_dev); 5825 if (!single_major) 5826 unregister_blkdev(rbd_dev->major, rbd_dev->name); 5827 } 5828 5829 /* 5830 * rbd_dev->header_rwsem must be locked for write and will be unlocked 5831 * upon return. 5832 */ 5833 static int rbd_dev_device_setup(struct rbd_device *rbd_dev) 5834 { 5835 int ret; 5836 5837 /* Record our major and minor device numbers. */ 5838 5839 if (!single_major) { 5840 ret = register_blkdev(0, rbd_dev->name); 5841 if (ret < 0) 5842 goto err_out_unlock; 5843 5844 rbd_dev->major = ret; 5845 rbd_dev->minor = 0; 5846 } else { 5847 rbd_dev->major = rbd_major; 5848 rbd_dev->minor = rbd_dev_id_to_minor(rbd_dev->dev_id); 5849 } 5850 5851 /* Set up the blkdev mapping. */ 5852 5853 ret = rbd_init_disk(rbd_dev); 5854 if (ret) 5855 goto err_out_blkdev; 5856 5857 ret = rbd_dev_mapping_set(rbd_dev); 5858 if (ret) 5859 goto err_out_disk; 5860 5861 set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE); 5862 set_disk_ro(rbd_dev->disk, rbd_dev->opts->read_only); 5863 5864 ret = dev_set_name(&rbd_dev->dev, "%d", rbd_dev->dev_id); 5865 if (ret) 5866 goto err_out_mapping; 5867 5868 set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags); 5869 up_write(&rbd_dev->header_rwsem); 5870 return 0; 5871 5872 err_out_mapping: 5873 rbd_dev_mapping_clear(rbd_dev); 5874 err_out_disk: 5875 rbd_free_disk(rbd_dev); 5876 err_out_blkdev: 5877 if (!single_major) 5878 unregister_blkdev(rbd_dev->major, rbd_dev->name); 5879 err_out_unlock: 5880 up_write(&rbd_dev->header_rwsem); 5881 return ret; 5882 } 5883 5884 static int rbd_dev_header_name(struct rbd_device *rbd_dev) 5885 { 5886 struct rbd_spec *spec = rbd_dev->spec; 5887 int ret; 5888 5889 /* Record the header object name for this rbd image. */ 5890 5891 rbd_assert(rbd_image_format_valid(rbd_dev->image_format)); 5892 if (rbd_dev->image_format == 1) 5893 ret = ceph_oid_aprintf(&rbd_dev->header_oid, GFP_KERNEL, "%s%s", 5894 spec->image_name, RBD_SUFFIX); 5895 else 5896 ret = ceph_oid_aprintf(&rbd_dev->header_oid, GFP_KERNEL, "%s%s", 5897 RBD_HEADER_PREFIX, spec->image_id); 5898 5899 return ret; 5900 } 5901 5902 static void rbd_dev_image_release(struct rbd_device *rbd_dev) 5903 { 5904 rbd_dev_unprobe(rbd_dev); 5905 if (rbd_dev->opts) 5906 rbd_unregister_watch(rbd_dev); 5907 rbd_dev->image_format = 0; 5908 kfree(rbd_dev->spec->image_id); 5909 rbd_dev->spec->image_id = NULL; 5910 } 5911 5912 /* 5913 * Probe for the existence of the header object for the given rbd 5914 * device. If this image is the one being mapped (i.e., not a 5915 * parent), initiate a watch on its header object before using that 5916 * object to get detailed information about the rbd image. 5917 */ 5918 static int rbd_dev_image_probe(struct rbd_device *rbd_dev, int depth) 5919 { 5920 int ret; 5921 5922 /* 5923 * Get the id from the image id object. Unless there's an 5924 * error, rbd_dev->spec->image_id will be filled in with 5925 * a dynamically-allocated string, and rbd_dev->image_format 5926 * will be set to either 1 or 2. 5927 */ 5928 ret = rbd_dev_image_id(rbd_dev); 5929 if (ret) 5930 return ret; 5931 5932 ret = rbd_dev_header_name(rbd_dev); 5933 if (ret) 5934 goto err_out_format; 5935 5936 if (!depth) { 5937 ret = rbd_register_watch(rbd_dev); 5938 if (ret) { 5939 if (ret == -ENOENT) 5940 pr_info("image %s/%s%s%s does not exist\n", 5941 rbd_dev->spec->pool_name, 5942 rbd_dev->spec->pool_ns ?: "", 5943 rbd_dev->spec->pool_ns ? "/" : "", 5944 rbd_dev->spec->image_name); 5945 goto err_out_format; 5946 } 5947 } 5948 5949 ret = rbd_dev_header_info(rbd_dev); 5950 if (ret) 5951 goto err_out_watch; 5952 5953 /* 5954 * If this image is the one being mapped, we have pool name and 5955 * id, image name and id, and snap name - need to fill snap id. 5956 * Otherwise this is a parent image, identified by pool, image 5957 * and snap ids - need to fill in names for those ids. 5958 */ 5959 if (!depth) 5960 ret = rbd_spec_fill_snap_id(rbd_dev); 5961 else 5962 ret = rbd_spec_fill_names(rbd_dev); 5963 if (ret) { 5964 if (ret == -ENOENT) 5965 pr_info("snap %s/%s%s%s@%s does not exist\n", 5966 rbd_dev->spec->pool_name, 5967 rbd_dev->spec->pool_ns ?: "", 5968 rbd_dev->spec->pool_ns ? "/" : "", 5969 rbd_dev->spec->image_name, 5970 rbd_dev->spec->snap_name); 5971 goto err_out_probe; 5972 } 5973 5974 if (rbd_dev->header.features & RBD_FEATURE_LAYERING) { 5975 ret = rbd_dev_v2_parent_info(rbd_dev); 5976 if (ret) 5977 goto err_out_probe; 5978 } 5979 5980 ret = rbd_dev_probe_parent(rbd_dev, depth); 5981 if (ret) 5982 goto err_out_probe; 5983 5984 dout("discovered format %u image, header name is %s\n", 5985 rbd_dev->image_format, rbd_dev->header_oid.name); 5986 return 0; 5987 5988 err_out_probe: 5989 rbd_dev_unprobe(rbd_dev); 5990 err_out_watch: 5991 if (!depth) 5992 rbd_unregister_watch(rbd_dev); 5993 err_out_format: 5994 rbd_dev->image_format = 0; 5995 kfree(rbd_dev->spec->image_id); 5996 rbd_dev->spec->image_id = NULL; 5997 return ret; 5998 } 5999 6000 static ssize_t do_rbd_add(struct bus_type *bus, 6001 const char *buf, 6002 size_t count) 6003 { 6004 struct rbd_device *rbd_dev = NULL; 6005 struct ceph_options *ceph_opts = NULL; 6006 struct rbd_options *rbd_opts = NULL; 6007 struct rbd_spec *spec = NULL; 6008 struct rbd_client *rbdc; 6009 int rc; 6010 6011 if (!try_module_get(THIS_MODULE)) 6012 return -ENODEV; 6013 6014 /* parse add command */ 6015 rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec); 6016 if (rc < 0) 6017 goto out; 6018 6019 rbdc = rbd_get_client(ceph_opts); 6020 if (IS_ERR(rbdc)) { 6021 rc = PTR_ERR(rbdc); 6022 goto err_out_args; 6023 } 6024 6025 /* pick the pool */ 6026 rc = ceph_pg_poolid_by_name(rbdc->client->osdc.osdmap, spec->pool_name); 6027 if (rc < 0) { 6028 if (rc == -ENOENT) 6029 pr_info("pool %s does not exist\n", spec->pool_name); 6030 goto err_out_client; 6031 } 6032 spec->pool_id = (u64)rc; 6033 6034 rbd_dev = rbd_dev_create(rbdc, spec, rbd_opts); 6035 if (!rbd_dev) { 6036 rc = -ENOMEM; 6037 goto err_out_client; 6038 } 6039 rbdc = NULL; /* rbd_dev now owns this */ 6040 spec = NULL; /* rbd_dev now owns this */ 6041 rbd_opts = NULL; /* rbd_dev now owns this */ 6042 6043 rbd_dev->config_info = kstrdup(buf, GFP_KERNEL); 6044 if (!rbd_dev->config_info) { 6045 rc = -ENOMEM; 6046 goto err_out_rbd_dev; 6047 } 6048 6049 down_write(&rbd_dev->header_rwsem); 6050 rc = rbd_dev_image_probe(rbd_dev, 0); 6051 if (rc < 0) { 6052 up_write(&rbd_dev->header_rwsem); 6053 goto err_out_rbd_dev; 6054 } 6055 6056 /* If we are mapping a snapshot it must be marked read-only */ 6057 if (rbd_dev->spec->snap_id != CEPH_NOSNAP) 6058 rbd_dev->opts->read_only = true; 6059 6060 if (rbd_dev->opts->alloc_size > rbd_dev->layout.object_size) { 6061 rbd_warn(rbd_dev, "alloc_size adjusted to %u", 6062 rbd_dev->layout.object_size); 6063 rbd_dev->opts->alloc_size = rbd_dev->layout.object_size; 6064 } 6065 6066 rc = rbd_dev_device_setup(rbd_dev); 6067 if (rc) 6068 goto err_out_image_probe; 6069 6070 if (rbd_dev->opts->exclusive) { 6071 rc = rbd_add_acquire_lock(rbd_dev); 6072 if (rc) 6073 goto err_out_device_setup; 6074 } 6075 6076 /* Everything's ready. Announce the disk to the world. */ 6077 6078 rc = device_add(&rbd_dev->dev); 6079 if (rc) 6080 goto err_out_image_lock; 6081 6082 add_disk(rbd_dev->disk); 6083 /* see rbd_init_disk() */ 6084 blk_put_queue(rbd_dev->disk->queue); 6085 6086 spin_lock(&rbd_dev_list_lock); 6087 list_add_tail(&rbd_dev->node, &rbd_dev_list); 6088 spin_unlock(&rbd_dev_list_lock); 6089 6090 pr_info("%s: capacity %llu features 0x%llx\n", rbd_dev->disk->disk_name, 6091 (unsigned long long)get_capacity(rbd_dev->disk) << SECTOR_SHIFT, 6092 rbd_dev->header.features); 6093 rc = count; 6094 out: 6095 module_put(THIS_MODULE); 6096 return rc; 6097 6098 err_out_image_lock: 6099 rbd_dev_image_unlock(rbd_dev); 6100 err_out_device_setup: 6101 rbd_dev_device_release(rbd_dev); 6102 err_out_image_probe: 6103 rbd_dev_image_release(rbd_dev); 6104 err_out_rbd_dev: 6105 rbd_dev_destroy(rbd_dev); 6106 err_out_client: 6107 rbd_put_client(rbdc); 6108 err_out_args: 6109 rbd_spec_put(spec); 6110 kfree(rbd_opts); 6111 goto out; 6112 } 6113 6114 static ssize_t add_store(struct bus_type *bus, const char *buf, size_t count) 6115 { 6116 if (single_major) 6117 return -EINVAL; 6118 6119 return do_rbd_add(bus, buf, count); 6120 } 6121 6122 static ssize_t add_single_major_store(struct bus_type *bus, const char *buf, 6123 size_t count) 6124 { 6125 return do_rbd_add(bus, buf, count); 6126 } 6127 6128 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev) 6129 { 6130 while (rbd_dev->parent) { 6131 struct rbd_device *first = rbd_dev; 6132 struct rbd_device *second = first->parent; 6133 struct rbd_device *third; 6134 6135 /* 6136 * Follow to the parent with no grandparent and 6137 * remove it. 6138 */ 6139 while (second && (third = second->parent)) { 6140 first = second; 6141 second = third; 6142 } 6143 rbd_assert(second); 6144 rbd_dev_image_release(second); 6145 rbd_dev_destroy(second); 6146 first->parent = NULL; 6147 first->parent_overlap = 0; 6148 6149 rbd_assert(first->parent_spec); 6150 rbd_spec_put(first->parent_spec); 6151 first->parent_spec = NULL; 6152 } 6153 } 6154 6155 static ssize_t do_rbd_remove(struct bus_type *bus, 6156 const char *buf, 6157 size_t count) 6158 { 6159 struct rbd_device *rbd_dev = NULL; 6160 struct list_head *tmp; 6161 int dev_id; 6162 char opt_buf[6]; 6163 bool force = false; 6164 int ret; 6165 6166 dev_id = -1; 6167 opt_buf[0] = '\0'; 6168 sscanf(buf, "%d %5s", &dev_id, opt_buf); 6169 if (dev_id < 0) { 6170 pr_err("dev_id out of range\n"); 6171 return -EINVAL; 6172 } 6173 if (opt_buf[0] != '\0') { 6174 if (!strcmp(opt_buf, "force")) { 6175 force = true; 6176 } else { 6177 pr_err("bad remove option at '%s'\n", opt_buf); 6178 return -EINVAL; 6179 } 6180 } 6181 6182 ret = -ENOENT; 6183 spin_lock(&rbd_dev_list_lock); 6184 list_for_each(tmp, &rbd_dev_list) { 6185 rbd_dev = list_entry(tmp, struct rbd_device, node); 6186 if (rbd_dev->dev_id == dev_id) { 6187 ret = 0; 6188 break; 6189 } 6190 } 6191 if (!ret) { 6192 spin_lock_irq(&rbd_dev->lock); 6193 if (rbd_dev->open_count && !force) 6194 ret = -EBUSY; 6195 else if (test_and_set_bit(RBD_DEV_FLAG_REMOVING, 6196 &rbd_dev->flags)) 6197 ret = -EINPROGRESS; 6198 spin_unlock_irq(&rbd_dev->lock); 6199 } 6200 spin_unlock(&rbd_dev_list_lock); 6201 if (ret) 6202 return ret; 6203 6204 if (force) { 6205 /* 6206 * Prevent new IO from being queued and wait for existing 6207 * IO to complete/fail. 6208 */ 6209 blk_mq_freeze_queue(rbd_dev->disk->queue); 6210 blk_set_queue_dying(rbd_dev->disk->queue); 6211 } 6212 6213 del_gendisk(rbd_dev->disk); 6214 spin_lock(&rbd_dev_list_lock); 6215 list_del_init(&rbd_dev->node); 6216 spin_unlock(&rbd_dev_list_lock); 6217 device_del(&rbd_dev->dev); 6218 6219 rbd_dev_image_unlock(rbd_dev); 6220 rbd_dev_device_release(rbd_dev); 6221 rbd_dev_image_release(rbd_dev); 6222 rbd_dev_destroy(rbd_dev); 6223 return count; 6224 } 6225 6226 static ssize_t remove_store(struct bus_type *bus, const char *buf, size_t count) 6227 { 6228 if (single_major) 6229 return -EINVAL; 6230 6231 return do_rbd_remove(bus, buf, count); 6232 } 6233 6234 static ssize_t remove_single_major_store(struct bus_type *bus, const char *buf, 6235 size_t count) 6236 { 6237 return do_rbd_remove(bus, buf, count); 6238 } 6239 6240 /* 6241 * create control files in sysfs 6242 * /sys/bus/rbd/... 6243 */ 6244 static int __init rbd_sysfs_init(void) 6245 { 6246 int ret; 6247 6248 ret = device_register(&rbd_root_dev); 6249 if (ret < 0) 6250 return ret; 6251 6252 ret = bus_register(&rbd_bus_type); 6253 if (ret < 0) 6254 device_unregister(&rbd_root_dev); 6255 6256 return ret; 6257 } 6258 6259 static void __exit rbd_sysfs_cleanup(void) 6260 { 6261 bus_unregister(&rbd_bus_type); 6262 device_unregister(&rbd_root_dev); 6263 } 6264 6265 static int __init rbd_slab_init(void) 6266 { 6267 rbd_assert(!rbd_img_request_cache); 6268 rbd_img_request_cache = KMEM_CACHE(rbd_img_request, 0); 6269 if (!rbd_img_request_cache) 6270 return -ENOMEM; 6271 6272 rbd_assert(!rbd_obj_request_cache); 6273 rbd_obj_request_cache = KMEM_CACHE(rbd_obj_request, 0); 6274 if (!rbd_obj_request_cache) 6275 goto out_err; 6276 6277 return 0; 6278 6279 out_err: 6280 kmem_cache_destroy(rbd_img_request_cache); 6281 rbd_img_request_cache = NULL; 6282 return -ENOMEM; 6283 } 6284 6285 static void rbd_slab_exit(void) 6286 { 6287 rbd_assert(rbd_obj_request_cache); 6288 kmem_cache_destroy(rbd_obj_request_cache); 6289 rbd_obj_request_cache = NULL; 6290 6291 rbd_assert(rbd_img_request_cache); 6292 kmem_cache_destroy(rbd_img_request_cache); 6293 rbd_img_request_cache = NULL; 6294 } 6295 6296 static int __init rbd_init(void) 6297 { 6298 int rc; 6299 6300 if (!libceph_compatible(NULL)) { 6301 rbd_warn(NULL, "libceph incompatibility (quitting)"); 6302 return -EINVAL; 6303 } 6304 6305 rc = rbd_slab_init(); 6306 if (rc) 6307 return rc; 6308 6309 /* 6310 * The number of active work items is limited by the number of 6311 * rbd devices * queue depth, so leave @max_active at default. 6312 */ 6313 rbd_wq = alloc_workqueue(RBD_DRV_NAME, WQ_MEM_RECLAIM, 0); 6314 if (!rbd_wq) { 6315 rc = -ENOMEM; 6316 goto err_out_slab; 6317 } 6318 6319 if (single_major) { 6320 rbd_major = register_blkdev(0, RBD_DRV_NAME); 6321 if (rbd_major < 0) { 6322 rc = rbd_major; 6323 goto err_out_wq; 6324 } 6325 } 6326 6327 rc = rbd_sysfs_init(); 6328 if (rc) 6329 goto err_out_blkdev; 6330 6331 if (single_major) 6332 pr_info("loaded (major %d)\n", rbd_major); 6333 else 6334 pr_info("loaded\n"); 6335 6336 return 0; 6337 6338 err_out_blkdev: 6339 if (single_major) 6340 unregister_blkdev(rbd_major, RBD_DRV_NAME); 6341 err_out_wq: 6342 destroy_workqueue(rbd_wq); 6343 err_out_slab: 6344 rbd_slab_exit(); 6345 return rc; 6346 } 6347 6348 static void __exit rbd_exit(void) 6349 { 6350 ida_destroy(&rbd_dev_id_ida); 6351 rbd_sysfs_cleanup(); 6352 if (single_major) 6353 unregister_blkdev(rbd_major, RBD_DRV_NAME); 6354 destroy_workqueue(rbd_wq); 6355 rbd_slab_exit(); 6356 } 6357 6358 module_init(rbd_init); 6359 module_exit(rbd_exit); 6360 6361 MODULE_AUTHOR("Alex Elder <elder@inktank.com>"); 6362 MODULE_AUTHOR("Sage Weil <sage@newdream.net>"); 6363 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>"); 6364 /* following authorship retained from original osdblk.c */ 6365 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>"); 6366 6367 MODULE_DESCRIPTION("RADOS Block Device (RBD) driver"); 6368 MODULE_LICENSE("GPL"); 6369