1 2 /* 3 rbd.c -- Export ceph rados objects as a Linux block device 4 5 6 based on drivers/block/osdblk.c: 7 8 Copyright 2009 Red Hat, Inc. 9 10 This program is free software; you can redistribute it and/or modify 11 it under the terms of the GNU General Public License as published by 12 the Free Software Foundation. 13 14 This program is distributed in the hope that it will be useful, 15 but WITHOUT ANY WARRANTY; without even the implied warranty of 16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 17 GNU General Public License for more details. 18 19 You should have received a copy of the GNU General Public License 20 along with this program; see the file COPYING. If not, write to 21 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA. 22 23 24 25 For usage instructions, please refer to: 26 27 Documentation/ABI/testing/sysfs-bus-rbd 28 29 */ 30 31 #include <linux/ceph/libceph.h> 32 #include <linux/ceph/osd_client.h> 33 #include <linux/ceph/mon_client.h> 34 #include <linux/ceph/cls_lock_client.h> 35 #include <linux/ceph/striper.h> 36 #include <linux/ceph/decode.h> 37 #include <linux/fs_parser.h> 38 #include <linux/bsearch.h> 39 40 #include <linux/kernel.h> 41 #include <linux/device.h> 42 #include <linux/module.h> 43 #include <linux/blk-mq.h> 44 #include <linux/fs.h> 45 #include <linux/blkdev.h> 46 #include <linux/slab.h> 47 #include <linux/idr.h> 48 #include <linux/workqueue.h> 49 50 #include "rbd_types.h" 51 52 #define RBD_DEBUG /* Activate rbd_assert() calls */ 53 54 /* 55 * Increment the given counter and return its updated value. 56 * If the counter is already 0 it will not be incremented. 57 * If the counter is already at its maximum value returns 58 * -EINVAL without updating it. 59 */ 60 static int atomic_inc_return_safe(atomic_t *v) 61 { 62 unsigned int counter; 63 64 counter = (unsigned int)atomic_fetch_add_unless(v, 1, 0); 65 if (counter <= (unsigned int)INT_MAX) 66 return (int)counter; 67 68 atomic_dec(v); 69 70 return -EINVAL; 71 } 72 73 /* Decrement the counter. Return the resulting value, or -EINVAL */ 74 static int atomic_dec_return_safe(atomic_t *v) 75 { 76 int counter; 77 78 counter = atomic_dec_return(v); 79 if (counter >= 0) 80 return counter; 81 82 atomic_inc(v); 83 84 return -EINVAL; 85 } 86 87 #define RBD_DRV_NAME "rbd" 88 89 #define RBD_MINORS_PER_MAJOR 256 90 #define RBD_SINGLE_MAJOR_PART_SHIFT 4 91 92 #define RBD_MAX_PARENT_CHAIN_LEN 16 93 94 #define RBD_SNAP_DEV_NAME_PREFIX "snap_" 95 #define RBD_MAX_SNAP_NAME_LEN \ 96 (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1)) 97 98 #define RBD_MAX_SNAP_COUNT 510 /* allows max snapc to fit in 4KB */ 99 100 #define RBD_SNAP_HEAD_NAME "-" 101 102 #define BAD_SNAP_INDEX U32_MAX /* invalid index into snap array */ 103 104 /* This allows a single page to hold an image name sent by OSD */ 105 #define RBD_IMAGE_NAME_LEN_MAX (PAGE_SIZE - sizeof (__le32) - 1) 106 #define RBD_IMAGE_ID_LEN_MAX 64 107 108 #define RBD_OBJ_PREFIX_LEN_MAX 64 109 110 #define RBD_NOTIFY_TIMEOUT 5 /* seconds */ 111 #define RBD_RETRY_DELAY msecs_to_jiffies(1000) 112 113 /* Feature bits */ 114 115 #define RBD_FEATURE_LAYERING (1ULL<<0) 116 #define RBD_FEATURE_STRIPINGV2 (1ULL<<1) 117 #define RBD_FEATURE_EXCLUSIVE_LOCK (1ULL<<2) 118 #define RBD_FEATURE_OBJECT_MAP (1ULL<<3) 119 #define RBD_FEATURE_FAST_DIFF (1ULL<<4) 120 #define RBD_FEATURE_DEEP_FLATTEN (1ULL<<5) 121 #define RBD_FEATURE_DATA_POOL (1ULL<<7) 122 #define RBD_FEATURE_OPERATIONS (1ULL<<8) 123 124 #define RBD_FEATURES_ALL (RBD_FEATURE_LAYERING | \ 125 RBD_FEATURE_STRIPINGV2 | \ 126 RBD_FEATURE_EXCLUSIVE_LOCK | \ 127 RBD_FEATURE_OBJECT_MAP | \ 128 RBD_FEATURE_FAST_DIFF | \ 129 RBD_FEATURE_DEEP_FLATTEN | \ 130 RBD_FEATURE_DATA_POOL | \ 131 RBD_FEATURE_OPERATIONS) 132 133 /* Features supported by this (client software) implementation. */ 134 135 #define RBD_FEATURES_SUPPORTED (RBD_FEATURES_ALL) 136 137 /* 138 * An RBD device name will be "rbd#", where the "rbd" comes from 139 * RBD_DRV_NAME above, and # is a unique integer identifier. 140 */ 141 #define DEV_NAME_LEN 32 142 143 /* 144 * block device image metadata (in-memory version) 145 */ 146 struct rbd_image_header { 147 /* These six fields never change for a given rbd image */ 148 char *object_prefix; 149 __u8 obj_order; 150 u64 stripe_unit; 151 u64 stripe_count; 152 s64 data_pool_id; 153 u64 features; /* Might be changeable someday? */ 154 155 /* The remaining fields need to be updated occasionally */ 156 u64 image_size; 157 struct ceph_snap_context *snapc; 158 char *snap_names; /* format 1 only */ 159 u64 *snap_sizes; /* format 1 only */ 160 }; 161 162 /* 163 * An rbd image specification. 164 * 165 * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely 166 * identify an image. Each rbd_dev structure includes a pointer to 167 * an rbd_spec structure that encapsulates this identity. 168 * 169 * Each of the id's in an rbd_spec has an associated name. For a 170 * user-mapped image, the names are supplied and the id's associated 171 * with them are looked up. For a layered image, a parent image is 172 * defined by the tuple, and the names are looked up. 173 * 174 * An rbd_dev structure contains a parent_spec pointer which is 175 * non-null if the image it represents is a child in a layered 176 * image. This pointer will refer to the rbd_spec structure used 177 * by the parent rbd_dev for its own identity (i.e., the structure 178 * is shared between the parent and child). 179 * 180 * Since these structures are populated once, during the discovery 181 * phase of image construction, they are effectively immutable so 182 * we make no effort to synchronize access to them. 183 * 184 * Note that code herein does not assume the image name is known (it 185 * could be a null pointer). 186 */ 187 struct rbd_spec { 188 u64 pool_id; 189 const char *pool_name; 190 const char *pool_ns; /* NULL if default, never "" */ 191 192 const char *image_id; 193 const char *image_name; 194 195 u64 snap_id; 196 const char *snap_name; 197 198 struct kref kref; 199 }; 200 201 /* 202 * an instance of the client. multiple devices may share an rbd client. 203 */ 204 struct rbd_client { 205 struct ceph_client *client; 206 struct kref kref; 207 struct list_head node; 208 }; 209 210 struct pending_result { 211 int result; /* first nonzero result */ 212 int num_pending; 213 }; 214 215 struct rbd_img_request; 216 217 enum obj_request_type { 218 OBJ_REQUEST_NODATA = 1, 219 OBJ_REQUEST_BIO, /* pointer into provided bio (list) */ 220 OBJ_REQUEST_BVECS, /* pointer into provided bio_vec array */ 221 OBJ_REQUEST_OWN_BVECS, /* private bio_vec array, doesn't own pages */ 222 }; 223 224 enum obj_operation_type { 225 OBJ_OP_READ = 1, 226 OBJ_OP_WRITE, 227 OBJ_OP_DISCARD, 228 OBJ_OP_ZEROOUT, 229 }; 230 231 #define RBD_OBJ_FLAG_DELETION (1U << 0) 232 #define RBD_OBJ_FLAG_COPYUP_ENABLED (1U << 1) 233 #define RBD_OBJ_FLAG_COPYUP_ZEROS (1U << 2) 234 #define RBD_OBJ_FLAG_MAY_EXIST (1U << 3) 235 #define RBD_OBJ_FLAG_NOOP_FOR_NONEXISTENT (1U << 4) 236 237 enum rbd_obj_read_state { 238 RBD_OBJ_READ_START = 1, 239 RBD_OBJ_READ_OBJECT, 240 RBD_OBJ_READ_PARENT, 241 }; 242 243 /* 244 * Writes go through the following state machine to deal with 245 * layering: 246 * 247 * . . . . . RBD_OBJ_WRITE_GUARD. . . . . . . . . . . . . . 248 * . | . 249 * . v . 250 * . RBD_OBJ_WRITE_READ_FROM_PARENT. . . . 251 * . | . . 252 * . v v (deep-copyup . 253 * (image . RBD_OBJ_WRITE_COPYUP_EMPTY_SNAPC . not needed) . 254 * flattened) v | . . 255 * . v . . 256 * . . . .RBD_OBJ_WRITE_COPYUP_OPS. . . . . (copyup . 257 * | not needed) v 258 * v . 259 * done . . . . . . . . . . . . . . . . . . 260 * ^ 261 * | 262 * RBD_OBJ_WRITE_FLAT 263 * 264 * Writes start in RBD_OBJ_WRITE_GUARD or _FLAT, depending on whether 265 * assert_exists guard is needed or not (in some cases it's not needed 266 * even if there is a parent). 267 */ 268 enum rbd_obj_write_state { 269 RBD_OBJ_WRITE_START = 1, 270 RBD_OBJ_WRITE_PRE_OBJECT_MAP, 271 RBD_OBJ_WRITE_OBJECT, 272 __RBD_OBJ_WRITE_COPYUP, 273 RBD_OBJ_WRITE_COPYUP, 274 RBD_OBJ_WRITE_POST_OBJECT_MAP, 275 }; 276 277 enum rbd_obj_copyup_state { 278 RBD_OBJ_COPYUP_START = 1, 279 RBD_OBJ_COPYUP_READ_PARENT, 280 __RBD_OBJ_COPYUP_OBJECT_MAPS, 281 RBD_OBJ_COPYUP_OBJECT_MAPS, 282 __RBD_OBJ_COPYUP_WRITE_OBJECT, 283 RBD_OBJ_COPYUP_WRITE_OBJECT, 284 }; 285 286 struct rbd_obj_request { 287 struct ceph_object_extent ex; 288 unsigned int flags; /* RBD_OBJ_FLAG_* */ 289 union { 290 enum rbd_obj_read_state read_state; /* for reads */ 291 enum rbd_obj_write_state write_state; /* for writes */ 292 }; 293 294 struct rbd_img_request *img_request; 295 struct ceph_file_extent *img_extents; 296 u32 num_img_extents; 297 298 union { 299 struct ceph_bio_iter bio_pos; 300 struct { 301 struct ceph_bvec_iter bvec_pos; 302 u32 bvec_count; 303 u32 bvec_idx; 304 }; 305 }; 306 307 enum rbd_obj_copyup_state copyup_state; 308 struct bio_vec *copyup_bvecs; 309 u32 copyup_bvec_count; 310 311 struct list_head osd_reqs; /* w/ r_private_item */ 312 313 struct mutex state_mutex; 314 struct pending_result pending; 315 struct kref kref; 316 }; 317 318 enum img_req_flags { 319 IMG_REQ_CHILD, /* initiator: block = 0, child image = 1 */ 320 IMG_REQ_LAYERED, /* ENOENT handling: normal = 0, layered = 1 */ 321 }; 322 323 enum rbd_img_state { 324 RBD_IMG_START = 1, 325 RBD_IMG_EXCLUSIVE_LOCK, 326 __RBD_IMG_OBJECT_REQUESTS, 327 RBD_IMG_OBJECT_REQUESTS, 328 }; 329 330 struct rbd_img_request { 331 struct rbd_device *rbd_dev; 332 enum obj_operation_type op_type; 333 enum obj_request_type data_type; 334 unsigned long flags; 335 enum rbd_img_state state; 336 union { 337 u64 snap_id; /* for reads */ 338 struct ceph_snap_context *snapc; /* for writes */ 339 }; 340 struct rbd_obj_request *obj_request; /* obj req initiator */ 341 342 struct list_head lock_item; 343 struct list_head object_extents; /* obj_req.ex structs */ 344 345 struct mutex state_mutex; 346 struct pending_result pending; 347 struct work_struct work; 348 int work_result; 349 }; 350 351 #define for_each_obj_request(ireq, oreq) \ 352 list_for_each_entry(oreq, &(ireq)->object_extents, ex.oe_item) 353 #define for_each_obj_request_safe(ireq, oreq, n) \ 354 list_for_each_entry_safe(oreq, n, &(ireq)->object_extents, ex.oe_item) 355 356 enum rbd_watch_state { 357 RBD_WATCH_STATE_UNREGISTERED, 358 RBD_WATCH_STATE_REGISTERED, 359 RBD_WATCH_STATE_ERROR, 360 }; 361 362 enum rbd_lock_state { 363 RBD_LOCK_STATE_UNLOCKED, 364 RBD_LOCK_STATE_LOCKED, 365 RBD_LOCK_STATE_RELEASING, 366 }; 367 368 /* WatchNotify::ClientId */ 369 struct rbd_client_id { 370 u64 gid; 371 u64 handle; 372 }; 373 374 struct rbd_mapping { 375 u64 size; 376 }; 377 378 /* 379 * a single device 380 */ 381 struct rbd_device { 382 int dev_id; /* blkdev unique id */ 383 384 int major; /* blkdev assigned major */ 385 int minor; 386 struct gendisk *disk; /* blkdev's gendisk and rq */ 387 388 u32 image_format; /* Either 1 or 2 */ 389 struct rbd_client *rbd_client; 390 391 char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */ 392 393 spinlock_t lock; /* queue, flags, open_count */ 394 395 struct rbd_image_header header; 396 unsigned long flags; /* possibly lock protected */ 397 struct rbd_spec *spec; 398 struct rbd_options *opts; 399 char *config_info; /* add{,_single_major} string */ 400 401 struct ceph_object_id header_oid; 402 struct ceph_object_locator header_oloc; 403 404 struct ceph_file_layout layout; /* used for all rbd requests */ 405 406 struct mutex watch_mutex; 407 enum rbd_watch_state watch_state; 408 struct ceph_osd_linger_request *watch_handle; 409 u64 watch_cookie; 410 struct delayed_work watch_dwork; 411 412 struct rw_semaphore lock_rwsem; 413 enum rbd_lock_state lock_state; 414 char lock_cookie[32]; 415 struct rbd_client_id owner_cid; 416 struct work_struct acquired_lock_work; 417 struct work_struct released_lock_work; 418 struct delayed_work lock_dwork; 419 struct work_struct unlock_work; 420 spinlock_t lock_lists_lock; 421 struct list_head acquiring_list; 422 struct list_head running_list; 423 struct completion acquire_wait; 424 int acquire_err; 425 struct completion releasing_wait; 426 427 spinlock_t object_map_lock; 428 u8 *object_map; 429 u64 object_map_size; /* in objects */ 430 u64 object_map_flags; 431 432 struct workqueue_struct *task_wq; 433 434 struct rbd_spec *parent_spec; 435 u64 parent_overlap; 436 atomic_t parent_ref; 437 struct rbd_device *parent; 438 439 /* Block layer tags. */ 440 struct blk_mq_tag_set tag_set; 441 442 /* protects updating the header */ 443 struct rw_semaphore header_rwsem; 444 445 struct rbd_mapping mapping; 446 447 struct list_head node; 448 449 /* sysfs related */ 450 struct device dev; 451 unsigned long open_count; /* protected by lock */ 452 }; 453 454 /* 455 * Flag bits for rbd_dev->flags: 456 * - REMOVING (which is coupled with rbd_dev->open_count) is protected 457 * by rbd_dev->lock 458 */ 459 enum rbd_dev_flags { 460 RBD_DEV_FLAG_EXISTS, /* rbd_dev_device_setup() ran */ 461 RBD_DEV_FLAG_REMOVING, /* this mapping is being removed */ 462 RBD_DEV_FLAG_READONLY, /* -o ro or snapshot */ 463 }; 464 465 static DEFINE_MUTEX(client_mutex); /* Serialize client creation */ 466 467 static LIST_HEAD(rbd_dev_list); /* devices */ 468 static DEFINE_SPINLOCK(rbd_dev_list_lock); 469 470 static LIST_HEAD(rbd_client_list); /* clients */ 471 static DEFINE_SPINLOCK(rbd_client_list_lock); 472 473 /* Slab caches for frequently-allocated structures */ 474 475 static struct kmem_cache *rbd_img_request_cache; 476 static struct kmem_cache *rbd_obj_request_cache; 477 478 static int rbd_major; 479 static DEFINE_IDA(rbd_dev_id_ida); 480 481 static struct workqueue_struct *rbd_wq; 482 483 static struct ceph_snap_context rbd_empty_snapc = { 484 .nref = REFCOUNT_INIT(1), 485 }; 486 487 /* 488 * single-major requires >= 0.75 version of userspace rbd utility. 489 */ 490 static bool single_major = true; 491 module_param(single_major, bool, 0444); 492 MODULE_PARM_DESC(single_major, "Use a single major number for all rbd devices (default: true)"); 493 494 static ssize_t add_store(struct bus_type *bus, const char *buf, size_t count); 495 static ssize_t remove_store(struct bus_type *bus, const char *buf, 496 size_t count); 497 static ssize_t add_single_major_store(struct bus_type *bus, const char *buf, 498 size_t count); 499 static ssize_t remove_single_major_store(struct bus_type *bus, const char *buf, 500 size_t count); 501 static int rbd_dev_image_probe(struct rbd_device *rbd_dev, int depth); 502 503 static int rbd_dev_id_to_minor(int dev_id) 504 { 505 return dev_id << RBD_SINGLE_MAJOR_PART_SHIFT; 506 } 507 508 static int minor_to_rbd_dev_id(int minor) 509 { 510 return minor >> RBD_SINGLE_MAJOR_PART_SHIFT; 511 } 512 513 static bool rbd_is_ro(struct rbd_device *rbd_dev) 514 { 515 return test_bit(RBD_DEV_FLAG_READONLY, &rbd_dev->flags); 516 } 517 518 static bool rbd_is_snap(struct rbd_device *rbd_dev) 519 { 520 return rbd_dev->spec->snap_id != CEPH_NOSNAP; 521 } 522 523 static bool __rbd_is_lock_owner(struct rbd_device *rbd_dev) 524 { 525 lockdep_assert_held(&rbd_dev->lock_rwsem); 526 527 return rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED || 528 rbd_dev->lock_state == RBD_LOCK_STATE_RELEASING; 529 } 530 531 static bool rbd_is_lock_owner(struct rbd_device *rbd_dev) 532 { 533 bool is_lock_owner; 534 535 down_read(&rbd_dev->lock_rwsem); 536 is_lock_owner = __rbd_is_lock_owner(rbd_dev); 537 up_read(&rbd_dev->lock_rwsem); 538 return is_lock_owner; 539 } 540 541 static ssize_t supported_features_show(struct bus_type *bus, char *buf) 542 { 543 return sprintf(buf, "0x%llx\n", RBD_FEATURES_SUPPORTED); 544 } 545 546 static BUS_ATTR_WO(add); 547 static BUS_ATTR_WO(remove); 548 static BUS_ATTR_WO(add_single_major); 549 static BUS_ATTR_WO(remove_single_major); 550 static BUS_ATTR_RO(supported_features); 551 552 static struct attribute *rbd_bus_attrs[] = { 553 &bus_attr_add.attr, 554 &bus_attr_remove.attr, 555 &bus_attr_add_single_major.attr, 556 &bus_attr_remove_single_major.attr, 557 &bus_attr_supported_features.attr, 558 NULL, 559 }; 560 561 static umode_t rbd_bus_is_visible(struct kobject *kobj, 562 struct attribute *attr, int index) 563 { 564 if (!single_major && 565 (attr == &bus_attr_add_single_major.attr || 566 attr == &bus_attr_remove_single_major.attr)) 567 return 0; 568 569 return attr->mode; 570 } 571 572 static const struct attribute_group rbd_bus_group = { 573 .attrs = rbd_bus_attrs, 574 .is_visible = rbd_bus_is_visible, 575 }; 576 __ATTRIBUTE_GROUPS(rbd_bus); 577 578 static struct bus_type rbd_bus_type = { 579 .name = "rbd", 580 .bus_groups = rbd_bus_groups, 581 }; 582 583 static void rbd_root_dev_release(struct device *dev) 584 { 585 } 586 587 static struct device rbd_root_dev = { 588 .init_name = "rbd", 589 .release = rbd_root_dev_release, 590 }; 591 592 static __printf(2, 3) 593 void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...) 594 { 595 struct va_format vaf; 596 va_list args; 597 598 va_start(args, fmt); 599 vaf.fmt = fmt; 600 vaf.va = &args; 601 602 if (!rbd_dev) 603 printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf); 604 else if (rbd_dev->disk) 605 printk(KERN_WARNING "%s: %s: %pV\n", 606 RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf); 607 else if (rbd_dev->spec && rbd_dev->spec->image_name) 608 printk(KERN_WARNING "%s: image %s: %pV\n", 609 RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf); 610 else if (rbd_dev->spec && rbd_dev->spec->image_id) 611 printk(KERN_WARNING "%s: id %s: %pV\n", 612 RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf); 613 else /* punt */ 614 printk(KERN_WARNING "%s: rbd_dev %p: %pV\n", 615 RBD_DRV_NAME, rbd_dev, &vaf); 616 va_end(args); 617 } 618 619 #ifdef RBD_DEBUG 620 #define rbd_assert(expr) \ 621 if (unlikely(!(expr))) { \ 622 printk(KERN_ERR "\nAssertion failure in %s() " \ 623 "at line %d:\n\n" \ 624 "\trbd_assert(%s);\n\n", \ 625 __func__, __LINE__, #expr); \ 626 BUG(); \ 627 } 628 #else /* !RBD_DEBUG */ 629 # define rbd_assert(expr) ((void) 0) 630 #endif /* !RBD_DEBUG */ 631 632 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev); 633 634 static int rbd_dev_refresh(struct rbd_device *rbd_dev); 635 static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev); 636 static int rbd_dev_header_info(struct rbd_device *rbd_dev); 637 static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev); 638 static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev, 639 u64 snap_id); 640 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id, 641 u8 *order, u64 *snap_size); 642 static int rbd_dev_v2_get_flags(struct rbd_device *rbd_dev); 643 644 static void rbd_obj_handle_request(struct rbd_obj_request *obj_req, int result); 645 static void rbd_img_handle_request(struct rbd_img_request *img_req, int result); 646 647 /* 648 * Return true if nothing else is pending. 649 */ 650 static bool pending_result_dec(struct pending_result *pending, int *result) 651 { 652 rbd_assert(pending->num_pending > 0); 653 654 if (*result && !pending->result) 655 pending->result = *result; 656 if (--pending->num_pending) 657 return false; 658 659 *result = pending->result; 660 return true; 661 } 662 663 static int rbd_open(struct block_device *bdev, fmode_t mode) 664 { 665 struct rbd_device *rbd_dev = bdev->bd_disk->private_data; 666 bool removing = false; 667 668 spin_lock_irq(&rbd_dev->lock); 669 if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags)) 670 removing = true; 671 else 672 rbd_dev->open_count++; 673 spin_unlock_irq(&rbd_dev->lock); 674 if (removing) 675 return -ENOENT; 676 677 (void) get_device(&rbd_dev->dev); 678 679 return 0; 680 } 681 682 static void rbd_release(struct gendisk *disk, fmode_t mode) 683 { 684 struct rbd_device *rbd_dev = disk->private_data; 685 unsigned long open_count_before; 686 687 spin_lock_irq(&rbd_dev->lock); 688 open_count_before = rbd_dev->open_count--; 689 spin_unlock_irq(&rbd_dev->lock); 690 rbd_assert(open_count_before > 0); 691 692 put_device(&rbd_dev->dev); 693 } 694 695 static int rbd_ioctl_set_ro(struct rbd_device *rbd_dev, unsigned long arg) 696 { 697 int ro; 698 699 if (get_user(ro, (int __user *)arg)) 700 return -EFAULT; 701 702 /* 703 * Both images mapped read-only and snapshots can't be marked 704 * read-write. 705 */ 706 if (!ro) { 707 if (rbd_is_ro(rbd_dev)) 708 return -EROFS; 709 710 rbd_assert(!rbd_is_snap(rbd_dev)); 711 } 712 713 /* Let blkdev_roset() handle it */ 714 return -ENOTTY; 715 } 716 717 static int rbd_ioctl(struct block_device *bdev, fmode_t mode, 718 unsigned int cmd, unsigned long arg) 719 { 720 struct rbd_device *rbd_dev = bdev->bd_disk->private_data; 721 int ret; 722 723 switch (cmd) { 724 case BLKROSET: 725 ret = rbd_ioctl_set_ro(rbd_dev, arg); 726 break; 727 default: 728 ret = -ENOTTY; 729 } 730 731 return ret; 732 } 733 734 #ifdef CONFIG_COMPAT 735 static int rbd_compat_ioctl(struct block_device *bdev, fmode_t mode, 736 unsigned int cmd, unsigned long arg) 737 { 738 return rbd_ioctl(bdev, mode, cmd, arg); 739 } 740 #endif /* CONFIG_COMPAT */ 741 742 static const struct block_device_operations rbd_bd_ops = { 743 .owner = THIS_MODULE, 744 .open = rbd_open, 745 .release = rbd_release, 746 .ioctl = rbd_ioctl, 747 #ifdef CONFIG_COMPAT 748 .compat_ioctl = rbd_compat_ioctl, 749 #endif 750 }; 751 752 /* 753 * Initialize an rbd client instance. Success or not, this function 754 * consumes ceph_opts. Caller holds client_mutex. 755 */ 756 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts) 757 { 758 struct rbd_client *rbdc; 759 int ret = -ENOMEM; 760 761 dout("%s:\n", __func__); 762 rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL); 763 if (!rbdc) 764 goto out_opt; 765 766 kref_init(&rbdc->kref); 767 INIT_LIST_HEAD(&rbdc->node); 768 769 rbdc->client = ceph_create_client(ceph_opts, rbdc); 770 if (IS_ERR(rbdc->client)) 771 goto out_rbdc; 772 ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */ 773 774 ret = ceph_open_session(rbdc->client); 775 if (ret < 0) 776 goto out_client; 777 778 spin_lock(&rbd_client_list_lock); 779 list_add_tail(&rbdc->node, &rbd_client_list); 780 spin_unlock(&rbd_client_list_lock); 781 782 dout("%s: rbdc %p\n", __func__, rbdc); 783 784 return rbdc; 785 out_client: 786 ceph_destroy_client(rbdc->client); 787 out_rbdc: 788 kfree(rbdc); 789 out_opt: 790 if (ceph_opts) 791 ceph_destroy_options(ceph_opts); 792 dout("%s: error %d\n", __func__, ret); 793 794 return ERR_PTR(ret); 795 } 796 797 static struct rbd_client *__rbd_get_client(struct rbd_client *rbdc) 798 { 799 kref_get(&rbdc->kref); 800 801 return rbdc; 802 } 803 804 /* 805 * Find a ceph client with specific addr and configuration. If 806 * found, bump its reference count. 807 */ 808 static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts) 809 { 810 struct rbd_client *client_node; 811 bool found = false; 812 813 if (ceph_opts->flags & CEPH_OPT_NOSHARE) 814 return NULL; 815 816 spin_lock(&rbd_client_list_lock); 817 list_for_each_entry(client_node, &rbd_client_list, node) { 818 if (!ceph_compare_options(ceph_opts, client_node->client)) { 819 __rbd_get_client(client_node); 820 821 found = true; 822 break; 823 } 824 } 825 spin_unlock(&rbd_client_list_lock); 826 827 return found ? client_node : NULL; 828 } 829 830 /* 831 * (Per device) rbd map options 832 */ 833 enum { 834 Opt_queue_depth, 835 Opt_alloc_size, 836 Opt_lock_timeout, 837 /* int args above */ 838 Opt_pool_ns, 839 Opt_compression_hint, 840 /* string args above */ 841 Opt_read_only, 842 Opt_read_write, 843 Opt_lock_on_read, 844 Opt_exclusive, 845 Opt_notrim, 846 }; 847 848 enum { 849 Opt_compression_hint_none, 850 Opt_compression_hint_compressible, 851 Opt_compression_hint_incompressible, 852 }; 853 854 static const struct constant_table rbd_param_compression_hint[] = { 855 {"none", Opt_compression_hint_none}, 856 {"compressible", Opt_compression_hint_compressible}, 857 {"incompressible", Opt_compression_hint_incompressible}, 858 {} 859 }; 860 861 static const struct fs_parameter_spec rbd_parameters[] = { 862 fsparam_u32 ("alloc_size", Opt_alloc_size), 863 fsparam_enum ("compression_hint", Opt_compression_hint, 864 rbd_param_compression_hint), 865 fsparam_flag ("exclusive", Opt_exclusive), 866 fsparam_flag ("lock_on_read", Opt_lock_on_read), 867 fsparam_u32 ("lock_timeout", Opt_lock_timeout), 868 fsparam_flag ("notrim", Opt_notrim), 869 fsparam_string ("_pool_ns", Opt_pool_ns), 870 fsparam_u32 ("queue_depth", Opt_queue_depth), 871 fsparam_flag ("read_only", Opt_read_only), 872 fsparam_flag ("read_write", Opt_read_write), 873 fsparam_flag ("ro", Opt_read_only), 874 fsparam_flag ("rw", Opt_read_write), 875 {} 876 }; 877 878 struct rbd_options { 879 int queue_depth; 880 int alloc_size; 881 unsigned long lock_timeout; 882 bool read_only; 883 bool lock_on_read; 884 bool exclusive; 885 bool trim; 886 887 u32 alloc_hint_flags; /* CEPH_OSD_OP_ALLOC_HINT_FLAG_* */ 888 }; 889 890 #define RBD_QUEUE_DEPTH_DEFAULT BLKDEV_MAX_RQ 891 #define RBD_ALLOC_SIZE_DEFAULT (64 * 1024) 892 #define RBD_LOCK_TIMEOUT_DEFAULT 0 /* no timeout */ 893 #define RBD_READ_ONLY_DEFAULT false 894 #define RBD_LOCK_ON_READ_DEFAULT false 895 #define RBD_EXCLUSIVE_DEFAULT false 896 #define RBD_TRIM_DEFAULT true 897 898 struct rbd_parse_opts_ctx { 899 struct rbd_spec *spec; 900 struct ceph_options *copts; 901 struct rbd_options *opts; 902 }; 903 904 static char* obj_op_name(enum obj_operation_type op_type) 905 { 906 switch (op_type) { 907 case OBJ_OP_READ: 908 return "read"; 909 case OBJ_OP_WRITE: 910 return "write"; 911 case OBJ_OP_DISCARD: 912 return "discard"; 913 case OBJ_OP_ZEROOUT: 914 return "zeroout"; 915 default: 916 return "???"; 917 } 918 } 919 920 /* 921 * Destroy ceph client 922 * 923 * Caller must hold rbd_client_list_lock. 924 */ 925 static void rbd_client_release(struct kref *kref) 926 { 927 struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref); 928 929 dout("%s: rbdc %p\n", __func__, rbdc); 930 spin_lock(&rbd_client_list_lock); 931 list_del(&rbdc->node); 932 spin_unlock(&rbd_client_list_lock); 933 934 ceph_destroy_client(rbdc->client); 935 kfree(rbdc); 936 } 937 938 /* 939 * Drop reference to ceph client node. If it's not referenced anymore, release 940 * it. 941 */ 942 static void rbd_put_client(struct rbd_client *rbdc) 943 { 944 if (rbdc) 945 kref_put(&rbdc->kref, rbd_client_release); 946 } 947 948 /* 949 * Get a ceph client with specific addr and configuration, if one does 950 * not exist create it. Either way, ceph_opts is consumed by this 951 * function. 952 */ 953 static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts) 954 { 955 struct rbd_client *rbdc; 956 int ret; 957 958 mutex_lock(&client_mutex); 959 rbdc = rbd_client_find(ceph_opts); 960 if (rbdc) { 961 ceph_destroy_options(ceph_opts); 962 963 /* 964 * Using an existing client. Make sure ->pg_pools is up to 965 * date before we look up the pool id in do_rbd_add(). 966 */ 967 ret = ceph_wait_for_latest_osdmap(rbdc->client, 968 rbdc->client->options->mount_timeout); 969 if (ret) { 970 rbd_warn(NULL, "failed to get latest osdmap: %d", ret); 971 rbd_put_client(rbdc); 972 rbdc = ERR_PTR(ret); 973 } 974 } else { 975 rbdc = rbd_client_create(ceph_opts); 976 } 977 mutex_unlock(&client_mutex); 978 979 return rbdc; 980 } 981 982 static bool rbd_image_format_valid(u32 image_format) 983 { 984 return image_format == 1 || image_format == 2; 985 } 986 987 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk) 988 { 989 size_t size; 990 u32 snap_count; 991 992 /* The header has to start with the magic rbd header text */ 993 if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT))) 994 return false; 995 996 /* The bio layer requires at least sector-sized I/O */ 997 998 if (ondisk->options.order < SECTOR_SHIFT) 999 return false; 1000 1001 /* If we use u64 in a few spots we may be able to loosen this */ 1002 1003 if (ondisk->options.order > 8 * sizeof (int) - 1) 1004 return false; 1005 1006 /* 1007 * The size of a snapshot header has to fit in a size_t, and 1008 * that limits the number of snapshots. 1009 */ 1010 snap_count = le32_to_cpu(ondisk->snap_count); 1011 size = SIZE_MAX - sizeof (struct ceph_snap_context); 1012 if (snap_count > size / sizeof (__le64)) 1013 return false; 1014 1015 /* 1016 * Not only that, but the size of the entire the snapshot 1017 * header must also be representable in a size_t. 1018 */ 1019 size -= snap_count * sizeof (__le64); 1020 if ((u64) size < le64_to_cpu(ondisk->snap_names_len)) 1021 return false; 1022 1023 return true; 1024 } 1025 1026 /* 1027 * returns the size of an object in the image 1028 */ 1029 static u32 rbd_obj_bytes(struct rbd_image_header *header) 1030 { 1031 return 1U << header->obj_order; 1032 } 1033 1034 static void rbd_init_layout(struct rbd_device *rbd_dev) 1035 { 1036 if (rbd_dev->header.stripe_unit == 0 || 1037 rbd_dev->header.stripe_count == 0) { 1038 rbd_dev->header.stripe_unit = rbd_obj_bytes(&rbd_dev->header); 1039 rbd_dev->header.stripe_count = 1; 1040 } 1041 1042 rbd_dev->layout.stripe_unit = rbd_dev->header.stripe_unit; 1043 rbd_dev->layout.stripe_count = rbd_dev->header.stripe_count; 1044 rbd_dev->layout.object_size = rbd_obj_bytes(&rbd_dev->header); 1045 rbd_dev->layout.pool_id = rbd_dev->header.data_pool_id == CEPH_NOPOOL ? 1046 rbd_dev->spec->pool_id : rbd_dev->header.data_pool_id; 1047 RCU_INIT_POINTER(rbd_dev->layout.pool_ns, NULL); 1048 } 1049 1050 /* 1051 * Fill an rbd image header with information from the given format 1 1052 * on-disk header. 1053 */ 1054 static int rbd_header_from_disk(struct rbd_device *rbd_dev, 1055 struct rbd_image_header_ondisk *ondisk) 1056 { 1057 struct rbd_image_header *header = &rbd_dev->header; 1058 bool first_time = header->object_prefix == NULL; 1059 struct ceph_snap_context *snapc; 1060 char *object_prefix = NULL; 1061 char *snap_names = NULL; 1062 u64 *snap_sizes = NULL; 1063 u32 snap_count; 1064 int ret = -ENOMEM; 1065 u32 i; 1066 1067 /* Allocate this now to avoid having to handle failure below */ 1068 1069 if (first_time) { 1070 object_prefix = kstrndup(ondisk->object_prefix, 1071 sizeof(ondisk->object_prefix), 1072 GFP_KERNEL); 1073 if (!object_prefix) 1074 return -ENOMEM; 1075 } 1076 1077 /* Allocate the snapshot context and fill it in */ 1078 1079 snap_count = le32_to_cpu(ondisk->snap_count); 1080 snapc = ceph_create_snap_context(snap_count, GFP_KERNEL); 1081 if (!snapc) 1082 goto out_err; 1083 snapc->seq = le64_to_cpu(ondisk->snap_seq); 1084 if (snap_count) { 1085 struct rbd_image_snap_ondisk *snaps; 1086 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len); 1087 1088 /* We'll keep a copy of the snapshot names... */ 1089 1090 if (snap_names_len > (u64)SIZE_MAX) 1091 goto out_2big; 1092 snap_names = kmalloc(snap_names_len, GFP_KERNEL); 1093 if (!snap_names) 1094 goto out_err; 1095 1096 /* ...as well as the array of their sizes. */ 1097 snap_sizes = kmalloc_array(snap_count, 1098 sizeof(*header->snap_sizes), 1099 GFP_KERNEL); 1100 if (!snap_sizes) 1101 goto out_err; 1102 1103 /* 1104 * Copy the names, and fill in each snapshot's id 1105 * and size. 1106 * 1107 * Note that rbd_dev_v1_header_info() guarantees the 1108 * ondisk buffer we're working with has 1109 * snap_names_len bytes beyond the end of the 1110 * snapshot id array, this memcpy() is safe. 1111 */ 1112 memcpy(snap_names, &ondisk->snaps[snap_count], snap_names_len); 1113 snaps = ondisk->snaps; 1114 for (i = 0; i < snap_count; i++) { 1115 snapc->snaps[i] = le64_to_cpu(snaps[i].id); 1116 snap_sizes[i] = le64_to_cpu(snaps[i].image_size); 1117 } 1118 } 1119 1120 /* We won't fail any more, fill in the header */ 1121 1122 if (first_time) { 1123 header->object_prefix = object_prefix; 1124 header->obj_order = ondisk->options.order; 1125 rbd_init_layout(rbd_dev); 1126 } else { 1127 ceph_put_snap_context(header->snapc); 1128 kfree(header->snap_names); 1129 kfree(header->snap_sizes); 1130 } 1131 1132 /* The remaining fields always get updated (when we refresh) */ 1133 1134 header->image_size = le64_to_cpu(ondisk->image_size); 1135 header->snapc = snapc; 1136 header->snap_names = snap_names; 1137 header->snap_sizes = snap_sizes; 1138 1139 return 0; 1140 out_2big: 1141 ret = -EIO; 1142 out_err: 1143 kfree(snap_sizes); 1144 kfree(snap_names); 1145 ceph_put_snap_context(snapc); 1146 kfree(object_prefix); 1147 1148 return ret; 1149 } 1150 1151 static const char *_rbd_dev_v1_snap_name(struct rbd_device *rbd_dev, u32 which) 1152 { 1153 const char *snap_name; 1154 1155 rbd_assert(which < rbd_dev->header.snapc->num_snaps); 1156 1157 /* Skip over names until we find the one we are looking for */ 1158 1159 snap_name = rbd_dev->header.snap_names; 1160 while (which--) 1161 snap_name += strlen(snap_name) + 1; 1162 1163 return kstrdup(snap_name, GFP_KERNEL); 1164 } 1165 1166 /* 1167 * Snapshot id comparison function for use with qsort()/bsearch(). 1168 * Note that result is for snapshots in *descending* order. 1169 */ 1170 static int snapid_compare_reverse(const void *s1, const void *s2) 1171 { 1172 u64 snap_id1 = *(u64 *)s1; 1173 u64 snap_id2 = *(u64 *)s2; 1174 1175 if (snap_id1 < snap_id2) 1176 return 1; 1177 return snap_id1 == snap_id2 ? 0 : -1; 1178 } 1179 1180 /* 1181 * Search a snapshot context to see if the given snapshot id is 1182 * present. 1183 * 1184 * Returns the position of the snapshot id in the array if it's found, 1185 * or BAD_SNAP_INDEX otherwise. 1186 * 1187 * Note: The snapshot array is in kept sorted (by the osd) in 1188 * reverse order, highest snapshot id first. 1189 */ 1190 static u32 rbd_dev_snap_index(struct rbd_device *rbd_dev, u64 snap_id) 1191 { 1192 struct ceph_snap_context *snapc = rbd_dev->header.snapc; 1193 u64 *found; 1194 1195 found = bsearch(&snap_id, &snapc->snaps, snapc->num_snaps, 1196 sizeof (snap_id), snapid_compare_reverse); 1197 1198 return found ? (u32)(found - &snapc->snaps[0]) : BAD_SNAP_INDEX; 1199 } 1200 1201 static const char *rbd_dev_v1_snap_name(struct rbd_device *rbd_dev, 1202 u64 snap_id) 1203 { 1204 u32 which; 1205 const char *snap_name; 1206 1207 which = rbd_dev_snap_index(rbd_dev, snap_id); 1208 if (which == BAD_SNAP_INDEX) 1209 return ERR_PTR(-ENOENT); 1210 1211 snap_name = _rbd_dev_v1_snap_name(rbd_dev, which); 1212 return snap_name ? snap_name : ERR_PTR(-ENOMEM); 1213 } 1214 1215 static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id) 1216 { 1217 if (snap_id == CEPH_NOSNAP) 1218 return RBD_SNAP_HEAD_NAME; 1219 1220 rbd_assert(rbd_image_format_valid(rbd_dev->image_format)); 1221 if (rbd_dev->image_format == 1) 1222 return rbd_dev_v1_snap_name(rbd_dev, snap_id); 1223 1224 return rbd_dev_v2_snap_name(rbd_dev, snap_id); 1225 } 1226 1227 static int rbd_snap_size(struct rbd_device *rbd_dev, u64 snap_id, 1228 u64 *snap_size) 1229 { 1230 rbd_assert(rbd_image_format_valid(rbd_dev->image_format)); 1231 if (snap_id == CEPH_NOSNAP) { 1232 *snap_size = rbd_dev->header.image_size; 1233 } else if (rbd_dev->image_format == 1) { 1234 u32 which; 1235 1236 which = rbd_dev_snap_index(rbd_dev, snap_id); 1237 if (which == BAD_SNAP_INDEX) 1238 return -ENOENT; 1239 1240 *snap_size = rbd_dev->header.snap_sizes[which]; 1241 } else { 1242 u64 size = 0; 1243 int ret; 1244 1245 ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, NULL, &size); 1246 if (ret) 1247 return ret; 1248 1249 *snap_size = size; 1250 } 1251 return 0; 1252 } 1253 1254 static int rbd_dev_mapping_set(struct rbd_device *rbd_dev) 1255 { 1256 u64 snap_id = rbd_dev->spec->snap_id; 1257 u64 size = 0; 1258 int ret; 1259 1260 ret = rbd_snap_size(rbd_dev, snap_id, &size); 1261 if (ret) 1262 return ret; 1263 1264 rbd_dev->mapping.size = size; 1265 return 0; 1266 } 1267 1268 static void rbd_dev_mapping_clear(struct rbd_device *rbd_dev) 1269 { 1270 rbd_dev->mapping.size = 0; 1271 } 1272 1273 static void zero_bvec(struct bio_vec *bv) 1274 { 1275 void *buf; 1276 unsigned long flags; 1277 1278 buf = bvec_kmap_irq(bv, &flags); 1279 memset(buf, 0, bv->bv_len); 1280 flush_dcache_page(bv->bv_page); 1281 bvec_kunmap_irq(buf, &flags); 1282 } 1283 1284 static void zero_bios(struct ceph_bio_iter *bio_pos, u32 off, u32 bytes) 1285 { 1286 struct ceph_bio_iter it = *bio_pos; 1287 1288 ceph_bio_iter_advance(&it, off); 1289 ceph_bio_iter_advance_step(&it, bytes, ({ 1290 zero_bvec(&bv); 1291 })); 1292 } 1293 1294 static void zero_bvecs(struct ceph_bvec_iter *bvec_pos, u32 off, u32 bytes) 1295 { 1296 struct ceph_bvec_iter it = *bvec_pos; 1297 1298 ceph_bvec_iter_advance(&it, off); 1299 ceph_bvec_iter_advance_step(&it, bytes, ({ 1300 zero_bvec(&bv); 1301 })); 1302 } 1303 1304 /* 1305 * Zero a range in @obj_req data buffer defined by a bio (list) or 1306 * (private) bio_vec array. 1307 * 1308 * @off is relative to the start of the data buffer. 1309 */ 1310 static void rbd_obj_zero_range(struct rbd_obj_request *obj_req, u32 off, 1311 u32 bytes) 1312 { 1313 dout("%s %p data buf %u~%u\n", __func__, obj_req, off, bytes); 1314 1315 switch (obj_req->img_request->data_type) { 1316 case OBJ_REQUEST_BIO: 1317 zero_bios(&obj_req->bio_pos, off, bytes); 1318 break; 1319 case OBJ_REQUEST_BVECS: 1320 case OBJ_REQUEST_OWN_BVECS: 1321 zero_bvecs(&obj_req->bvec_pos, off, bytes); 1322 break; 1323 default: 1324 BUG(); 1325 } 1326 } 1327 1328 static void rbd_obj_request_destroy(struct kref *kref); 1329 static void rbd_obj_request_put(struct rbd_obj_request *obj_request) 1330 { 1331 rbd_assert(obj_request != NULL); 1332 dout("%s: obj %p (was %d)\n", __func__, obj_request, 1333 kref_read(&obj_request->kref)); 1334 kref_put(&obj_request->kref, rbd_obj_request_destroy); 1335 } 1336 1337 static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request, 1338 struct rbd_obj_request *obj_request) 1339 { 1340 rbd_assert(obj_request->img_request == NULL); 1341 1342 /* Image request now owns object's original reference */ 1343 obj_request->img_request = img_request; 1344 dout("%s: img %p obj %p\n", __func__, img_request, obj_request); 1345 } 1346 1347 static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request, 1348 struct rbd_obj_request *obj_request) 1349 { 1350 dout("%s: img %p obj %p\n", __func__, img_request, obj_request); 1351 list_del(&obj_request->ex.oe_item); 1352 rbd_assert(obj_request->img_request == img_request); 1353 rbd_obj_request_put(obj_request); 1354 } 1355 1356 static void rbd_osd_submit(struct ceph_osd_request *osd_req) 1357 { 1358 struct rbd_obj_request *obj_req = osd_req->r_priv; 1359 1360 dout("%s osd_req %p for obj_req %p objno %llu %llu~%llu\n", 1361 __func__, osd_req, obj_req, obj_req->ex.oe_objno, 1362 obj_req->ex.oe_off, obj_req->ex.oe_len); 1363 ceph_osdc_start_request(osd_req->r_osdc, osd_req, false); 1364 } 1365 1366 /* 1367 * The default/initial value for all image request flags is 0. Each 1368 * is conditionally set to 1 at image request initialization time 1369 * and currently never change thereafter. 1370 */ 1371 static void img_request_layered_set(struct rbd_img_request *img_request) 1372 { 1373 set_bit(IMG_REQ_LAYERED, &img_request->flags); 1374 } 1375 1376 static bool img_request_layered_test(struct rbd_img_request *img_request) 1377 { 1378 return test_bit(IMG_REQ_LAYERED, &img_request->flags) != 0; 1379 } 1380 1381 static bool rbd_obj_is_entire(struct rbd_obj_request *obj_req) 1382 { 1383 struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev; 1384 1385 return !obj_req->ex.oe_off && 1386 obj_req->ex.oe_len == rbd_dev->layout.object_size; 1387 } 1388 1389 static bool rbd_obj_is_tail(struct rbd_obj_request *obj_req) 1390 { 1391 struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev; 1392 1393 return obj_req->ex.oe_off + obj_req->ex.oe_len == 1394 rbd_dev->layout.object_size; 1395 } 1396 1397 /* 1398 * Must be called after rbd_obj_calc_img_extents(). 1399 */ 1400 static bool rbd_obj_copyup_enabled(struct rbd_obj_request *obj_req) 1401 { 1402 if (!obj_req->num_img_extents || 1403 (rbd_obj_is_entire(obj_req) && 1404 !obj_req->img_request->snapc->num_snaps)) 1405 return false; 1406 1407 return true; 1408 } 1409 1410 static u64 rbd_obj_img_extents_bytes(struct rbd_obj_request *obj_req) 1411 { 1412 return ceph_file_extents_bytes(obj_req->img_extents, 1413 obj_req->num_img_extents); 1414 } 1415 1416 static bool rbd_img_is_write(struct rbd_img_request *img_req) 1417 { 1418 switch (img_req->op_type) { 1419 case OBJ_OP_READ: 1420 return false; 1421 case OBJ_OP_WRITE: 1422 case OBJ_OP_DISCARD: 1423 case OBJ_OP_ZEROOUT: 1424 return true; 1425 default: 1426 BUG(); 1427 } 1428 } 1429 1430 static void rbd_osd_req_callback(struct ceph_osd_request *osd_req) 1431 { 1432 struct rbd_obj_request *obj_req = osd_req->r_priv; 1433 int result; 1434 1435 dout("%s osd_req %p result %d for obj_req %p\n", __func__, osd_req, 1436 osd_req->r_result, obj_req); 1437 1438 /* 1439 * Writes aren't allowed to return a data payload. In some 1440 * guarded write cases (e.g. stat + zero on an empty object) 1441 * a stat response makes it through, but we don't care. 1442 */ 1443 if (osd_req->r_result > 0 && rbd_img_is_write(obj_req->img_request)) 1444 result = 0; 1445 else 1446 result = osd_req->r_result; 1447 1448 rbd_obj_handle_request(obj_req, result); 1449 } 1450 1451 static void rbd_osd_format_read(struct ceph_osd_request *osd_req) 1452 { 1453 struct rbd_obj_request *obj_request = osd_req->r_priv; 1454 1455 osd_req->r_flags = CEPH_OSD_FLAG_READ; 1456 osd_req->r_snapid = obj_request->img_request->snap_id; 1457 } 1458 1459 static void rbd_osd_format_write(struct ceph_osd_request *osd_req) 1460 { 1461 struct rbd_obj_request *obj_request = osd_req->r_priv; 1462 1463 osd_req->r_flags = CEPH_OSD_FLAG_WRITE; 1464 ktime_get_real_ts64(&osd_req->r_mtime); 1465 osd_req->r_data_offset = obj_request->ex.oe_off; 1466 } 1467 1468 static struct ceph_osd_request * 1469 __rbd_obj_add_osd_request(struct rbd_obj_request *obj_req, 1470 struct ceph_snap_context *snapc, int num_ops) 1471 { 1472 struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev; 1473 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; 1474 struct ceph_osd_request *req; 1475 const char *name_format = rbd_dev->image_format == 1 ? 1476 RBD_V1_DATA_FORMAT : RBD_V2_DATA_FORMAT; 1477 int ret; 1478 1479 req = ceph_osdc_alloc_request(osdc, snapc, num_ops, false, GFP_NOIO); 1480 if (!req) 1481 return ERR_PTR(-ENOMEM); 1482 1483 list_add_tail(&req->r_private_item, &obj_req->osd_reqs); 1484 req->r_callback = rbd_osd_req_callback; 1485 req->r_priv = obj_req; 1486 1487 /* 1488 * Data objects may be stored in a separate pool, but always in 1489 * the same namespace in that pool as the header in its pool. 1490 */ 1491 ceph_oloc_copy(&req->r_base_oloc, &rbd_dev->header_oloc); 1492 req->r_base_oloc.pool = rbd_dev->layout.pool_id; 1493 1494 ret = ceph_oid_aprintf(&req->r_base_oid, GFP_NOIO, name_format, 1495 rbd_dev->header.object_prefix, 1496 obj_req->ex.oe_objno); 1497 if (ret) 1498 return ERR_PTR(ret); 1499 1500 return req; 1501 } 1502 1503 static struct ceph_osd_request * 1504 rbd_obj_add_osd_request(struct rbd_obj_request *obj_req, int num_ops) 1505 { 1506 return __rbd_obj_add_osd_request(obj_req, obj_req->img_request->snapc, 1507 num_ops); 1508 } 1509 1510 static struct rbd_obj_request *rbd_obj_request_create(void) 1511 { 1512 struct rbd_obj_request *obj_request; 1513 1514 obj_request = kmem_cache_zalloc(rbd_obj_request_cache, GFP_NOIO); 1515 if (!obj_request) 1516 return NULL; 1517 1518 ceph_object_extent_init(&obj_request->ex); 1519 INIT_LIST_HEAD(&obj_request->osd_reqs); 1520 mutex_init(&obj_request->state_mutex); 1521 kref_init(&obj_request->kref); 1522 1523 dout("%s %p\n", __func__, obj_request); 1524 return obj_request; 1525 } 1526 1527 static void rbd_obj_request_destroy(struct kref *kref) 1528 { 1529 struct rbd_obj_request *obj_request; 1530 struct ceph_osd_request *osd_req; 1531 u32 i; 1532 1533 obj_request = container_of(kref, struct rbd_obj_request, kref); 1534 1535 dout("%s: obj %p\n", __func__, obj_request); 1536 1537 while (!list_empty(&obj_request->osd_reqs)) { 1538 osd_req = list_first_entry(&obj_request->osd_reqs, 1539 struct ceph_osd_request, r_private_item); 1540 list_del_init(&osd_req->r_private_item); 1541 ceph_osdc_put_request(osd_req); 1542 } 1543 1544 switch (obj_request->img_request->data_type) { 1545 case OBJ_REQUEST_NODATA: 1546 case OBJ_REQUEST_BIO: 1547 case OBJ_REQUEST_BVECS: 1548 break; /* Nothing to do */ 1549 case OBJ_REQUEST_OWN_BVECS: 1550 kfree(obj_request->bvec_pos.bvecs); 1551 break; 1552 default: 1553 BUG(); 1554 } 1555 1556 kfree(obj_request->img_extents); 1557 if (obj_request->copyup_bvecs) { 1558 for (i = 0; i < obj_request->copyup_bvec_count; i++) { 1559 if (obj_request->copyup_bvecs[i].bv_page) 1560 __free_page(obj_request->copyup_bvecs[i].bv_page); 1561 } 1562 kfree(obj_request->copyup_bvecs); 1563 } 1564 1565 kmem_cache_free(rbd_obj_request_cache, obj_request); 1566 } 1567 1568 /* It's OK to call this for a device with no parent */ 1569 1570 static void rbd_spec_put(struct rbd_spec *spec); 1571 static void rbd_dev_unparent(struct rbd_device *rbd_dev) 1572 { 1573 rbd_dev_remove_parent(rbd_dev); 1574 rbd_spec_put(rbd_dev->parent_spec); 1575 rbd_dev->parent_spec = NULL; 1576 rbd_dev->parent_overlap = 0; 1577 } 1578 1579 /* 1580 * Parent image reference counting is used to determine when an 1581 * image's parent fields can be safely torn down--after there are no 1582 * more in-flight requests to the parent image. When the last 1583 * reference is dropped, cleaning them up is safe. 1584 */ 1585 static void rbd_dev_parent_put(struct rbd_device *rbd_dev) 1586 { 1587 int counter; 1588 1589 if (!rbd_dev->parent_spec) 1590 return; 1591 1592 counter = atomic_dec_return_safe(&rbd_dev->parent_ref); 1593 if (counter > 0) 1594 return; 1595 1596 /* Last reference; clean up parent data structures */ 1597 1598 if (!counter) 1599 rbd_dev_unparent(rbd_dev); 1600 else 1601 rbd_warn(rbd_dev, "parent reference underflow"); 1602 } 1603 1604 /* 1605 * If an image has a non-zero parent overlap, get a reference to its 1606 * parent. 1607 * 1608 * Returns true if the rbd device has a parent with a non-zero 1609 * overlap and a reference for it was successfully taken, or 1610 * false otherwise. 1611 */ 1612 static bool rbd_dev_parent_get(struct rbd_device *rbd_dev) 1613 { 1614 int counter = 0; 1615 1616 if (!rbd_dev->parent_spec) 1617 return false; 1618 1619 if (rbd_dev->parent_overlap) 1620 counter = atomic_inc_return_safe(&rbd_dev->parent_ref); 1621 1622 if (counter < 0) 1623 rbd_warn(rbd_dev, "parent reference overflow"); 1624 1625 return counter > 0; 1626 } 1627 1628 static void rbd_img_request_init(struct rbd_img_request *img_request, 1629 struct rbd_device *rbd_dev, 1630 enum obj_operation_type op_type) 1631 { 1632 memset(img_request, 0, sizeof(*img_request)); 1633 1634 img_request->rbd_dev = rbd_dev; 1635 img_request->op_type = op_type; 1636 1637 INIT_LIST_HEAD(&img_request->lock_item); 1638 INIT_LIST_HEAD(&img_request->object_extents); 1639 mutex_init(&img_request->state_mutex); 1640 } 1641 1642 static void rbd_img_capture_header(struct rbd_img_request *img_req) 1643 { 1644 struct rbd_device *rbd_dev = img_req->rbd_dev; 1645 1646 lockdep_assert_held(&rbd_dev->header_rwsem); 1647 1648 if (rbd_img_is_write(img_req)) 1649 img_req->snapc = ceph_get_snap_context(rbd_dev->header.snapc); 1650 else 1651 img_req->snap_id = rbd_dev->spec->snap_id; 1652 1653 if (rbd_dev_parent_get(rbd_dev)) 1654 img_request_layered_set(img_req); 1655 } 1656 1657 static void rbd_img_request_destroy(struct rbd_img_request *img_request) 1658 { 1659 struct rbd_obj_request *obj_request; 1660 struct rbd_obj_request *next_obj_request; 1661 1662 dout("%s: img %p\n", __func__, img_request); 1663 1664 WARN_ON(!list_empty(&img_request->lock_item)); 1665 for_each_obj_request_safe(img_request, obj_request, next_obj_request) 1666 rbd_img_obj_request_del(img_request, obj_request); 1667 1668 if (img_request_layered_test(img_request)) 1669 rbd_dev_parent_put(img_request->rbd_dev); 1670 1671 if (rbd_img_is_write(img_request)) 1672 ceph_put_snap_context(img_request->snapc); 1673 1674 if (test_bit(IMG_REQ_CHILD, &img_request->flags)) 1675 kmem_cache_free(rbd_img_request_cache, img_request); 1676 } 1677 1678 #define BITS_PER_OBJ 2 1679 #define OBJS_PER_BYTE (BITS_PER_BYTE / BITS_PER_OBJ) 1680 #define OBJ_MASK ((1 << BITS_PER_OBJ) - 1) 1681 1682 static void __rbd_object_map_index(struct rbd_device *rbd_dev, u64 objno, 1683 u64 *index, u8 *shift) 1684 { 1685 u32 off; 1686 1687 rbd_assert(objno < rbd_dev->object_map_size); 1688 *index = div_u64_rem(objno, OBJS_PER_BYTE, &off); 1689 *shift = (OBJS_PER_BYTE - off - 1) * BITS_PER_OBJ; 1690 } 1691 1692 static u8 __rbd_object_map_get(struct rbd_device *rbd_dev, u64 objno) 1693 { 1694 u64 index; 1695 u8 shift; 1696 1697 lockdep_assert_held(&rbd_dev->object_map_lock); 1698 __rbd_object_map_index(rbd_dev, objno, &index, &shift); 1699 return (rbd_dev->object_map[index] >> shift) & OBJ_MASK; 1700 } 1701 1702 static void __rbd_object_map_set(struct rbd_device *rbd_dev, u64 objno, u8 val) 1703 { 1704 u64 index; 1705 u8 shift; 1706 u8 *p; 1707 1708 lockdep_assert_held(&rbd_dev->object_map_lock); 1709 rbd_assert(!(val & ~OBJ_MASK)); 1710 1711 __rbd_object_map_index(rbd_dev, objno, &index, &shift); 1712 p = &rbd_dev->object_map[index]; 1713 *p = (*p & ~(OBJ_MASK << shift)) | (val << shift); 1714 } 1715 1716 static u8 rbd_object_map_get(struct rbd_device *rbd_dev, u64 objno) 1717 { 1718 u8 state; 1719 1720 spin_lock(&rbd_dev->object_map_lock); 1721 state = __rbd_object_map_get(rbd_dev, objno); 1722 spin_unlock(&rbd_dev->object_map_lock); 1723 return state; 1724 } 1725 1726 static bool use_object_map(struct rbd_device *rbd_dev) 1727 { 1728 /* 1729 * An image mapped read-only can't use the object map -- it isn't 1730 * loaded because the header lock isn't acquired. Someone else can 1731 * write to the image and update the object map behind our back. 1732 * 1733 * A snapshot can't be written to, so using the object map is always 1734 * safe. 1735 */ 1736 if (!rbd_is_snap(rbd_dev) && rbd_is_ro(rbd_dev)) 1737 return false; 1738 1739 return ((rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP) && 1740 !(rbd_dev->object_map_flags & RBD_FLAG_OBJECT_MAP_INVALID)); 1741 } 1742 1743 static bool rbd_object_map_may_exist(struct rbd_device *rbd_dev, u64 objno) 1744 { 1745 u8 state; 1746 1747 /* fall back to default logic if object map is disabled or invalid */ 1748 if (!use_object_map(rbd_dev)) 1749 return true; 1750 1751 state = rbd_object_map_get(rbd_dev, objno); 1752 return state != OBJECT_NONEXISTENT; 1753 } 1754 1755 static void rbd_object_map_name(struct rbd_device *rbd_dev, u64 snap_id, 1756 struct ceph_object_id *oid) 1757 { 1758 if (snap_id == CEPH_NOSNAP) 1759 ceph_oid_printf(oid, "%s%s", RBD_OBJECT_MAP_PREFIX, 1760 rbd_dev->spec->image_id); 1761 else 1762 ceph_oid_printf(oid, "%s%s.%016llx", RBD_OBJECT_MAP_PREFIX, 1763 rbd_dev->spec->image_id, snap_id); 1764 } 1765 1766 static int rbd_object_map_lock(struct rbd_device *rbd_dev) 1767 { 1768 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; 1769 CEPH_DEFINE_OID_ONSTACK(oid); 1770 u8 lock_type; 1771 char *lock_tag; 1772 struct ceph_locker *lockers; 1773 u32 num_lockers; 1774 bool broke_lock = false; 1775 int ret; 1776 1777 rbd_object_map_name(rbd_dev, CEPH_NOSNAP, &oid); 1778 1779 again: 1780 ret = ceph_cls_lock(osdc, &oid, &rbd_dev->header_oloc, RBD_LOCK_NAME, 1781 CEPH_CLS_LOCK_EXCLUSIVE, "", "", "", 0); 1782 if (ret != -EBUSY || broke_lock) { 1783 if (ret == -EEXIST) 1784 ret = 0; /* already locked by myself */ 1785 if (ret) 1786 rbd_warn(rbd_dev, "failed to lock object map: %d", ret); 1787 return ret; 1788 } 1789 1790 ret = ceph_cls_lock_info(osdc, &oid, &rbd_dev->header_oloc, 1791 RBD_LOCK_NAME, &lock_type, &lock_tag, 1792 &lockers, &num_lockers); 1793 if (ret) { 1794 if (ret == -ENOENT) 1795 goto again; 1796 1797 rbd_warn(rbd_dev, "failed to get object map lockers: %d", ret); 1798 return ret; 1799 } 1800 1801 kfree(lock_tag); 1802 if (num_lockers == 0) 1803 goto again; 1804 1805 rbd_warn(rbd_dev, "breaking object map lock owned by %s%llu", 1806 ENTITY_NAME(lockers[0].id.name)); 1807 1808 ret = ceph_cls_break_lock(osdc, &oid, &rbd_dev->header_oloc, 1809 RBD_LOCK_NAME, lockers[0].id.cookie, 1810 &lockers[0].id.name); 1811 ceph_free_lockers(lockers, num_lockers); 1812 if (ret) { 1813 if (ret == -ENOENT) 1814 goto again; 1815 1816 rbd_warn(rbd_dev, "failed to break object map lock: %d", ret); 1817 return ret; 1818 } 1819 1820 broke_lock = true; 1821 goto again; 1822 } 1823 1824 static void rbd_object_map_unlock(struct rbd_device *rbd_dev) 1825 { 1826 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; 1827 CEPH_DEFINE_OID_ONSTACK(oid); 1828 int ret; 1829 1830 rbd_object_map_name(rbd_dev, CEPH_NOSNAP, &oid); 1831 1832 ret = ceph_cls_unlock(osdc, &oid, &rbd_dev->header_oloc, RBD_LOCK_NAME, 1833 ""); 1834 if (ret && ret != -ENOENT) 1835 rbd_warn(rbd_dev, "failed to unlock object map: %d", ret); 1836 } 1837 1838 static int decode_object_map_header(void **p, void *end, u64 *object_map_size) 1839 { 1840 u8 struct_v; 1841 u32 struct_len; 1842 u32 header_len; 1843 void *header_end; 1844 int ret; 1845 1846 ceph_decode_32_safe(p, end, header_len, e_inval); 1847 header_end = *p + header_len; 1848 1849 ret = ceph_start_decoding(p, end, 1, "BitVector header", &struct_v, 1850 &struct_len); 1851 if (ret) 1852 return ret; 1853 1854 ceph_decode_64_safe(p, end, *object_map_size, e_inval); 1855 1856 *p = header_end; 1857 return 0; 1858 1859 e_inval: 1860 return -EINVAL; 1861 } 1862 1863 static int __rbd_object_map_load(struct rbd_device *rbd_dev) 1864 { 1865 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; 1866 CEPH_DEFINE_OID_ONSTACK(oid); 1867 struct page **pages; 1868 void *p, *end; 1869 size_t reply_len; 1870 u64 num_objects; 1871 u64 object_map_bytes; 1872 u64 object_map_size; 1873 int num_pages; 1874 int ret; 1875 1876 rbd_assert(!rbd_dev->object_map && !rbd_dev->object_map_size); 1877 1878 num_objects = ceph_get_num_objects(&rbd_dev->layout, 1879 rbd_dev->mapping.size); 1880 object_map_bytes = DIV_ROUND_UP_ULL(num_objects * BITS_PER_OBJ, 1881 BITS_PER_BYTE); 1882 num_pages = calc_pages_for(0, object_map_bytes) + 1; 1883 pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL); 1884 if (IS_ERR(pages)) 1885 return PTR_ERR(pages); 1886 1887 reply_len = num_pages * PAGE_SIZE; 1888 rbd_object_map_name(rbd_dev, rbd_dev->spec->snap_id, &oid); 1889 ret = ceph_osdc_call(osdc, &oid, &rbd_dev->header_oloc, 1890 "rbd", "object_map_load", CEPH_OSD_FLAG_READ, 1891 NULL, 0, pages, &reply_len); 1892 if (ret) 1893 goto out; 1894 1895 p = page_address(pages[0]); 1896 end = p + min(reply_len, (size_t)PAGE_SIZE); 1897 ret = decode_object_map_header(&p, end, &object_map_size); 1898 if (ret) 1899 goto out; 1900 1901 if (object_map_size != num_objects) { 1902 rbd_warn(rbd_dev, "object map size mismatch: %llu vs %llu", 1903 object_map_size, num_objects); 1904 ret = -EINVAL; 1905 goto out; 1906 } 1907 1908 if (offset_in_page(p) + object_map_bytes > reply_len) { 1909 ret = -EINVAL; 1910 goto out; 1911 } 1912 1913 rbd_dev->object_map = kvmalloc(object_map_bytes, GFP_KERNEL); 1914 if (!rbd_dev->object_map) { 1915 ret = -ENOMEM; 1916 goto out; 1917 } 1918 1919 rbd_dev->object_map_size = object_map_size; 1920 ceph_copy_from_page_vector(pages, rbd_dev->object_map, 1921 offset_in_page(p), object_map_bytes); 1922 1923 out: 1924 ceph_release_page_vector(pages, num_pages); 1925 return ret; 1926 } 1927 1928 static void rbd_object_map_free(struct rbd_device *rbd_dev) 1929 { 1930 kvfree(rbd_dev->object_map); 1931 rbd_dev->object_map = NULL; 1932 rbd_dev->object_map_size = 0; 1933 } 1934 1935 static int rbd_object_map_load(struct rbd_device *rbd_dev) 1936 { 1937 int ret; 1938 1939 ret = __rbd_object_map_load(rbd_dev); 1940 if (ret) 1941 return ret; 1942 1943 ret = rbd_dev_v2_get_flags(rbd_dev); 1944 if (ret) { 1945 rbd_object_map_free(rbd_dev); 1946 return ret; 1947 } 1948 1949 if (rbd_dev->object_map_flags & RBD_FLAG_OBJECT_MAP_INVALID) 1950 rbd_warn(rbd_dev, "object map is invalid"); 1951 1952 return 0; 1953 } 1954 1955 static int rbd_object_map_open(struct rbd_device *rbd_dev) 1956 { 1957 int ret; 1958 1959 ret = rbd_object_map_lock(rbd_dev); 1960 if (ret) 1961 return ret; 1962 1963 ret = rbd_object_map_load(rbd_dev); 1964 if (ret) { 1965 rbd_object_map_unlock(rbd_dev); 1966 return ret; 1967 } 1968 1969 return 0; 1970 } 1971 1972 static void rbd_object_map_close(struct rbd_device *rbd_dev) 1973 { 1974 rbd_object_map_free(rbd_dev); 1975 rbd_object_map_unlock(rbd_dev); 1976 } 1977 1978 /* 1979 * This function needs snap_id (or more precisely just something to 1980 * distinguish between HEAD and snapshot object maps), new_state and 1981 * current_state that were passed to rbd_object_map_update(). 1982 * 1983 * To avoid allocating and stashing a context we piggyback on the OSD 1984 * request. A HEAD update has two ops (assert_locked). For new_state 1985 * and current_state we decode our own object_map_update op, encoded in 1986 * rbd_cls_object_map_update(). 1987 */ 1988 static int rbd_object_map_update_finish(struct rbd_obj_request *obj_req, 1989 struct ceph_osd_request *osd_req) 1990 { 1991 struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev; 1992 struct ceph_osd_data *osd_data; 1993 u64 objno; 1994 u8 state, new_state, uninitialized_var(current_state); 1995 bool has_current_state; 1996 void *p; 1997 1998 if (osd_req->r_result) 1999 return osd_req->r_result; 2000 2001 /* 2002 * Nothing to do for a snapshot object map. 2003 */ 2004 if (osd_req->r_num_ops == 1) 2005 return 0; 2006 2007 /* 2008 * Update in-memory HEAD object map. 2009 */ 2010 rbd_assert(osd_req->r_num_ops == 2); 2011 osd_data = osd_req_op_data(osd_req, 1, cls, request_data); 2012 rbd_assert(osd_data->type == CEPH_OSD_DATA_TYPE_PAGES); 2013 2014 p = page_address(osd_data->pages[0]); 2015 objno = ceph_decode_64(&p); 2016 rbd_assert(objno == obj_req->ex.oe_objno); 2017 rbd_assert(ceph_decode_64(&p) == objno + 1); 2018 new_state = ceph_decode_8(&p); 2019 has_current_state = ceph_decode_8(&p); 2020 if (has_current_state) 2021 current_state = ceph_decode_8(&p); 2022 2023 spin_lock(&rbd_dev->object_map_lock); 2024 state = __rbd_object_map_get(rbd_dev, objno); 2025 if (!has_current_state || current_state == state || 2026 (current_state == OBJECT_EXISTS && state == OBJECT_EXISTS_CLEAN)) 2027 __rbd_object_map_set(rbd_dev, objno, new_state); 2028 spin_unlock(&rbd_dev->object_map_lock); 2029 2030 return 0; 2031 } 2032 2033 static void rbd_object_map_callback(struct ceph_osd_request *osd_req) 2034 { 2035 struct rbd_obj_request *obj_req = osd_req->r_priv; 2036 int result; 2037 2038 dout("%s osd_req %p result %d for obj_req %p\n", __func__, osd_req, 2039 osd_req->r_result, obj_req); 2040 2041 result = rbd_object_map_update_finish(obj_req, osd_req); 2042 rbd_obj_handle_request(obj_req, result); 2043 } 2044 2045 static bool update_needed(struct rbd_device *rbd_dev, u64 objno, u8 new_state) 2046 { 2047 u8 state = rbd_object_map_get(rbd_dev, objno); 2048 2049 if (state == new_state || 2050 (new_state == OBJECT_PENDING && state == OBJECT_NONEXISTENT) || 2051 (new_state == OBJECT_NONEXISTENT && state != OBJECT_PENDING)) 2052 return false; 2053 2054 return true; 2055 } 2056 2057 static int rbd_cls_object_map_update(struct ceph_osd_request *req, 2058 int which, u64 objno, u8 new_state, 2059 const u8 *current_state) 2060 { 2061 struct page **pages; 2062 void *p, *start; 2063 int ret; 2064 2065 ret = osd_req_op_cls_init(req, which, "rbd", "object_map_update"); 2066 if (ret) 2067 return ret; 2068 2069 pages = ceph_alloc_page_vector(1, GFP_NOIO); 2070 if (IS_ERR(pages)) 2071 return PTR_ERR(pages); 2072 2073 p = start = page_address(pages[0]); 2074 ceph_encode_64(&p, objno); 2075 ceph_encode_64(&p, objno + 1); 2076 ceph_encode_8(&p, new_state); 2077 if (current_state) { 2078 ceph_encode_8(&p, 1); 2079 ceph_encode_8(&p, *current_state); 2080 } else { 2081 ceph_encode_8(&p, 0); 2082 } 2083 2084 osd_req_op_cls_request_data_pages(req, which, pages, p - start, 0, 2085 false, true); 2086 return 0; 2087 } 2088 2089 /* 2090 * Return: 2091 * 0 - object map update sent 2092 * 1 - object map update isn't needed 2093 * <0 - error 2094 */ 2095 static int rbd_object_map_update(struct rbd_obj_request *obj_req, u64 snap_id, 2096 u8 new_state, const u8 *current_state) 2097 { 2098 struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev; 2099 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; 2100 struct ceph_osd_request *req; 2101 int num_ops = 1; 2102 int which = 0; 2103 int ret; 2104 2105 if (snap_id == CEPH_NOSNAP) { 2106 if (!update_needed(rbd_dev, obj_req->ex.oe_objno, new_state)) 2107 return 1; 2108 2109 num_ops++; /* assert_locked */ 2110 } 2111 2112 req = ceph_osdc_alloc_request(osdc, NULL, num_ops, false, GFP_NOIO); 2113 if (!req) 2114 return -ENOMEM; 2115 2116 list_add_tail(&req->r_private_item, &obj_req->osd_reqs); 2117 req->r_callback = rbd_object_map_callback; 2118 req->r_priv = obj_req; 2119 2120 rbd_object_map_name(rbd_dev, snap_id, &req->r_base_oid); 2121 ceph_oloc_copy(&req->r_base_oloc, &rbd_dev->header_oloc); 2122 req->r_flags = CEPH_OSD_FLAG_WRITE; 2123 ktime_get_real_ts64(&req->r_mtime); 2124 2125 if (snap_id == CEPH_NOSNAP) { 2126 /* 2127 * Protect against possible race conditions during lock 2128 * ownership transitions. 2129 */ 2130 ret = ceph_cls_assert_locked(req, which++, RBD_LOCK_NAME, 2131 CEPH_CLS_LOCK_EXCLUSIVE, "", ""); 2132 if (ret) 2133 return ret; 2134 } 2135 2136 ret = rbd_cls_object_map_update(req, which, obj_req->ex.oe_objno, 2137 new_state, current_state); 2138 if (ret) 2139 return ret; 2140 2141 ret = ceph_osdc_alloc_messages(req, GFP_NOIO); 2142 if (ret) 2143 return ret; 2144 2145 ceph_osdc_start_request(osdc, req, false); 2146 return 0; 2147 } 2148 2149 static void prune_extents(struct ceph_file_extent *img_extents, 2150 u32 *num_img_extents, u64 overlap) 2151 { 2152 u32 cnt = *num_img_extents; 2153 2154 /* drop extents completely beyond the overlap */ 2155 while (cnt && img_extents[cnt - 1].fe_off >= overlap) 2156 cnt--; 2157 2158 if (cnt) { 2159 struct ceph_file_extent *ex = &img_extents[cnt - 1]; 2160 2161 /* trim final overlapping extent */ 2162 if (ex->fe_off + ex->fe_len > overlap) 2163 ex->fe_len = overlap - ex->fe_off; 2164 } 2165 2166 *num_img_extents = cnt; 2167 } 2168 2169 /* 2170 * Determine the byte range(s) covered by either just the object extent 2171 * or the entire object in the parent image. 2172 */ 2173 static int rbd_obj_calc_img_extents(struct rbd_obj_request *obj_req, 2174 bool entire) 2175 { 2176 struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev; 2177 int ret; 2178 2179 if (!rbd_dev->parent_overlap) 2180 return 0; 2181 2182 ret = ceph_extent_to_file(&rbd_dev->layout, obj_req->ex.oe_objno, 2183 entire ? 0 : obj_req->ex.oe_off, 2184 entire ? rbd_dev->layout.object_size : 2185 obj_req->ex.oe_len, 2186 &obj_req->img_extents, 2187 &obj_req->num_img_extents); 2188 if (ret) 2189 return ret; 2190 2191 prune_extents(obj_req->img_extents, &obj_req->num_img_extents, 2192 rbd_dev->parent_overlap); 2193 return 0; 2194 } 2195 2196 static void rbd_osd_setup_data(struct ceph_osd_request *osd_req, int which) 2197 { 2198 struct rbd_obj_request *obj_req = osd_req->r_priv; 2199 2200 switch (obj_req->img_request->data_type) { 2201 case OBJ_REQUEST_BIO: 2202 osd_req_op_extent_osd_data_bio(osd_req, which, 2203 &obj_req->bio_pos, 2204 obj_req->ex.oe_len); 2205 break; 2206 case OBJ_REQUEST_BVECS: 2207 case OBJ_REQUEST_OWN_BVECS: 2208 rbd_assert(obj_req->bvec_pos.iter.bi_size == 2209 obj_req->ex.oe_len); 2210 rbd_assert(obj_req->bvec_idx == obj_req->bvec_count); 2211 osd_req_op_extent_osd_data_bvec_pos(osd_req, which, 2212 &obj_req->bvec_pos); 2213 break; 2214 default: 2215 BUG(); 2216 } 2217 } 2218 2219 static int rbd_osd_setup_stat(struct ceph_osd_request *osd_req, int which) 2220 { 2221 struct page **pages; 2222 2223 /* 2224 * The response data for a STAT call consists of: 2225 * le64 length; 2226 * struct { 2227 * le32 tv_sec; 2228 * le32 tv_nsec; 2229 * } mtime; 2230 */ 2231 pages = ceph_alloc_page_vector(1, GFP_NOIO); 2232 if (IS_ERR(pages)) 2233 return PTR_ERR(pages); 2234 2235 osd_req_op_init(osd_req, which, CEPH_OSD_OP_STAT, 0); 2236 osd_req_op_raw_data_in_pages(osd_req, which, pages, 2237 8 + sizeof(struct ceph_timespec), 2238 0, false, true); 2239 return 0; 2240 } 2241 2242 static int rbd_osd_setup_copyup(struct ceph_osd_request *osd_req, int which, 2243 u32 bytes) 2244 { 2245 struct rbd_obj_request *obj_req = osd_req->r_priv; 2246 int ret; 2247 2248 ret = osd_req_op_cls_init(osd_req, which, "rbd", "copyup"); 2249 if (ret) 2250 return ret; 2251 2252 osd_req_op_cls_request_data_bvecs(osd_req, which, obj_req->copyup_bvecs, 2253 obj_req->copyup_bvec_count, bytes); 2254 return 0; 2255 } 2256 2257 static int rbd_obj_init_read(struct rbd_obj_request *obj_req) 2258 { 2259 obj_req->read_state = RBD_OBJ_READ_START; 2260 return 0; 2261 } 2262 2263 static void __rbd_osd_setup_write_ops(struct ceph_osd_request *osd_req, 2264 int which) 2265 { 2266 struct rbd_obj_request *obj_req = osd_req->r_priv; 2267 struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev; 2268 u16 opcode; 2269 2270 if (!use_object_map(rbd_dev) || 2271 !(obj_req->flags & RBD_OBJ_FLAG_MAY_EXIST)) { 2272 osd_req_op_alloc_hint_init(osd_req, which++, 2273 rbd_dev->layout.object_size, 2274 rbd_dev->layout.object_size, 2275 rbd_dev->opts->alloc_hint_flags); 2276 } 2277 2278 if (rbd_obj_is_entire(obj_req)) 2279 opcode = CEPH_OSD_OP_WRITEFULL; 2280 else 2281 opcode = CEPH_OSD_OP_WRITE; 2282 2283 osd_req_op_extent_init(osd_req, which, opcode, 2284 obj_req->ex.oe_off, obj_req->ex.oe_len, 0, 0); 2285 rbd_osd_setup_data(osd_req, which); 2286 } 2287 2288 static int rbd_obj_init_write(struct rbd_obj_request *obj_req) 2289 { 2290 int ret; 2291 2292 /* reverse map the entire object onto the parent */ 2293 ret = rbd_obj_calc_img_extents(obj_req, true); 2294 if (ret) 2295 return ret; 2296 2297 if (rbd_obj_copyup_enabled(obj_req)) 2298 obj_req->flags |= RBD_OBJ_FLAG_COPYUP_ENABLED; 2299 2300 obj_req->write_state = RBD_OBJ_WRITE_START; 2301 return 0; 2302 } 2303 2304 static u16 truncate_or_zero_opcode(struct rbd_obj_request *obj_req) 2305 { 2306 return rbd_obj_is_tail(obj_req) ? CEPH_OSD_OP_TRUNCATE : 2307 CEPH_OSD_OP_ZERO; 2308 } 2309 2310 static void __rbd_osd_setup_discard_ops(struct ceph_osd_request *osd_req, 2311 int which) 2312 { 2313 struct rbd_obj_request *obj_req = osd_req->r_priv; 2314 2315 if (rbd_obj_is_entire(obj_req) && !obj_req->num_img_extents) { 2316 rbd_assert(obj_req->flags & RBD_OBJ_FLAG_DELETION); 2317 osd_req_op_init(osd_req, which, CEPH_OSD_OP_DELETE, 0); 2318 } else { 2319 osd_req_op_extent_init(osd_req, which, 2320 truncate_or_zero_opcode(obj_req), 2321 obj_req->ex.oe_off, obj_req->ex.oe_len, 2322 0, 0); 2323 } 2324 } 2325 2326 static int rbd_obj_init_discard(struct rbd_obj_request *obj_req) 2327 { 2328 struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev; 2329 u64 off, next_off; 2330 int ret; 2331 2332 /* 2333 * Align the range to alloc_size boundary and punt on discards 2334 * that are too small to free up any space. 2335 * 2336 * alloc_size == object_size && is_tail() is a special case for 2337 * filestore with filestore_punch_hole = false, needed to allow 2338 * truncate (in addition to delete). 2339 */ 2340 if (rbd_dev->opts->alloc_size != rbd_dev->layout.object_size || 2341 !rbd_obj_is_tail(obj_req)) { 2342 off = round_up(obj_req->ex.oe_off, rbd_dev->opts->alloc_size); 2343 next_off = round_down(obj_req->ex.oe_off + obj_req->ex.oe_len, 2344 rbd_dev->opts->alloc_size); 2345 if (off >= next_off) 2346 return 1; 2347 2348 dout("%s %p %llu~%llu -> %llu~%llu\n", __func__, 2349 obj_req, obj_req->ex.oe_off, obj_req->ex.oe_len, 2350 off, next_off - off); 2351 obj_req->ex.oe_off = off; 2352 obj_req->ex.oe_len = next_off - off; 2353 } 2354 2355 /* reverse map the entire object onto the parent */ 2356 ret = rbd_obj_calc_img_extents(obj_req, true); 2357 if (ret) 2358 return ret; 2359 2360 obj_req->flags |= RBD_OBJ_FLAG_NOOP_FOR_NONEXISTENT; 2361 if (rbd_obj_is_entire(obj_req) && !obj_req->num_img_extents) 2362 obj_req->flags |= RBD_OBJ_FLAG_DELETION; 2363 2364 obj_req->write_state = RBD_OBJ_WRITE_START; 2365 return 0; 2366 } 2367 2368 static void __rbd_osd_setup_zeroout_ops(struct ceph_osd_request *osd_req, 2369 int which) 2370 { 2371 struct rbd_obj_request *obj_req = osd_req->r_priv; 2372 u16 opcode; 2373 2374 if (rbd_obj_is_entire(obj_req)) { 2375 if (obj_req->num_img_extents) { 2376 if (!(obj_req->flags & RBD_OBJ_FLAG_COPYUP_ENABLED)) 2377 osd_req_op_init(osd_req, which++, 2378 CEPH_OSD_OP_CREATE, 0); 2379 opcode = CEPH_OSD_OP_TRUNCATE; 2380 } else { 2381 rbd_assert(obj_req->flags & RBD_OBJ_FLAG_DELETION); 2382 osd_req_op_init(osd_req, which++, 2383 CEPH_OSD_OP_DELETE, 0); 2384 opcode = 0; 2385 } 2386 } else { 2387 opcode = truncate_or_zero_opcode(obj_req); 2388 } 2389 2390 if (opcode) 2391 osd_req_op_extent_init(osd_req, which, opcode, 2392 obj_req->ex.oe_off, obj_req->ex.oe_len, 2393 0, 0); 2394 } 2395 2396 static int rbd_obj_init_zeroout(struct rbd_obj_request *obj_req) 2397 { 2398 int ret; 2399 2400 /* reverse map the entire object onto the parent */ 2401 ret = rbd_obj_calc_img_extents(obj_req, true); 2402 if (ret) 2403 return ret; 2404 2405 if (rbd_obj_copyup_enabled(obj_req)) 2406 obj_req->flags |= RBD_OBJ_FLAG_COPYUP_ENABLED; 2407 if (!obj_req->num_img_extents) { 2408 obj_req->flags |= RBD_OBJ_FLAG_NOOP_FOR_NONEXISTENT; 2409 if (rbd_obj_is_entire(obj_req)) 2410 obj_req->flags |= RBD_OBJ_FLAG_DELETION; 2411 } 2412 2413 obj_req->write_state = RBD_OBJ_WRITE_START; 2414 return 0; 2415 } 2416 2417 static int count_write_ops(struct rbd_obj_request *obj_req) 2418 { 2419 struct rbd_img_request *img_req = obj_req->img_request; 2420 2421 switch (img_req->op_type) { 2422 case OBJ_OP_WRITE: 2423 if (!use_object_map(img_req->rbd_dev) || 2424 !(obj_req->flags & RBD_OBJ_FLAG_MAY_EXIST)) 2425 return 2; /* setallochint + write/writefull */ 2426 2427 return 1; /* write/writefull */ 2428 case OBJ_OP_DISCARD: 2429 return 1; /* delete/truncate/zero */ 2430 case OBJ_OP_ZEROOUT: 2431 if (rbd_obj_is_entire(obj_req) && obj_req->num_img_extents && 2432 !(obj_req->flags & RBD_OBJ_FLAG_COPYUP_ENABLED)) 2433 return 2; /* create + truncate */ 2434 2435 return 1; /* delete/truncate/zero */ 2436 default: 2437 BUG(); 2438 } 2439 } 2440 2441 static void rbd_osd_setup_write_ops(struct ceph_osd_request *osd_req, 2442 int which) 2443 { 2444 struct rbd_obj_request *obj_req = osd_req->r_priv; 2445 2446 switch (obj_req->img_request->op_type) { 2447 case OBJ_OP_WRITE: 2448 __rbd_osd_setup_write_ops(osd_req, which); 2449 break; 2450 case OBJ_OP_DISCARD: 2451 __rbd_osd_setup_discard_ops(osd_req, which); 2452 break; 2453 case OBJ_OP_ZEROOUT: 2454 __rbd_osd_setup_zeroout_ops(osd_req, which); 2455 break; 2456 default: 2457 BUG(); 2458 } 2459 } 2460 2461 /* 2462 * Prune the list of object requests (adjust offset and/or length, drop 2463 * redundant requests). Prepare object request state machines and image 2464 * request state machine for execution. 2465 */ 2466 static int __rbd_img_fill_request(struct rbd_img_request *img_req) 2467 { 2468 struct rbd_obj_request *obj_req, *next_obj_req; 2469 int ret; 2470 2471 for_each_obj_request_safe(img_req, obj_req, next_obj_req) { 2472 switch (img_req->op_type) { 2473 case OBJ_OP_READ: 2474 ret = rbd_obj_init_read(obj_req); 2475 break; 2476 case OBJ_OP_WRITE: 2477 ret = rbd_obj_init_write(obj_req); 2478 break; 2479 case OBJ_OP_DISCARD: 2480 ret = rbd_obj_init_discard(obj_req); 2481 break; 2482 case OBJ_OP_ZEROOUT: 2483 ret = rbd_obj_init_zeroout(obj_req); 2484 break; 2485 default: 2486 BUG(); 2487 } 2488 if (ret < 0) 2489 return ret; 2490 if (ret > 0) { 2491 rbd_img_obj_request_del(img_req, obj_req); 2492 continue; 2493 } 2494 } 2495 2496 img_req->state = RBD_IMG_START; 2497 return 0; 2498 } 2499 2500 union rbd_img_fill_iter { 2501 struct ceph_bio_iter bio_iter; 2502 struct ceph_bvec_iter bvec_iter; 2503 }; 2504 2505 struct rbd_img_fill_ctx { 2506 enum obj_request_type pos_type; 2507 union rbd_img_fill_iter *pos; 2508 union rbd_img_fill_iter iter; 2509 ceph_object_extent_fn_t set_pos_fn; 2510 ceph_object_extent_fn_t count_fn; 2511 ceph_object_extent_fn_t copy_fn; 2512 }; 2513 2514 static struct ceph_object_extent *alloc_object_extent(void *arg) 2515 { 2516 struct rbd_img_request *img_req = arg; 2517 struct rbd_obj_request *obj_req; 2518 2519 obj_req = rbd_obj_request_create(); 2520 if (!obj_req) 2521 return NULL; 2522 2523 rbd_img_obj_request_add(img_req, obj_req); 2524 return &obj_req->ex; 2525 } 2526 2527 /* 2528 * While su != os && sc == 1 is technically not fancy (it's the same 2529 * layout as su == os && sc == 1), we can't use the nocopy path for it 2530 * because ->set_pos_fn() should be called only once per object. 2531 * ceph_file_to_extents() invokes action_fn once per stripe unit, so 2532 * treat su != os && sc == 1 as fancy. 2533 */ 2534 static bool rbd_layout_is_fancy(struct ceph_file_layout *l) 2535 { 2536 return l->stripe_unit != l->object_size; 2537 } 2538 2539 static int rbd_img_fill_request_nocopy(struct rbd_img_request *img_req, 2540 struct ceph_file_extent *img_extents, 2541 u32 num_img_extents, 2542 struct rbd_img_fill_ctx *fctx) 2543 { 2544 u32 i; 2545 int ret; 2546 2547 img_req->data_type = fctx->pos_type; 2548 2549 /* 2550 * Create object requests and set each object request's starting 2551 * position in the provided bio (list) or bio_vec array. 2552 */ 2553 fctx->iter = *fctx->pos; 2554 for (i = 0; i < num_img_extents; i++) { 2555 ret = ceph_file_to_extents(&img_req->rbd_dev->layout, 2556 img_extents[i].fe_off, 2557 img_extents[i].fe_len, 2558 &img_req->object_extents, 2559 alloc_object_extent, img_req, 2560 fctx->set_pos_fn, &fctx->iter); 2561 if (ret) 2562 return ret; 2563 } 2564 2565 return __rbd_img_fill_request(img_req); 2566 } 2567 2568 /* 2569 * Map a list of image extents to a list of object extents, create the 2570 * corresponding object requests (normally each to a different object, 2571 * but not always) and add them to @img_req. For each object request, 2572 * set up its data descriptor to point to the corresponding chunk(s) of 2573 * @fctx->pos data buffer. 2574 * 2575 * Because ceph_file_to_extents() will merge adjacent object extents 2576 * together, each object request's data descriptor may point to multiple 2577 * different chunks of @fctx->pos data buffer. 2578 * 2579 * @fctx->pos data buffer is assumed to be large enough. 2580 */ 2581 static int rbd_img_fill_request(struct rbd_img_request *img_req, 2582 struct ceph_file_extent *img_extents, 2583 u32 num_img_extents, 2584 struct rbd_img_fill_ctx *fctx) 2585 { 2586 struct rbd_device *rbd_dev = img_req->rbd_dev; 2587 struct rbd_obj_request *obj_req; 2588 u32 i; 2589 int ret; 2590 2591 if (fctx->pos_type == OBJ_REQUEST_NODATA || 2592 !rbd_layout_is_fancy(&rbd_dev->layout)) 2593 return rbd_img_fill_request_nocopy(img_req, img_extents, 2594 num_img_extents, fctx); 2595 2596 img_req->data_type = OBJ_REQUEST_OWN_BVECS; 2597 2598 /* 2599 * Create object requests and determine ->bvec_count for each object 2600 * request. Note that ->bvec_count sum over all object requests may 2601 * be greater than the number of bio_vecs in the provided bio (list) 2602 * or bio_vec array because when mapped, those bio_vecs can straddle 2603 * stripe unit boundaries. 2604 */ 2605 fctx->iter = *fctx->pos; 2606 for (i = 0; i < num_img_extents; i++) { 2607 ret = ceph_file_to_extents(&rbd_dev->layout, 2608 img_extents[i].fe_off, 2609 img_extents[i].fe_len, 2610 &img_req->object_extents, 2611 alloc_object_extent, img_req, 2612 fctx->count_fn, &fctx->iter); 2613 if (ret) 2614 return ret; 2615 } 2616 2617 for_each_obj_request(img_req, obj_req) { 2618 obj_req->bvec_pos.bvecs = kmalloc_array(obj_req->bvec_count, 2619 sizeof(*obj_req->bvec_pos.bvecs), 2620 GFP_NOIO); 2621 if (!obj_req->bvec_pos.bvecs) 2622 return -ENOMEM; 2623 } 2624 2625 /* 2626 * Fill in each object request's private bio_vec array, splitting and 2627 * rearranging the provided bio_vecs in stripe unit chunks as needed. 2628 */ 2629 fctx->iter = *fctx->pos; 2630 for (i = 0; i < num_img_extents; i++) { 2631 ret = ceph_iterate_extents(&rbd_dev->layout, 2632 img_extents[i].fe_off, 2633 img_extents[i].fe_len, 2634 &img_req->object_extents, 2635 fctx->copy_fn, &fctx->iter); 2636 if (ret) 2637 return ret; 2638 } 2639 2640 return __rbd_img_fill_request(img_req); 2641 } 2642 2643 static int rbd_img_fill_nodata(struct rbd_img_request *img_req, 2644 u64 off, u64 len) 2645 { 2646 struct ceph_file_extent ex = { off, len }; 2647 union rbd_img_fill_iter dummy = {}; 2648 struct rbd_img_fill_ctx fctx = { 2649 .pos_type = OBJ_REQUEST_NODATA, 2650 .pos = &dummy, 2651 }; 2652 2653 return rbd_img_fill_request(img_req, &ex, 1, &fctx); 2654 } 2655 2656 static void set_bio_pos(struct ceph_object_extent *ex, u32 bytes, void *arg) 2657 { 2658 struct rbd_obj_request *obj_req = 2659 container_of(ex, struct rbd_obj_request, ex); 2660 struct ceph_bio_iter *it = arg; 2661 2662 dout("%s objno %llu bytes %u\n", __func__, ex->oe_objno, bytes); 2663 obj_req->bio_pos = *it; 2664 ceph_bio_iter_advance(it, bytes); 2665 } 2666 2667 static void count_bio_bvecs(struct ceph_object_extent *ex, u32 bytes, void *arg) 2668 { 2669 struct rbd_obj_request *obj_req = 2670 container_of(ex, struct rbd_obj_request, ex); 2671 struct ceph_bio_iter *it = arg; 2672 2673 dout("%s objno %llu bytes %u\n", __func__, ex->oe_objno, bytes); 2674 ceph_bio_iter_advance_step(it, bytes, ({ 2675 obj_req->bvec_count++; 2676 })); 2677 2678 } 2679 2680 static void copy_bio_bvecs(struct ceph_object_extent *ex, u32 bytes, void *arg) 2681 { 2682 struct rbd_obj_request *obj_req = 2683 container_of(ex, struct rbd_obj_request, ex); 2684 struct ceph_bio_iter *it = arg; 2685 2686 dout("%s objno %llu bytes %u\n", __func__, ex->oe_objno, bytes); 2687 ceph_bio_iter_advance_step(it, bytes, ({ 2688 obj_req->bvec_pos.bvecs[obj_req->bvec_idx++] = bv; 2689 obj_req->bvec_pos.iter.bi_size += bv.bv_len; 2690 })); 2691 } 2692 2693 static int __rbd_img_fill_from_bio(struct rbd_img_request *img_req, 2694 struct ceph_file_extent *img_extents, 2695 u32 num_img_extents, 2696 struct ceph_bio_iter *bio_pos) 2697 { 2698 struct rbd_img_fill_ctx fctx = { 2699 .pos_type = OBJ_REQUEST_BIO, 2700 .pos = (union rbd_img_fill_iter *)bio_pos, 2701 .set_pos_fn = set_bio_pos, 2702 .count_fn = count_bio_bvecs, 2703 .copy_fn = copy_bio_bvecs, 2704 }; 2705 2706 return rbd_img_fill_request(img_req, img_extents, num_img_extents, 2707 &fctx); 2708 } 2709 2710 static int rbd_img_fill_from_bio(struct rbd_img_request *img_req, 2711 u64 off, u64 len, struct bio *bio) 2712 { 2713 struct ceph_file_extent ex = { off, len }; 2714 struct ceph_bio_iter it = { .bio = bio, .iter = bio->bi_iter }; 2715 2716 return __rbd_img_fill_from_bio(img_req, &ex, 1, &it); 2717 } 2718 2719 static void set_bvec_pos(struct ceph_object_extent *ex, u32 bytes, void *arg) 2720 { 2721 struct rbd_obj_request *obj_req = 2722 container_of(ex, struct rbd_obj_request, ex); 2723 struct ceph_bvec_iter *it = arg; 2724 2725 obj_req->bvec_pos = *it; 2726 ceph_bvec_iter_shorten(&obj_req->bvec_pos, bytes); 2727 ceph_bvec_iter_advance(it, bytes); 2728 } 2729 2730 static void count_bvecs(struct ceph_object_extent *ex, u32 bytes, void *arg) 2731 { 2732 struct rbd_obj_request *obj_req = 2733 container_of(ex, struct rbd_obj_request, ex); 2734 struct ceph_bvec_iter *it = arg; 2735 2736 ceph_bvec_iter_advance_step(it, bytes, ({ 2737 obj_req->bvec_count++; 2738 })); 2739 } 2740 2741 static void copy_bvecs(struct ceph_object_extent *ex, u32 bytes, void *arg) 2742 { 2743 struct rbd_obj_request *obj_req = 2744 container_of(ex, struct rbd_obj_request, ex); 2745 struct ceph_bvec_iter *it = arg; 2746 2747 ceph_bvec_iter_advance_step(it, bytes, ({ 2748 obj_req->bvec_pos.bvecs[obj_req->bvec_idx++] = bv; 2749 obj_req->bvec_pos.iter.bi_size += bv.bv_len; 2750 })); 2751 } 2752 2753 static int __rbd_img_fill_from_bvecs(struct rbd_img_request *img_req, 2754 struct ceph_file_extent *img_extents, 2755 u32 num_img_extents, 2756 struct ceph_bvec_iter *bvec_pos) 2757 { 2758 struct rbd_img_fill_ctx fctx = { 2759 .pos_type = OBJ_REQUEST_BVECS, 2760 .pos = (union rbd_img_fill_iter *)bvec_pos, 2761 .set_pos_fn = set_bvec_pos, 2762 .count_fn = count_bvecs, 2763 .copy_fn = copy_bvecs, 2764 }; 2765 2766 return rbd_img_fill_request(img_req, img_extents, num_img_extents, 2767 &fctx); 2768 } 2769 2770 static int rbd_img_fill_from_bvecs(struct rbd_img_request *img_req, 2771 struct ceph_file_extent *img_extents, 2772 u32 num_img_extents, 2773 struct bio_vec *bvecs) 2774 { 2775 struct ceph_bvec_iter it = { 2776 .bvecs = bvecs, 2777 .iter = { .bi_size = ceph_file_extents_bytes(img_extents, 2778 num_img_extents) }, 2779 }; 2780 2781 return __rbd_img_fill_from_bvecs(img_req, img_extents, num_img_extents, 2782 &it); 2783 } 2784 2785 static void rbd_img_handle_request_work(struct work_struct *work) 2786 { 2787 struct rbd_img_request *img_req = 2788 container_of(work, struct rbd_img_request, work); 2789 2790 rbd_img_handle_request(img_req, img_req->work_result); 2791 } 2792 2793 static void rbd_img_schedule(struct rbd_img_request *img_req, int result) 2794 { 2795 INIT_WORK(&img_req->work, rbd_img_handle_request_work); 2796 img_req->work_result = result; 2797 queue_work(rbd_wq, &img_req->work); 2798 } 2799 2800 static bool rbd_obj_may_exist(struct rbd_obj_request *obj_req) 2801 { 2802 struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev; 2803 2804 if (rbd_object_map_may_exist(rbd_dev, obj_req->ex.oe_objno)) { 2805 obj_req->flags |= RBD_OBJ_FLAG_MAY_EXIST; 2806 return true; 2807 } 2808 2809 dout("%s %p objno %llu assuming dne\n", __func__, obj_req, 2810 obj_req->ex.oe_objno); 2811 return false; 2812 } 2813 2814 static int rbd_obj_read_object(struct rbd_obj_request *obj_req) 2815 { 2816 struct ceph_osd_request *osd_req; 2817 int ret; 2818 2819 osd_req = __rbd_obj_add_osd_request(obj_req, NULL, 1); 2820 if (IS_ERR(osd_req)) 2821 return PTR_ERR(osd_req); 2822 2823 osd_req_op_extent_init(osd_req, 0, CEPH_OSD_OP_READ, 2824 obj_req->ex.oe_off, obj_req->ex.oe_len, 0, 0); 2825 rbd_osd_setup_data(osd_req, 0); 2826 rbd_osd_format_read(osd_req); 2827 2828 ret = ceph_osdc_alloc_messages(osd_req, GFP_NOIO); 2829 if (ret) 2830 return ret; 2831 2832 rbd_osd_submit(osd_req); 2833 return 0; 2834 } 2835 2836 static int rbd_obj_read_from_parent(struct rbd_obj_request *obj_req) 2837 { 2838 struct rbd_img_request *img_req = obj_req->img_request; 2839 struct rbd_device *parent = img_req->rbd_dev->parent; 2840 struct rbd_img_request *child_img_req; 2841 int ret; 2842 2843 child_img_req = kmem_cache_alloc(rbd_img_request_cache, GFP_NOIO); 2844 if (!child_img_req) 2845 return -ENOMEM; 2846 2847 rbd_img_request_init(child_img_req, parent, OBJ_OP_READ); 2848 __set_bit(IMG_REQ_CHILD, &child_img_req->flags); 2849 child_img_req->obj_request = obj_req; 2850 2851 down_read(&parent->header_rwsem); 2852 rbd_img_capture_header(child_img_req); 2853 up_read(&parent->header_rwsem); 2854 2855 dout("%s child_img_req %p for obj_req %p\n", __func__, child_img_req, 2856 obj_req); 2857 2858 if (!rbd_img_is_write(img_req)) { 2859 switch (img_req->data_type) { 2860 case OBJ_REQUEST_BIO: 2861 ret = __rbd_img_fill_from_bio(child_img_req, 2862 obj_req->img_extents, 2863 obj_req->num_img_extents, 2864 &obj_req->bio_pos); 2865 break; 2866 case OBJ_REQUEST_BVECS: 2867 case OBJ_REQUEST_OWN_BVECS: 2868 ret = __rbd_img_fill_from_bvecs(child_img_req, 2869 obj_req->img_extents, 2870 obj_req->num_img_extents, 2871 &obj_req->bvec_pos); 2872 break; 2873 default: 2874 BUG(); 2875 } 2876 } else { 2877 ret = rbd_img_fill_from_bvecs(child_img_req, 2878 obj_req->img_extents, 2879 obj_req->num_img_extents, 2880 obj_req->copyup_bvecs); 2881 } 2882 if (ret) { 2883 rbd_img_request_destroy(child_img_req); 2884 return ret; 2885 } 2886 2887 /* avoid parent chain recursion */ 2888 rbd_img_schedule(child_img_req, 0); 2889 return 0; 2890 } 2891 2892 static bool rbd_obj_advance_read(struct rbd_obj_request *obj_req, int *result) 2893 { 2894 struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev; 2895 int ret; 2896 2897 again: 2898 switch (obj_req->read_state) { 2899 case RBD_OBJ_READ_START: 2900 rbd_assert(!*result); 2901 2902 if (!rbd_obj_may_exist(obj_req)) { 2903 *result = -ENOENT; 2904 obj_req->read_state = RBD_OBJ_READ_OBJECT; 2905 goto again; 2906 } 2907 2908 ret = rbd_obj_read_object(obj_req); 2909 if (ret) { 2910 *result = ret; 2911 return true; 2912 } 2913 obj_req->read_state = RBD_OBJ_READ_OBJECT; 2914 return false; 2915 case RBD_OBJ_READ_OBJECT: 2916 if (*result == -ENOENT && rbd_dev->parent_overlap) { 2917 /* reverse map this object extent onto the parent */ 2918 ret = rbd_obj_calc_img_extents(obj_req, false); 2919 if (ret) { 2920 *result = ret; 2921 return true; 2922 } 2923 if (obj_req->num_img_extents) { 2924 ret = rbd_obj_read_from_parent(obj_req); 2925 if (ret) { 2926 *result = ret; 2927 return true; 2928 } 2929 obj_req->read_state = RBD_OBJ_READ_PARENT; 2930 return false; 2931 } 2932 } 2933 2934 /* 2935 * -ENOENT means a hole in the image -- zero-fill the entire 2936 * length of the request. A short read also implies zero-fill 2937 * to the end of the request. 2938 */ 2939 if (*result == -ENOENT) { 2940 rbd_obj_zero_range(obj_req, 0, obj_req->ex.oe_len); 2941 *result = 0; 2942 } else if (*result >= 0) { 2943 if (*result < obj_req->ex.oe_len) 2944 rbd_obj_zero_range(obj_req, *result, 2945 obj_req->ex.oe_len - *result); 2946 else 2947 rbd_assert(*result == obj_req->ex.oe_len); 2948 *result = 0; 2949 } 2950 return true; 2951 case RBD_OBJ_READ_PARENT: 2952 /* 2953 * The parent image is read only up to the overlap -- zero-fill 2954 * from the overlap to the end of the request. 2955 */ 2956 if (!*result) { 2957 u32 obj_overlap = rbd_obj_img_extents_bytes(obj_req); 2958 2959 if (obj_overlap < obj_req->ex.oe_len) 2960 rbd_obj_zero_range(obj_req, obj_overlap, 2961 obj_req->ex.oe_len - obj_overlap); 2962 } 2963 return true; 2964 default: 2965 BUG(); 2966 } 2967 } 2968 2969 static bool rbd_obj_write_is_noop(struct rbd_obj_request *obj_req) 2970 { 2971 struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev; 2972 2973 if (rbd_object_map_may_exist(rbd_dev, obj_req->ex.oe_objno)) 2974 obj_req->flags |= RBD_OBJ_FLAG_MAY_EXIST; 2975 2976 if (!(obj_req->flags & RBD_OBJ_FLAG_MAY_EXIST) && 2977 (obj_req->flags & RBD_OBJ_FLAG_NOOP_FOR_NONEXISTENT)) { 2978 dout("%s %p noop for nonexistent\n", __func__, obj_req); 2979 return true; 2980 } 2981 2982 return false; 2983 } 2984 2985 /* 2986 * Return: 2987 * 0 - object map update sent 2988 * 1 - object map update isn't needed 2989 * <0 - error 2990 */ 2991 static int rbd_obj_write_pre_object_map(struct rbd_obj_request *obj_req) 2992 { 2993 struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev; 2994 u8 new_state; 2995 2996 if (!(rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP)) 2997 return 1; 2998 2999 if (obj_req->flags & RBD_OBJ_FLAG_DELETION) 3000 new_state = OBJECT_PENDING; 3001 else 3002 new_state = OBJECT_EXISTS; 3003 3004 return rbd_object_map_update(obj_req, CEPH_NOSNAP, new_state, NULL); 3005 } 3006 3007 static int rbd_obj_write_object(struct rbd_obj_request *obj_req) 3008 { 3009 struct ceph_osd_request *osd_req; 3010 int num_ops = count_write_ops(obj_req); 3011 int which = 0; 3012 int ret; 3013 3014 if (obj_req->flags & RBD_OBJ_FLAG_COPYUP_ENABLED) 3015 num_ops++; /* stat */ 3016 3017 osd_req = rbd_obj_add_osd_request(obj_req, num_ops); 3018 if (IS_ERR(osd_req)) 3019 return PTR_ERR(osd_req); 3020 3021 if (obj_req->flags & RBD_OBJ_FLAG_COPYUP_ENABLED) { 3022 ret = rbd_osd_setup_stat(osd_req, which++); 3023 if (ret) 3024 return ret; 3025 } 3026 3027 rbd_osd_setup_write_ops(osd_req, which); 3028 rbd_osd_format_write(osd_req); 3029 3030 ret = ceph_osdc_alloc_messages(osd_req, GFP_NOIO); 3031 if (ret) 3032 return ret; 3033 3034 rbd_osd_submit(osd_req); 3035 return 0; 3036 } 3037 3038 /* 3039 * copyup_bvecs pages are never highmem pages 3040 */ 3041 static bool is_zero_bvecs(struct bio_vec *bvecs, u32 bytes) 3042 { 3043 struct ceph_bvec_iter it = { 3044 .bvecs = bvecs, 3045 .iter = { .bi_size = bytes }, 3046 }; 3047 3048 ceph_bvec_iter_advance_step(&it, bytes, ({ 3049 if (memchr_inv(page_address(bv.bv_page) + bv.bv_offset, 0, 3050 bv.bv_len)) 3051 return false; 3052 })); 3053 return true; 3054 } 3055 3056 #define MODS_ONLY U32_MAX 3057 3058 static int rbd_obj_copyup_empty_snapc(struct rbd_obj_request *obj_req, 3059 u32 bytes) 3060 { 3061 struct ceph_osd_request *osd_req; 3062 int ret; 3063 3064 dout("%s obj_req %p bytes %u\n", __func__, obj_req, bytes); 3065 rbd_assert(bytes > 0 && bytes != MODS_ONLY); 3066 3067 osd_req = __rbd_obj_add_osd_request(obj_req, &rbd_empty_snapc, 1); 3068 if (IS_ERR(osd_req)) 3069 return PTR_ERR(osd_req); 3070 3071 ret = rbd_osd_setup_copyup(osd_req, 0, bytes); 3072 if (ret) 3073 return ret; 3074 3075 rbd_osd_format_write(osd_req); 3076 3077 ret = ceph_osdc_alloc_messages(osd_req, GFP_NOIO); 3078 if (ret) 3079 return ret; 3080 3081 rbd_osd_submit(osd_req); 3082 return 0; 3083 } 3084 3085 static int rbd_obj_copyup_current_snapc(struct rbd_obj_request *obj_req, 3086 u32 bytes) 3087 { 3088 struct ceph_osd_request *osd_req; 3089 int num_ops = count_write_ops(obj_req); 3090 int which = 0; 3091 int ret; 3092 3093 dout("%s obj_req %p bytes %u\n", __func__, obj_req, bytes); 3094 3095 if (bytes != MODS_ONLY) 3096 num_ops++; /* copyup */ 3097 3098 osd_req = rbd_obj_add_osd_request(obj_req, num_ops); 3099 if (IS_ERR(osd_req)) 3100 return PTR_ERR(osd_req); 3101 3102 if (bytes != MODS_ONLY) { 3103 ret = rbd_osd_setup_copyup(osd_req, which++, bytes); 3104 if (ret) 3105 return ret; 3106 } 3107 3108 rbd_osd_setup_write_ops(osd_req, which); 3109 rbd_osd_format_write(osd_req); 3110 3111 ret = ceph_osdc_alloc_messages(osd_req, GFP_NOIO); 3112 if (ret) 3113 return ret; 3114 3115 rbd_osd_submit(osd_req); 3116 return 0; 3117 } 3118 3119 static int setup_copyup_bvecs(struct rbd_obj_request *obj_req, u64 obj_overlap) 3120 { 3121 u32 i; 3122 3123 rbd_assert(!obj_req->copyup_bvecs); 3124 obj_req->copyup_bvec_count = calc_pages_for(0, obj_overlap); 3125 obj_req->copyup_bvecs = kcalloc(obj_req->copyup_bvec_count, 3126 sizeof(*obj_req->copyup_bvecs), 3127 GFP_NOIO); 3128 if (!obj_req->copyup_bvecs) 3129 return -ENOMEM; 3130 3131 for (i = 0; i < obj_req->copyup_bvec_count; i++) { 3132 unsigned int len = min(obj_overlap, (u64)PAGE_SIZE); 3133 3134 obj_req->copyup_bvecs[i].bv_page = alloc_page(GFP_NOIO); 3135 if (!obj_req->copyup_bvecs[i].bv_page) 3136 return -ENOMEM; 3137 3138 obj_req->copyup_bvecs[i].bv_offset = 0; 3139 obj_req->copyup_bvecs[i].bv_len = len; 3140 obj_overlap -= len; 3141 } 3142 3143 rbd_assert(!obj_overlap); 3144 return 0; 3145 } 3146 3147 /* 3148 * The target object doesn't exist. Read the data for the entire 3149 * target object up to the overlap point (if any) from the parent, 3150 * so we can use it for a copyup. 3151 */ 3152 static int rbd_obj_copyup_read_parent(struct rbd_obj_request *obj_req) 3153 { 3154 struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev; 3155 int ret; 3156 3157 rbd_assert(obj_req->num_img_extents); 3158 prune_extents(obj_req->img_extents, &obj_req->num_img_extents, 3159 rbd_dev->parent_overlap); 3160 if (!obj_req->num_img_extents) { 3161 /* 3162 * The overlap has become 0 (most likely because the 3163 * image has been flattened). Re-submit the original write 3164 * request -- pass MODS_ONLY since the copyup isn't needed 3165 * anymore. 3166 */ 3167 return rbd_obj_copyup_current_snapc(obj_req, MODS_ONLY); 3168 } 3169 3170 ret = setup_copyup_bvecs(obj_req, rbd_obj_img_extents_bytes(obj_req)); 3171 if (ret) 3172 return ret; 3173 3174 return rbd_obj_read_from_parent(obj_req); 3175 } 3176 3177 static void rbd_obj_copyup_object_maps(struct rbd_obj_request *obj_req) 3178 { 3179 struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev; 3180 struct ceph_snap_context *snapc = obj_req->img_request->snapc; 3181 u8 new_state; 3182 u32 i; 3183 int ret; 3184 3185 rbd_assert(!obj_req->pending.result && !obj_req->pending.num_pending); 3186 3187 if (!(rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP)) 3188 return; 3189 3190 if (obj_req->flags & RBD_OBJ_FLAG_COPYUP_ZEROS) 3191 return; 3192 3193 for (i = 0; i < snapc->num_snaps; i++) { 3194 if ((rbd_dev->header.features & RBD_FEATURE_FAST_DIFF) && 3195 i + 1 < snapc->num_snaps) 3196 new_state = OBJECT_EXISTS_CLEAN; 3197 else 3198 new_state = OBJECT_EXISTS; 3199 3200 ret = rbd_object_map_update(obj_req, snapc->snaps[i], 3201 new_state, NULL); 3202 if (ret < 0) { 3203 obj_req->pending.result = ret; 3204 return; 3205 } 3206 3207 rbd_assert(!ret); 3208 obj_req->pending.num_pending++; 3209 } 3210 } 3211 3212 static void rbd_obj_copyup_write_object(struct rbd_obj_request *obj_req) 3213 { 3214 u32 bytes = rbd_obj_img_extents_bytes(obj_req); 3215 int ret; 3216 3217 rbd_assert(!obj_req->pending.result && !obj_req->pending.num_pending); 3218 3219 /* 3220 * Only send non-zero copyup data to save some I/O and network 3221 * bandwidth -- zero copyup data is equivalent to the object not 3222 * existing. 3223 */ 3224 if (obj_req->flags & RBD_OBJ_FLAG_COPYUP_ZEROS) 3225 bytes = 0; 3226 3227 if (obj_req->img_request->snapc->num_snaps && bytes > 0) { 3228 /* 3229 * Send a copyup request with an empty snapshot context to 3230 * deep-copyup the object through all existing snapshots. 3231 * A second request with the current snapshot context will be 3232 * sent for the actual modification. 3233 */ 3234 ret = rbd_obj_copyup_empty_snapc(obj_req, bytes); 3235 if (ret) { 3236 obj_req->pending.result = ret; 3237 return; 3238 } 3239 3240 obj_req->pending.num_pending++; 3241 bytes = MODS_ONLY; 3242 } 3243 3244 ret = rbd_obj_copyup_current_snapc(obj_req, bytes); 3245 if (ret) { 3246 obj_req->pending.result = ret; 3247 return; 3248 } 3249 3250 obj_req->pending.num_pending++; 3251 } 3252 3253 static bool rbd_obj_advance_copyup(struct rbd_obj_request *obj_req, int *result) 3254 { 3255 struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev; 3256 int ret; 3257 3258 again: 3259 switch (obj_req->copyup_state) { 3260 case RBD_OBJ_COPYUP_START: 3261 rbd_assert(!*result); 3262 3263 ret = rbd_obj_copyup_read_parent(obj_req); 3264 if (ret) { 3265 *result = ret; 3266 return true; 3267 } 3268 if (obj_req->num_img_extents) 3269 obj_req->copyup_state = RBD_OBJ_COPYUP_READ_PARENT; 3270 else 3271 obj_req->copyup_state = RBD_OBJ_COPYUP_WRITE_OBJECT; 3272 return false; 3273 case RBD_OBJ_COPYUP_READ_PARENT: 3274 if (*result) 3275 return true; 3276 3277 if (is_zero_bvecs(obj_req->copyup_bvecs, 3278 rbd_obj_img_extents_bytes(obj_req))) { 3279 dout("%s %p detected zeros\n", __func__, obj_req); 3280 obj_req->flags |= RBD_OBJ_FLAG_COPYUP_ZEROS; 3281 } 3282 3283 rbd_obj_copyup_object_maps(obj_req); 3284 if (!obj_req->pending.num_pending) { 3285 *result = obj_req->pending.result; 3286 obj_req->copyup_state = RBD_OBJ_COPYUP_OBJECT_MAPS; 3287 goto again; 3288 } 3289 obj_req->copyup_state = __RBD_OBJ_COPYUP_OBJECT_MAPS; 3290 return false; 3291 case __RBD_OBJ_COPYUP_OBJECT_MAPS: 3292 if (!pending_result_dec(&obj_req->pending, result)) 3293 return false; 3294 /* fall through */ 3295 case RBD_OBJ_COPYUP_OBJECT_MAPS: 3296 if (*result) { 3297 rbd_warn(rbd_dev, "snap object map update failed: %d", 3298 *result); 3299 return true; 3300 } 3301 3302 rbd_obj_copyup_write_object(obj_req); 3303 if (!obj_req->pending.num_pending) { 3304 *result = obj_req->pending.result; 3305 obj_req->copyup_state = RBD_OBJ_COPYUP_WRITE_OBJECT; 3306 goto again; 3307 } 3308 obj_req->copyup_state = __RBD_OBJ_COPYUP_WRITE_OBJECT; 3309 return false; 3310 case __RBD_OBJ_COPYUP_WRITE_OBJECT: 3311 if (!pending_result_dec(&obj_req->pending, result)) 3312 return false; 3313 /* fall through */ 3314 case RBD_OBJ_COPYUP_WRITE_OBJECT: 3315 return true; 3316 default: 3317 BUG(); 3318 } 3319 } 3320 3321 /* 3322 * Return: 3323 * 0 - object map update sent 3324 * 1 - object map update isn't needed 3325 * <0 - error 3326 */ 3327 static int rbd_obj_write_post_object_map(struct rbd_obj_request *obj_req) 3328 { 3329 struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev; 3330 u8 current_state = OBJECT_PENDING; 3331 3332 if (!(rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP)) 3333 return 1; 3334 3335 if (!(obj_req->flags & RBD_OBJ_FLAG_DELETION)) 3336 return 1; 3337 3338 return rbd_object_map_update(obj_req, CEPH_NOSNAP, OBJECT_NONEXISTENT, 3339 ¤t_state); 3340 } 3341 3342 static bool rbd_obj_advance_write(struct rbd_obj_request *obj_req, int *result) 3343 { 3344 struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev; 3345 int ret; 3346 3347 again: 3348 switch (obj_req->write_state) { 3349 case RBD_OBJ_WRITE_START: 3350 rbd_assert(!*result); 3351 3352 if (rbd_obj_write_is_noop(obj_req)) 3353 return true; 3354 3355 ret = rbd_obj_write_pre_object_map(obj_req); 3356 if (ret < 0) { 3357 *result = ret; 3358 return true; 3359 } 3360 obj_req->write_state = RBD_OBJ_WRITE_PRE_OBJECT_MAP; 3361 if (ret > 0) 3362 goto again; 3363 return false; 3364 case RBD_OBJ_WRITE_PRE_OBJECT_MAP: 3365 if (*result) { 3366 rbd_warn(rbd_dev, "pre object map update failed: %d", 3367 *result); 3368 return true; 3369 } 3370 ret = rbd_obj_write_object(obj_req); 3371 if (ret) { 3372 *result = ret; 3373 return true; 3374 } 3375 obj_req->write_state = RBD_OBJ_WRITE_OBJECT; 3376 return false; 3377 case RBD_OBJ_WRITE_OBJECT: 3378 if (*result == -ENOENT) { 3379 if (obj_req->flags & RBD_OBJ_FLAG_COPYUP_ENABLED) { 3380 *result = 0; 3381 obj_req->copyup_state = RBD_OBJ_COPYUP_START; 3382 obj_req->write_state = __RBD_OBJ_WRITE_COPYUP; 3383 goto again; 3384 } 3385 /* 3386 * On a non-existent object: 3387 * delete - -ENOENT, truncate/zero - 0 3388 */ 3389 if (obj_req->flags & RBD_OBJ_FLAG_DELETION) 3390 *result = 0; 3391 } 3392 if (*result) 3393 return true; 3394 3395 obj_req->write_state = RBD_OBJ_WRITE_COPYUP; 3396 goto again; 3397 case __RBD_OBJ_WRITE_COPYUP: 3398 if (!rbd_obj_advance_copyup(obj_req, result)) 3399 return false; 3400 /* fall through */ 3401 case RBD_OBJ_WRITE_COPYUP: 3402 if (*result) { 3403 rbd_warn(rbd_dev, "copyup failed: %d", *result); 3404 return true; 3405 } 3406 ret = rbd_obj_write_post_object_map(obj_req); 3407 if (ret < 0) { 3408 *result = ret; 3409 return true; 3410 } 3411 obj_req->write_state = RBD_OBJ_WRITE_POST_OBJECT_MAP; 3412 if (ret > 0) 3413 goto again; 3414 return false; 3415 case RBD_OBJ_WRITE_POST_OBJECT_MAP: 3416 if (*result) 3417 rbd_warn(rbd_dev, "post object map update failed: %d", 3418 *result); 3419 return true; 3420 default: 3421 BUG(); 3422 } 3423 } 3424 3425 /* 3426 * Return true if @obj_req is completed. 3427 */ 3428 static bool __rbd_obj_handle_request(struct rbd_obj_request *obj_req, 3429 int *result) 3430 { 3431 struct rbd_img_request *img_req = obj_req->img_request; 3432 struct rbd_device *rbd_dev = img_req->rbd_dev; 3433 bool done; 3434 3435 mutex_lock(&obj_req->state_mutex); 3436 if (!rbd_img_is_write(img_req)) 3437 done = rbd_obj_advance_read(obj_req, result); 3438 else 3439 done = rbd_obj_advance_write(obj_req, result); 3440 mutex_unlock(&obj_req->state_mutex); 3441 3442 if (done && *result) { 3443 rbd_assert(*result < 0); 3444 rbd_warn(rbd_dev, "%s at objno %llu %llu~%llu result %d", 3445 obj_op_name(img_req->op_type), obj_req->ex.oe_objno, 3446 obj_req->ex.oe_off, obj_req->ex.oe_len, *result); 3447 } 3448 return done; 3449 } 3450 3451 /* 3452 * This is open-coded in rbd_img_handle_request() to avoid parent chain 3453 * recursion. 3454 */ 3455 static void rbd_obj_handle_request(struct rbd_obj_request *obj_req, int result) 3456 { 3457 if (__rbd_obj_handle_request(obj_req, &result)) 3458 rbd_img_handle_request(obj_req->img_request, result); 3459 } 3460 3461 static bool need_exclusive_lock(struct rbd_img_request *img_req) 3462 { 3463 struct rbd_device *rbd_dev = img_req->rbd_dev; 3464 3465 if (!(rbd_dev->header.features & RBD_FEATURE_EXCLUSIVE_LOCK)) 3466 return false; 3467 3468 if (rbd_is_ro(rbd_dev)) 3469 return false; 3470 3471 rbd_assert(!test_bit(IMG_REQ_CHILD, &img_req->flags)); 3472 if (rbd_dev->opts->lock_on_read || 3473 (rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP)) 3474 return true; 3475 3476 return rbd_img_is_write(img_req); 3477 } 3478 3479 static bool rbd_lock_add_request(struct rbd_img_request *img_req) 3480 { 3481 struct rbd_device *rbd_dev = img_req->rbd_dev; 3482 bool locked; 3483 3484 lockdep_assert_held(&rbd_dev->lock_rwsem); 3485 locked = rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED; 3486 spin_lock(&rbd_dev->lock_lists_lock); 3487 rbd_assert(list_empty(&img_req->lock_item)); 3488 if (!locked) 3489 list_add_tail(&img_req->lock_item, &rbd_dev->acquiring_list); 3490 else 3491 list_add_tail(&img_req->lock_item, &rbd_dev->running_list); 3492 spin_unlock(&rbd_dev->lock_lists_lock); 3493 return locked; 3494 } 3495 3496 static void rbd_lock_del_request(struct rbd_img_request *img_req) 3497 { 3498 struct rbd_device *rbd_dev = img_req->rbd_dev; 3499 bool need_wakeup; 3500 3501 lockdep_assert_held(&rbd_dev->lock_rwsem); 3502 spin_lock(&rbd_dev->lock_lists_lock); 3503 rbd_assert(!list_empty(&img_req->lock_item)); 3504 list_del_init(&img_req->lock_item); 3505 need_wakeup = (rbd_dev->lock_state == RBD_LOCK_STATE_RELEASING && 3506 list_empty(&rbd_dev->running_list)); 3507 spin_unlock(&rbd_dev->lock_lists_lock); 3508 if (need_wakeup) 3509 complete(&rbd_dev->releasing_wait); 3510 } 3511 3512 static int rbd_img_exclusive_lock(struct rbd_img_request *img_req) 3513 { 3514 struct rbd_device *rbd_dev = img_req->rbd_dev; 3515 3516 if (!need_exclusive_lock(img_req)) 3517 return 1; 3518 3519 if (rbd_lock_add_request(img_req)) 3520 return 1; 3521 3522 if (rbd_dev->opts->exclusive) { 3523 WARN_ON(1); /* lock got released? */ 3524 return -EROFS; 3525 } 3526 3527 /* 3528 * Note the use of mod_delayed_work() in rbd_acquire_lock() 3529 * and cancel_delayed_work() in wake_lock_waiters(). 3530 */ 3531 dout("%s rbd_dev %p queueing lock_dwork\n", __func__, rbd_dev); 3532 queue_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 0); 3533 return 0; 3534 } 3535 3536 static void rbd_img_object_requests(struct rbd_img_request *img_req) 3537 { 3538 struct rbd_obj_request *obj_req; 3539 3540 rbd_assert(!img_req->pending.result && !img_req->pending.num_pending); 3541 3542 for_each_obj_request(img_req, obj_req) { 3543 int result = 0; 3544 3545 if (__rbd_obj_handle_request(obj_req, &result)) { 3546 if (result) { 3547 img_req->pending.result = result; 3548 return; 3549 } 3550 } else { 3551 img_req->pending.num_pending++; 3552 } 3553 } 3554 } 3555 3556 static bool rbd_img_advance(struct rbd_img_request *img_req, int *result) 3557 { 3558 struct rbd_device *rbd_dev = img_req->rbd_dev; 3559 int ret; 3560 3561 again: 3562 switch (img_req->state) { 3563 case RBD_IMG_START: 3564 rbd_assert(!*result); 3565 3566 ret = rbd_img_exclusive_lock(img_req); 3567 if (ret < 0) { 3568 *result = ret; 3569 return true; 3570 } 3571 img_req->state = RBD_IMG_EXCLUSIVE_LOCK; 3572 if (ret > 0) 3573 goto again; 3574 return false; 3575 case RBD_IMG_EXCLUSIVE_LOCK: 3576 if (*result) 3577 return true; 3578 3579 rbd_assert(!need_exclusive_lock(img_req) || 3580 __rbd_is_lock_owner(rbd_dev)); 3581 3582 rbd_img_object_requests(img_req); 3583 if (!img_req->pending.num_pending) { 3584 *result = img_req->pending.result; 3585 img_req->state = RBD_IMG_OBJECT_REQUESTS; 3586 goto again; 3587 } 3588 img_req->state = __RBD_IMG_OBJECT_REQUESTS; 3589 return false; 3590 case __RBD_IMG_OBJECT_REQUESTS: 3591 if (!pending_result_dec(&img_req->pending, result)) 3592 return false; 3593 /* fall through */ 3594 case RBD_IMG_OBJECT_REQUESTS: 3595 return true; 3596 default: 3597 BUG(); 3598 } 3599 } 3600 3601 /* 3602 * Return true if @img_req is completed. 3603 */ 3604 static bool __rbd_img_handle_request(struct rbd_img_request *img_req, 3605 int *result) 3606 { 3607 struct rbd_device *rbd_dev = img_req->rbd_dev; 3608 bool done; 3609 3610 if (need_exclusive_lock(img_req)) { 3611 down_read(&rbd_dev->lock_rwsem); 3612 mutex_lock(&img_req->state_mutex); 3613 done = rbd_img_advance(img_req, result); 3614 if (done) 3615 rbd_lock_del_request(img_req); 3616 mutex_unlock(&img_req->state_mutex); 3617 up_read(&rbd_dev->lock_rwsem); 3618 } else { 3619 mutex_lock(&img_req->state_mutex); 3620 done = rbd_img_advance(img_req, result); 3621 mutex_unlock(&img_req->state_mutex); 3622 } 3623 3624 if (done && *result) { 3625 rbd_assert(*result < 0); 3626 rbd_warn(rbd_dev, "%s%s result %d", 3627 test_bit(IMG_REQ_CHILD, &img_req->flags) ? "child " : "", 3628 obj_op_name(img_req->op_type), *result); 3629 } 3630 return done; 3631 } 3632 3633 static void rbd_img_handle_request(struct rbd_img_request *img_req, int result) 3634 { 3635 again: 3636 if (!__rbd_img_handle_request(img_req, &result)) 3637 return; 3638 3639 if (test_bit(IMG_REQ_CHILD, &img_req->flags)) { 3640 struct rbd_obj_request *obj_req = img_req->obj_request; 3641 3642 rbd_img_request_destroy(img_req); 3643 if (__rbd_obj_handle_request(obj_req, &result)) { 3644 img_req = obj_req->img_request; 3645 goto again; 3646 } 3647 } else { 3648 struct request *rq = blk_mq_rq_from_pdu(img_req); 3649 3650 rbd_img_request_destroy(img_req); 3651 blk_mq_end_request(rq, errno_to_blk_status(result)); 3652 } 3653 } 3654 3655 static const struct rbd_client_id rbd_empty_cid; 3656 3657 static bool rbd_cid_equal(const struct rbd_client_id *lhs, 3658 const struct rbd_client_id *rhs) 3659 { 3660 return lhs->gid == rhs->gid && lhs->handle == rhs->handle; 3661 } 3662 3663 static struct rbd_client_id rbd_get_cid(struct rbd_device *rbd_dev) 3664 { 3665 struct rbd_client_id cid; 3666 3667 mutex_lock(&rbd_dev->watch_mutex); 3668 cid.gid = ceph_client_gid(rbd_dev->rbd_client->client); 3669 cid.handle = rbd_dev->watch_cookie; 3670 mutex_unlock(&rbd_dev->watch_mutex); 3671 return cid; 3672 } 3673 3674 /* 3675 * lock_rwsem must be held for write 3676 */ 3677 static void rbd_set_owner_cid(struct rbd_device *rbd_dev, 3678 const struct rbd_client_id *cid) 3679 { 3680 dout("%s rbd_dev %p %llu-%llu -> %llu-%llu\n", __func__, rbd_dev, 3681 rbd_dev->owner_cid.gid, rbd_dev->owner_cid.handle, 3682 cid->gid, cid->handle); 3683 rbd_dev->owner_cid = *cid; /* struct */ 3684 } 3685 3686 static void format_lock_cookie(struct rbd_device *rbd_dev, char *buf) 3687 { 3688 mutex_lock(&rbd_dev->watch_mutex); 3689 sprintf(buf, "%s %llu", RBD_LOCK_COOKIE_PREFIX, rbd_dev->watch_cookie); 3690 mutex_unlock(&rbd_dev->watch_mutex); 3691 } 3692 3693 static void __rbd_lock(struct rbd_device *rbd_dev, const char *cookie) 3694 { 3695 struct rbd_client_id cid = rbd_get_cid(rbd_dev); 3696 3697 rbd_dev->lock_state = RBD_LOCK_STATE_LOCKED; 3698 strcpy(rbd_dev->lock_cookie, cookie); 3699 rbd_set_owner_cid(rbd_dev, &cid); 3700 queue_work(rbd_dev->task_wq, &rbd_dev->acquired_lock_work); 3701 } 3702 3703 /* 3704 * lock_rwsem must be held for write 3705 */ 3706 static int rbd_lock(struct rbd_device *rbd_dev) 3707 { 3708 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; 3709 char cookie[32]; 3710 int ret; 3711 3712 WARN_ON(__rbd_is_lock_owner(rbd_dev) || 3713 rbd_dev->lock_cookie[0] != '\0'); 3714 3715 format_lock_cookie(rbd_dev, cookie); 3716 ret = ceph_cls_lock(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc, 3717 RBD_LOCK_NAME, CEPH_CLS_LOCK_EXCLUSIVE, cookie, 3718 RBD_LOCK_TAG, "", 0); 3719 if (ret) 3720 return ret; 3721 3722 __rbd_lock(rbd_dev, cookie); 3723 return 0; 3724 } 3725 3726 /* 3727 * lock_rwsem must be held for write 3728 */ 3729 static void rbd_unlock(struct rbd_device *rbd_dev) 3730 { 3731 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; 3732 int ret; 3733 3734 WARN_ON(!__rbd_is_lock_owner(rbd_dev) || 3735 rbd_dev->lock_cookie[0] == '\0'); 3736 3737 ret = ceph_cls_unlock(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc, 3738 RBD_LOCK_NAME, rbd_dev->lock_cookie); 3739 if (ret && ret != -ENOENT) 3740 rbd_warn(rbd_dev, "failed to unlock header: %d", ret); 3741 3742 /* treat errors as the image is unlocked */ 3743 rbd_dev->lock_state = RBD_LOCK_STATE_UNLOCKED; 3744 rbd_dev->lock_cookie[0] = '\0'; 3745 rbd_set_owner_cid(rbd_dev, &rbd_empty_cid); 3746 queue_work(rbd_dev->task_wq, &rbd_dev->released_lock_work); 3747 } 3748 3749 static int __rbd_notify_op_lock(struct rbd_device *rbd_dev, 3750 enum rbd_notify_op notify_op, 3751 struct page ***preply_pages, 3752 size_t *preply_len) 3753 { 3754 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; 3755 struct rbd_client_id cid = rbd_get_cid(rbd_dev); 3756 char buf[4 + 8 + 8 + CEPH_ENCODING_START_BLK_LEN]; 3757 int buf_size = sizeof(buf); 3758 void *p = buf; 3759 3760 dout("%s rbd_dev %p notify_op %d\n", __func__, rbd_dev, notify_op); 3761 3762 /* encode *LockPayload NotifyMessage (op + ClientId) */ 3763 ceph_start_encoding(&p, 2, 1, buf_size - CEPH_ENCODING_START_BLK_LEN); 3764 ceph_encode_32(&p, notify_op); 3765 ceph_encode_64(&p, cid.gid); 3766 ceph_encode_64(&p, cid.handle); 3767 3768 return ceph_osdc_notify(osdc, &rbd_dev->header_oid, 3769 &rbd_dev->header_oloc, buf, buf_size, 3770 RBD_NOTIFY_TIMEOUT, preply_pages, preply_len); 3771 } 3772 3773 static void rbd_notify_op_lock(struct rbd_device *rbd_dev, 3774 enum rbd_notify_op notify_op) 3775 { 3776 __rbd_notify_op_lock(rbd_dev, notify_op, NULL, NULL); 3777 } 3778 3779 static void rbd_notify_acquired_lock(struct work_struct *work) 3780 { 3781 struct rbd_device *rbd_dev = container_of(work, struct rbd_device, 3782 acquired_lock_work); 3783 3784 rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_ACQUIRED_LOCK); 3785 } 3786 3787 static void rbd_notify_released_lock(struct work_struct *work) 3788 { 3789 struct rbd_device *rbd_dev = container_of(work, struct rbd_device, 3790 released_lock_work); 3791 3792 rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_RELEASED_LOCK); 3793 } 3794 3795 static int rbd_request_lock(struct rbd_device *rbd_dev) 3796 { 3797 struct page **reply_pages; 3798 size_t reply_len; 3799 bool lock_owner_responded = false; 3800 int ret; 3801 3802 dout("%s rbd_dev %p\n", __func__, rbd_dev); 3803 3804 ret = __rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_REQUEST_LOCK, 3805 &reply_pages, &reply_len); 3806 if (ret && ret != -ETIMEDOUT) { 3807 rbd_warn(rbd_dev, "failed to request lock: %d", ret); 3808 goto out; 3809 } 3810 3811 if (reply_len > 0 && reply_len <= PAGE_SIZE) { 3812 void *p = page_address(reply_pages[0]); 3813 void *const end = p + reply_len; 3814 u32 n; 3815 3816 ceph_decode_32_safe(&p, end, n, e_inval); /* num_acks */ 3817 while (n--) { 3818 u8 struct_v; 3819 u32 len; 3820 3821 ceph_decode_need(&p, end, 8 + 8, e_inval); 3822 p += 8 + 8; /* skip gid and cookie */ 3823 3824 ceph_decode_32_safe(&p, end, len, e_inval); 3825 if (!len) 3826 continue; 3827 3828 if (lock_owner_responded) { 3829 rbd_warn(rbd_dev, 3830 "duplicate lock owners detected"); 3831 ret = -EIO; 3832 goto out; 3833 } 3834 3835 lock_owner_responded = true; 3836 ret = ceph_start_decoding(&p, end, 1, "ResponseMessage", 3837 &struct_v, &len); 3838 if (ret) { 3839 rbd_warn(rbd_dev, 3840 "failed to decode ResponseMessage: %d", 3841 ret); 3842 goto e_inval; 3843 } 3844 3845 ret = ceph_decode_32(&p); 3846 } 3847 } 3848 3849 if (!lock_owner_responded) { 3850 rbd_warn(rbd_dev, "no lock owners detected"); 3851 ret = -ETIMEDOUT; 3852 } 3853 3854 out: 3855 ceph_release_page_vector(reply_pages, calc_pages_for(0, reply_len)); 3856 return ret; 3857 3858 e_inval: 3859 ret = -EINVAL; 3860 goto out; 3861 } 3862 3863 /* 3864 * Either image request state machine(s) or rbd_add_acquire_lock() 3865 * (i.e. "rbd map"). 3866 */ 3867 static void wake_lock_waiters(struct rbd_device *rbd_dev, int result) 3868 { 3869 struct rbd_img_request *img_req; 3870 3871 dout("%s rbd_dev %p result %d\n", __func__, rbd_dev, result); 3872 lockdep_assert_held_write(&rbd_dev->lock_rwsem); 3873 3874 cancel_delayed_work(&rbd_dev->lock_dwork); 3875 if (!completion_done(&rbd_dev->acquire_wait)) { 3876 rbd_assert(list_empty(&rbd_dev->acquiring_list) && 3877 list_empty(&rbd_dev->running_list)); 3878 rbd_dev->acquire_err = result; 3879 complete_all(&rbd_dev->acquire_wait); 3880 return; 3881 } 3882 3883 list_for_each_entry(img_req, &rbd_dev->acquiring_list, lock_item) { 3884 mutex_lock(&img_req->state_mutex); 3885 rbd_assert(img_req->state == RBD_IMG_EXCLUSIVE_LOCK); 3886 rbd_img_schedule(img_req, result); 3887 mutex_unlock(&img_req->state_mutex); 3888 } 3889 3890 list_splice_tail_init(&rbd_dev->acquiring_list, &rbd_dev->running_list); 3891 } 3892 3893 static int get_lock_owner_info(struct rbd_device *rbd_dev, 3894 struct ceph_locker **lockers, u32 *num_lockers) 3895 { 3896 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; 3897 u8 lock_type; 3898 char *lock_tag; 3899 int ret; 3900 3901 dout("%s rbd_dev %p\n", __func__, rbd_dev); 3902 3903 ret = ceph_cls_lock_info(osdc, &rbd_dev->header_oid, 3904 &rbd_dev->header_oloc, RBD_LOCK_NAME, 3905 &lock_type, &lock_tag, lockers, num_lockers); 3906 if (ret) 3907 return ret; 3908 3909 if (*num_lockers == 0) { 3910 dout("%s rbd_dev %p no lockers detected\n", __func__, rbd_dev); 3911 goto out; 3912 } 3913 3914 if (strcmp(lock_tag, RBD_LOCK_TAG)) { 3915 rbd_warn(rbd_dev, "locked by external mechanism, tag %s", 3916 lock_tag); 3917 ret = -EBUSY; 3918 goto out; 3919 } 3920 3921 if (lock_type == CEPH_CLS_LOCK_SHARED) { 3922 rbd_warn(rbd_dev, "shared lock type detected"); 3923 ret = -EBUSY; 3924 goto out; 3925 } 3926 3927 if (strncmp((*lockers)[0].id.cookie, RBD_LOCK_COOKIE_PREFIX, 3928 strlen(RBD_LOCK_COOKIE_PREFIX))) { 3929 rbd_warn(rbd_dev, "locked by external mechanism, cookie %s", 3930 (*lockers)[0].id.cookie); 3931 ret = -EBUSY; 3932 goto out; 3933 } 3934 3935 out: 3936 kfree(lock_tag); 3937 return ret; 3938 } 3939 3940 static int find_watcher(struct rbd_device *rbd_dev, 3941 const struct ceph_locker *locker) 3942 { 3943 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; 3944 struct ceph_watch_item *watchers; 3945 u32 num_watchers; 3946 u64 cookie; 3947 int i; 3948 int ret; 3949 3950 ret = ceph_osdc_list_watchers(osdc, &rbd_dev->header_oid, 3951 &rbd_dev->header_oloc, &watchers, 3952 &num_watchers); 3953 if (ret) 3954 return ret; 3955 3956 sscanf(locker->id.cookie, RBD_LOCK_COOKIE_PREFIX " %llu", &cookie); 3957 for (i = 0; i < num_watchers; i++) { 3958 if (!memcmp(&watchers[i].addr, &locker->info.addr, 3959 sizeof(locker->info.addr)) && 3960 watchers[i].cookie == cookie) { 3961 struct rbd_client_id cid = { 3962 .gid = le64_to_cpu(watchers[i].name.num), 3963 .handle = cookie, 3964 }; 3965 3966 dout("%s rbd_dev %p found cid %llu-%llu\n", __func__, 3967 rbd_dev, cid.gid, cid.handle); 3968 rbd_set_owner_cid(rbd_dev, &cid); 3969 ret = 1; 3970 goto out; 3971 } 3972 } 3973 3974 dout("%s rbd_dev %p no watchers\n", __func__, rbd_dev); 3975 ret = 0; 3976 out: 3977 kfree(watchers); 3978 return ret; 3979 } 3980 3981 /* 3982 * lock_rwsem must be held for write 3983 */ 3984 static int rbd_try_lock(struct rbd_device *rbd_dev) 3985 { 3986 struct ceph_client *client = rbd_dev->rbd_client->client; 3987 struct ceph_locker *lockers; 3988 u32 num_lockers; 3989 int ret; 3990 3991 for (;;) { 3992 ret = rbd_lock(rbd_dev); 3993 if (ret != -EBUSY) 3994 return ret; 3995 3996 /* determine if the current lock holder is still alive */ 3997 ret = get_lock_owner_info(rbd_dev, &lockers, &num_lockers); 3998 if (ret) 3999 return ret; 4000 4001 if (num_lockers == 0) 4002 goto again; 4003 4004 ret = find_watcher(rbd_dev, lockers); 4005 if (ret) 4006 goto out; /* request lock or error */ 4007 4008 rbd_warn(rbd_dev, "breaking header lock owned by %s%llu", 4009 ENTITY_NAME(lockers[0].id.name)); 4010 4011 ret = ceph_monc_blacklist_add(&client->monc, 4012 &lockers[0].info.addr); 4013 if (ret) { 4014 rbd_warn(rbd_dev, "blacklist of %s%llu failed: %d", 4015 ENTITY_NAME(lockers[0].id.name), ret); 4016 goto out; 4017 } 4018 4019 ret = ceph_cls_break_lock(&client->osdc, &rbd_dev->header_oid, 4020 &rbd_dev->header_oloc, RBD_LOCK_NAME, 4021 lockers[0].id.cookie, 4022 &lockers[0].id.name); 4023 if (ret && ret != -ENOENT) 4024 goto out; 4025 4026 again: 4027 ceph_free_lockers(lockers, num_lockers); 4028 } 4029 4030 out: 4031 ceph_free_lockers(lockers, num_lockers); 4032 return ret; 4033 } 4034 4035 static int rbd_post_acquire_action(struct rbd_device *rbd_dev) 4036 { 4037 int ret; 4038 4039 if (rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP) { 4040 ret = rbd_object_map_open(rbd_dev); 4041 if (ret) 4042 return ret; 4043 } 4044 4045 return 0; 4046 } 4047 4048 /* 4049 * Return: 4050 * 0 - lock acquired 4051 * 1 - caller should call rbd_request_lock() 4052 * <0 - error 4053 */ 4054 static int rbd_try_acquire_lock(struct rbd_device *rbd_dev) 4055 { 4056 int ret; 4057 4058 down_read(&rbd_dev->lock_rwsem); 4059 dout("%s rbd_dev %p read lock_state %d\n", __func__, rbd_dev, 4060 rbd_dev->lock_state); 4061 if (__rbd_is_lock_owner(rbd_dev)) { 4062 up_read(&rbd_dev->lock_rwsem); 4063 return 0; 4064 } 4065 4066 up_read(&rbd_dev->lock_rwsem); 4067 down_write(&rbd_dev->lock_rwsem); 4068 dout("%s rbd_dev %p write lock_state %d\n", __func__, rbd_dev, 4069 rbd_dev->lock_state); 4070 if (__rbd_is_lock_owner(rbd_dev)) { 4071 up_write(&rbd_dev->lock_rwsem); 4072 return 0; 4073 } 4074 4075 ret = rbd_try_lock(rbd_dev); 4076 if (ret < 0) { 4077 rbd_warn(rbd_dev, "failed to lock header: %d", ret); 4078 if (ret == -EBLACKLISTED) 4079 goto out; 4080 4081 ret = 1; /* request lock anyway */ 4082 } 4083 if (ret > 0) { 4084 up_write(&rbd_dev->lock_rwsem); 4085 return ret; 4086 } 4087 4088 rbd_assert(rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED); 4089 rbd_assert(list_empty(&rbd_dev->running_list)); 4090 4091 ret = rbd_post_acquire_action(rbd_dev); 4092 if (ret) { 4093 rbd_warn(rbd_dev, "post-acquire action failed: %d", ret); 4094 /* 4095 * Can't stay in RBD_LOCK_STATE_LOCKED because 4096 * rbd_lock_add_request() would let the request through, 4097 * assuming that e.g. object map is locked and loaded. 4098 */ 4099 rbd_unlock(rbd_dev); 4100 } 4101 4102 out: 4103 wake_lock_waiters(rbd_dev, ret); 4104 up_write(&rbd_dev->lock_rwsem); 4105 return ret; 4106 } 4107 4108 static void rbd_acquire_lock(struct work_struct *work) 4109 { 4110 struct rbd_device *rbd_dev = container_of(to_delayed_work(work), 4111 struct rbd_device, lock_dwork); 4112 int ret; 4113 4114 dout("%s rbd_dev %p\n", __func__, rbd_dev); 4115 again: 4116 ret = rbd_try_acquire_lock(rbd_dev); 4117 if (ret <= 0) { 4118 dout("%s rbd_dev %p ret %d - done\n", __func__, rbd_dev, ret); 4119 return; 4120 } 4121 4122 ret = rbd_request_lock(rbd_dev); 4123 if (ret == -ETIMEDOUT) { 4124 goto again; /* treat this as a dead client */ 4125 } else if (ret == -EROFS) { 4126 rbd_warn(rbd_dev, "peer will not release lock"); 4127 down_write(&rbd_dev->lock_rwsem); 4128 wake_lock_waiters(rbd_dev, ret); 4129 up_write(&rbd_dev->lock_rwsem); 4130 } else if (ret < 0) { 4131 rbd_warn(rbd_dev, "error requesting lock: %d", ret); 4132 mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 4133 RBD_RETRY_DELAY); 4134 } else { 4135 /* 4136 * lock owner acked, but resend if we don't see them 4137 * release the lock 4138 */ 4139 dout("%s rbd_dev %p requeuing lock_dwork\n", __func__, 4140 rbd_dev); 4141 mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 4142 msecs_to_jiffies(2 * RBD_NOTIFY_TIMEOUT * MSEC_PER_SEC)); 4143 } 4144 } 4145 4146 static bool rbd_quiesce_lock(struct rbd_device *rbd_dev) 4147 { 4148 bool need_wait; 4149 4150 dout("%s rbd_dev %p\n", __func__, rbd_dev); 4151 lockdep_assert_held_write(&rbd_dev->lock_rwsem); 4152 4153 if (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED) 4154 return false; 4155 4156 /* 4157 * Ensure that all in-flight IO is flushed. 4158 */ 4159 rbd_dev->lock_state = RBD_LOCK_STATE_RELEASING; 4160 rbd_assert(!completion_done(&rbd_dev->releasing_wait)); 4161 need_wait = !list_empty(&rbd_dev->running_list); 4162 downgrade_write(&rbd_dev->lock_rwsem); 4163 if (need_wait) 4164 wait_for_completion(&rbd_dev->releasing_wait); 4165 up_read(&rbd_dev->lock_rwsem); 4166 4167 down_write(&rbd_dev->lock_rwsem); 4168 if (rbd_dev->lock_state != RBD_LOCK_STATE_RELEASING) 4169 return false; 4170 4171 rbd_assert(list_empty(&rbd_dev->running_list)); 4172 return true; 4173 } 4174 4175 static void rbd_pre_release_action(struct rbd_device *rbd_dev) 4176 { 4177 if (rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP) 4178 rbd_object_map_close(rbd_dev); 4179 } 4180 4181 static void __rbd_release_lock(struct rbd_device *rbd_dev) 4182 { 4183 rbd_assert(list_empty(&rbd_dev->running_list)); 4184 4185 rbd_pre_release_action(rbd_dev); 4186 rbd_unlock(rbd_dev); 4187 } 4188 4189 /* 4190 * lock_rwsem must be held for write 4191 */ 4192 static void rbd_release_lock(struct rbd_device *rbd_dev) 4193 { 4194 if (!rbd_quiesce_lock(rbd_dev)) 4195 return; 4196 4197 __rbd_release_lock(rbd_dev); 4198 4199 /* 4200 * Give others a chance to grab the lock - we would re-acquire 4201 * almost immediately if we got new IO while draining the running 4202 * list otherwise. We need to ack our own notifications, so this 4203 * lock_dwork will be requeued from rbd_handle_released_lock() by 4204 * way of maybe_kick_acquire(). 4205 */ 4206 cancel_delayed_work(&rbd_dev->lock_dwork); 4207 } 4208 4209 static void rbd_release_lock_work(struct work_struct *work) 4210 { 4211 struct rbd_device *rbd_dev = container_of(work, struct rbd_device, 4212 unlock_work); 4213 4214 down_write(&rbd_dev->lock_rwsem); 4215 rbd_release_lock(rbd_dev); 4216 up_write(&rbd_dev->lock_rwsem); 4217 } 4218 4219 static void maybe_kick_acquire(struct rbd_device *rbd_dev) 4220 { 4221 bool have_requests; 4222 4223 dout("%s rbd_dev %p\n", __func__, rbd_dev); 4224 if (__rbd_is_lock_owner(rbd_dev)) 4225 return; 4226 4227 spin_lock(&rbd_dev->lock_lists_lock); 4228 have_requests = !list_empty(&rbd_dev->acquiring_list); 4229 spin_unlock(&rbd_dev->lock_lists_lock); 4230 if (have_requests || delayed_work_pending(&rbd_dev->lock_dwork)) { 4231 dout("%s rbd_dev %p kicking lock_dwork\n", __func__, rbd_dev); 4232 mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 0); 4233 } 4234 } 4235 4236 static void rbd_handle_acquired_lock(struct rbd_device *rbd_dev, u8 struct_v, 4237 void **p) 4238 { 4239 struct rbd_client_id cid = { 0 }; 4240 4241 if (struct_v >= 2) { 4242 cid.gid = ceph_decode_64(p); 4243 cid.handle = ceph_decode_64(p); 4244 } 4245 4246 dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid, 4247 cid.handle); 4248 if (!rbd_cid_equal(&cid, &rbd_empty_cid)) { 4249 down_write(&rbd_dev->lock_rwsem); 4250 if (rbd_cid_equal(&cid, &rbd_dev->owner_cid)) { 4251 /* 4252 * we already know that the remote client is 4253 * the owner 4254 */ 4255 up_write(&rbd_dev->lock_rwsem); 4256 return; 4257 } 4258 4259 rbd_set_owner_cid(rbd_dev, &cid); 4260 downgrade_write(&rbd_dev->lock_rwsem); 4261 } else { 4262 down_read(&rbd_dev->lock_rwsem); 4263 } 4264 4265 maybe_kick_acquire(rbd_dev); 4266 up_read(&rbd_dev->lock_rwsem); 4267 } 4268 4269 static void rbd_handle_released_lock(struct rbd_device *rbd_dev, u8 struct_v, 4270 void **p) 4271 { 4272 struct rbd_client_id cid = { 0 }; 4273 4274 if (struct_v >= 2) { 4275 cid.gid = ceph_decode_64(p); 4276 cid.handle = ceph_decode_64(p); 4277 } 4278 4279 dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid, 4280 cid.handle); 4281 if (!rbd_cid_equal(&cid, &rbd_empty_cid)) { 4282 down_write(&rbd_dev->lock_rwsem); 4283 if (!rbd_cid_equal(&cid, &rbd_dev->owner_cid)) { 4284 dout("%s rbd_dev %p unexpected owner, cid %llu-%llu != owner_cid %llu-%llu\n", 4285 __func__, rbd_dev, cid.gid, cid.handle, 4286 rbd_dev->owner_cid.gid, rbd_dev->owner_cid.handle); 4287 up_write(&rbd_dev->lock_rwsem); 4288 return; 4289 } 4290 4291 rbd_set_owner_cid(rbd_dev, &rbd_empty_cid); 4292 downgrade_write(&rbd_dev->lock_rwsem); 4293 } else { 4294 down_read(&rbd_dev->lock_rwsem); 4295 } 4296 4297 maybe_kick_acquire(rbd_dev); 4298 up_read(&rbd_dev->lock_rwsem); 4299 } 4300 4301 /* 4302 * Returns result for ResponseMessage to be encoded (<= 0), or 1 if no 4303 * ResponseMessage is needed. 4304 */ 4305 static int rbd_handle_request_lock(struct rbd_device *rbd_dev, u8 struct_v, 4306 void **p) 4307 { 4308 struct rbd_client_id my_cid = rbd_get_cid(rbd_dev); 4309 struct rbd_client_id cid = { 0 }; 4310 int result = 1; 4311 4312 if (struct_v >= 2) { 4313 cid.gid = ceph_decode_64(p); 4314 cid.handle = ceph_decode_64(p); 4315 } 4316 4317 dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid, 4318 cid.handle); 4319 if (rbd_cid_equal(&cid, &my_cid)) 4320 return result; 4321 4322 down_read(&rbd_dev->lock_rwsem); 4323 if (__rbd_is_lock_owner(rbd_dev)) { 4324 if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED && 4325 rbd_cid_equal(&rbd_dev->owner_cid, &rbd_empty_cid)) 4326 goto out_unlock; 4327 4328 /* 4329 * encode ResponseMessage(0) so the peer can detect 4330 * a missing owner 4331 */ 4332 result = 0; 4333 4334 if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED) { 4335 if (!rbd_dev->opts->exclusive) { 4336 dout("%s rbd_dev %p queueing unlock_work\n", 4337 __func__, rbd_dev); 4338 queue_work(rbd_dev->task_wq, 4339 &rbd_dev->unlock_work); 4340 } else { 4341 /* refuse to release the lock */ 4342 result = -EROFS; 4343 } 4344 } 4345 } 4346 4347 out_unlock: 4348 up_read(&rbd_dev->lock_rwsem); 4349 return result; 4350 } 4351 4352 static void __rbd_acknowledge_notify(struct rbd_device *rbd_dev, 4353 u64 notify_id, u64 cookie, s32 *result) 4354 { 4355 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; 4356 char buf[4 + CEPH_ENCODING_START_BLK_LEN]; 4357 int buf_size = sizeof(buf); 4358 int ret; 4359 4360 if (result) { 4361 void *p = buf; 4362 4363 /* encode ResponseMessage */ 4364 ceph_start_encoding(&p, 1, 1, 4365 buf_size - CEPH_ENCODING_START_BLK_LEN); 4366 ceph_encode_32(&p, *result); 4367 } else { 4368 buf_size = 0; 4369 } 4370 4371 ret = ceph_osdc_notify_ack(osdc, &rbd_dev->header_oid, 4372 &rbd_dev->header_oloc, notify_id, cookie, 4373 buf, buf_size); 4374 if (ret) 4375 rbd_warn(rbd_dev, "acknowledge_notify failed: %d", ret); 4376 } 4377 4378 static void rbd_acknowledge_notify(struct rbd_device *rbd_dev, u64 notify_id, 4379 u64 cookie) 4380 { 4381 dout("%s rbd_dev %p\n", __func__, rbd_dev); 4382 __rbd_acknowledge_notify(rbd_dev, notify_id, cookie, NULL); 4383 } 4384 4385 static void rbd_acknowledge_notify_result(struct rbd_device *rbd_dev, 4386 u64 notify_id, u64 cookie, s32 result) 4387 { 4388 dout("%s rbd_dev %p result %d\n", __func__, rbd_dev, result); 4389 __rbd_acknowledge_notify(rbd_dev, notify_id, cookie, &result); 4390 } 4391 4392 static void rbd_watch_cb(void *arg, u64 notify_id, u64 cookie, 4393 u64 notifier_id, void *data, size_t data_len) 4394 { 4395 struct rbd_device *rbd_dev = arg; 4396 void *p = data; 4397 void *const end = p + data_len; 4398 u8 struct_v = 0; 4399 u32 len; 4400 u32 notify_op; 4401 int ret; 4402 4403 dout("%s rbd_dev %p cookie %llu notify_id %llu data_len %zu\n", 4404 __func__, rbd_dev, cookie, notify_id, data_len); 4405 if (data_len) { 4406 ret = ceph_start_decoding(&p, end, 1, "NotifyMessage", 4407 &struct_v, &len); 4408 if (ret) { 4409 rbd_warn(rbd_dev, "failed to decode NotifyMessage: %d", 4410 ret); 4411 return; 4412 } 4413 4414 notify_op = ceph_decode_32(&p); 4415 } else { 4416 /* legacy notification for header updates */ 4417 notify_op = RBD_NOTIFY_OP_HEADER_UPDATE; 4418 len = 0; 4419 } 4420 4421 dout("%s rbd_dev %p notify_op %u\n", __func__, rbd_dev, notify_op); 4422 switch (notify_op) { 4423 case RBD_NOTIFY_OP_ACQUIRED_LOCK: 4424 rbd_handle_acquired_lock(rbd_dev, struct_v, &p); 4425 rbd_acknowledge_notify(rbd_dev, notify_id, cookie); 4426 break; 4427 case RBD_NOTIFY_OP_RELEASED_LOCK: 4428 rbd_handle_released_lock(rbd_dev, struct_v, &p); 4429 rbd_acknowledge_notify(rbd_dev, notify_id, cookie); 4430 break; 4431 case RBD_NOTIFY_OP_REQUEST_LOCK: 4432 ret = rbd_handle_request_lock(rbd_dev, struct_v, &p); 4433 if (ret <= 0) 4434 rbd_acknowledge_notify_result(rbd_dev, notify_id, 4435 cookie, ret); 4436 else 4437 rbd_acknowledge_notify(rbd_dev, notify_id, cookie); 4438 break; 4439 case RBD_NOTIFY_OP_HEADER_UPDATE: 4440 ret = rbd_dev_refresh(rbd_dev); 4441 if (ret) 4442 rbd_warn(rbd_dev, "refresh failed: %d", ret); 4443 4444 rbd_acknowledge_notify(rbd_dev, notify_id, cookie); 4445 break; 4446 default: 4447 if (rbd_is_lock_owner(rbd_dev)) 4448 rbd_acknowledge_notify_result(rbd_dev, notify_id, 4449 cookie, -EOPNOTSUPP); 4450 else 4451 rbd_acknowledge_notify(rbd_dev, notify_id, cookie); 4452 break; 4453 } 4454 } 4455 4456 static void __rbd_unregister_watch(struct rbd_device *rbd_dev); 4457 4458 static void rbd_watch_errcb(void *arg, u64 cookie, int err) 4459 { 4460 struct rbd_device *rbd_dev = arg; 4461 4462 rbd_warn(rbd_dev, "encountered watch error: %d", err); 4463 4464 down_write(&rbd_dev->lock_rwsem); 4465 rbd_set_owner_cid(rbd_dev, &rbd_empty_cid); 4466 up_write(&rbd_dev->lock_rwsem); 4467 4468 mutex_lock(&rbd_dev->watch_mutex); 4469 if (rbd_dev->watch_state == RBD_WATCH_STATE_REGISTERED) { 4470 __rbd_unregister_watch(rbd_dev); 4471 rbd_dev->watch_state = RBD_WATCH_STATE_ERROR; 4472 4473 queue_delayed_work(rbd_dev->task_wq, &rbd_dev->watch_dwork, 0); 4474 } 4475 mutex_unlock(&rbd_dev->watch_mutex); 4476 } 4477 4478 /* 4479 * watch_mutex must be locked 4480 */ 4481 static int __rbd_register_watch(struct rbd_device *rbd_dev) 4482 { 4483 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; 4484 struct ceph_osd_linger_request *handle; 4485 4486 rbd_assert(!rbd_dev->watch_handle); 4487 dout("%s rbd_dev %p\n", __func__, rbd_dev); 4488 4489 handle = ceph_osdc_watch(osdc, &rbd_dev->header_oid, 4490 &rbd_dev->header_oloc, rbd_watch_cb, 4491 rbd_watch_errcb, rbd_dev); 4492 if (IS_ERR(handle)) 4493 return PTR_ERR(handle); 4494 4495 rbd_dev->watch_handle = handle; 4496 return 0; 4497 } 4498 4499 /* 4500 * watch_mutex must be locked 4501 */ 4502 static void __rbd_unregister_watch(struct rbd_device *rbd_dev) 4503 { 4504 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; 4505 int ret; 4506 4507 rbd_assert(rbd_dev->watch_handle); 4508 dout("%s rbd_dev %p\n", __func__, rbd_dev); 4509 4510 ret = ceph_osdc_unwatch(osdc, rbd_dev->watch_handle); 4511 if (ret) 4512 rbd_warn(rbd_dev, "failed to unwatch: %d", ret); 4513 4514 rbd_dev->watch_handle = NULL; 4515 } 4516 4517 static int rbd_register_watch(struct rbd_device *rbd_dev) 4518 { 4519 int ret; 4520 4521 mutex_lock(&rbd_dev->watch_mutex); 4522 rbd_assert(rbd_dev->watch_state == RBD_WATCH_STATE_UNREGISTERED); 4523 ret = __rbd_register_watch(rbd_dev); 4524 if (ret) 4525 goto out; 4526 4527 rbd_dev->watch_state = RBD_WATCH_STATE_REGISTERED; 4528 rbd_dev->watch_cookie = rbd_dev->watch_handle->linger_id; 4529 4530 out: 4531 mutex_unlock(&rbd_dev->watch_mutex); 4532 return ret; 4533 } 4534 4535 static void cancel_tasks_sync(struct rbd_device *rbd_dev) 4536 { 4537 dout("%s rbd_dev %p\n", __func__, rbd_dev); 4538 4539 cancel_work_sync(&rbd_dev->acquired_lock_work); 4540 cancel_work_sync(&rbd_dev->released_lock_work); 4541 cancel_delayed_work_sync(&rbd_dev->lock_dwork); 4542 cancel_work_sync(&rbd_dev->unlock_work); 4543 } 4544 4545 /* 4546 * header_rwsem must not be held to avoid a deadlock with 4547 * rbd_dev_refresh() when flushing notifies. 4548 */ 4549 static void rbd_unregister_watch(struct rbd_device *rbd_dev) 4550 { 4551 cancel_tasks_sync(rbd_dev); 4552 4553 mutex_lock(&rbd_dev->watch_mutex); 4554 if (rbd_dev->watch_state == RBD_WATCH_STATE_REGISTERED) 4555 __rbd_unregister_watch(rbd_dev); 4556 rbd_dev->watch_state = RBD_WATCH_STATE_UNREGISTERED; 4557 mutex_unlock(&rbd_dev->watch_mutex); 4558 4559 cancel_delayed_work_sync(&rbd_dev->watch_dwork); 4560 ceph_osdc_flush_notifies(&rbd_dev->rbd_client->client->osdc); 4561 } 4562 4563 /* 4564 * lock_rwsem must be held for write 4565 */ 4566 static void rbd_reacquire_lock(struct rbd_device *rbd_dev) 4567 { 4568 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; 4569 char cookie[32]; 4570 int ret; 4571 4572 if (!rbd_quiesce_lock(rbd_dev)) 4573 return; 4574 4575 format_lock_cookie(rbd_dev, cookie); 4576 ret = ceph_cls_set_cookie(osdc, &rbd_dev->header_oid, 4577 &rbd_dev->header_oloc, RBD_LOCK_NAME, 4578 CEPH_CLS_LOCK_EXCLUSIVE, rbd_dev->lock_cookie, 4579 RBD_LOCK_TAG, cookie); 4580 if (ret) { 4581 if (ret != -EOPNOTSUPP) 4582 rbd_warn(rbd_dev, "failed to update lock cookie: %d", 4583 ret); 4584 4585 /* 4586 * Lock cookie cannot be updated on older OSDs, so do 4587 * a manual release and queue an acquire. 4588 */ 4589 __rbd_release_lock(rbd_dev); 4590 queue_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 0); 4591 } else { 4592 __rbd_lock(rbd_dev, cookie); 4593 wake_lock_waiters(rbd_dev, 0); 4594 } 4595 } 4596 4597 static void rbd_reregister_watch(struct work_struct *work) 4598 { 4599 struct rbd_device *rbd_dev = container_of(to_delayed_work(work), 4600 struct rbd_device, watch_dwork); 4601 int ret; 4602 4603 dout("%s rbd_dev %p\n", __func__, rbd_dev); 4604 4605 mutex_lock(&rbd_dev->watch_mutex); 4606 if (rbd_dev->watch_state != RBD_WATCH_STATE_ERROR) { 4607 mutex_unlock(&rbd_dev->watch_mutex); 4608 return; 4609 } 4610 4611 ret = __rbd_register_watch(rbd_dev); 4612 if (ret) { 4613 rbd_warn(rbd_dev, "failed to reregister watch: %d", ret); 4614 if (ret != -EBLACKLISTED && ret != -ENOENT) { 4615 queue_delayed_work(rbd_dev->task_wq, 4616 &rbd_dev->watch_dwork, 4617 RBD_RETRY_DELAY); 4618 mutex_unlock(&rbd_dev->watch_mutex); 4619 return; 4620 } 4621 4622 mutex_unlock(&rbd_dev->watch_mutex); 4623 down_write(&rbd_dev->lock_rwsem); 4624 wake_lock_waiters(rbd_dev, ret); 4625 up_write(&rbd_dev->lock_rwsem); 4626 return; 4627 } 4628 4629 rbd_dev->watch_state = RBD_WATCH_STATE_REGISTERED; 4630 rbd_dev->watch_cookie = rbd_dev->watch_handle->linger_id; 4631 mutex_unlock(&rbd_dev->watch_mutex); 4632 4633 down_write(&rbd_dev->lock_rwsem); 4634 if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED) 4635 rbd_reacquire_lock(rbd_dev); 4636 up_write(&rbd_dev->lock_rwsem); 4637 4638 ret = rbd_dev_refresh(rbd_dev); 4639 if (ret) 4640 rbd_warn(rbd_dev, "reregistration refresh failed: %d", ret); 4641 } 4642 4643 /* 4644 * Synchronous osd object method call. Returns the number of bytes 4645 * returned in the outbound buffer, or a negative error code. 4646 */ 4647 static int rbd_obj_method_sync(struct rbd_device *rbd_dev, 4648 struct ceph_object_id *oid, 4649 struct ceph_object_locator *oloc, 4650 const char *method_name, 4651 const void *outbound, 4652 size_t outbound_size, 4653 void *inbound, 4654 size_t inbound_size) 4655 { 4656 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; 4657 struct page *req_page = NULL; 4658 struct page *reply_page; 4659 int ret; 4660 4661 /* 4662 * Method calls are ultimately read operations. The result 4663 * should placed into the inbound buffer provided. They 4664 * also supply outbound data--parameters for the object 4665 * method. Currently if this is present it will be a 4666 * snapshot id. 4667 */ 4668 if (outbound) { 4669 if (outbound_size > PAGE_SIZE) 4670 return -E2BIG; 4671 4672 req_page = alloc_page(GFP_KERNEL); 4673 if (!req_page) 4674 return -ENOMEM; 4675 4676 memcpy(page_address(req_page), outbound, outbound_size); 4677 } 4678 4679 reply_page = alloc_page(GFP_KERNEL); 4680 if (!reply_page) { 4681 if (req_page) 4682 __free_page(req_page); 4683 return -ENOMEM; 4684 } 4685 4686 ret = ceph_osdc_call(osdc, oid, oloc, RBD_DRV_NAME, method_name, 4687 CEPH_OSD_FLAG_READ, req_page, outbound_size, 4688 &reply_page, &inbound_size); 4689 if (!ret) { 4690 memcpy(inbound, page_address(reply_page), inbound_size); 4691 ret = inbound_size; 4692 } 4693 4694 if (req_page) 4695 __free_page(req_page); 4696 __free_page(reply_page); 4697 return ret; 4698 } 4699 4700 static void rbd_queue_workfn(struct work_struct *work) 4701 { 4702 struct rbd_img_request *img_request = 4703 container_of(work, struct rbd_img_request, work); 4704 struct rbd_device *rbd_dev = img_request->rbd_dev; 4705 enum obj_operation_type op_type = img_request->op_type; 4706 struct request *rq = blk_mq_rq_from_pdu(img_request); 4707 u64 offset = (u64)blk_rq_pos(rq) << SECTOR_SHIFT; 4708 u64 length = blk_rq_bytes(rq); 4709 u64 mapping_size; 4710 int result; 4711 4712 /* Ignore/skip any zero-length requests */ 4713 if (!length) { 4714 dout("%s: zero-length request\n", __func__); 4715 result = 0; 4716 goto err_img_request; 4717 } 4718 4719 blk_mq_start_request(rq); 4720 4721 down_read(&rbd_dev->header_rwsem); 4722 mapping_size = rbd_dev->mapping.size; 4723 rbd_img_capture_header(img_request); 4724 up_read(&rbd_dev->header_rwsem); 4725 4726 if (offset + length > mapping_size) { 4727 rbd_warn(rbd_dev, "beyond EOD (%llu~%llu > %llu)", offset, 4728 length, mapping_size); 4729 result = -EIO; 4730 goto err_img_request; 4731 } 4732 4733 dout("%s rbd_dev %p img_req %p %s %llu~%llu\n", __func__, rbd_dev, 4734 img_request, obj_op_name(op_type), offset, length); 4735 4736 if (op_type == OBJ_OP_DISCARD || op_type == OBJ_OP_ZEROOUT) 4737 result = rbd_img_fill_nodata(img_request, offset, length); 4738 else 4739 result = rbd_img_fill_from_bio(img_request, offset, length, 4740 rq->bio); 4741 if (result) 4742 goto err_img_request; 4743 4744 rbd_img_handle_request(img_request, 0); 4745 return; 4746 4747 err_img_request: 4748 rbd_img_request_destroy(img_request); 4749 if (result) 4750 rbd_warn(rbd_dev, "%s %llx at %llx result %d", 4751 obj_op_name(op_type), length, offset, result); 4752 blk_mq_end_request(rq, errno_to_blk_status(result)); 4753 } 4754 4755 static blk_status_t rbd_queue_rq(struct blk_mq_hw_ctx *hctx, 4756 const struct blk_mq_queue_data *bd) 4757 { 4758 struct rbd_device *rbd_dev = hctx->queue->queuedata; 4759 struct rbd_img_request *img_req = blk_mq_rq_to_pdu(bd->rq); 4760 enum obj_operation_type op_type; 4761 4762 switch (req_op(bd->rq)) { 4763 case REQ_OP_DISCARD: 4764 op_type = OBJ_OP_DISCARD; 4765 break; 4766 case REQ_OP_WRITE_ZEROES: 4767 op_type = OBJ_OP_ZEROOUT; 4768 break; 4769 case REQ_OP_WRITE: 4770 op_type = OBJ_OP_WRITE; 4771 break; 4772 case REQ_OP_READ: 4773 op_type = OBJ_OP_READ; 4774 break; 4775 default: 4776 rbd_warn(rbd_dev, "unknown req_op %d", req_op(bd->rq)); 4777 return BLK_STS_IOERR; 4778 } 4779 4780 rbd_img_request_init(img_req, rbd_dev, op_type); 4781 4782 if (rbd_img_is_write(img_req)) { 4783 if (rbd_is_ro(rbd_dev)) { 4784 rbd_warn(rbd_dev, "%s on read-only mapping", 4785 obj_op_name(img_req->op_type)); 4786 return BLK_STS_IOERR; 4787 } 4788 rbd_assert(!rbd_is_snap(rbd_dev)); 4789 } 4790 4791 INIT_WORK(&img_req->work, rbd_queue_workfn); 4792 queue_work(rbd_wq, &img_req->work); 4793 return BLK_STS_OK; 4794 } 4795 4796 static void rbd_free_disk(struct rbd_device *rbd_dev) 4797 { 4798 blk_cleanup_queue(rbd_dev->disk->queue); 4799 blk_mq_free_tag_set(&rbd_dev->tag_set); 4800 put_disk(rbd_dev->disk); 4801 rbd_dev->disk = NULL; 4802 } 4803 4804 static int rbd_obj_read_sync(struct rbd_device *rbd_dev, 4805 struct ceph_object_id *oid, 4806 struct ceph_object_locator *oloc, 4807 void *buf, int buf_len) 4808 4809 { 4810 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; 4811 struct ceph_osd_request *req; 4812 struct page **pages; 4813 int num_pages = calc_pages_for(0, buf_len); 4814 int ret; 4815 4816 req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_KERNEL); 4817 if (!req) 4818 return -ENOMEM; 4819 4820 ceph_oid_copy(&req->r_base_oid, oid); 4821 ceph_oloc_copy(&req->r_base_oloc, oloc); 4822 req->r_flags = CEPH_OSD_FLAG_READ; 4823 4824 pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL); 4825 if (IS_ERR(pages)) { 4826 ret = PTR_ERR(pages); 4827 goto out_req; 4828 } 4829 4830 osd_req_op_extent_init(req, 0, CEPH_OSD_OP_READ, 0, buf_len, 0, 0); 4831 osd_req_op_extent_osd_data_pages(req, 0, pages, buf_len, 0, false, 4832 true); 4833 4834 ret = ceph_osdc_alloc_messages(req, GFP_KERNEL); 4835 if (ret) 4836 goto out_req; 4837 4838 ceph_osdc_start_request(osdc, req, false); 4839 ret = ceph_osdc_wait_request(osdc, req); 4840 if (ret >= 0) 4841 ceph_copy_from_page_vector(pages, buf, 0, ret); 4842 4843 out_req: 4844 ceph_osdc_put_request(req); 4845 return ret; 4846 } 4847 4848 /* 4849 * Read the complete header for the given rbd device. On successful 4850 * return, the rbd_dev->header field will contain up-to-date 4851 * information about the image. 4852 */ 4853 static int rbd_dev_v1_header_info(struct rbd_device *rbd_dev) 4854 { 4855 struct rbd_image_header_ondisk *ondisk = NULL; 4856 u32 snap_count = 0; 4857 u64 names_size = 0; 4858 u32 want_count; 4859 int ret; 4860 4861 /* 4862 * The complete header will include an array of its 64-bit 4863 * snapshot ids, followed by the names of those snapshots as 4864 * a contiguous block of NUL-terminated strings. Note that 4865 * the number of snapshots could change by the time we read 4866 * it in, in which case we re-read it. 4867 */ 4868 do { 4869 size_t size; 4870 4871 kfree(ondisk); 4872 4873 size = sizeof (*ondisk); 4874 size += snap_count * sizeof (struct rbd_image_snap_ondisk); 4875 size += names_size; 4876 ondisk = kmalloc(size, GFP_KERNEL); 4877 if (!ondisk) 4878 return -ENOMEM; 4879 4880 ret = rbd_obj_read_sync(rbd_dev, &rbd_dev->header_oid, 4881 &rbd_dev->header_oloc, ondisk, size); 4882 if (ret < 0) 4883 goto out; 4884 if ((size_t)ret < size) { 4885 ret = -ENXIO; 4886 rbd_warn(rbd_dev, "short header read (want %zd got %d)", 4887 size, ret); 4888 goto out; 4889 } 4890 if (!rbd_dev_ondisk_valid(ondisk)) { 4891 ret = -ENXIO; 4892 rbd_warn(rbd_dev, "invalid header"); 4893 goto out; 4894 } 4895 4896 names_size = le64_to_cpu(ondisk->snap_names_len); 4897 want_count = snap_count; 4898 snap_count = le32_to_cpu(ondisk->snap_count); 4899 } while (snap_count != want_count); 4900 4901 ret = rbd_header_from_disk(rbd_dev, ondisk); 4902 out: 4903 kfree(ondisk); 4904 4905 return ret; 4906 } 4907 4908 static void rbd_dev_update_size(struct rbd_device *rbd_dev) 4909 { 4910 sector_t size; 4911 4912 /* 4913 * If EXISTS is not set, rbd_dev->disk may be NULL, so don't 4914 * try to update its size. If REMOVING is set, updating size 4915 * is just useless work since the device can't be opened. 4916 */ 4917 if (test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags) && 4918 !test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags)) { 4919 size = (sector_t)rbd_dev->mapping.size / SECTOR_SIZE; 4920 dout("setting size to %llu sectors", (unsigned long long)size); 4921 set_capacity(rbd_dev->disk, size); 4922 revalidate_disk(rbd_dev->disk); 4923 } 4924 } 4925 4926 static int rbd_dev_refresh(struct rbd_device *rbd_dev) 4927 { 4928 u64 mapping_size; 4929 int ret; 4930 4931 down_write(&rbd_dev->header_rwsem); 4932 mapping_size = rbd_dev->mapping.size; 4933 4934 ret = rbd_dev_header_info(rbd_dev); 4935 if (ret) 4936 goto out; 4937 4938 /* 4939 * If there is a parent, see if it has disappeared due to the 4940 * mapped image getting flattened. 4941 */ 4942 if (rbd_dev->parent) { 4943 ret = rbd_dev_v2_parent_info(rbd_dev); 4944 if (ret) 4945 goto out; 4946 } 4947 4948 rbd_assert(!rbd_is_snap(rbd_dev)); 4949 rbd_dev->mapping.size = rbd_dev->header.image_size; 4950 4951 out: 4952 up_write(&rbd_dev->header_rwsem); 4953 if (!ret && mapping_size != rbd_dev->mapping.size) 4954 rbd_dev_update_size(rbd_dev); 4955 4956 return ret; 4957 } 4958 4959 static const struct blk_mq_ops rbd_mq_ops = { 4960 .queue_rq = rbd_queue_rq, 4961 }; 4962 4963 static int rbd_init_disk(struct rbd_device *rbd_dev) 4964 { 4965 struct gendisk *disk; 4966 struct request_queue *q; 4967 unsigned int objset_bytes = 4968 rbd_dev->layout.object_size * rbd_dev->layout.stripe_count; 4969 int err; 4970 4971 /* create gendisk info */ 4972 disk = alloc_disk(single_major ? 4973 (1 << RBD_SINGLE_MAJOR_PART_SHIFT) : 4974 RBD_MINORS_PER_MAJOR); 4975 if (!disk) 4976 return -ENOMEM; 4977 4978 snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d", 4979 rbd_dev->dev_id); 4980 disk->major = rbd_dev->major; 4981 disk->first_minor = rbd_dev->minor; 4982 if (single_major) 4983 disk->flags |= GENHD_FL_EXT_DEVT; 4984 disk->fops = &rbd_bd_ops; 4985 disk->private_data = rbd_dev; 4986 4987 memset(&rbd_dev->tag_set, 0, sizeof(rbd_dev->tag_set)); 4988 rbd_dev->tag_set.ops = &rbd_mq_ops; 4989 rbd_dev->tag_set.queue_depth = rbd_dev->opts->queue_depth; 4990 rbd_dev->tag_set.numa_node = NUMA_NO_NODE; 4991 rbd_dev->tag_set.flags = BLK_MQ_F_SHOULD_MERGE; 4992 rbd_dev->tag_set.nr_hw_queues = num_present_cpus(); 4993 rbd_dev->tag_set.cmd_size = sizeof(struct rbd_img_request); 4994 4995 err = blk_mq_alloc_tag_set(&rbd_dev->tag_set); 4996 if (err) 4997 goto out_disk; 4998 4999 q = blk_mq_init_queue(&rbd_dev->tag_set); 5000 if (IS_ERR(q)) { 5001 err = PTR_ERR(q); 5002 goto out_tag_set; 5003 } 5004 5005 blk_queue_flag_set(QUEUE_FLAG_NONROT, q); 5006 /* QUEUE_FLAG_ADD_RANDOM is off by default for blk-mq */ 5007 5008 blk_queue_max_hw_sectors(q, objset_bytes >> SECTOR_SHIFT); 5009 q->limits.max_sectors = queue_max_hw_sectors(q); 5010 blk_queue_max_segments(q, USHRT_MAX); 5011 blk_queue_max_segment_size(q, UINT_MAX); 5012 blk_queue_io_min(q, rbd_dev->opts->alloc_size); 5013 blk_queue_io_opt(q, rbd_dev->opts->alloc_size); 5014 5015 if (rbd_dev->opts->trim) { 5016 blk_queue_flag_set(QUEUE_FLAG_DISCARD, q); 5017 q->limits.discard_granularity = rbd_dev->opts->alloc_size; 5018 blk_queue_max_discard_sectors(q, objset_bytes >> SECTOR_SHIFT); 5019 blk_queue_max_write_zeroes_sectors(q, objset_bytes >> SECTOR_SHIFT); 5020 } 5021 5022 if (!ceph_test_opt(rbd_dev->rbd_client->client, NOCRC)) 5023 q->backing_dev_info->capabilities |= BDI_CAP_STABLE_WRITES; 5024 5025 /* 5026 * disk_release() expects a queue ref from add_disk() and will 5027 * put it. Hold an extra ref until add_disk() is called. 5028 */ 5029 WARN_ON(!blk_get_queue(q)); 5030 disk->queue = q; 5031 q->queuedata = rbd_dev; 5032 5033 rbd_dev->disk = disk; 5034 5035 return 0; 5036 out_tag_set: 5037 blk_mq_free_tag_set(&rbd_dev->tag_set); 5038 out_disk: 5039 put_disk(disk); 5040 return err; 5041 } 5042 5043 /* 5044 sysfs 5045 */ 5046 5047 static struct rbd_device *dev_to_rbd_dev(struct device *dev) 5048 { 5049 return container_of(dev, struct rbd_device, dev); 5050 } 5051 5052 static ssize_t rbd_size_show(struct device *dev, 5053 struct device_attribute *attr, char *buf) 5054 { 5055 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 5056 5057 return sprintf(buf, "%llu\n", 5058 (unsigned long long)rbd_dev->mapping.size); 5059 } 5060 5061 static ssize_t rbd_features_show(struct device *dev, 5062 struct device_attribute *attr, char *buf) 5063 { 5064 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 5065 5066 return sprintf(buf, "0x%016llx\n", rbd_dev->header.features); 5067 } 5068 5069 static ssize_t rbd_major_show(struct device *dev, 5070 struct device_attribute *attr, char *buf) 5071 { 5072 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 5073 5074 if (rbd_dev->major) 5075 return sprintf(buf, "%d\n", rbd_dev->major); 5076 5077 return sprintf(buf, "(none)\n"); 5078 } 5079 5080 static ssize_t rbd_minor_show(struct device *dev, 5081 struct device_attribute *attr, char *buf) 5082 { 5083 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 5084 5085 return sprintf(buf, "%d\n", rbd_dev->minor); 5086 } 5087 5088 static ssize_t rbd_client_addr_show(struct device *dev, 5089 struct device_attribute *attr, char *buf) 5090 { 5091 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 5092 struct ceph_entity_addr *client_addr = 5093 ceph_client_addr(rbd_dev->rbd_client->client); 5094 5095 return sprintf(buf, "%pISpc/%u\n", &client_addr->in_addr, 5096 le32_to_cpu(client_addr->nonce)); 5097 } 5098 5099 static ssize_t rbd_client_id_show(struct device *dev, 5100 struct device_attribute *attr, char *buf) 5101 { 5102 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 5103 5104 return sprintf(buf, "client%lld\n", 5105 ceph_client_gid(rbd_dev->rbd_client->client)); 5106 } 5107 5108 static ssize_t rbd_cluster_fsid_show(struct device *dev, 5109 struct device_attribute *attr, char *buf) 5110 { 5111 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 5112 5113 return sprintf(buf, "%pU\n", &rbd_dev->rbd_client->client->fsid); 5114 } 5115 5116 static ssize_t rbd_config_info_show(struct device *dev, 5117 struct device_attribute *attr, char *buf) 5118 { 5119 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 5120 5121 return sprintf(buf, "%s\n", rbd_dev->config_info); 5122 } 5123 5124 static ssize_t rbd_pool_show(struct device *dev, 5125 struct device_attribute *attr, char *buf) 5126 { 5127 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 5128 5129 return sprintf(buf, "%s\n", rbd_dev->spec->pool_name); 5130 } 5131 5132 static ssize_t rbd_pool_id_show(struct device *dev, 5133 struct device_attribute *attr, char *buf) 5134 { 5135 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 5136 5137 return sprintf(buf, "%llu\n", 5138 (unsigned long long) rbd_dev->spec->pool_id); 5139 } 5140 5141 static ssize_t rbd_pool_ns_show(struct device *dev, 5142 struct device_attribute *attr, char *buf) 5143 { 5144 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 5145 5146 return sprintf(buf, "%s\n", rbd_dev->spec->pool_ns ?: ""); 5147 } 5148 5149 static ssize_t rbd_name_show(struct device *dev, 5150 struct device_attribute *attr, char *buf) 5151 { 5152 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 5153 5154 if (rbd_dev->spec->image_name) 5155 return sprintf(buf, "%s\n", rbd_dev->spec->image_name); 5156 5157 return sprintf(buf, "(unknown)\n"); 5158 } 5159 5160 static ssize_t rbd_image_id_show(struct device *dev, 5161 struct device_attribute *attr, char *buf) 5162 { 5163 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 5164 5165 return sprintf(buf, "%s\n", rbd_dev->spec->image_id); 5166 } 5167 5168 /* 5169 * Shows the name of the currently-mapped snapshot (or 5170 * RBD_SNAP_HEAD_NAME for the base image). 5171 */ 5172 static ssize_t rbd_snap_show(struct device *dev, 5173 struct device_attribute *attr, 5174 char *buf) 5175 { 5176 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 5177 5178 return sprintf(buf, "%s\n", rbd_dev->spec->snap_name); 5179 } 5180 5181 static ssize_t rbd_snap_id_show(struct device *dev, 5182 struct device_attribute *attr, char *buf) 5183 { 5184 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 5185 5186 return sprintf(buf, "%llu\n", rbd_dev->spec->snap_id); 5187 } 5188 5189 /* 5190 * For a v2 image, shows the chain of parent images, separated by empty 5191 * lines. For v1 images or if there is no parent, shows "(no parent 5192 * image)". 5193 */ 5194 static ssize_t rbd_parent_show(struct device *dev, 5195 struct device_attribute *attr, 5196 char *buf) 5197 { 5198 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 5199 ssize_t count = 0; 5200 5201 if (!rbd_dev->parent) 5202 return sprintf(buf, "(no parent image)\n"); 5203 5204 for ( ; rbd_dev->parent; rbd_dev = rbd_dev->parent) { 5205 struct rbd_spec *spec = rbd_dev->parent_spec; 5206 5207 count += sprintf(&buf[count], "%s" 5208 "pool_id %llu\npool_name %s\n" 5209 "pool_ns %s\n" 5210 "image_id %s\nimage_name %s\n" 5211 "snap_id %llu\nsnap_name %s\n" 5212 "overlap %llu\n", 5213 !count ? "" : "\n", /* first? */ 5214 spec->pool_id, spec->pool_name, 5215 spec->pool_ns ?: "", 5216 spec->image_id, spec->image_name ?: "(unknown)", 5217 spec->snap_id, spec->snap_name, 5218 rbd_dev->parent_overlap); 5219 } 5220 5221 return count; 5222 } 5223 5224 static ssize_t rbd_image_refresh(struct device *dev, 5225 struct device_attribute *attr, 5226 const char *buf, 5227 size_t size) 5228 { 5229 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 5230 int ret; 5231 5232 ret = rbd_dev_refresh(rbd_dev); 5233 if (ret) 5234 return ret; 5235 5236 return size; 5237 } 5238 5239 static DEVICE_ATTR(size, 0444, rbd_size_show, NULL); 5240 static DEVICE_ATTR(features, 0444, rbd_features_show, NULL); 5241 static DEVICE_ATTR(major, 0444, rbd_major_show, NULL); 5242 static DEVICE_ATTR(minor, 0444, rbd_minor_show, NULL); 5243 static DEVICE_ATTR(client_addr, 0444, rbd_client_addr_show, NULL); 5244 static DEVICE_ATTR(client_id, 0444, rbd_client_id_show, NULL); 5245 static DEVICE_ATTR(cluster_fsid, 0444, rbd_cluster_fsid_show, NULL); 5246 static DEVICE_ATTR(config_info, 0400, rbd_config_info_show, NULL); 5247 static DEVICE_ATTR(pool, 0444, rbd_pool_show, NULL); 5248 static DEVICE_ATTR(pool_id, 0444, rbd_pool_id_show, NULL); 5249 static DEVICE_ATTR(pool_ns, 0444, rbd_pool_ns_show, NULL); 5250 static DEVICE_ATTR(name, 0444, rbd_name_show, NULL); 5251 static DEVICE_ATTR(image_id, 0444, rbd_image_id_show, NULL); 5252 static DEVICE_ATTR(refresh, 0200, NULL, rbd_image_refresh); 5253 static DEVICE_ATTR(current_snap, 0444, rbd_snap_show, NULL); 5254 static DEVICE_ATTR(snap_id, 0444, rbd_snap_id_show, NULL); 5255 static DEVICE_ATTR(parent, 0444, rbd_parent_show, NULL); 5256 5257 static struct attribute *rbd_attrs[] = { 5258 &dev_attr_size.attr, 5259 &dev_attr_features.attr, 5260 &dev_attr_major.attr, 5261 &dev_attr_minor.attr, 5262 &dev_attr_client_addr.attr, 5263 &dev_attr_client_id.attr, 5264 &dev_attr_cluster_fsid.attr, 5265 &dev_attr_config_info.attr, 5266 &dev_attr_pool.attr, 5267 &dev_attr_pool_id.attr, 5268 &dev_attr_pool_ns.attr, 5269 &dev_attr_name.attr, 5270 &dev_attr_image_id.attr, 5271 &dev_attr_current_snap.attr, 5272 &dev_attr_snap_id.attr, 5273 &dev_attr_parent.attr, 5274 &dev_attr_refresh.attr, 5275 NULL 5276 }; 5277 5278 static struct attribute_group rbd_attr_group = { 5279 .attrs = rbd_attrs, 5280 }; 5281 5282 static const struct attribute_group *rbd_attr_groups[] = { 5283 &rbd_attr_group, 5284 NULL 5285 }; 5286 5287 static void rbd_dev_release(struct device *dev); 5288 5289 static const struct device_type rbd_device_type = { 5290 .name = "rbd", 5291 .groups = rbd_attr_groups, 5292 .release = rbd_dev_release, 5293 }; 5294 5295 static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec) 5296 { 5297 kref_get(&spec->kref); 5298 5299 return spec; 5300 } 5301 5302 static void rbd_spec_free(struct kref *kref); 5303 static void rbd_spec_put(struct rbd_spec *spec) 5304 { 5305 if (spec) 5306 kref_put(&spec->kref, rbd_spec_free); 5307 } 5308 5309 static struct rbd_spec *rbd_spec_alloc(void) 5310 { 5311 struct rbd_spec *spec; 5312 5313 spec = kzalloc(sizeof (*spec), GFP_KERNEL); 5314 if (!spec) 5315 return NULL; 5316 5317 spec->pool_id = CEPH_NOPOOL; 5318 spec->snap_id = CEPH_NOSNAP; 5319 kref_init(&spec->kref); 5320 5321 return spec; 5322 } 5323 5324 static void rbd_spec_free(struct kref *kref) 5325 { 5326 struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref); 5327 5328 kfree(spec->pool_name); 5329 kfree(spec->pool_ns); 5330 kfree(spec->image_id); 5331 kfree(spec->image_name); 5332 kfree(spec->snap_name); 5333 kfree(spec); 5334 } 5335 5336 static void rbd_dev_free(struct rbd_device *rbd_dev) 5337 { 5338 WARN_ON(rbd_dev->watch_state != RBD_WATCH_STATE_UNREGISTERED); 5339 WARN_ON(rbd_dev->lock_state != RBD_LOCK_STATE_UNLOCKED); 5340 5341 ceph_oid_destroy(&rbd_dev->header_oid); 5342 ceph_oloc_destroy(&rbd_dev->header_oloc); 5343 kfree(rbd_dev->config_info); 5344 5345 rbd_put_client(rbd_dev->rbd_client); 5346 rbd_spec_put(rbd_dev->spec); 5347 kfree(rbd_dev->opts); 5348 kfree(rbd_dev); 5349 } 5350 5351 static void rbd_dev_release(struct device *dev) 5352 { 5353 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 5354 bool need_put = !!rbd_dev->opts; 5355 5356 if (need_put) { 5357 destroy_workqueue(rbd_dev->task_wq); 5358 ida_simple_remove(&rbd_dev_id_ida, rbd_dev->dev_id); 5359 } 5360 5361 rbd_dev_free(rbd_dev); 5362 5363 /* 5364 * This is racy, but way better than putting module outside of 5365 * the release callback. The race window is pretty small, so 5366 * doing something similar to dm (dm-builtin.c) is overkill. 5367 */ 5368 if (need_put) 5369 module_put(THIS_MODULE); 5370 } 5371 5372 static struct rbd_device *__rbd_dev_create(struct rbd_client *rbdc, 5373 struct rbd_spec *spec) 5374 { 5375 struct rbd_device *rbd_dev; 5376 5377 rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL); 5378 if (!rbd_dev) 5379 return NULL; 5380 5381 spin_lock_init(&rbd_dev->lock); 5382 INIT_LIST_HEAD(&rbd_dev->node); 5383 init_rwsem(&rbd_dev->header_rwsem); 5384 5385 rbd_dev->header.data_pool_id = CEPH_NOPOOL; 5386 ceph_oid_init(&rbd_dev->header_oid); 5387 rbd_dev->header_oloc.pool = spec->pool_id; 5388 if (spec->pool_ns) { 5389 WARN_ON(!*spec->pool_ns); 5390 rbd_dev->header_oloc.pool_ns = 5391 ceph_find_or_create_string(spec->pool_ns, 5392 strlen(spec->pool_ns)); 5393 } 5394 5395 mutex_init(&rbd_dev->watch_mutex); 5396 rbd_dev->watch_state = RBD_WATCH_STATE_UNREGISTERED; 5397 INIT_DELAYED_WORK(&rbd_dev->watch_dwork, rbd_reregister_watch); 5398 5399 init_rwsem(&rbd_dev->lock_rwsem); 5400 rbd_dev->lock_state = RBD_LOCK_STATE_UNLOCKED; 5401 INIT_WORK(&rbd_dev->acquired_lock_work, rbd_notify_acquired_lock); 5402 INIT_WORK(&rbd_dev->released_lock_work, rbd_notify_released_lock); 5403 INIT_DELAYED_WORK(&rbd_dev->lock_dwork, rbd_acquire_lock); 5404 INIT_WORK(&rbd_dev->unlock_work, rbd_release_lock_work); 5405 spin_lock_init(&rbd_dev->lock_lists_lock); 5406 INIT_LIST_HEAD(&rbd_dev->acquiring_list); 5407 INIT_LIST_HEAD(&rbd_dev->running_list); 5408 init_completion(&rbd_dev->acquire_wait); 5409 init_completion(&rbd_dev->releasing_wait); 5410 5411 spin_lock_init(&rbd_dev->object_map_lock); 5412 5413 rbd_dev->dev.bus = &rbd_bus_type; 5414 rbd_dev->dev.type = &rbd_device_type; 5415 rbd_dev->dev.parent = &rbd_root_dev; 5416 device_initialize(&rbd_dev->dev); 5417 5418 rbd_dev->rbd_client = rbdc; 5419 rbd_dev->spec = spec; 5420 5421 return rbd_dev; 5422 } 5423 5424 /* 5425 * Create a mapping rbd_dev. 5426 */ 5427 static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc, 5428 struct rbd_spec *spec, 5429 struct rbd_options *opts) 5430 { 5431 struct rbd_device *rbd_dev; 5432 5433 rbd_dev = __rbd_dev_create(rbdc, spec); 5434 if (!rbd_dev) 5435 return NULL; 5436 5437 rbd_dev->opts = opts; 5438 5439 /* get an id and fill in device name */ 5440 rbd_dev->dev_id = ida_simple_get(&rbd_dev_id_ida, 0, 5441 minor_to_rbd_dev_id(1 << MINORBITS), 5442 GFP_KERNEL); 5443 if (rbd_dev->dev_id < 0) 5444 goto fail_rbd_dev; 5445 5446 sprintf(rbd_dev->name, RBD_DRV_NAME "%d", rbd_dev->dev_id); 5447 rbd_dev->task_wq = alloc_ordered_workqueue("%s-tasks", WQ_MEM_RECLAIM, 5448 rbd_dev->name); 5449 if (!rbd_dev->task_wq) 5450 goto fail_dev_id; 5451 5452 /* we have a ref from do_rbd_add() */ 5453 __module_get(THIS_MODULE); 5454 5455 dout("%s rbd_dev %p dev_id %d\n", __func__, rbd_dev, rbd_dev->dev_id); 5456 return rbd_dev; 5457 5458 fail_dev_id: 5459 ida_simple_remove(&rbd_dev_id_ida, rbd_dev->dev_id); 5460 fail_rbd_dev: 5461 rbd_dev_free(rbd_dev); 5462 return NULL; 5463 } 5464 5465 static void rbd_dev_destroy(struct rbd_device *rbd_dev) 5466 { 5467 if (rbd_dev) 5468 put_device(&rbd_dev->dev); 5469 } 5470 5471 /* 5472 * Get the size and object order for an image snapshot, or if 5473 * snap_id is CEPH_NOSNAP, gets this information for the base 5474 * image. 5475 */ 5476 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id, 5477 u8 *order, u64 *snap_size) 5478 { 5479 __le64 snapid = cpu_to_le64(snap_id); 5480 int ret; 5481 struct { 5482 u8 order; 5483 __le64 size; 5484 } __attribute__ ((packed)) size_buf = { 0 }; 5485 5486 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid, 5487 &rbd_dev->header_oloc, "get_size", 5488 &snapid, sizeof(snapid), 5489 &size_buf, sizeof(size_buf)); 5490 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret); 5491 if (ret < 0) 5492 return ret; 5493 if (ret < sizeof (size_buf)) 5494 return -ERANGE; 5495 5496 if (order) { 5497 *order = size_buf.order; 5498 dout(" order %u", (unsigned int)*order); 5499 } 5500 *snap_size = le64_to_cpu(size_buf.size); 5501 5502 dout(" snap_id 0x%016llx snap_size = %llu\n", 5503 (unsigned long long)snap_id, 5504 (unsigned long long)*snap_size); 5505 5506 return 0; 5507 } 5508 5509 static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev) 5510 { 5511 return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP, 5512 &rbd_dev->header.obj_order, 5513 &rbd_dev->header.image_size); 5514 } 5515 5516 static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev) 5517 { 5518 size_t size; 5519 void *reply_buf; 5520 int ret; 5521 void *p; 5522 5523 /* Response will be an encoded string, which includes a length */ 5524 size = sizeof(__le32) + RBD_OBJ_PREFIX_LEN_MAX; 5525 reply_buf = kzalloc(size, GFP_KERNEL); 5526 if (!reply_buf) 5527 return -ENOMEM; 5528 5529 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid, 5530 &rbd_dev->header_oloc, "get_object_prefix", 5531 NULL, 0, reply_buf, size); 5532 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret); 5533 if (ret < 0) 5534 goto out; 5535 5536 p = reply_buf; 5537 rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p, 5538 p + ret, NULL, GFP_NOIO); 5539 ret = 0; 5540 5541 if (IS_ERR(rbd_dev->header.object_prefix)) { 5542 ret = PTR_ERR(rbd_dev->header.object_prefix); 5543 rbd_dev->header.object_prefix = NULL; 5544 } else { 5545 dout(" object_prefix = %s\n", rbd_dev->header.object_prefix); 5546 } 5547 out: 5548 kfree(reply_buf); 5549 5550 return ret; 5551 } 5552 5553 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id, 5554 bool read_only, u64 *snap_features) 5555 { 5556 struct { 5557 __le64 snap_id; 5558 u8 read_only; 5559 } features_in; 5560 struct { 5561 __le64 features; 5562 __le64 incompat; 5563 } __attribute__ ((packed)) features_buf = { 0 }; 5564 u64 unsup; 5565 int ret; 5566 5567 features_in.snap_id = cpu_to_le64(snap_id); 5568 features_in.read_only = read_only; 5569 5570 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid, 5571 &rbd_dev->header_oloc, "get_features", 5572 &features_in, sizeof(features_in), 5573 &features_buf, sizeof(features_buf)); 5574 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret); 5575 if (ret < 0) 5576 return ret; 5577 if (ret < sizeof (features_buf)) 5578 return -ERANGE; 5579 5580 unsup = le64_to_cpu(features_buf.incompat) & ~RBD_FEATURES_SUPPORTED; 5581 if (unsup) { 5582 rbd_warn(rbd_dev, "image uses unsupported features: 0x%llx", 5583 unsup); 5584 return -ENXIO; 5585 } 5586 5587 *snap_features = le64_to_cpu(features_buf.features); 5588 5589 dout(" snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n", 5590 (unsigned long long)snap_id, 5591 (unsigned long long)*snap_features, 5592 (unsigned long long)le64_to_cpu(features_buf.incompat)); 5593 5594 return 0; 5595 } 5596 5597 static int rbd_dev_v2_features(struct rbd_device *rbd_dev) 5598 { 5599 return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP, 5600 rbd_is_ro(rbd_dev), 5601 &rbd_dev->header.features); 5602 } 5603 5604 /* 5605 * These are generic image flags, but since they are used only for 5606 * object map, store them in rbd_dev->object_map_flags. 5607 * 5608 * For the same reason, this function is called only on object map 5609 * (re)load and not on header refresh. 5610 */ 5611 static int rbd_dev_v2_get_flags(struct rbd_device *rbd_dev) 5612 { 5613 __le64 snapid = cpu_to_le64(rbd_dev->spec->snap_id); 5614 __le64 flags; 5615 int ret; 5616 5617 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid, 5618 &rbd_dev->header_oloc, "get_flags", 5619 &snapid, sizeof(snapid), 5620 &flags, sizeof(flags)); 5621 if (ret < 0) 5622 return ret; 5623 if (ret < sizeof(flags)) 5624 return -EBADMSG; 5625 5626 rbd_dev->object_map_flags = le64_to_cpu(flags); 5627 return 0; 5628 } 5629 5630 struct parent_image_info { 5631 u64 pool_id; 5632 const char *pool_ns; 5633 const char *image_id; 5634 u64 snap_id; 5635 5636 bool has_overlap; 5637 u64 overlap; 5638 }; 5639 5640 /* 5641 * The caller is responsible for @pii. 5642 */ 5643 static int decode_parent_image_spec(void **p, void *end, 5644 struct parent_image_info *pii) 5645 { 5646 u8 struct_v; 5647 u32 struct_len; 5648 int ret; 5649 5650 ret = ceph_start_decoding(p, end, 1, "ParentImageSpec", 5651 &struct_v, &struct_len); 5652 if (ret) 5653 return ret; 5654 5655 ceph_decode_64_safe(p, end, pii->pool_id, e_inval); 5656 pii->pool_ns = ceph_extract_encoded_string(p, end, NULL, GFP_KERNEL); 5657 if (IS_ERR(pii->pool_ns)) { 5658 ret = PTR_ERR(pii->pool_ns); 5659 pii->pool_ns = NULL; 5660 return ret; 5661 } 5662 pii->image_id = ceph_extract_encoded_string(p, end, NULL, GFP_KERNEL); 5663 if (IS_ERR(pii->image_id)) { 5664 ret = PTR_ERR(pii->image_id); 5665 pii->image_id = NULL; 5666 return ret; 5667 } 5668 ceph_decode_64_safe(p, end, pii->snap_id, e_inval); 5669 return 0; 5670 5671 e_inval: 5672 return -EINVAL; 5673 } 5674 5675 static int __get_parent_info(struct rbd_device *rbd_dev, 5676 struct page *req_page, 5677 struct page *reply_page, 5678 struct parent_image_info *pii) 5679 { 5680 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; 5681 size_t reply_len = PAGE_SIZE; 5682 void *p, *end; 5683 int ret; 5684 5685 ret = ceph_osdc_call(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc, 5686 "rbd", "parent_get", CEPH_OSD_FLAG_READ, 5687 req_page, sizeof(u64), &reply_page, &reply_len); 5688 if (ret) 5689 return ret == -EOPNOTSUPP ? 1 : ret; 5690 5691 p = page_address(reply_page); 5692 end = p + reply_len; 5693 ret = decode_parent_image_spec(&p, end, pii); 5694 if (ret) 5695 return ret; 5696 5697 ret = ceph_osdc_call(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc, 5698 "rbd", "parent_overlap_get", CEPH_OSD_FLAG_READ, 5699 req_page, sizeof(u64), &reply_page, &reply_len); 5700 if (ret) 5701 return ret; 5702 5703 p = page_address(reply_page); 5704 end = p + reply_len; 5705 ceph_decode_8_safe(&p, end, pii->has_overlap, e_inval); 5706 if (pii->has_overlap) 5707 ceph_decode_64_safe(&p, end, pii->overlap, e_inval); 5708 5709 return 0; 5710 5711 e_inval: 5712 return -EINVAL; 5713 } 5714 5715 /* 5716 * The caller is responsible for @pii. 5717 */ 5718 static int __get_parent_info_legacy(struct rbd_device *rbd_dev, 5719 struct page *req_page, 5720 struct page *reply_page, 5721 struct parent_image_info *pii) 5722 { 5723 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; 5724 size_t reply_len = PAGE_SIZE; 5725 void *p, *end; 5726 int ret; 5727 5728 ret = ceph_osdc_call(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc, 5729 "rbd", "get_parent", CEPH_OSD_FLAG_READ, 5730 req_page, sizeof(u64), &reply_page, &reply_len); 5731 if (ret) 5732 return ret; 5733 5734 p = page_address(reply_page); 5735 end = p + reply_len; 5736 ceph_decode_64_safe(&p, end, pii->pool_id, e_inval); 5737 pii->image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL); 5738 if (IS_ERR(pii->image_id)) { 5739 ret = PTR_ERR(pii->image_id); 5740 pii->image_id = NULL; 5741 return ret; 5742 } 5743 ceph_decode_64_safe(&p, end, pii->snap_id, e_inval); 5744 pii->has_overlap = true; 5745 ceph_decode_64_safe(&p, end, pii->overlap, e_inval); 5746 5747 return 0; 5748 5749 e_inval: 5750 return -EINVAL; 5751 } 5752 5753 static int get_parent_info(struct rbd_device *rbd_dev, 5754 struct parent_image_info *pii) 5755 { 5756 struct page *req_page, *reply_page; 5757 void *p; 5758 int ret; 5759 5760 req_page = alloc_page(GFP_KERNEL); 5761 if (!req_page) 5762 return -ENOMEM; 5763 5764 reply_page = alloc_page(GFP_KERNEL); 5765 if (!reply_page) { 5766 __free_page(req_page); 5767 return -ENOMEM; 5768 } 5769 5770 p = page_address(req_page); 5771 ceph_encode_64(&p, rbd_dev->spec->snap_id); 5772 ret = __get_parent_info(rbd_dev, req_page, reply_page, pii); 5773 if (ret > 0) 5774 ret = __get_parent_info_legacy(rbd_dev, req_page, reply_page, 5775 pii); 5776 5777 __free_page(req_page); 5778 __free_page(reply_page); 5779 return ret; 5780 } 5781 5782 static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev) 5783 { 5784 struct rbd_spec *parent_spec; 5785 struct parent_image_info pii = { 0 }; 5786 int ret; 5787 5788 parent_spec = rbd_spec_alloc(); 5789 if (!parent_spec) 5790 return -ENOMEM; 5791 5792 ret = get_parent_info(rbd_dev, &pii); 5793 if (ret) 5794 goto out_err; 5795 5796 dout("%s pool_id %llu pool_ns %s image_id %s snap_id %llu has_overlap %d overlap %llu\n", 5797 __func__, pii.pool_id, pii.pool_ns, pii.image_id, pii.snap_id, 5798 pii.has_overlap, pii.overlap); 5799 5800 if (pii.pool_id == CEPH_NOPOOL || !pii.has_overlap) { 5801 /* 5802 * Either the parent never existed, or we have 5803 * record of it but the image got flattened so it no 5804 * longer has a parent. When the parent of a 5805 * layered image disappears we immediately set the 5806 * overlap to 0. The effect of this is that all new 5807 * requests will be treated as if the image had no 5808 * parent. 5809 * 5810 * If !pii.has_overlap, the parent image spec is not 5811 * applicable. It's there to avoid duplication in each 5812 * snapshot record. 5813 */ 5814 if (rbd_dev->parent_overlap) { 5815 rbd_dev->parent_overlap = 0; 5816 rbd_dev_parent_put(rbd_dev); 5817 pr_info("%s: clone image has been flattened\n", 5818 rbd_dev->disk->disk_name); 5819 } 5820 5821 goto out; /* No parent? No problem. */ 5822 } 5823 5824 /* The ceph file layout needs to fit pool id in 32 bits */ 5825 5826 ret = -EIO; 5827 if (pii.pool_id > (u64)U32_MAX) { 5828 rbd_warn(NULL, "parent pool id too large (%llu > %u)", 5829 (unsigned long long)pii.pool_id, U32_MAX); 5830 goto out_err; 5831 } 5832 5833 /* 5834 * The parent won't change (except when the clone is 5835 * flattened, already handled that). So we only need to 5836 * record the parent spec we have not already done so. 5837 */ 5838 if (!rbd_dev->parent_spec) { 5839 parent_spec->pool_id = pii.pool_id; 5840 if (pii.pool_ns && *pii.pool_ns) { 5841 parent_spec->pool_ns = pii.pool_ns; 5842 pii.pool_ns = NULL; 5843 } 5844 parent_spec->image_id = pii.image_id; 5845 pii.image_id = NULL; 5846 parent_spec->snap_id = pii.snap_id; 5847 5848 rbd_dev->parent_spec = parent_spec; 5849 parent_spec = NULL; /* rbd_dev now owns this */ 5850 } 5851 5852 /* 5853 * We always update the parent overlap. If it's zero we issue 5854 * a warning, as we will proceed as if there was no parent. 5855 */ 5856 if (!pii.overlap) { 5857 if (parent_spec) { 5858 /* refresh, careful to warn just once */ 5859 if (rbd_dev->parent_overlap) 5860 rbd_warn(rbd_dev, 5861 "clone now standalone (overlap became 0)"); 5862 } else { 5863 /* initial probe */ 5864 rbd_warn(rbd_dev, "clone is standalone (overlap 0)"); 5865 } 5866 } 5867 rbd_dev->parent_overlap = pii.overlap; 5868 5869 out: 5870 ret = 0; 5871 out_err: 5872 kfree(pii.pool_ns); 5873 kfree(pii.image_id); 5874 rbd_spec_put(parent_spec); 5875 return ret; 5876 } 5877 5878 static int rbd_dev_v2_striping_info(struct rbd_device *rbd_dev) 5879 { 5880 struct { 5881 __le64 stripe_unit; 5882 __le64 stripe_count; 5883 } __attribute__ ((packed)) striping_info_buf = { 0 }; 5884 size_t size = sizeof (striping_info_buf); 5885 void *p; 5886 int ret; 5887 5888 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid, 5889 &rbd_dev->header_oloc, "get_stripe_unit_count", 5890 NULL, 0, &striping_info_buf, size); 5891 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret); 5892 if (ret < 0) 5893 return ret; 5894 if (ret < size) 5895 return -ERANGE; 5896 5897 p = &striping_info_buf; 5898 rbd_dev->header.stripe_unit = ceph_decode_64(&p); 5899 rbd_dev->header.stripe_count = ceph_decode_64(&p); 5900 return 0; 5901 } 5902 5903 static int rbd_dev_v2_data_pool(struct rbd_device *rbd_dev) 5904 { 5905 __le64 data_pool_id; 5906 int ret; 5907 5908 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid, 5909 &rbd_dev->header_oloc, "get_data_pool", 5910 NULL, 0, &data_pool_id, sizeof(data_pool_id)); 5911 if (ret < 0) 5912 return ret; 5913 if (ret < sizeof(data_pool_id)) 5914 return -EBADMSG; 5915 5916 rbd_dev->header.data_pool_id = le64_to_cpu(data_pool_id); 5917 WARN_ON(rbd_dev->header.data_pool_id == CEPH_NOPOOL); 5918 return 0; 5919 } 5920 5921 static char *rbd_dev_image_name(struct rbd_device *rbd_dev) 5922 { 5923 CEPH_DEFINE_OID_ONSTACK(oid); 5924 size_t image_id_size; 5925 char *image_id; 5926 void *p; 5927 void *end; 5928 size_t size; 5929 void *reply_buf = NULL; 5930 size_t len = 0; 5931 char *image_name = NULL; 5932 int ret; 5933 5934 rbd_assert(!rbd_dev->spec->image_name); 5935 5936 len = strlen(rbd_dev->spec->image_id); 5937 image_id_size = sizeof (__le32) + len; 5938 image_id = kmalloc(image_id_size, GFP_KERNEL); 5939 if (!image_id) 5940 return NULL; 5941 5942 p = image_id; 5943 end = image_id + image_id_size; 5944 ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32)len); 5945 5946 size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX; 5947 reply_buf = kmalloc(size, GFP_KERNEL); 5948 if (!reply_buf) 5949 goto out; 5950 5951 ceph_oid_printf(&oid, "%s", RBD_DIRECTORY); 5952 ret = rbd_obj_method_sync(rbd_dev, &oid, &rbd_dev->header_oloc, 5953 "dir_get_name", image_id, image_id_size, 5954 reply_buf, size); 5955 if (ret < 0) 5956 goto out; 5957 p = reply_buf; 5958 end = reply_buf + ret; 5959 5960 image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL); 5961 if (IS_ERR(image_name)) 5962 image_name = NULL; 5963 else 5964 dout("%s: name is %s len is %zd\n", __func__, image_name, len); 5965 out: 5966 kfree(reply_buf); 5967 kfree(image_id); 5968 5969 return image_name; 5970 } 5971 5972 static u64 rbd_v1_snap_id_by_name(struct rbd_device *rbd_dev, const char *name) 5973 { 5974 struct ceph_snap_context *snapc = rbd_dev->header.snapc; 5975 const char *snap_name; 5976 u32 which = 0; 5977 5978 /* Skip over names until we find the one we are looking for */ 5979 5980 snap_name = rbd_dev->header.snap_names; 5981 while (which < snapc->num_snaps) { 5982 if (!strcmp(name, snap_name)) 5983 return snapc->snaps[which]; 5984 snap_name += strlen(snap_name) + 1; 5985 which++; 5986 } 5987 return CEPH_NOSNAP; 5988 } 5989 5990 static u64 rbd_v2_snap_id_by_name(struct rbd_device *rbd_dev, const char *name) 5991 { 5992 struct ceph_snap_context *snapc = rbd_dev->header.snapc; 5993 u32 which; 5994 bool found = false; 5995 u64 snap_id; 5996 5997 for (which = 0; !found && which < snapc->num_snaps; which++) { 5998 const char *snap_name; 5999 6000 snap_id = snapc->snaps[which]; 6001 snap_name = rbd_dev_v2_snap_name(rbd_dev, snap_id); 6002 if (IS_ERR(snap_name)) { 6003 /* ignore no-longer existing snapshots */ 6004 if (PTR_ERR(snap_name) == -ENOENT) 6005 continue; 6006 else 6007 break; 6008 } 6009 found = !strcmp(name, snap_name); 6010 kfree(snap_name); 6011 } 6012 return found ? snap_id : CEPH_NOSNAP; 6013 } 6014 6015 /* 6016 * Assumes name is never RBD_SNAP_HEAD_NAME; returns CEPH_NOSNAP if 6017 * no snapshot by that name is found, or if an error occurs. 6018 */ 6019 static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name) 6020 { 6021 if (rbd_dev->image_format == 1) 6022 return rbd_v1_snap_id_by_name(rbd_dev, name); 6023 6024 return rbd_v2_snap_id_by_name(rbd_dev, name); 6025 } 6026 6027 /* 6028 * An image being mapped will have everything but the snap id. 6029 */ 6030 static int rbd_spec_fill_snap_id(struct rbd_device *rbd_dev) 6031 { 6032 struct rbd_spec *spec = rbd_dev->spec; 6033 6034 rbd_assert(spec->pool_id != CEPH_NOPOOL && spec->pool_name); 6035 rbd_assert(spec->image_id && spec->image_name); 6036 rbd_assert(spec->snap_name); 6037 6038 if (strcmp(spec->snap_name, RBD_SNAP_HEAD_NAME)) { 6039 u64 snap_id; 6040 6041 snap_id = rbd_snap_id_by_name(rbd_dev, spec->snap_name); 6042 if (snap_id == CEPH_NOSNAP) 6043 return -ENOENT; 6044 6045 spec->snap_id = snap_id; 6046 } else { 6047 spec->snap_id = CEPH_NOSNAP; 6048 } 6049 6050 return 0; 6051 } 6052 6053 /* 6054 * A parent image will have all ids but none of the names. 6055 * 6056 * All names in an rbd spec are dynamically allocated. It's OK if we 6057 * can't figure out the name for an image id. 6058 */ 6059 static int rbd_spec_fill_names(struct rbd_device *rbd_dev) 6060 { 6061 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; 6062 struct rbd_spec *spec = rbd_dev->spec; 6063 const char *pool_name; 6064 const char *image_name; 6065 const char *snap_name; 6066 int ret; 6067 6068 rbd_assert(spec->pool_id != CEPH_NOPOOL); 6069 rbd_assert(spec->image_id); 6070 rbd_assert(spec->snap_id != CEPH_NOSNAP); 6071 6072 /* Get the pool name; we have to make our own copy of this */ 6073 6074 pool_name = ceph_pg_pool_name_by_id(osdc->osdmap, spec->pool_id); 6075 if (!pool_name) { 6076 rbd_warn(rbd_dev, "no pool with id %llu", spec->pool_id); 6077 return -EIO; 6078 } 6079 pool_name = kstrdup(pool_name, GFP_KERNEL); 6080 if (!pool_name) 6081 return -ENOMEM; 6082 6083 /* Fetch the image name; tolerate failure here */ 6084 6085 image_name = rbd_dev_image_name(rbd_dev); 6086 if (!image_name) 6087 rbd_warn(rbd_dev, "unable to get image name"); 6088 6089 /* Fetch the snapshot name */ 6090 6091 snap_name = rbd_snap_name(rbd_dev, spec->snap_id); 6092 if (IS_ERR(snap_name)) { 6093 ret = PTR_ERR(snap_name); 6094 goto out_err; 6095 } 6096 6097 spec->pool_name = pool_name; 6098 spec->image_name = image_name; 6099 spec->snap_name = snap_name; 6100 6101 return 0; 6102 6103 out_err: 6104 kfree(image_name); 6105 kfree(pool_name); 6106 return ret; 6107 } 6108 6109 static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev) 6110 { 6111 size_t size; 6112 int ret; 6113 void *reply_buf; 6114 void *p; 6115 void *end; 6116 u64 seq; 6117 u32 snap_count; 6118 struct ceph_snap_context *snapc; 6119 u32 i; 6120 6121 /* 6122 * We'll need room for the seq value (maximum snapshot id), 6123 * snapshot count, and array of that many snapshot ids. 6124 * For now we have a fixed upper limit on the number we're 6125 * prepared to receive. 6126 */ 6127 size = sizeof (__le64) + sizeof (__le32) + 6128 RBD_MAX_SNAP_COUNT * sizeof (__le64); 6129 reply_buf = kzalloc(size, GFP_KERNEL); 6130 if (!reply_buf) 6131 return -ENOMEM; 6132 6133 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid, 6134 &rbd_dev->header_oloc, "get_snapcontext", 6135 NULL, 0, reply_buf, size); 6136 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret); 6137 if (ret < 0) 6138 goto out; 6139 6140 p = reply_buf; 6141 end = reply_buf + ret; 6142 ret = -ERANGE; 6143 ceph_decode_64_safe(&p, end, seq, out); 6144 ceph_decode_32_safe(&p, end, snap_count, out); 6145 6146 /* 6147 * Make sure the reported number of snapshot ids wouldn't go 6148 * beyond the end of our buffer. But before checking that, 6149 * make sure the computed size of the snapshot context we 6150 * allocate is representable in a size_t. 6151 */ 6152 if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context)) 6153 / sizeof (u64)) { 6154 ret = -EINVAL; 6155 goto out; 6156 } 6157 if (!ceph_has_room(&p, end, snap_count * sizeof (__le64))) 6158 goto out; 6159 ret = 0; 6160 6161 snapc = ceph_create_snap_context(snap_count, GFP_KERNEL); 6162 if (!snapc) { 6163 ret = -ENOMEM; 6164 goto out; 6165 } 6166 snapc->seq = seq; 6167 for (i = 0; i < snap_count; i++) 6168 snapc->snaps[i] = ceph_decode_64(&p); 6169 6170 ceph_put_snap_context(rbd_dev->header.snapc); 6171 rbd_dev->header.snapc = snapc; 6172 6173 dout(" snap context seq = %llu, snap_count = %u\n", 6174 (unsigned long long)seq, (unsigned int)snap_count); 6175 out: 6176 kfree(reply_buf); 6177 6178 return ret; 6179 } 6180 6181 static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev, 6182 u64 snap_id) 6183 { 6184 size_t size; 6185 void *reply_buf; 6186 __le64 snapid; 6187 int ret; 6188 void *p; 6189 void *end; 6190 char *snap_name; 6191 6192 size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN; 6193 reply_buf = kmalloc(size, GFP_KERNEL); 6194 if (!reply_buf) 6195 return ERR_PTR(-ENOMEM); 6196 6197 snapid = cpu_to_le64(snap_id); 6198 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid, 6199 &rbd_dev->header_oloc, "get_snapshot_name", 6200 &snapid, sizeof(snapid), reply_buf, size); 6201 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret); 6202 if (ret < 0) { 6203 snap_name = ERR_PTR(ret); 6204 goto out; 6205 } 6206 6207 p = reply_buf; 6208 end = reply_buf + ret; 6209 snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL); 6210 if (IS_ERR(snap_name)) 6211 goto out; 6212 6213 dout(" snap_id 0x%016llx snap_name = %s\n", 6214 (unsigned long long)snap_id, snap_name); 6215 out: 6216 kfree(reply_buf); 6217 6218 return snap_name; 6219 } 6220 6221 static int rbd_dev_v2_header_info(struct rbd_device *rbd_dev) 6222 { 6223 bool first_time = rbd_dev->header.object_prefix == NULL; 6224 int ret; 6225 6226 ret = rbd_dev_v2_image_size(rbd_dev); 6227 if (ret) 6228 return ret; 6229 6230 if (first_time) { 6231 ret = rbd_dev_v2_header_onetime(rbd_dev); 6232 if (ret) 6233 return ret; 6234 } 6235 6236 ret = rbd_dev_v2_snap_context(rbd_dev); 6237 if (ret && first_time) { 6238 kfree(rbd_dev->header.object_prefix); 6239 rbd_dev->header.object_prefix = NULL; 6240 } 6241 6242 return ret; 6243 } 6244 6245 static int rbd_dev_header_info(struct rbd_device *rbd_dev) 6246 { 6247 rbd_assert(rbd_image_format_valid(rbd_dev->image_format)); 6248 6249 if (rbd_dev->image_format == 1) 6250 return rbd_dev_v1_header_info(rbd_dev); 6251 6252 return rbd_dev_v2_header_info(rbd_dev); 6253 } 6254 6255 /* 6256 * Skips over white space at *buf, and updates *buf to point to the 6257 * first found non-space character (if any). Returns the length of 6258 * the token (string of non-white space characters) found. Note 6259 * that *buf must be terminated with '\0'. 6260 */ 6261 static inline size_t next_token(const char **buf) 6262 { 6263 /* 6264 * These are the characters that produce nonzero for 6265 * isspace() in the "C" and "POSIX" locales. 6266 */ 6267 const char *spaces = " \f\n\r\t\v"; 6268 6269 *buf += strspn(*buf, spaces); /* Find start of token */ 6270 6271 return strcspn(*buf, spaces); /* Return token length */ 6272 } 6273 6274 /* 6275 * Finds the next token in *buf, dynamically allocates a buffer big 6276 * enough to hold a copy of it, and copies the token into the new 6277 * buffer. The copy is guaranteed to be terminated with '\0'. Note 6278 * that a duplicate buffer is created even for a zero-length token. 6279 * 6280 * Returns a pointer to the newly-allocated duplicate, or a null 6281 * pointer if memory for the duplicate was not available. If 6282 * the lenp argument is a non-null pointer, the length of the token 6283 * (not including the '\0') is returned in *lenp. 6284 * 6285 * If successful, the *buf pointer will be updated to point beyond 6286 * the end of the found token. 6287 * 6288 * Note: uses GFP_KERNEL for allocation. 6289 */ 6290 static inline char *dup_token(const char **buf, size_t *lenp) 6291 { 6292 char *dup; 6293 size_t len; 6294 6295 len = next_token(buf); 6296 dup = kmemdup(*buf, len + 1, GFP_KERNEL); 6297 if (!dup) 6298 return NULL; 6299 *(dup + len) = '\0'; 6300 *buf += len; 6301 6302 if (lenp) 6303 *lenp = len; 6304 6305 return dup; 6306 } 6307 6308 static int rbd_parse_param(struct fs_parameter *param, 6309 struct rbd_parse_opts_ctx *pctx) 6310 { 6311 struct rbd_options *opt = pctx->opts; 6312 struct fs_parse_result result; 6313 struct p_log log = {.prefix = "rbd"}; 6314 int token, ret; 6315 6316 ret = ceph_parse_param(param, pctx->copts, NULL); 6317 if (ret != -ENOPARAM) 6318 return ret; 6319 6320 token = __fs_parse(&log, rbd_parameters, param, &result); 6321 dout("%s fs_parse '%s' token %d\n", __func__, param->key, token); 6322 if (token < 0) { 6323 if (token == -ENOPARAM) 6324 return inval_plog(&log, "Unknown parameter '%s'", 6325 param->key); 6326 return token; 6327 } 6328 6329 switch (token) { 6330 case Opt_queue_depth: 6331 if (result.uint_32 < 1) 6332 goto out_of_range; 6333 opt->queue_depth = result.uint_32; 6334 break; 6335 case Opt_alloc_size: 6336 if (result.uint_32 < SECTOR_SIZE) 6337 goto out_of_range; 6338 if (!is_power_of_2(result.uint_32)) 6339 return inval_plog(&log, "alloc_size must be a power of 2"); 6340 opt->alloc_size = result.uint_32; 6341 break; 6342 case Opt_lock_timeout: 6343 /* 0 is "wait forever" (i.e. infinite timeout) */ 6344 if (result.uint_32 > INT_MAX / 1000) 6345 goto out_of_range; 6346 opt->lock_timeout = msecs_to_jiffies(result.uint_32 * 1000); 6347 break; 6348 case Opt_pool_ns: 6349 kfree(pctx->spec->pool_ns); 6350 pctx->spec->pool_ns = param->string; 6351 param->string = NULL; 6352 break; 6353 case Opt_compression_hint: 6354 switch (result.uint_32) { 6355 case Opt_compression_hint_none: 6356 opt->alloc_hint_flags &= 6357 ~(CEPH_OSD_ALLOC_HINT_FLAG_COMPRESSIBLE | 6358 CEPH_OSD_ALLOC_HINT_FLAG_INCOMPRESSIBLE); 6359 break; 6360 case Opt_compression_hint_compressible: 6361 opt->alloc_hint_flags |= 6362 CEPH_OSD_ALLOC_HINT_FLAG_COMPRESSIBLE; 6363 opt->alloc_hint_flags &= 6364 ~CEPH_OSD_ALLOC_HINT_FLAG_INCOMPRESSIBLE; 6365 break; 6366 case Opt_compression_hint_incompressible: 6367 opt->alloc_hint_flags |= 6368 CEPH_OSD_ALLOC_HINT_FLAG_INCOMPRESSIBLE; 6369 opt->alloc_hint_flags &= 6370 ~CEPH_OSD_ALLOC_HINT_FLAG_COMPRESSIBLE; 6371 break; 6372 default: 6373 BUG(); 6374 } 6375 break; 6376 case Opt_read_only: 6377 opt->read_only = true; 6378 break; 6379 case Opt_read_write: 6380 opt->read_only = false; 6381 break; 6382 case Opt_lock_on_read: 6383 opt->lock_on_read = true; 6384 break; 6385 case Opt_exclusive: 6386 opt->exclusive = true; 6387 break; 6388 case Opt_notrim: 6389 opt->trim = false; 6390 break; 6391 default: 6392 BUG(); 6393 } 6394 6395 return 0; 6396 6397 out_of_range: 6398 return inval_plog(&log, "%s out of range", param->key); 6399 } 6400 6401 /* 6402 * This duplicates most of generic_parse_monolithic(), untying it from 6403 * fs_context and skipping standard superblock and security options. 6404 */ 6405 static int rbd_parse_options(char *options, struct rbd_parse_opts_ctx *pctx) 6406 { 6407 char *key; 6408 int ret = 0; 6409 6410 dout("%s '%s'\n", __func__, options); 6411 while ((key = strsep(&options, ",")) != NULL) { 6412 if (*key) { 6413 struct fs_parameter param = { 6414 .key = key, 6415 .type = fs_value_is_flag, 6416 }; 6417 char *value = strchr(key, '='); 6418 size_t v_len = 0; 6419 6420 if (value) { 6421 if (value == key) 6422 continue; 6423 *value++ = 0; 6424 v_len = strlen(value); 6425 param.string = kmemdup_nul(value, v_len, 6426 GFP_KERNEL); 6427 if (!param.string) 6428 return -ENOMEM; 6429 param.type = fs_value_is_string; 6430 } 6431 param.size = v_len; 6432 6433 ret = rbd_parse_param(¶m, pctx); 6434 kfree(param.string); 6435 if (ret) 6436 break; 6437 } 6438 } 6439 6440 return ret; 6441 } 6442 6443 /* 6444 * Parse the options provided for an "rbd add" (i.e., rbd image 6445 * mapping) request. These arrive via a write to /sys/bus/rbd/add, 6446 * and the data written is passed here via a NUL-terminated buffer. 6447 * Returns 0 if successful or an error code otherwise. 6448 * 6449 * The information extracted from these options is recorded in 6450 * the other parameters which return dynamically-allocated 6451 * structures: 6452 * ceph_opts 6453 * The address of a pointer that will refer to a ceph options 6454 * structure. Caller must release the returned pointer using 6455 * ceph_destroy_options() when it is no longer needed. 6456 * rbd_opts 6457 * Address of an rbd options pointer. Fully initialized by 6458 * this function; caller must release with kfree(). 6459 * spec 6460 * Address of an rbd image specification pointer. Fully 6461 * initialized by this function based on parsed options. 6462 * Caller must release with rbd_spec_put(). 6463 * 6464 * The options passed take this form: 6465 * <mon_addrs> <options> <pool_name> <image_name> [<snap_id>] 6466 * where: 6467 * <mon_addrs> 6468 * A comma-separated list of one or more monitor addresses. 6469 * A monitor address is an ip address, optionally followed 6470 * by a port number (separated by a colon). 6471 * I.e.: ip1[:port1][,ip2[:port2]...] 6472 * <options> 6473 * A comma-separated list of ceph and/or rbd options. 6474 * <pool_name> 6475 * The name of the rados pool containing the rbd image. 6476 * <image_name> 6477 * The name of the image in that pool to map. 6478 * <snap_id> 6479 * An optional snapshot id. If provided, the mapping will 6480 * present data from the image at the time that snapshot was 6481 * created. The image head is used if no snapshot id is 6482 * provided. Snapshot mappings are always read-only. 6483 */ 6484 static int rbd_add_parse_args(const char *buf, 6485 struct ceph_options **ceph_opts, 6486 struct rbd_options **opts, 6487 struct rbd_spec **rbd_spec) 6488 { 6489 size_t len; 6490 char *options; 6491 const char *mon_addrs; 6492 char *snap_name; 6493 size_t mon_addrs_size; 6494 struct rbd_parse_opts_ctx pctx = { 0 }; 6495 int ret; 6496 6497 /* The first four tokens are required */ 6498 6499 len = next_token(&buf); 6500 if (!len) { 6501 rbd_warn(NULL, "no monitor address(es) provided"); 6502 return -EINVAL; 6503 } 6504 mon_addrs = buf; 6505 mon_addrs_size = len; 6506 buf += len; 6507 6508 ret = -EINVAL; 6509 options = dup_token(&buf, NULL); 6510 if (!options) 6511 return -ENOMEM; 6512 if (!*options) { 6513 rbd_warn(NULL, "no options provided"); 6514 goto out_err; 6515 } 6516 6517 pctx.spec = rbd_spec_alloc(); 6518 if (!pctx.spec) 6519 goto out_mem; 6520 6521 pctx.spec->pool_name = dup_token(&buf, NULL); 6522 if (!pctx.spec->pool_name) 6523 goto out_mem; 6524 if (!*pctx.spec->pool_name) { 6525 rbd_warn(NULL, "no pool name provided"); 6526 goto out_err; 6527 } 6528 6529 pctx.spec->image_name = dup_token(&buf, NULL); 6530 if (!pctx.spec->image_name) 6531 goto out_mem; 6532 if (!*pctx.spec->image_name) { 6533 rbd_warn(NULL, "no image name provided"); 6534 goto out_err; 6535 } 6536 6537 /* 6538 * Snapshot name is optional; default is to use "-" 6539 * (indicating the head/no snapshot). 6540 */ 6541 len = next_token(&buf); 6542 if (!len) { 6543 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */ 6544 len = sizeof (RBD_SNAP_HEAD_NAME) - 1; 6545 } else if (len > RBD_MAX_SNAP_NAME_LEN) { 6546 ret = -ENAMETOOLONG; 6547 goto out_err; 6548 } 6549 snap_name = kmemdup(buf, len + 1, GFP_KERNEL); 6550 if (!snap_name) 6551 goto out_mem; 6552 *(snap_name + len) = '\0'; 6553 pctx.spec->snap_name = snap_name; 6554 6555 pctx.copts = ceph_alloc_options(); 6556 if (!pctx.copts) 6557 goto out_mem; 6558 6559 /* Initialize all rbd options to the defaults */ 6560 6561 pctx.opts = kzalloc(sizeof(*pctx.opts), GFP_KERNEL); 6562 if (!pctx.opts) 6563 goto out_mem; 6564 6565 pctx.opts->read_only = RBD_READ_ONLY_DEFAULT; 6566 pctx.opts->queue_depth = RBD_QUEUE_DEPTH_DEFAULT; 6567 pctx.opts->alloc_size = RBD_ALLOC_SIZE_DEFAULT; 6568 pctx.opts->lock_timeout = RBD_LOCK_TIMEOUT_DEFAULT; 6569 pctx.opts->lock_on_read = RBD_LOCK_ON_READ_DEFAULT; 6570 pctx.opts->exclusive = RBD_EXCLUSIVE_DEFAULT; 6571 pctx.opts->trim = RBD_TRIM_DEFAULT; 6572 6573 ret = ceph_parse_mon_ips(mon_addrs, mon_addrs_size, pctx.copts, NULL); 6574 if (ret) 6575 goto out_err; 6576 6577 ret = rbd_parse_options(options, &pctx); 6578 if (ret) 6579 goto out_err; 6580 6581 *ceph_opts = pctx.copts; 6582 *opts = pctx.opts; 6583 *rbd_spec = pctx.spec; 6584 kfree(options); 6585 return 0; 6586 6587 out_mem: 6588 ret = -ENOMEM; 6589 out_err: 6590 kfree(pctx.opts); 6591 ceph_destroy_options(pctx.copts); 6592 rbd_spec_put(pctx.spec); 6593 kfree(options); 6594 return ret; 6595 } 6596 6597 static void rbd_dev_image_unlock(struct rbd_device *rbd_dev) 6598 { 6599 down_write(&rbd_dev->lock_rwsem); 6600 if (__rbd_is_lock_owner(rbd_dev)) 6601 __rbd_release_lock(rbd_dev); 6602 up_write(&rbd_dev->lock_rwsem); 6603 } 6604 6605 /* 6606 * If the wait is interrupted, an error is returned even if the lock 6607 * was successfully acquired. rbd_dev_image_unlock() will release it 6608 * if needed. 6609 */ 6610 static int rbd_add_acquire_lock(struct rbd_device *rbd_dev) 6611 { 6612 long ret; 6613 6614 if (!(rbd_dev->header.features & RBD_FEATURE_EXCLUSIVE_LOCK)) { 6615 if (!rbd_dev->opts->exclusive && !rbd_dev->opts->lock_on_read) 6616 return 0; 6617 6618 rbd_warn(rbd_dev, "exclusive-lock feature is not enabled"); 6619 return -EINVAL; 6620 } 6621 6622 if (rbd_is_ro(rbd_dev)) 6623 return 0; 6624 6625 rbd_assert(!rbd_is_lock_owner(rbd_dev)); 6626 queue_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 0); 6627 ret = wait_for_completion_killable_timeout(&rbd_dev->acquire_wait, 6628 ceph_timeout_jiffies(rbd_dev->opts->lock_timeout)); 6629 if (ret > 0) { 6630 ret = rbd_dev->acquire_err; 6631 } else { 6632 cancel_delayed_work_sync(&rbd_dev->lock_dwork); 6633 if (!ret) 6634 ret = -ETIMEDOUT; 6635 } 6636 6637 if (ret) { 6638 rbd_warn(rbd_dev, "failed to acquire exclusive lock: %ld", ret); 6639 return ret; 6640 } 6641 6642 /* 6643 * The lock may have been released by now, unless automatic lock 6644 * transitions are disabled. 6645 */ 6646 rbd_assert(!rbd_dev->opts->exclusive || rbd_is_lock_owner(rbd_dev)); 6647 return 0; 6648 } 6649 6650 /* 6651 * An rbd format 2 image has a unique identifier, distinct from the 6652 * name given to it by the user. Internally, that identifier is 6653 * what's used to specify the names of objects related to the image. 6654 * 6655 * A special "rbd id" object is used to map an rbd image name to its 6656 * id. If that object doesn't exist, then there is no v2 rbd image 6657 * with the supplied name. 6658 * 6659 * This function will record the given rbd_dev's image_id field if 6660 * it can be determined, and in that case will return 0. If any 6661 * errors occur a negative errno will be returned and the rbd_dev's 6662 * image_id field will be unchanged (and should be NULL). 6663 */ 6664 static int rbd_dev_image_id(struct rbd_device *rbd_dev) 6665 { 6666 int ret; 6667 size_t size; 6668 CEPH_DEFINE_OID_ONSTACK(oid); 6669 void *response; 6670 char *image_id; 6671 6672 /* 6673 * When probing a parent image, the image id is already 6674 * known (and the image name likely is not). There's no 6675 * need to fetch the image id again in this case. We 6676 * do still need to set the image format though. 6677 */ 6678 if (rbd_dev->spec->image_id) { 6679 rbd_dev->image_format = *rbd_dev->spec->image_id ? 2 : 1; 6680 6681 return 0; 6682 } 6683 6684 /* 6685 * First, see if the format 2 image id file exists, and if 6686 * so, get the image's persistent id from it. 6687 */ 6688 ret = ceph_oid_aprintf(&oid, GFP_KERNEL, "%s%s", RBD_ID_PREFIX, 6689 rbd_dev->spec->image_name); 6690 if (ret) 6691 return ret; 6692 6693 dout("rbd id object name is %s\n", oid.name); 6694 6695 /* Response will be an encoded string, which includes a length */ 6696 size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX; 6697 response = kzalloc(size, GFP_NOIO); 6698 if (!response) { 6699 ret = -ENOMEM; 6700 goto out; 6701 } 6702 6703 /* If it doesn't exist we'll assume it's a format 1 image */ 6704 6705 ret = rbd_obj_method_sync(rbd_dev, &oid, &rbd_dev->header_oloc, 6706 "get_id", NULL, 0, 6707 response, size); 6708 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret); 6709 if (ret == -ENOENT) { 6710 image_id = kstrdup("", GFP_KERNEL); 6711 ret = image_id ? 0 : -ENOMEM; 6712 if (!ret) 6713 rbd_dev->image_format = 1; 6714 } else if (ret >= 0) { 6715 void *p = response; 6716 6717 image_id = ceph_extract_encoded_string(&p, p + ret, 6718 NULL, GFP_NOIO); 6719 ret = PTR_ERR_OR_ZERO(image_id); 6720 if (!ret) 6721 rbd_dev->image_format = 2; 6722 } 6723 6724 if (!ret) { 6725 rbd_dev->spec->image_id = image_id; 6726 dout("image_id is %s\n", image_id); 6727 } 6728 out: 6729 kfree(response); 6730 ceph_oid_destroy(&oid); 6731 return ret; 6732 } 6733 6734 /* 6735 * Undo whatever state changes are made by v1 or v2 header info 6736 * call. 6737 */ 6738 static void rbd_dev_unprobe(struct rbd_device *rbd_dev) 6739 { 6740 struct rbd_image_header *header; 6741 6742 rbd_dev_parent_put(rbd_dev); 6743 rbd_object_map_free(rbd_dev); 6744 rbd_dev_mapping_clear(rbd_dev); 6745 6746 /* Free dynamic fields from the header, then zero it out */ 6747 6748 header = &rbd_dev->header; 6749 ceph_put_snap_context(header->snapc); 6750 kfree(header->snap_sizes); 6751 kfree(header->snap_names); 6752 kfree(header->object_prefix); 6753 memset(header, 0, sizeof (*header)); 6754 } 6755 6756 static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev) 6757 { 6758 int ret; 6759 6760 ret = rbd_dev_v2_object_prefix(rbd_dev); 6761 if (ret) 6762 goto out_err; 6763 6764 /* 6765 * Get the and check features for the image. Currently the 6766 * features are assumed to never change. 6767 */ 6768 ret = rbd_dev_v2_features(rbd_dev); 6769 if (ret) 6770 goto out_err; 6771 6772 /* If the image supports fancy striping, get its parameters */ 6773 6774 if (rbd_dev->header.features & RBD_FEATURE_STRIPINGV2) { 6775 ret = rbd_dev_v2_striping_info(rbd_dev); 6776 if (ret < 0) 6777 goto out_err; 6778 } 6779 6780 if (rbd_dev->header.features & RBD_FEATURE_DATA_POOL) { 6781 ret = rbd_dev_v2_data_pool(rbd_dev); 6782 if (ret) 6783 goto out_err; 6784 } 6785 6786 rbd_init_layout(rbd_dev); 6787 return 0; 6788 6789 out_err: 6790 rbd_dev->header.features = 0; 6791 kfree(rbd_dev->header.object_prefix); 6792 rbd_dev->header.object_prefix = NULL; 6793 return ret; 6794 } 6795 6796 /* 6797 * @depth is rbd_dev_image_probe() -> rbd_dev_probe_parent() -> 6798 * rbd_dev_image_probe() recursion depth, which means it's also the 6799 * length of the already discovered part of the parent chain. 6800 */ 6801 static int rbd_dev_probe_parent(struct rbd_device *rbd_dev, int depth) 6802 { 6803 struct rbd_device *parent = NULL; 6804 int ret; 6805 6806 if (!rbd_dev->parent_spec) 6807 return 0; 6808 6809 if (++depth > RBD_MAX_PARENT_CHAIN_LEN) { 6810 pr_info("parent chain is too long (%d)\n", depth); 6811 ret = -EINVAL; 6812 goto out_err; 6813 } 6814 6815 parent = __rbd_dev_create(rbd_dev->rbd_client, rbd_dev->parent_spec); 6816 if (!parent) { 6817 ret = -ENOMEM; 6818 goto out_err; 6819 } 6820 6821 /* 6822 * Images related by parent/child relationships always share 6823 * rbd_client and spec/parent_spec, so bump their refcounts. 6824 */ 6825 __rbd_get_client(rbd_dev->rbd_client); 6826 rbd_spec_get(rbd_dev->parent_spec); 6827 6828 __set_bit(RBD_DEV_FLAG_READONLY, &parent->flags); 6829 6830 ret = rbd_dev_image_probe(parent, depth); 6831 if (ret < 0) 6832 goto out_err; 6833 6834 rbd_dev->parent = parent; 6835 atomic_set(&rbd_dev->parent_ref, 1); 6836 return 0; 6837 6838 out_err: 6839 rbd_dev_unparent(rbd_dev); 6840 rbd_dev_destroy(parent); 6841 return ret; 6842 } 6843 6844 static void rbd_dev_device_release(struct rbd_device *rbd_dev) 6845 { 6846 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags); 6847 rbd_free_disk(rbd_dev); 6848 if (!single_major) 6849 unregister_blkdev(rbd_dev->major, rbd_dev->name); 6850 } 6851 6852 /* 6853 * rbd_dev->header_rwsem must be locked for write and will be unlocked 6854 * upon return. 6855 */ 6856 static int rbd_dev_device_setup(struct rbd_device *rbd_dev) 6857 { 6858 int ret; 6859 6860 /* Record our major and minor device numbers. */ 6861 6862 if (!single_major) { 6863 ret = register_blkdev(0, rbd_dev->name); 6864 if (ret < 0) 6865 goto err_out_unlock; 6866 6867 rbd_dev->major = ret; 6868 rbd_dev->minor = 0; 6869 } else { 6870 rbd_dev->major = rbd_major; 6871 rbd_dev->minor = rbd_dev_id_to_minor(rbd_dev->dev_id); 6872 } 6873 6874 /* Set up the blkdev mapping. */ 6875 6876 ret = rbd_init_disk(rbd_dev); 6877 if (ret) 6878 goto err_out_blkdev; 6879 6880 set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE); 6881 set_disk_ro(rbd_dev->disk, rbd_is_ro(rbd_dev)); 6882 6883 ret = dev_set_name(&rbd_dev->dev, "%d", rbd_dev->dev_id); 6884 if (ret) 6885 goto err_out_disk; 6886 6887 set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags); 6888 up_write(&rbd_dev->header_rwsem); 6889 return 0; 6890 6891 err_out_disk: 6892 rbd_free_disk(rbd_dev); 6893 err_out_blkdev: 6894 if (!single_major) 6895 unregister_blkdev(rbd_dev->major, rbd_dev->name); 6896 err_out_unlock: 6897 up_write(&rbd_dev->header_rwsem); 6898 return ret; 6899 } 6900 6901 static int rbd_dev_header_name(struct rbd_device *rbd_dev) 6902 { 6903 struct rbd_spec *spec = rbd_dev->spec; 6904 int ret; 6905 6906 /* Record the header object name for this rbd image. */ 6907 6908 rbd_assert(rbd_image_format_valid(rbd_dev->image_format)); 6909 if (rbd_dev->image_format == 1) 6910 ret = ceph_oid_aprintf(&rbd_dev->header_oid, GFP_KERNEL, "%s%s", 6911 spec->image_name, RBD_SUFFIX); 6912 else 6913 ret = ceph_oid_aprintf(&rbd_dev->header_oid, GFP_KERNEL, "%s%s", 6914 RBD_HEADER_PREFIX, spec->image_id); 6915 6916 return ret; 6917 } 6918 6919 static void rbd_print_dne(struct rbd_device *rbd_dev, bool is_snap) 6920 { 6921 if (!is_snap) { 6922 pr_info("image %s/%s%s%s does not exist\n", 6923 rbd_dev->spec->pool_name, 6924 rbd_dev->spec->pool_ns ?: "", 6925 rbd_dev->spec->pool_ns ? "/" : "", 6926 rbd_dev->spec->image_name); 6927 } else { 6928 pr_info("snap %s/%s%s%s@%s does not exist\n", 6929 rbd_dev->spec->pool_name, 6930 rbd_dev->spec->pool_ns ?: "", 6931 rbd_dev->spec->pool_ns ? "/" : "", 6932 rbd_dev->spec->image_name, 6933 rbd_dev->spec->snap_name); 6934 } 6935 } 6936 6937 static void rbd_dev_image_release(struct rbd_device *rbd_dev) 6938 { 6939 if (!rbd_is_ro(rbd_dev)) 6940 rbd_unregister_watch(rbd_dev); 6941 6942 rbd_dev_unprobe(rbd_dev); 6943 rbd_dev->image_format = 0; 6944 kfree(rbd_dev->spec->image_id); 6945 rbd_dev->spec->image_id = NULL; 6946 } 6947 6948 /* 6949 * Probe for the existence of the header object for the given rbd 6950 * device. If this image is the one being mapped (i.e., not a 6951 * parent), initiate a watch on its header object before using that 6952 * object to get detailed information about the rbd image. 6953 * 6954 * On success, returns with header_rwsem held for write if called 6955 * with @depth == 0. 6956 */ 6957 static int rbd_dev_image_probe(struct rbd_device *rbd_dev, int depth) 6958 { 6959 bool need_watch = !rbd_is_ro(rbd_dev); 6960 int ret; 6961 6962 /* 6963 * Get the id from the image id object. Unless there's an 6964 * error, rbd_dev->spec->image_id will be filled in with 6965 * a dynamically-allocated string, and rbd_dev->image_format 6966 * will be set to either 1 or 2. 6967 */ 6968 ret = rbd_dev_image_id(rbd_dev); 6969 if (ret) 6970 return ret; 6971 6972 ret = rbd_dev_header_name(rbd_dev); 6973 if (ret) 6974 goto err_out_format; 6975 6976 if (need_watch) { 6977 ret = rbd_register_watch(rbd_dev); 6978 if (ret) { 6979 if (ret == -ENOENT) 6980 rbd_print_dne(rbd_dev, false); 6981 goto err_out_format; 6982 } 6983 } 6984 6985 if (!depth) 6986 down_write(&rbd_dev->header_rwsem); 6987 6988 ret = rbd_dev_header_info(rbd_dev); 6989 if (ret) { 6990 if (ret == -ENOENT && !need_watch) 6991 rbd_print_dne(rbd_dev, false); 6992 goto err_out_probe; 6993 } 6994 6995 /* 6996 * If this image is the one being mapped, we have pool name and 6997 * id, image name and id, and snap name - need to fill snap id. 6998 * Otherwise this is a parent image, identified by pool, image 6999 * and snap ids - need to fill in names for those ids. 7000 */ 7001 if (!depth) 7002 ret = rbd_spec_fill_snap_id(rbd_dev); 7003 else 7004 ret = rbd_spec_fill_names(rbd_dev); 7005 if (ret) { 7006 if (ret == -ENOENT) 7007 rbd_print_dne(rbd_dev, true); 7008 goto err_out_probe; 7009 } 7010 7011 ret = rbd_dev_mapping_set(rbd_dev); 7012 if (ret) 7013 goto err_out_probe; 7014 7015 if (rbd_is_snap(rbd_dev) && 7016 (rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP)) { 7017 ret = rbd_object_map_load(rbd_dev); 7018 if (ret) 7019 goto err_out_probe; 7020 } 7021 7022 if (rbd_dev->header.features & RBD_FEATURE_LAYERING) { 7023 ret = rbd_dev_v2_parent_info(rbd_dev); 7024 if (ret) 7025 goto err_out_probe; 7026 } 7027 7028 ret = rbd_dev_probe_parent(rbd_dev, depth); 7029 if (ret) 7030 goto err_out_probe; 7031 7032 dout("discovered format %u image, header name is %s\n", 7033 rbd_dev->image_format, rbd_dev->header_oid.name); 7034 return 0; 7035 7036 err_out_probe: 7037 if (!depth) 7038 up_write(&rbd_dev->header_rwsem); 7039 if (need_watch) 7040 rbd_unregister_watch(rbd_dev); 7041 rbd_dev_unprobe(rbd_dev); 7042 err_out_format: 7043 rbd_dev->image_format = 0; 7044 kfree(rbd_dev->spec->image_id); 7045 rbd_dev->spec->image_id = NULL; 7046 return ret; 7047 } 7048 7049 static ssize_t do_rbd_add(struct bus_type *bus, 7050 const char *buf, 7051 size_t count) 7052 { 7053 struct rbd_device *rbd_dev = NULL; 7054 struct ceph_options *ceph_opts = NULL; 7055 struct rbd_options *rbd_opts = NULL; 7056 struct rbd_spec *spec = NULL; 7057 struct rbd_client *rbdc; 7058 int rc; 7059 7060 if (!try_module_get(THIS_MODULE)) 7061 return -ENODEV; 7062 7063 /* parse add command */ 7064 rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec); 7065 if (rc < 0) 7066 goto out; 7067 7068 rbdc = rbd_get_client(ceph_opts); 7069 if (IS_ERR(rbdc)) { 7070 rc = PTR_ERR(rbdc); 7071 goto err_out_args; 7072 } 7073 7074 /* pick the pool */ 7075 rc = ceph_pg_poolid_by_name(rbdc->client->osdc.osdmap, spec->pool_name); 7076 if (rc < 0) { 7077 if (rc == -ENOENT) 7078 pr_info("pool %s does not exist\n", spec->pool_name); 7079 goto err_out_client; 7080 } 7081 spec->pool_id = (u64)rc; 7082 7083 rbd_dev = rbd_dev_create(rbdc, spec, rbd_opts); 7084 if (!rbd_dev) { 7085 rc = -ENOMEM; 7086 goto err_out_client; 7087 } 7088 rbdc = NULL; /* rbd_dev now owns this */ 7089 spec = NULL; /* rbd_dev now owns this */ 7090 rbd_opts = NULL; /* rbd_dev now owns this */ 7091 7092 /* if we are mapping a snapshot it will be a read-only mapping */ 7093 if (rbd_dev->opts->read_only || 7094 strcmp(rbd_dev->spec->snap_name, RBD_SNAP_HEAD_NAME)) 7095 __set_bit(RBD_DEV_FLAG_READONLY, &rbd_dev->flags); 7096 7097 rbd_dev->config_info = kstrdup(buf, GFP_KERNEL); 7098 if (!rbd_dev->config_info) { 7099 rc = -ENOMEM; 7100 goto err_out_rbd_dev; 7101 } 7102 7103 rc = rbd_dev_image_probe(rbd_dev, 0); 7104 if (rc < 0) 7105 goto err_out_rbd_dev; 7106 7107 if (rbd_dev->opts->alloc_size > rbd_dev->layout.object_size) { 7108 rbd_warn(rbd_dev, "alloc_size adjusted to %u", 7109 rbd_dev->layout.object_size); 7110 rbd_dev->opts->alloc_size = rbd_dev->layout.object_size; 7111 } 7112 7113 rc = rbd_dev_device_setup(rbd_dev); 7114 if (rc) 7115 goto err_out_image_probe; 7116 7117 rc = rbd_add_acquire_lock(rbd_dev); 7118 if (rc) 7119 goto err_out_image_lock; 7120 7121 /* Everything's ready. Announce the disk to the world. */ 7122 7123 rc = device_add(&rbd_dev->dev); 7124 if (rc) 7125 goto err_out_image_lock; 7126 7127 device_add_disk(&rbd_dev->dev, rbd_dev->disk, NULL); 7128 /* see rbd_init_disk() */ 7129 blk_put_queue(rbd_dev->disk->queue); 7130 7131 spin_lock(&rbd_dev_list_lock); 7132 list_add_tail(&rbd_dev->node, &rbd_dev_list); 7133 spin_unlock(&rbd_dev_list_lock); 7134 7135 pr_info("%s: capacity %llu features 0x%llx\n", rbd_dev->disk->disk_name, 7136 (unsigned long long)get_capacity(rbd_dev->disk) << SECTOR_SHIFT, 7137 rbd_dev->header.features); 7138 rc = count; 7139 out: 7140 module_put(THIS_MODULE); 7141 return rc; 7142 7143 err_out_image_lock: 7144 rbd_dev_image_unlock(rbd_dev); 7145 rbd_dev_device_release(rbd_dev); 7146 err_out_image_probe: 7147 rbd_dev_image_release(rbd_dev); 7148 err_out_rbd_dev: 7149 rbd_dev_destroy(rbd_dev); 7150 err_out_client: 7151 rbd_put_client(rbdc); 7152 err_out_args: 7153 rbd_spec_put(spec); 7154 kfree(rbd_opts); 7155 goto out; 7156 } 7157 7158 static ssize_t add_store(struct bus_type *bus, const char *buf, size_t count) 7159 { 7160 if (single_major) 7161 return -EINVAL; 7162 7163 return do_rbd_add(bus, buf, count); 7164 } 7165 7166 static ssize_t add_single_major_store(struct bus_type *bus, const char *buf, 7167 size_t count) 7168 { 7169 return do_rbd_add(bus, buf, count); 7170 } 7171 7172 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev) 7173 { 7174 while (rbd_dev->parent) { 7175 struct rbd_device *first = rbd_dev; 7176 struct rbd_device *second = first->parent; 7177 struct rbd_device *third; 7178 7179 /* 7180 * Follow to the parent with no grandparent and 7181 * remove it. 7182 */ 7183 while (second && (third = second->parent)) { 7184 first = second; 7185 second = third; 7186 } 7187 rbd_assert(second); 7188 rbd_dev_image_release(second); 7189 rbd_dev_destroy(second); 7190 first->parent = NULL; 7191 first->parent_overlap = 0; 7192 7193 rbd_assert(first->parent_spec); 7194 rbd_spec_put(first->parent_spec); 7195 first->parent_spec = NULL; 7196 } 7197 } 7198 7199 static ssize_t do_rbd_remove(struct bus_type *bus, 7200 const char *buf, 7201 size_t count) 7202 { 7203 struct rbd_device *rbd_dev = NULL; 7204 struct list_head *tmp; 7205 int dev_id; 7206 char opt_buf[6]; 7207 bool force = false; 7208 int ret; 7209 7210 dev_id = -1; 7211 opt_buf[0] = '\0'; 7212 sscanf(buf, "%d %5s", &dev_id, opt_buf); 7213 if (dev_id < 0) { 7214 pr_err("dev_id out of range\n"); 7215 return -EINVAL; 7216 } 7217 if (opt_buf[0] != '\0') { 7218 if (!strcmp(opt_buf, "force")) { 7219 force = true; 7220 } else { 7221 pr_err("bad remove option at '%s'\n", opt_buf); 7222 return -EINVAL; 7223 } 7224 } 7225 7226 ret = -ENOENT; 7227 spin_lock(&rbd_dev_list_lock); 7228 list_for_each(tmp, &rbd_dev_list) { 7229 rbd_dev = list_entry(tmp, struct rbd_device, node); 7230 if (rbd_dev->dev_id == dev_id) { 7231 ret = 0; 7232 break; 7233 } 7234 } 7235 if (!ret) { 7236 spin_lock_irq(&rbd_dev->lock); 7237 if (rbd_dev->open_count && !force) 7238 ret = -EBUSY; 7239 else if (test_and_set_bit(RBD_DEV_FLAG_REMOVING, 7240 &rbd_dev->flags)) 7241 ret = -EINPROGRESS; 7242 spin_unlock_irq(&rbd_dev->lock); 7243 } 7244 spin_unlock(&rbd_dev_list_lock); 7245 if (ret) 7246 return ret; 7247 7248 if (force) { 7249 /* 7250 * Prevent new IO from being queued and wait for existing 7251 * IO to complete/fail. 7252 */ 7253 blk_mq_freeze_queue(rbd_dev->disk->queue); 7254 blk_set_queue_dying(rbd_dev->disk->queue); 7255 } 7256 7257 del_gendisk(rbd_dev->disk); 7258 spin_lock(&rbd_dev_list_lock); 7259 list_del_init(&rbd_dev->node); 7260 spin_unlock(&rbd_dev_list_lock); 7261 device_del(&rbd_dev->dev); 7262 7263 rbd_dev_image_unlock(rbd_dev); 7264 rbd_dev_device_release(rbd_dev); 7265 rbd_dev_image_release(rbd_dev); 7266 rbd_dev_destroy(rbd_dev); 7267 return count; 7268 } 7269 7270 static ssize_t remove_store(struct bus_type *bus, const char *buf, size_t count) 7271 { 7272 if (single_major) 7273 return -EINVAL; 7274 7275 return do_rbd_remove(bus, buf, count); 7276 } 7277 7278 static ssize_t remove_single_major_store(struct bus_type *bus, const char *buf, 7279 size_t count) 7280 { 7281 return do_rbd_remove(bus, buf, count); 7282 } 7283 7284 /* 7285 * create control files in sysfs 7286 * /sys/bus/rbd/... 7287 */ 7288 static int __init rbd_sysfs_init(void) 7289 { 7290 int ret; 7291 7292 ret = device_register(&rbd_root_dev); 7293 if (ret < 0) 7294 return ret; 7295 7296 ret = bus_register(&rbd_bus_type); 7297 if (ret < 0) 7298 device_unregister(&rbd_root_dev); 7299 7300 return ret; 7301 } 7302 7303 static void __exit rbd_sysfs_cleanup(void) 7304 { 7305 bus_unregister(&rbd_bus_type); 7306 device_unregister(&rbd_root_dev); 7307 } 7308 7309 static int __init rbd_slab_init(void) 7310 { 7311 rbd_assert(!rbd_img_request_cache); 7312 rbd_img_request_cache = KMEM_CACHE(rbd_img_request, 0); 7313 if (!rbd_img_request_cache) 7314 return -ENOMEM; 7315 7316 rbd_assert(!rbd_obj_request_cache); 7317 rbd_obj_request_cache = KMEM_CACHE(rbd_obj_request, 0); 7318 if (!rbd_obj_request_cache) 7319 goto out_err; 7320 7321 return 0; 7322 7323 out_err: 7324 kmem_cache_destroy(rbd_img_request_cache); 7325 rbd_img_request_cache = NULL; 7326 return -ENOMEM; 7327 } 7328 7329 static void rbd_slab_exit(void) 7330 { 7331 rbd_assert(rbd_obj_request_cache); 7332 kmem_cache_destroy(rbd_obj_request_cache); 7333 rbd_obj_request_cache = NULL; 7334 7335 rbd_assert(rbd_img_request_cache); 7336 kmem_cache_destroy(rbd_img_request_cache); 7337 rbd_img_request_cache = NULL; 7338 } 7339 7340 static int __init rbd_init(void) 7341 { 7342 int rc; 7343 7344 if (!libceph_compatible(NULL)) { 7345 rbd_warn(NULL, "libceph incompatibility (quitting)"); 7346 return -EINVAL; 7347 } 7348 7349 rc = rbd_slab_init(); 7350 if (rc) 7351 return rc; 7352 7353 /* 7354 * The number of active work items is limited by the number of 7355 * rbd devices * queue depth, so leave @max_active at default. 7356 */ 7357 rbd_wq = alloc_workqueue(RBD_DRV_NAME, WQ_MEM_RECLAIM, 0); 7358 if (!rbd_wq) { 7359 rc = -ENOMEM; 7360 goto err_out_slab; 7361 } 7362 7363 if (single_major) { 7364 rbd_major = register_blkdev(0, RBD_DRV_NAME); 7365 if (rbd_major < 0) { 7366 rc = rbd_major; 7367 goto err_out_wq; 7368 } 7369 } 7370 7371 rc = rbd_sysfs_init(); 7372 if (rc) 7373 goto err_out_blkdev; 7374 7375 if (single_major) 7376 pr_info("loaded (major %d)\n", rbd_major); 7377 else 7378 pr_info("loaded\n"); 7379 7380 return 0; 7381 7382 err_out_blkdev: 7383 if (single_major) 7384 unregister_blkdev(rbd_major, RBD_DRV_NAME); 7385 err_out_wq: 7386 destroy_workqueue(rbd_wq); 7387 err_out_slab: 7388 rbd_slab_exit(); 7389 return rc; 7390 } 7391 7392 static void __exit rbd_exit(void) 7393 { 7394 ida_destroy(&rbd_dev_id_ida); 7395 rbd_sysfs_cleanup(); 7396 if (single_major) 7397 unregister_blkdev(rbd_major, RBD_DRV_NAME); 7398 destroy_workqueue(rbd_wq); 7399 rbd_slab_exit(); 7400 } 7401 7402 module_init(rbd_init); 7403 module_exit(rbd_exit); 7404 7405 MODULE_AUTHOR("Alex Elder <elder@inktank.com>"); 7406 MODULE_AUTHOR("Sage Weil <sage@newdream.net>"); 7407 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>"); 7408 /* following authorship retained from original osdblk.c */ 7409 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>"); 7410 7411 MODULE_DESCRIPTION("RADOS Block Device (RBD) driver"); 7412 MODULE_LICENSE("GPL"); 7413