1 2 /* 3 rbd.c -- Export ceph rados objects as a Linux block device 4 5 6 based on drivers/block/osdblk.c: 7 8 Copyright 2009 Red Hat, Inc. 9 10 This program is free software; you can redistribute it and/or modify 11 it under the terms of the GNU General Public License as published by 12 the Free Software Foundation. 13 14 This program is distributed in the hope that it will be useful, 15 but WITHOUT ANY WARRANTY; without even the implied warranty of 16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 17 GNU General Public License for more details. 18 19 You should have received a copy of the GNU General Public License 20 along with this program; see the file COPYING. If not, write to 21 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA. 22 23 24 25 For usage instructions, please refer to: 26 27 Documentation/ABI/testing/sysfs-bus-rbd 28 29 */ 30 31 #include <linux/ceph/libceph.h> 32 #include <linux/ceph/osd_client.h> 33 #include <linux/ceph/mon_client.h> 34 #include <linux/ceph/cls_lock_client.h> 35 #include <linux/ceph/decode.h> 36 #include <linux/parser.h> 37 #include <linux/bsearch.h> 38 39 #include <linux/kernel.h> 40 #include <linux/device.h> 41 #include <linux/module.h> 42 #include <linux/blk-mq.h> 43 #include <linux/fs.h> 44 #include <linux/blkdev.h> 45 #include <linux/slab.h> 46 #include <linux/idr.h> 47 #include <linux/workqueue.h> 48 49 #include "rbd_types.h" 50 51 #define RBD_DEBUG /* Activate rbd_assert() calls */ 52 53 /* 54 * The basic unit of block I/O is a sector. It is interpreted in a 55 * number of contexts in Linux (blk, bio, genhd), but the default is 56 * universally 512 bytes. These symbols are just slightly more 57 * meaningful than the bare numbers they represent. 58 */ 59 #define SECTOR_SHIFT 9 60 #define SECTOR_SIZE (1ULL << SECTOR_SHIFT) 61 62 /* 63 * Increment the given counter and return its updated value. 64 * If the counter is already 0 it will not be incremented. 65 * If the counter is already at its maximum value returns 66 * -EINVAL without updating it. 67 */ 68 static int atomic_inc_return_safe(atomic_t *v) 69 { 70 unsigned int counter; 71 72 counter = (unsigned int)__atomic_add_unless(v, 1, 0); 73 if (counter <= (unsigned int)INT_MAX) 74 return (int)counter; 75 76 atomic_dec(v); 77 78 return -EINVAL; 79 } 80 81 /* Decrement the counter. Return the resulting value, or -EINVAL */ 82 static int atomic_dec_return_safe(atomic_t *v) 83 { 84 int counter; 85 86 counter = atomic_dec_return(v); 87 if (counter >= 0) 88 return counter; 89 90 atomic_inc(v); 91 92 return -EINVAL; 93 } 94 95 #define RBD_DRV_NAME "rbd" 96 97 #define RBD_MINORS_PER_MAJOR 256 98 #define RBD_SINGLE_MAJOR_PART_SHIFT 4 99 100 #define RBD_MAX_PARENT_CHAIN_LEN 16 101 102 #define RBD_SNAP_DEV_NAME_PREFIX "snap_" 103 #define RBD_MAX_SNAP_NAME_LEN \ 104 (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1)) 105 106 #define RBD_MAX_SNAP_COUNT 510 /* allows max snapc to fit in 4KB */ 107 108 #define RBD_SNAP_HEAD_NAME "-" 109 110 #define BAD_SNAP_INDEX U32_MAX /* invalid index into snap array */ 111 112 /* This allows a single page to hold an image name sent by OSD */ 113 #define RBD_IMAGE_NAME_LEN_MAX (PAGE_SIZE - sizeof (__le32) - 1) 114 #define RBD_IMAGE_ID_LEN_MAX 64 115 116 #define RBD_OBJ_PREFIX_LEN_MAX 64 117 118 #define RBD_NOTIFY_TIMEOUT 5 /* seconds */ 119 #define RBD_RETRY_DELAY msecs_to_jiffies(1000) 120 121 /* Feature bits */ 122 123 #define RBD_FEATURE_LAYERING (1ULL<<0) 124 #define RBD_FEATURE_STRIPINGV2 (1ULL<<1) 125 #define RBD_FEATURE_EXCLUSIVE_LOCK (1ULL<<2) 126 #define RBD_FEATURE_DATA_POOL (1ULL<<7) 127 128 #define RBD_FEATURES_ALL (RBD_FEATURE_LAYERING | \ 129 RBD_FEATURE_STRIPINGV2 | \ 130 RBD_FEATURE_EXCLUSIVE_LOCK | \ 131 RBD_FEATURE_DATA_POOL) 132 133 /* Features supported by this (client software) implementation. */ 134 135 #define RBD_FEATURES_SUPPORTED (RBD_FEATURES_ALL) 136 137 /* 138 * An RBD device name will be "rbd#", where the "rbd" comes from 139 * RBD_DRV_NAME above, and # is a unique integer identifier. 140 */ 141 #define DEV_NAME_LEN 32 142 143 /* 144 * block device image metadata (in-memory version) 145 */ 146 struct rbd_image_header { 147 /* These six fields never change for a given rbd image */ 148 char *object_prefix; 149 __u8 obj_order; 150 u64 stripe_unit; 151 u64 stripe_count; 152 s64 data_pool_id; 153 u64 features; /* Might be changeable someday? */ 154 155 /* The remaining fields need to be updated occasionally */ 156 u64 image_size; 157 struct ceph_snap_context *snapc; 158 char *snap_names; /* format 1 only */ 159 u64 *snap_sizes; /* format 1 only */ 160 }; 161 162 /* 163 * An rbd image specification. 164 * 165 * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely 166 * identify an image. Each rbd_dev structure includes a pointer to 167 * an rbd_spec structure that encapsulates this identity. 168 * 169 * Each of the id's in an rbd_spec has an associated name. For a 170 * user-mapped image, the names are supplied and the id's associated 171 * with them are looked up. For a layered image, a parent image is 172 * defined by the tuple, and the names are looked up. 173 * 174 * An rbd_dev structure contains a parent_spec pointer which is 175 * non-null if the image it represents is a child in a layered 176 * image. This pointer will refer to the rbd_spec structure used 177 * by the parent rbd_dev for its own identity (i.e., the structure 178 * is shared between the parent and child). 179 * 180 * Since these structures are populated once, during the discovery 181 * phase of image construction, they are effectively immutable so 182 * we make no effort to synchronize access to them. 183 * 184 * Note that code herein does not assume the image name is known (it 185 * could be a null pointer). 186 */ 187 struct rbd_spec { 188 u64 pool_id; 189 const char *pool_name; 190 191 const char *image_id; 192 const char *image_name; 193 194 u64 snap_id; 195 const char *snap_name; 196 197 struct kref kref; 198 }; 199 200 /* 201 * an instance of the client. multiple devices may share an rbd client. 202 */ 203 struct rbd_client { 204 struct ceph_client *client; 205 struct kref kref; 206 struct list_head node; 207 }; 208 209 struct rbd_img_request; 210 typedef void (*rbd_img_callback_t)(struct rbd_img_request *); 211 212 #define BAD_WHICH U32_MAX /* Good which or bad which, which? */ 213 214 struct rbd_obj_request; 215 typedef void (*rbd_obj_callback_t)(struct rbd_obj_request *); 216 217 enum obj_request_type { 218 OBJ_REQUEST_NODATA, OBJ_REQUEST_BIO, OBJ_REQUEST_PAGES 219 }; 220 221 enum obj_operation_type { 222 OBJ_OP_WRITE, 223 OBJ_OP_READ, 224 OBJ_OP_DISCARD, 225 }; 226 227 enum obj_req_flags { 228 OBJ_REQ_DONE, /* completion flag: not done = 0, done = 1 */ 229 OBJ_REQ_IMG_DATA, /* object usage: standalone = 0, image = 1 */ 230 OBJ_REQ_KNOWN, /* EXISTS flag valid: no = 0, yes = 1 */ 231 OBJ_REQ_EXISTS, /* target exists: no = 0, yes = 1 */ 232 }; 233 234 struct rbd_obj_request { 235 u64 object_no; 236 u64 offset; /* object start byte */ 237 u64 length; /* bytes from offset */ 238 unsigned long flags; 239 240 /* 241 * An object request associated with an image will have its 242 * img_data flag set; a standalone object request will not. 243 * 244 * A standalone object request will have which == BAD_WHICH 245 * and a null obj_request pointer. 246 * 247 * An object request initiated in support of a layered image 248 * object (to check for its existence before a write) will 249 * have which == BAD_WHICH and a non-null obj_request pointer. 250 * 251 * Finally, an object request for rbd image data will have 252 * which != BAD_WHICH, and will have a non-null img_request 253 * pointer. The value of which will be in the range 254 * 0..(img_request->obj_request_count-1). 255 */ 256 union { 257 struct rbd_obj_request *obj_request; /* STAT op */ 258 struct { 259 struct rbd_img_request *img_request; 260 u64 img_offset; 261 /* links for img_request->obj_requests list */ 262 struct list_head links; 263 }; 264 }; 265 u32 which; /* posn image request list */ 266 267 enum obj_request_type type; 268 union { 269 struct bio *bio_list; 270 struct { 271 struct page **pages; 272 u32 page_count; 273 }; 274 }; 275 struct page **copyup_pages; 276 u32 copyup_page_count; 277 278 struct ceph_osd_request *osd_req; 279 280 u64 xferred; /* bytes transferred */ 281 int result; 282 283 rbd_obj_callback_t callback; 284 struct completion completion; 285 286 struct kref kref; 287 }; 288 289 enum img_req_flags { 290 IMG_REQ_WRITE, /* I/O direction: read = 0, write = 1 */ 291 IMG_REQ_CHILD, /* initiator: block = 0, child image = 1 */ 292 IMG_REQ_LAYERED, /* ENOENT handling: normal = 0, layered = 1 */ 293 IMG_REQ_DISCARD, /* discard: normal = 0, discard request = 1 */ 294 }; 295 296 struct rbd_img_request { 297 struct rbd_device *rbd_dev; 298 u64 offset; /* starting image byte offset */ 299 u64 length; /* byte count from offset */ 300 unsigned long flags; 301 union { 302 u64 snap_id; /* for reads */ 303 struct ceph_snap_context *snapc; /* for writes */ 304 }; 305 union { 306 struct request *rq; /* block request */ 307 struct rbd_obj_request *obj_request; /* obj req initiator */ 308 }; 309 struct page **copyup_pages; 310 u32 copyup_page_count; 311 spinlock_t completion_lock;/* protects next_completion */ 312 u32 next_completion; 313 rbd_img_callback_t callback; 314 u64 xferred;/* aggregate bytes transferred */ 315 int result; /* first nonzero obj_request result */ 316 317 u32 obj_request_count; 318 struct list_head obj_requests; /* rbd_obj_request structs */ 319 320 struct kref kref; 321 }; 322 323 #define for_each_obj_request(ireq, oreq) \ 324 list_for_each_entry(oreq, &(ireq)->obj_requests, links) 325 #define for_each_obj_request_from(ireq, oreq) \ 326 list_for_each_entry_from(oreq, &(ireq)->obj_requests, links) 327 #define for_each_obj_request_safe(ireq, oreq, n) \ 328 list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links) 329 330 enum rbd_watch_state { 331 RBD_WATCH_STATE_UNREGISTERED, 332 RBD_WATCH_STATE_REGISTERED, 333 RBD_WATCH_STATE_ERROR, 334 }; 335 336 enum rbd_lock_state { 337 RBD_LOCK_STATE_UNLOCKED, 338 RBD_LOCK_STATE_LOCKED, 339 RBD_LOCK_STATE_RELEASING, 340 }; 341 342 /* WatchNotify::ClientId */ 343 struct rbd_client_id { 344 u64 gid; 345 u64 handle; 346 }; 347 348 struct rbd_mapping { 349 u64 size; 350 u64 features; 351 bool read_only; 352 }; 353 354 /* 355 * a single device 356 */ 357 struct rbd_device { 358 int dev_id; /* blkdev unique id */ 359 360 int major; /* blkdev assigned major */ 361 int minor; 362 struct gendisk *disk; /* blkdev's gendisk and rq */ 363 364 u32 image_format; /* Either 1 or 2 */ 365 struct rbd_client *rbd_client; 366 367 char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */ 368 369 spinlock_t lock; /* queue, flags, open_count */ 370 371 struct rbd_image_header header; 372 unsigned long flags; /* possibly lock protected */ 373 struct rbd_spec *spec; 374 struct rbd_options *opts; 375 char *config_info; /* add{,_single_major} string */ 376 377 struct ceph_object_id header_oid; 378 struct ceph_object_locator header_oloc; 379 380 struct ceph_file_layout layout; /* used for all rbd requests */ 381 382 struct mutex watch_mutex; 383 enum rbd_watch_state watch_state; 384 struct ceph_osd_linger_request *watch_handle; 385 u64 watch_cookie; 386 struct delayed_work watch_dwork; 387 388 struct rw_semaphore lock_rwsem; 389 enum rbd_lock_state lock_state; 390 char lock_cookie[32]; 391 struct rbd_client_id owner_cid; 392 struct work_struct acquired_lock_work; 393 struct work_struct released_lock_work; 394 struct delayed_work lock_dwork; 395 struct work_struct unlock_work; 396 wait_queue_head_t lock_waitq; 397 398 struct workqueue_struct *task_wq; 399 400 struct rbd_spec *parent_spec; 401 u64 parent_overlap; 402 atomic_t parent_ref; 403 struct rbd_device *parent; 404 405 /* Block layer tags. */ 406 struct blk_mq_tag_set tag_set; 407 408 /* protects updating the header */ 409 struct rw_semaphore header_rwsem; 410 411 struct rbd_mapping mapping; 412 413 struct list_head node; 414 415 /* sysfs related */ 416 struct device dev; 417 unsigned long open_count; /* protected by lock */ 418 }; 419 420 /* 421 * Flag bits for rbd_dev->flags: 422 * - REMOVING (which is coupled with rbd_dev->open_count) is protected 423 * by rbd_dev->lock 424 * - BLACKLISTED is protected by rbd_dev->lock_rwsem 425 */ 426 enum rbd_dev_flags { 427 RBD_DEV_FLAG_EXISTS, /* mapped snapshot has not been deleted */ 428 RBD_DEV_FLAG_REMOVING, /* this mapping is being removed */ 429 RBD_DEV_FLAG_BLACKLISTED, /* our ceph_client is blacklisted */ 430 }; 431 432 static DEFINE_MUTEX(client_mutex); /* Serialize client creation */ 433 434 static LIST_HEAD(rbd_dev_list); /* devices */ 435 static DEFINE_SPINLOCK(rbd_dev_list_lock); 436 437 static LIST_HEAD(rbd_client_list); /* clients */ 438 static DEFINE_SPINLOCK(rbd_client_list_lock); 439 440 /* Slab caches for frequently-allocated structures */ 441 442 static struct kmem_cache *rbd_img_request_cache; 443 static struct kmem_cache *rbd_obj_request_cache; 444 445 static struct bio_set *rbd_bio_clone; 446 447 static int rbd_major; 448 static DEFINE_IDA(rbd_dev_id_ida); 449 450 static struct workqueue_struct *rbd_wq; 451 452 /* 453 * Default to false for now, as single-major requires >= 0.75 version of 454 * userspace rbd utility. 455 */ 456 static bool single_major = false; 457 module_param(single_major, bool, S_IRUGO); 458 MODULE_PARM_DESC(single_major, "Use a single major number for all rbd devices (default: false)"); 459 460 static int rbd_img_request_submit(struct rbd_img_request *img_request); 461 462 static ssize_t rbd_add(struct bus_type *bus, const char *buf, 463 size_t count); 464 static ssize_t rbd_remove(struct bus_type *bus, const char *buf, 465 size_t count); 466 static ssize_t rbd_add_single_major(struct bus_type *bus, const char *buf, 467 size_t count); 468 static ssize_t rbd_remove_single_major(struct bus_type *bus, const char *buf, 469 size_t count); 470 static int rbd_dev_image_probe(struct rbd_device *rbd_dev, int depth); 471 static void rbd_spec_put(struct rbd_spec *spec); 472 473 static int rbd_dev_id_to_minor(int dev_id) 474 { 475 return dev_id << RBD_SINGLE_MAJOR_PART_SHIFT; 476 } 477 478 static int minor_to_rbd_dev_id(int minor) 479 { 480 return minor >> RBD_SINGLE_MAJOR_PART_SHIFT; 481 } 482 483 static bool __rbd_is_lock_owner(struct rbd_device *rbd_dev) 484 { 485 return rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED || 486 rbd_dev->lock_state == RBD_LOCK_STATE_RELEASING; 487 } 488 489 static bool rbd_is_lock_owner(struct rbd_device *rbd_dev) 490 { 491 bool is_lock_owner; 492 493 down_read(&rbd_dev->lock_rwsem); 494 is_lock_owner = __rbd_is_lock_owner(rbd_dev); 495 up_read(&rbd_dev->lock_rwsem); 496 return is_lock_owner; 497 } 498 499 static ssize_t rbd_supported_features_show(struct bus_type *bus, char *buf) 500 { 501 return sprintf(buf, "0x%llx\n", RBD_FEATURES_SUPPORTED); 502 } 503 504 static BUS_ATTR(add, S_IWUSR, NULL, rbd_add); 505 static BUS_ATTR(remove, S_IWUSR, NULL, rbd_remove); 506 static BUS_ATTR(add_single_major, S_IWUSR, NULL, rbd_add_single_major); 507 static BUS_ATTR(remove_single_major, S_IWUSR, NULL, rbd_remove_single_major); 508 static BUS_ATTR(supported_features, S_IRUGO, rbd_supported_features_show, NULL); 509 510 static struct attribute *rbd_bus_attrs[] = { 511 &bus_attr_add.attr, 512 &bus_attr_remove.attr, 513 &bus_attr_add_single_major.attr, 514 &bus_attr_remove_single_major.attr, 515 &bus_attr_supported_features.attr, 516 NULL, 517 }; 518 519 static umode_t rbd_bus_is_visible(struct kobject *kobj, 520 struct attribute *attr, int index) 521 { 522 if (!single_major && 523 (attr == &bus_attr_add_single_major.attr || 524 attr == &bus_attr_remove_single_major.attr)) 525 return 0; 526 527 return attr->mode; 528 } 529 530 static const struct attribute_group rbd_bus_group = { 531 .attrs = rbd_bus_attrs, 532 .is_visible = rbd_bus_is_visible, 533 }; 534 __ATTRIBUTE_GROUPS(rbd_bus); 535 536 static struct bus_type rbd_bus_type = { 537 .name = "rbd", 538 .bus_groups = rbd_bus_groups, 539 }; 540 541 static void rbd_root_dev_release(struct device *dev) 542 { 543 } 544 545 static struct device rbd_root_dev = { 546 .init_name = "rbd", 547 .release = rbd_root_dev_release, 548 }; 549 550 static __printf(2, 3) 551 void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...) 552 { 553 struct va_format vaf; 554 va_list args; 555 556 va_start(args, fmt); 557 vaf.fmt = fmt; 558 vaf.va = &args; 559 560 if (!rbd_dev) 561 printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf); 562 else if (rbd_dev->disk) 563 printk(KERN_WARNING "%s: %s: %pV\n", 564 RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf); 565 else if (rbd_dev->spec && rbd_dev->spec->image_name) 566 printk(KERN_WARNING "%s: image %s: %pV\n", 567 RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf); 568 else if (rbd_dev->spec && rbd_dev->spec->image_id) 569 printk(KERN_WARNING "%s: id %s: %pV\n", 570 RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf); 571 else /* punt */ 572 printk(KERN_WARNING "%s: rbd_dev %p: %pV\n", 573 RBD_DRV_NAME, rbd_dev, &vaf); 574 va_end(args); 575 } 576 577 #ifdef RBD_DEBUG 578 #define rbd_assert(expr) \ 579 if (unlikely(!(expr))) { \ 580 printk(KERN_ERR "\nAssertion failure in %s() " \ 581 "at line %d:\n\n" \ 582 "\trbd_assert(%s);\n\n", \ 583 __func__, __LINE__, #expr); \ 584 BUG(); \ 585 } 586 #else /* !RBD_DEBUG */ 587 # define rbd_assert(expr) ((void) 0) 588 #endif /* !RBD_DEBUG */ 589 590 static void rbd_osd_copyup_callback(struct rbd_obj_request *obj_request); 591 static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request); 592 static void rbd_img_parent_read(struct rbd_obj_request *obj_request); 593 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev); 594 595 static int rbd_dev_refresh(struct rbd_device *rbd_dev); 596 static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev); 597 static int rbd_dev_header_info(struct rbd_device *rbd_dev); 598 static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev); 599 static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev, 600 u64 snap_id); 601 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id, 602 u8 *order, u64 *snap_size); 603 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id, 604 u64 *snap_features); 605 606 static int rbd_open(struct block_device *bdev, fmode_t mode) 607 { 608 struct rbd_device *rbd_dev = bdev->bd_disk->private_data; 609 bool removing = false; 610 611 if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only) 612 return -EROFS; 613 614 spin_lock_irq(&rbd_dev->lock); 615 if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags)) 616 removing = true; 617 else 618 rbd_dev->open_count++; 619 spin_unlock_irq(&rbd_dev->lock); 620 if (removing) 621 return -ENOENT; 622 623 (void) get_device(&rbd_dev->dev); 624 625 return 0; 626 } 627 628 static void rbd_release(struct gendisk *disk, fmode_t mode) 629 { 630 struct rbd_device *rbd_dev = disk->private_data; 631 unsigned long open_count_before; 632 633 spin_lock_irq(&rbd_dev->lock); 634 open_count_before = rbd_dev->open_count--; 635 spin_unlock_irq(&rbd_dev->lock); 636 rbd_assert(open_count_before > 0); 637 638 put_device(&rbd_dev->dev); 639 } 640 641 static int rbd_ioctl_set_ro(struct rbd_device *rbd_dev, unsigned long arg) 642 { 643 int ret = 0; 644 int val; 645 bool ro; 646 bool ro_changed = false; 647 648 /* get_user() may sleep, so call it before taking rbd_dev->lock */ 649 if (get_user(val, (int __user *)(arg))) 650 return -EFAULT; 651 652 ro = val ? true : false; 653 /* Snapshot doesn't allow to write*/ 654 if (rbd_dev->spec->snap_id != CEPH_NOSNAP && !ro) 655 return -EROFS; 656 657 spin_lock_irq(&rbd_dev->lock); 658 /* prevent others open this device */ 659 if (rbd_dev->open_count > 1) { 660 ret = -EBUSY; 661 goto out; 662 } 663 664 if (rbd_dev->mapping.read_only != ro) { 665 rbd_dev->mapping.read_only = ro; 666 ro_changed = true; 667 } 668 669 out: 670 spin_unlock_irq(&rbd_dev->lock); 671 /* set_disk_ro() may sleep, so call it after releasing rbd_dev->lock */ 672 if (ret == 0 && ro_changed) 673 set_disk_ro(rbd_dev->disk, ro ? 1 : 0); 674 675 return ret; 676 } 677 678 static int rbd_ioctl(struct block_device *bdev, fmode_t mode, 679 unsigned int cmd, unsigned long arg) 680 { 681 struct rbd_device *rbd_dev = bdev->bd_disk->private_data; 682 int ret = 0; 683 684 switch (cmd) { 685 case BLKROSET: 686 ret = rbd_ioctl_set_ro(rbd_dev, arg); 687 break; 688 default: 689 ret = -ENOTTY; 690 } 691 692 return ret; 693 } 694 695 #ifdef CONFIG_COMPAT 696 static int rbd_compat_ioctl(struct block_device *bdev, fmode_t mode, 697 unsigned int cmd, unsigned long arg) 698 { 699 return rbd_ioctl(bdev, mode, cmd, arg); 700 } 701 #endif /* CONFIG_COMPAT */ 702 703 static const struct block_device_operations rbd_bd_ops = { 704 .owner = THIS_MODULE, 705 .open = rbd_open, 706 .release = rbd_release, 707 .ioctl = rbd_ioctl, 708 #ifdef CONFIG_COMPAT 709 .compat_ioctl = rbd_compat_ioctl, 710 #endif 711 }; 712 713 /* 714 * Initialize an rbd client instance. Success or not, this function 715 * consumes ceph_opts. Caller holds client_mutex. 716 */ 717 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts) 718 { 719 struct rbd_client *rbdc; 720 int ret = -ENOMEM; 721 722 dout("%s:\n", __func__); 723 rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL); 724 if (!rbdc) 725 goto out_opt; 726 727 kref_init(&rbdc->kref); 728 INIT_LIST_HEAD(&rbdc->node); 729 730 rbdc->client = ceph_create_client(ceph_opts, rbdc); 731 if (IS_ERR(rbdc->client)) 732 goto out_rbdc; 733 ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */ 734 735 ret = ceph_open_session(rbdc->client); 736 if (ret < 0) 737 goto out_client; 738 739 spin_lock(&rbd_client_list_lock); 740 list_add_tail(&rbdc->node, &rbd_client_list); 741 spin_unlock(&rbd_client_list_lock); 742 743 dout("%s: rbdc %p\n", __func__, rbdc); 744 745 return rbdc; 746 out_client: 747 ceph_destroy_client(rbdc->client); 748 out_rbdc: 749 kfree(rbdc); 750 out_opt: 751 if (ceph_opts) 752 ceph_destroy_options(ceph_opts); 753 dout("%s: error %d\n", __func__, ret); 754 755 return ERR_PTR(ret); 756 } 757 758 static struct rbd_client *__rbd_get_client(struct rbd_client *rbdc) 759 { 760 kref_get(&rbdc->kref); 761 762 return rbdc; 763 } 764 765 /* 766 * Find a ceph client with specific addr and configuration. If 767 * found, bump its reference count. 768 */ 769 static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts) 770 { 771 struct rbd_client *client_node; 772 bool found = false; 773 774 if (ceph_opts->flags & CEPH_OPT_NOSHARE) 775 return NULL; 776 777 spin_lock(&rbd_client_list_lock); 778 list_for_each_entry(client_node, &rbd_client_list, node) { 779 if (!ceph_compare_options(ceph_opts, client_node->client)) { 780 __rbd_get_client(client_node); 781 782 found = true; 783 break; 784 } 785 } 786 spin_unlock(&rbd_client_list_lock); 787 788 return found ? client_node : NULL; 789 } 790 791 /* 792 * (Per device) rbd map options 793 */ 794 enum { 795 Opt_queue_depth, 796 Opt_last_int, 797 /* int args above */ 798 Opt_last_string, 799 /* string args above */ 800 Opt_read_only, 801 Opt_read_write, 802 Opt_lock_on_read, 803 Opt_exclusive, 804 Opt_err 805 }; 806 807 static match_table_t rbd_opts_tokens = { 808 {Opt_queue_depth, "queue_depth=%d"}, 809 /* int args above */ 810 /* string args above */ 811 {Opt_read_only, "read_only"}, 812 {Opt_read_only, "ro"}, /* Alternate spelling */ 813 {Opt_read_write, "read_write"}, 814 {Opt_read_write, "rw"}, /* Alternate spelling */ 815 {Opt_lock_on_read, "lock_on_read"}, 816 {Opt_exclusive, "exclusive"}, 817 {Opt_err, NULL} 818 }; 819 820 struct rbd_options { 821 int queue_depth; 822 bool read_only; 823 bool lock_on_read; 824 bool exclusive; 825 }; 826 827 #define RBD_QUEUE_DEPTH_DEFAULT BLKDEV_MAX_RQ 828 #define RBD_READ_ONLY_DEFAULT false 829 #define RBD_LOCK_ON_READ_DEFAULT false 830 #define RBD_EXCLUSIVE_DEFAULT false 831 832 static int parse_rbd_opts_token(char *c, void *private) 833 { 834 struct rbd_options *rbd_opts = private; 835 substring_t argstr[MAX_OPT_ARGS]; 836 int token, intval, ret; 837 838 token = match_token(c, rbd_opts_tokens, argstr); 839 if (token < Opt_last_int) { 840 ret = match_int(&argstr[0], &intval); 841 if (ret < 0) { 842 pr_err("bad mount option arg (not int) at '%s'\n", c); 843 return ret; 844 } 845 dout("got int token %d val %d\n", token, intval); 846 } else if (token > Opt_last_int && token < Opt_last_string) { 847 dout("got string token %d val %s\n", token, argstr[0].from); 848 } else { 849 dout("got token %d\n", token); 850 } 851 852 switch (token) { 853 case Opt_queue_depth: 854 if (intval < 1) { 855 pr_err("queue_depth out of range\n"); 856 return -EINVAL; 857 } 858 rbd_opts->queue_depth = intval; 859 break; 860 case Opt_read_only: 861 rbd_opts->read_only = true; 862 break; 863 case Opt_read_write: 864 rbd_opts->read_only = false; 865 break; 866 case Opt_lock_on_read: 867 rbd_opts->lock_on_read = true; 868 break; 869 case Opt_exclusive: 870 rbd_opts->exclusive = true; 871 break; 872 default: 873 /* libceph prints "bad option" msg */ 874 return -EINVAL; 875 } 876 877 return 0; 878 } 879 880 static char* obj_op_name(enum obj_operation_type op_type) 881 { 882 switch (op_type) { 883 case OBJ_OP_READ: 884 return "read"; 885 case OBJ_OP_WRITE: 886 return "write"; 887 case OBJ_OP_DISCARD: 888 return "discard"; 889 default: 890 return "???"; 891 } 892 } 893 894 /* 895 * Get a ceph client with specific addr and configuration, if one does 896 * not exist create it. Either way, ceph_opts is consumed by this 897 * function. 898 */ 899 static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts) 900 { 901 struct rbd_client *rbdc; 902 903 mutex_lock_nested(&client_mutex, SINGLE_DEPTH_NESTING); 904 rbdc = rbd_client_find(ceph_opts); 905 if (rbdc) /* using an existing client */ 906 ceph_destroy_options(ceph_opts); 907 else 908 rbdc = rbd_client_create(ceph_opts); 909 mutex_unlock(&client_mutex); 910 911 return rbdc; 912 } 913 914 /* 915 * Destroy ceph client 916 * 917 * Caller must hold rbd_client_list_lock. 918 */ 919 static void rbd_client_release(struct kref *kref) 920 { 921 struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref); 922 923 dout("%s: rbdc %p\n", __func__, rbdc); 924 spin_lock(&rbd_client_list_lock); 925 list_del(&rbdc->node); 926 spin_unlock(&rbd_client_list_lock); 927 928 ceph_destroy_client(rbdc->client); 929 kfree(rbdc); 930 } 931 932 /* 933 * Drop reference to ceph client node. If it's not referenced anymore, release 934 * it. 935 */ 936 static void rbd_put_client(struct rbd_client *rbdc) 937 { 938 if (rbdc) 939 kref_put(&rbdc->kref, rbd_client_release); 940 } 941 942 static bool rbd_image_format_valid(u32 image_format) 943 { 944 return image_format == 1 || image_format == 2; 945 } 946 947 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk) 948 { 949 size_t size; 950 u32 snap_count; 951 952 /* The header has to start with the magic rbd header text */ 953 if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT))) 954 return false; 955 956 /* The bio layer requires at least sector-sized I/O */ 957 958 if (ondisk->options.order < SECTOR_SHIFT) 959 return false; 960 961 /* If we use u64 in a few spots we may be able to loosen this */ 962 963 if (ondisk->options.order > 8 * sizeof (int) - 1) 964 return false; 965 966 /* 967 * The size of a snapshot header has to fit in a size_t, and 968 * that limits the number of snapshots. 969 */ 970 snap_count = le32_to_cpu(ondisk->snap_count); 971 size = SIZE_MAX - sizeof (struct ceph_snap_context); 972 if (snap_count > size / sizeof (__le64)) 973 return false; 974 975 /* 976 * Not only that, but the size of the entire the snapshot 977 * header must also be representable in a size_t. 978 */ 979 size -= snap_count * sizeof (__le64); 980 if ((u64) size < le64_to_cpu(ondisk->snap_names_len)) 981 return false; 982 983 return true; 984 } 985 986 /* 987 * returns the size of an object in the image 988 */ 989 static u32 rbd_obj_bytes(struct rbd_image_header *header) 990 { 991 return 1U << header->obj_order; 992 } 993 994 static void rbd_init_layout(struct rbd_device *rbd_dev) 995 { 996 if (rbd_dev->header.stripe_unit == 0 || 997 rbd_dev->header.stripe_count == 0) { 998 rbd_dev->header.stripe_unit = rbd_obj_bytes(&rbd_dev->header); 999 rbd_dev->header.stripe_count = 1; 1000 } 1001 1002 rbd_dev->layout.stripe_unit = rbd_dev->header.stripe_unit; 1003 rbd_dev->layout.stripe_count = rbd_dev->header.stripe_count; 1004 rbd_dev->layout.object_size = rbd_obj_bytes(&rbd_dev->header); 1005 rbd_dev->layout.pool_id = rbd_dev->header.data_pool_id == CEPH_NOPOOL ? 1006 rbd_dev->spec->pool_id : rbd_dev->header.data_pool_id; 1007 RCU_INIT_POINTER(rbd_dev->layout.pool_ns, NULL); 1008 } 1009 1010 /* 1011 * Fill an rbd image header with information from the given format 1 1012 * on-disk header. 1013 */ 1014 static int rbd_header_from_disk(struct rbd_device *rbd_dev, 1015 struct rbd_image_header_ondisk *ondisk) 1016 { 1017 struct rbd_image_header *header = &rbd_dev->header; 1018 bool first_time = header->object_prefix == NULL; 1019 struct ceph_snap_context *snapc; 1020 char *object_prefix = NULL; 1021 char *snap_names = NULL; 1022 u64 *snap_sizes = NULL; 1023 u32 snap_count; 1024 int ret = -ENOMEM; 1025 u32 i; 1026 1027 /* Allocate this now to avoid having to handle failure below */ 1028 1029 if (first_time) { 1030 object_prefix = kstrndup(ondisk->object_prefix, 1031 sizeof(ondisk->object_prefix), 1032 GFP_KERNEL); 1033 if (!object_prefix) 1034 return -ENOMEM; 1035 } 1036 1037 /* Allocate the snapshot context and fill it in */ 1038 1039 snap_count = le32_to_cpu(ondisk->snap_count); 1040 snapc = ceph_create_snap_context(snap_count, GFP_KERNEL); 1041 if (!snapc) 1042 goto out_err; 1043 snapc->seq = le64_to_cpu(ondisk->snap_seq); 1044 if (snap_count) { 1045 struct rbd_image_snap_ondisk *snaps; 1046 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len); 1047 1048 /* We'll keep a copy of the snapshot names... */ 1049 1050 if (snap_names_len > (u64)SIZE_MAX) 1051 goto out_2big; 1052 snap_names = kmalloc(snap_names_len, GFP_KERNEL); 1053 if (!snap_names) 1054 goto out_err; 1055 1056 /* ...as well as the array of their sizes. */ 1057 snap_sizes = kmalloc_array(snap_count, 1058 sizeof(*header->snap_sizes), 1059 GFP_KERNEL); 1060 if (!snap_sizes) 1061 goto out_err; 1062 1063 /* 1064 * Copy the names, and fill in each snapshot's id 1065 * and size. 1066 * 1067 * Note that rbd_dev_v1_header_info() guarantees the 1068 * ondisk buffer we're working with has 1069 * snap_names_len bytes beyond the end of the 1070 * snapshot id array, this memcpy() is safe. 1071 */ 1072 memcpy(snap_names, &ondisk->snaps[snap_count], snap_names_len); 1073 snaps = ondisk->snaps; 1074 for (i = 0; i < snap_count; i++) { 1075 snapc->snaps[i] = le64_to_cpu(snaps[i].id); 1076 snap_sizes[i] = le64_to_cpu(snaps[i].image_size); 1077 } 1078 } 1079 1080 /* We won't fail any more, fill in the header */ 1081 1082 if (first_time) { 1083 header->object_prefix = object_prefix; 1084 header->obj_order = ondisk->options.order; 1085 rbd_init_layout(rbd_dev); 1086 } else { 1087 ceph_put_snap_context(header->snapc); 1088 kfree(header->snap_names); 1089 kfree(header->snap_sizes); 1090 } 1091 1092 /* The remaining fields always get updated (when we refresh) */ 1093 1094 header->image_size = le64_to_cpu(ondisk->image_size); 1095 header->snapc = snapc; 1096 header->snap_names = snap_names; 1097 header->snap_sizes = snap_sizes; 1098 1099 return 0; 1100 out_2big: 1101 ret = -EIO; 1102 out_err: 1103 kfree(snap_sizes); 1104 kfree(snap_names); 1105 ceph_put_snap_context(snapc); 1106 kfree(object_prefix); 1107 1108 return ret; 1109 } 1110 1111 static const char *_rbd_dev_v1_snap_name(struct rbd_device *rbd_dev, u32 which) 1112 { 1113 const char *snap_name; 1114 1115 rbd_assert(which < rbd_dev->header.snapc->num_snaps); 1116 1117 /* Skip over names until we find the one we are looking for */ 1118 1119 snap_name = rbd_dev->header.snap_names; 1120 while (which--) 1121 snap_name += strlen(snap_name) + 1; 1122 1123 return kstrdup(snap_name, GFP_KERNEL); 1124 } 1125 1126 /* 1127 * Snapshot id comparison function for use with qsort()/bsearch(). 1128 * Note that result is for snapshots in *descending* order. 1129 */ 1130 static int snapid_compare_reverse(const void *s1, const void *s2) 1131 { 1132 u64 snap_id1 = *(u64 *)s1; 1133 u64 snap_id2 = *(u64 *)s2; 1134 1135 if (snap_id1 < snap_id2) 1136 return 1; 1137 return snap_id1 == snap_id2 ? 0 : -1; 1138 } 1139 1140 /* 1141 * Search a snapshot context to see if the given snapshot id is 1142 * present. 1143 * 1144 * Returns the position of the snapshot id in the array if it's found, 1145 * or BAD_SNAP_INDEX otherwise. 1146 * 1147 * Note: The snapshot array is in kept sorted (by the osd) in 1148 * reverse order, highest snapshot id first. 1149 */ 1150 static u32 rbd_dev_snap_index(struct rbd_device *rbd_dev, u64 snap_id) 1151 { 1152 struct ceph_snap_context *snapc = rbd_dev->header.snapc; 1153 u64 *found; 1154 1155 found = bsearch(&snap_id, &snapc->snaps, snapc->num_snaps, 1156 sizeof (snap_id), snapid_compare_reverse); 1157 1158 return found ? (u32)(found - &snapc->snaps[0]) : BAD_SNAP_INDEX; 1159 } 1160 1161 static const char *rbd_dev_v1_snap_name(struct rbd_device *rbd_dev, 1162 u64 snap_id) 1163 { 1164 u32 which; 1165 const char *snap_name; 1166 1167 which = rbd_dev_snap_index(rbd_dev, snap_id); 1168 if (which == BAD_SNAP_INDEX) 1169 return ERR_PTR(-ENOENT); 1170 1171 snap_name = _rbd_dev_v1_snap_name(rbd_dev, which); 1172 return snap_name ? snap_name : ERR_PTR(-ENOMEM); 1173 } 1174 1175 static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id) 1176 { 1177 if (snap_id == CEPH_NOSNAP) 1178 return RBD_SNAP_HEAD_NAME; 1179 1180 rbd_assert(rbd_image_format_valid(rbd_dev->image_format)); 1181 if (rbd_dev->image_format == 1) 1182 return rbd_dev_v1_snap_name(rbd_dev, snap_id); 1183 1184 return rbd_dev_v2_snap_name(rbd_dev, snap_id); 1185 } 1186 1187 static int rbd_snap_size(struct rbd_device *rbd_dev, u64 snap_id, 1188 u64 *snap_size) 1189 { 1190 rbd_assert(rbd_image_format_valid(rbd_dev->image_format)); 1191 if (snap_id == CEPH_NOSNAP) { 1192 *snap_size = rbd_dev->header.image_size; 1193 } else if (rbd_dev->image_format == 1) { 1194 u32 which; 1195 1196 which = rbd_dev_snap_index(rbd_dev, snap_id); 1197 if (which == BAD_SNAP_INDEX) 1198 return -ENOENT; 1199 1200 *snap_size = rbd_dev->header.snap_sizes[which]; 1201 } else { 1202 u64 size = 0; 1203 int ret; 1204 1205 ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, NULL, &size); 1206 if (ret) 1207 return ret; 1208 1209 *snap_size = size; 1210 } 1211 return 0; 1212 } 1213 1214 static int rbd_snap_features(struct rbd_device *rbd_dev, u64 snap_id, 1215 u64 *snap_features) 1216 { 1217 rbd_assert(rbd_image_format_valid(rbd_dev->image_format)); 1218 if (snap_id == CEPH_NOSNAP) { 1219 *snap_features = rbd_dev->header.features; 1220 } else if (rbd_dev->image_format == 1) { 1221 *snap_features = 0; /* No features for format 1 */ 1222 } else { 1223 u64 features = 0; 1224 int ret; 1225 1226 ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, &features); 1227 if (ret) 1228 return ret; 1229 1230 *snap_features = features; 1231 } 1232 return 0; 1233 } 1234 1235 static int rbd_dev_mapping_set(struct rbd_device *rbd_dev) 1236 { 1237 u64 snap_id = rbd_dev->spec->snap_id; 1238 u64 size = 0; 1239 u64 features = 0; 1240 int ret; 1241 1242 ret = rbd_snap_size(rbd_dev, snap_id, &size); 1243 if (ret) 1244 return ret; 1245 ret = rbd_snap_features(rbd_dev, snap_id, &features); 1246 if (ret) 1247 return ret; 1248 1249 rbd_dev->mapping.size = size; 1250 rbd_dev->mapping.features = features; 1251 1252 return 0; 1253 } 1254 1255 static void rbd_dev_mapping_clear(struct rbd_device *rbd_dev) 1256 { 1257 rbd_dev->mapping.size = 0; 1258 rbd_dev->mapping.features = 0; 1259 } 1260 1261 static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset) 1262 { 1263 u64 segment_size = rbd_obj_bytes(&rbd_dev->header); 1264 1265 return offset & (segment_size - 1); 1266 } 1267 1268 static u64 rbd_segment_length(struct rbd_device *rbd_dev, 1269 u64 offset, u64 length) 1270 { 1271 u64 segment_size = rbd_obj_bytes(&rbd_dev->header); 1272 1273 offset &= segment_size - 1; 1274 1275 rbd_assert(length <= U64_MAX - offset); 1276 if (offset + length > segment_size) 1277 length = segment_size - offset; 1278 1279 return length; 1280 } 1281 1282 /* 1283 * bio helpers 1284 */ 1285 1286 static void bio_chain_put(struct bio *chain) 1287 { 1288 struct bio *tmp; 1289 1290 while (chain) { 1291 tmp = chain; 1292 chain = chain->bi_next; 1293 bio_put(tmp); 1294 } 1295 } 1296 1297 /* 1298 * zeros a bio chain, starting at specific offset 1299 */ 1300 static void zero_bio_chain(struct bio *chain, int start_ofs) 1301 { 1302 struct bio_vec bv; 1303 struct bvec_iter iter; 1304 unsigned long flags; 1305 void *buf; 1306 int pos = 0; 1307 1308 while (chain) { 1309 bio_for_each_segment(bv, chain, iter) { 1310 if (pos + bv.bv_len > start_ofs) { 1311 int remainder = max(start_ofs - pos, 0); 1312 buf = bvec_kmap_irq(&bv, &flags); 1313 memset(buf + remainder, 0, 1314 bv.bv_len - remainder); 1315 flush_dcache_page(bv.bv_page); 1316 bvec_kunmap_irq(buf, &flags); 1317 } 1318 pos += bv.bv_len; 1319 } 1320 1321 chain = chain->bi_next; 1322 } 1323 } 1324 1325 /* 1326 * similar to zero_bio_chain(), zeros data defined by a page array, 1327 * starting at the given byte offset from the start of the array and 1328 * continuing up to the given end offset. The pages array is 1329 * assumed to be big enough to hold all bytes up to the end. 1330 */ 1331 static void zero_pages(struct page **pages, u64 offset, u64 end) 1332 { 1333 struct page **page = &pages[offset >> PAGE_SHIFT]; 1334 1335 rbd_assert(end > offset); 1336 rbd_assert(end - offset <= (u64)SIZE_MAX); 1337 while (offset < end) { 1338 size_t page_offset; 1339 size_t length; 1340 unsigned long flags; 1341 void *kaddr; 1342 1343 page_offset = offset & ~PAGE_MASK; 1344 length = min_t(size_t, PAGE_SIZE - page_offset, end - offset); 1345 local_irq_save(flags); 1346 kaddr = kmap_atomic(*page); 1347 memset(kaddr + page_offset, 0, length); 1348 flush_dcache_page(*page); 1349 kunmap_atomic(kaddr); 1350 local_irq_restore(flags); 1351 1352 offset += length; 1353 page++; 1354 } 1355 } 1356 1357 /* 1358 * Clone a portion of a bio, starting at the given byte offset 1359 * and continuing for the number of bytes indicated. 1360 */ 1361 static struct bio *bio_clone_range(struct bio *bio_src, 1362 unsigned int offset, 1363 unsigned int len, 1364 gfp_t gfpmask) 1365 { 1366 struct bio *bio; 1367 1368 bio = bio_clone_fast(bio_src, gfpmask, rbd_bio_clone); 1369 if (!bio) 1370 return NULL; /* ENOMEM */ 1371 1372 bio_advance(bio, offset); 1373 bio->bi_iter.bi_size = len; 1374 1375 return bio; 1376 } 1377 1378 /* 1379 * Clone a portion of a bio chain, starting at the given byte offset 1380 * into the first bio in the source chain and continuing for the 1381 * number of bytes indicated. The result is another bio chain of 1382 * exactly the given length, or a null pointer on error. 1383 * 1384 * The bio_src and offset parameters are both in-out. On entry they 1385 * refer to the first source bio and the offset into that bio where 1386 * the start of data to be cloned is located. 1387 * 1388 * On return, bio_src is updated to refer to the bio in the source 1389 * chain that contains first un-cloned byte, and *offset will 1390 * contain the offset of that byte within that bio. 1391 */ 1392 static struct bio *bio_chain_clone_range(struct bio **bio_src, 1393 unsigned int *offset, 1394 unsigned int len, 1395 gfp_t gfpmask) 1396 { 1397 struct bio *bi = *bio_src; 1398 unsigned int off = *offset; 1399 struct bio *chain = NULL; 1400 struct bio **end; 1401 1402 /* Build up a chain of clone bios up to the limit */ 1403 1404 if (!bi || off >= bi->bi_iter.bi_size || !len) 1405 return NULL; /* Nothing to clone */ 1406 1407 end = &chain; 1408 while (len) { 1409 unsigned int bi_size; 1410 struct bio *bio; 1411 1412 if (!bi) { 1413 rbd_warn(NULL, "bio_chain exhausted with %u left", len); 1414 goto out_err; /* EINVAL; ran out of bio's */ 1415 } 1416 bi_size = min_t(unsigned int, bi->bi_iter.bi_size - off, len); 1417 bio = bio_clone_range(bi, off, bi_size, gfpmask); 1418 if (!bio) 1419 goto out_err; /* ENOMEM */ 1420 1421 *end = bio; 1422 end = &bio->bi_next; 1423 1424 off += bi_size; 1425 if (off == bi->bi_iter.bi_size) { 1426 bi = bi->bi_next; 1427 off = 0; 1428 } 1429 len -= bi_size; 1430 } 1431 *bio_src = bi; 1432 *offset = off; 1433 1434 return chain; 1435 out_err: 1436 bio_chain_put(chain); 1437 1438 return NULL; 1439 } 1440 1441 /* 1442 * The default/initial value for all object request flags is 0. For 1443 * each flag, once its value is set to 1 it is never reset to 0 1444 * again. 1445 */ 1446 static void obj_request_img_data_set(struct rbd_obj_request *obj_request) 1447 { 1448 if (test_and_set_bit(OBJ_REQ_IMG_DATA, &obj_request->flags)) { 1449 struct rbd_device *rbd_dev; 1450 1451 rbd_dev = obj_request->img_request->rbd_dev; 1452 rbd_warn(rbd_dev, "obj_request %p already marked img_data", 1453 obj_request); 1454 } 1455 } 1456 1457 static bool obj_request_img_data_test(struct rbd_obj_request *obj_request) 1458 { 1459 smp_mb(); 1460 return test_bit(OBJ_REQ_IMG_DATA, &obj_request->flags) != 0; 1461 } 1462 1463 static void obj_request_done_set(struct rbd_obj_request *obj_request) 1464 { 1465 if (test_and_set_bit(OBJ_REQ_DONE, &obj_request->flags)) { 1466 struct rbd_device *rbd_dev = NULL; 1467 1468 if (obj_request_img_data_test(obj_request)) 1469 rbd_dev = obj_request->img_request->rbd_dev; 1470 rbd_warn(rbd_dev, "obj_request %p already marked done", 1471 obj_request); 1472 } 1473 } 1474 1475 static bool obj_request_done_test(struct rbd_obj_request *obj_request) 1476 { 1477 smp_mb(); 1478 return test_bit(OBJ_REQ_DONE, &obj_request->flags) != 0; 1479 } 1480 1481 /* 1482 * This sets the KNOWN flag after (possibly) setting the EXISTS 1483 * flag. The latter is set based on the "exists" value provided. 1484 * 1485 * Note that for our purposes once an object exists it never goes 1486 * away again. It's possible that the response from two existence 1487 * checks are separated by the creation of the target object, and 1488 * the first ("doesn't exist") response arrives *after* the second 1489 * ("does exist"). In that case we ignore the second one. 1490 */ 1491 static void obj_request_existence_set(struct rbd_obj_request *obj_request, 1492 bool exists) 1493 { 1494 if (exists) 1495 set_bit(OBJ_REQ_EXISTS, &obj_request->flags); 1496 set_bit(OBJ_REQ_KNOWN, &obj_request->flags); 1497 smp_mb(); 1498 } 1499 1500 static bool obj_request_known_test(struct rbd_obj_request *obj_request) 1501 { 1502 smp_mb(); 1503 return test_bit(OBJ_REQ_KNOWN, &obj_request->flags) != 0; 1504 } 1505 1506 static bool obj_request_exists_test(struct rbd_obj_request *obj_request) 1507 { 1508 smp_mb(); 1509 return test_bit(OBJ_REQ_EXISTS, &obj_request->flags) != 0; 1510 } 1511 1512 static bool obj_request_overlaps_parent(struct rbd_obj_request *obj_request) 1513 { 1514 struct rbd_device *rbd_dev = obj_request->img_request->rbd_dev; 1515 1516 return obj_request->img_offset < 1517 round_up(rbd_dev->parent_overlap, rbd_obj_bytes(&rbd_dev->header)); 1518 } 1519 1520 static void rbd_obj_request_get(struct rbd_obj_request *obj_request) 1521 { 1522 dout("%s: obj %p (was %d)\n", __func__, obj_request, 1523 kref_read(&obj_request->kref)); 1524 kref_get(&obj_request->kref); 1525 } 1526 1527 static void rbd_obj_request_destroy(struct kref *kref); 1528 static void rbd_obj_request_put(struct rbd_obj_request *obj_request) 1529 { 1530 rbd_assert(obj_request != NULL); 1531 dout("%s: obj %p (was %d)\n", __func__, obj_request, 1532 kref_read(&obj_request->kref)); 1533 kref_put(&obj_request->kref, rbd_obj_request_destroy); 1534 } 1535 1536 static void rbd_img_request_get(struct rbd_img_request *img_request) 1537 { 1538 dout("%s: img %p (was %d)\n", __func__, img_request, 1539 kref_read(&img_request->kref)); 1540 kref_get(&img_request->kref); 1541 } 1542 1543 static bool img_request_child_test(struct rbd_img_request *img_request); 1544 static void rbd_parent_request_destroy(struct kref *kref); 1545 static void rbd_img_request_destroy(struct kref *kref); 1546 static void rbd_img_request_put(struct rbd_img_request *img_request) 1547 { 1548 rbd_assert(img_request != NULL); 1549 dout("%s: img %p (was %d)\n", __func__, img_request, 1550 kref_read(&img_request->kref)); 1551 if (img_request_child_test(img_request)) 1552 kref_put(&img_request->kref, rbd_parent_request_destroy); 1553 else 1554 kref_put(&img_request->kref, rbd_img_request_destroy); 1555 } 1556 1557 static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request, 1558 struct rbd_obj_request *obj_request) 1559 { 1560 rbd_assert(obj_request->img_request == NULL); 1561 1562 /* Image request now owns object's original reference */ 1563 obj_request->img_request = img_request; 1564 obj_request->which = img_request->obj_request_count; 1565 rbd_assert(!obj_request_img_data_test(obj_request)); 1566 obj_request_img_data_set(obj_request); 1567 rbd_assert(obj_request->which != BAD_WHICH); 1568 img_request->obj_request_count++; 1569 list_add_tail(&obj_request->links, &img_request->obj_requests); 1570 dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request, 1571 obj_request->which); 1572 } 1573 1574 static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request, 1575 struct rbd_obj_request *obj_request) 1576 { 1577 rbd_assert(obj_request->which != BAD_WHICH); 1578 1579 dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request, 1580 obj_request->which); 1581 list_del(&obj_request->links); 1582 rbd_assert(img_request->obj_request_count > 0); 1583 img_request->obj_request_count--; 1584 rbd_assert(obj_request->which == img_request->obj_request_count); 1585 obj_request->which = BAD_WHICH; 1586 rbd_assert(obj_request_img_data_test(obj_request)); 1587 rbd_assert(obj_request->img_request == img_request); 1588 obj_request->img_request = NULL; 1589 obj_request->callback = NULL; 1590 rbd_obj_request_put(obj_request); 1591 } 1592 1593 static bool obj_request_type_valid(enum obj_request_type type) 1594 { 1595 switch (type) { 1596 case OBJ_REQUEST_NODATA: 1597 case OBJ_REQUEST_BIO: 1598 case OBJ_REQUEST_PAGES: 1599 return true; 1600 default: 1601 return false; 1602 } 1603 } 1604 1605 static void rbd_img_obj_callback(struct rbd_obj_request *obj_request); 1606 1607 static void rbd_obj_request_submit(struct rbd_obj_request *obj_request) 1608 { 1609 struct ceph_osd_request *osd_req = obj_request->osd_req; 1610 1611 dout("%s %p object_no %016llx %llu~%llu osd_req %p\n", __func__, 1612 obj_request, obj_request->object_no, obj_request->offset, 1613 obj_request->length, osd_req); 1614 if (obj_request_img_data_test(obj_request)) { 1615 WARN_ON(obj_request->callback != rbd_img_obj_callback); 1616 rbd_img_request_get(obj_request->img_request); 1617 } 1618 ceph_osdc_start_request(osd_req->r_osdc, osd_req, false); 1619 } 1620 1621 static void rbd_img_request_complete(struct rbd_img_request *img_request) 1622 { 1623 1624 dout("%s: img %p\n", __func__, img_request); 1625 1626 /* 1627 * If no error occurred, compute the aggregate transfer 1628 * count for the image request. We could instead use 1629 * atomic64_cmpxchg() to update it as each object request 1630 * completes; not clear which way is better off hand. 1631 */ 1632 if (!img_request->result) { 1633 struct rbd_obj_request *obj_request; 1634 u64 xferred = 0; 1635 1636 for_each_obj_request(img_request, obj_request) 1637 xferred += obj_request->xferred; 1638 img_request->xferred = xferred; 1639 } 1640 1641 if (img_request->callback) 1642 img_request->callback(img_request); 1643 else 1644 rbd_img_request_put(img_request); 1645 } 1646 1647 /* 1648 * The default/initial value for all image request flags is 0. Each 1649 * is conditionally set to 1 at image request initialization time 1650 * and currently never change thereafter. 1651 */ 1652 static void img_request_write_set(struct rbd_img_request *img_request) 1653 { 1654 set_bit(IMG_REQ_WRITE, &img_request->flags); 1655 smp_mb(); 1656 } 1657 1658 static bool img_request_write_test(struct rbd_img_request *img_request) 1659 { 1660 smp_mb(); 1661 return test_bit(IMG_REQ_WRITE, &img_request->flags) != 0; 1662 } 1663 1664 /* 1665 * Set the discard flag when the img_request is an discard request 1666 */ 1667 static void img_request_discard_set(struct rbd_img_request *img_request) 1668 { 1669 set_bit(IMG_REQ_DISCARD, &img_request->flags); 1670 smp_mb(); 1671 } 1672 1673 static bool img_request_discard_test(struct rbd_img_request *img_request) 1674 { 1675 smp_mb(); 1676 return test_bit(IMG_REQ_DISCARD, &img_request->flags) != 0; 1677 } 1678 1679 static void img_request_child_set(struct rbd_img_request *img_request) 1680 { 1681 set_bit(IMG_REQ_CHILD, &img_request->flags); 1682 smp_mb(); 1683 } 1684 1685 static void img_request_child_clear(struct rbd_img_request *img_request) 1686 { 1687 clear_bit(IMG_REQ_CHILD, &img_request->flags); 1688 smp_mb(); 1689 } 1690 1691 static bool img_request_child_test(struct rbd_img_request *img_request) 1692 { 1693 smp_mb(); 1694 return test_bit(IMG_REQ_CHILD, &img_request->flags) != 0; 1695 } 1696 1697 static void img_request_layered_set(struct rbd_img_request *img_request) 1698 { 1699 set_bit(IMG_REQ_LAYERED, &img_request->flags); 1700 smp_mb(); 1701 } 1702 1703 static void img_request_layered_clear(struct rbd_img_request *img_request) 1704 { 1705 clear_bit(IMG_REQ_LAYERED, &img_request->flags); 1706 smp_mb(); 1707 } 1708 1709 static bool img_request_layered_test(struct rbd_img_request *img_request) 1710 { 1711 smp_mb(); 1712 return test_bit(IMG_REQ_LAYERED, &img_request->flags) != 0; 1713 } 1714 1715 static enum obj_operation_type 1716 rbd_img_request_op_type(struct rbd_img_request *img_request) 1717 { 1718 if (img_request_write_test(img_request)) 1719 return OBJ_OP_WRITE; 1720 else if (img_request_discard_test(img_request)) 1721 return OBJ_OP_DISCARD; 1722 else 1723 return OBJ_OP_READ; 1724 } 1725 1726 static void 1727 rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request) 1728 { 1729 u64 xferred = obj_request->xferred; 1730 u64 length = obj_request->length; 1731 1732 dout("%s: obj %p img %p result %d %llu/%llu\n", __func__, 1733 obj_request, obj_request->img_request, obj_request->result, 1734 xferred, length); 1735 /* 1736 * ENOENT means a hole in the image. We zero-fill the entire 1737 * length of the request. A short read also implies zero-fill 1738 * to the end of the request. An error requires the whole 1739 * length of the request to be reported finished with an error 1740 * to the block layer. In each case we update the xferred 1741 * count to indicate the whole request was satisfied. 1742 */ 1743 rbd_assert(obj_request->type != OBJ_REQUEST_NODATA); 1744 if (obj_request->result == -ENOENT) { 1745 if (obj_request->type == OBJ_REQUEST_BIO) 1746 zero_bio_chain(obj_request->bio_list, 0); 1747 else 1748 zero_pages(obj_request->pages, 0, length); 1749 obj_request->result = 0; 1750 } else if (xferred < length && !obj_request->result) { 1751 if (obj_request->type == OBJ_REQUEST_BIO) 1752 zero_bio_chain(obj_request->bio_list, xferred); 1753 else 1754 zero_pages(obj_request->pages, xferred, length); 1755 } 1756 obj_request->xferred = length; 1757 obj_request_done_set(obj_request); 1758 } 1759 1760 static void rbd_obj_request_complete(struct rbd_obj_request *obj_request) 1761 { 1762 dout("%s: obj %p cb %p\n", __func__, obj_request, 1763 obj_request->callback); 1764 if (obj_request->callback) 1765 obj_request->callback(obj_request); 1766 else 1767 complete_all(&obj_request->completion); 1768 } 1769 1770 static void rbd_obj_request_error(struct rbd_obj_request *obj_request, int err) 1771 { 1772 obj_request->result = err; 1773 obj_request->xferred = 0; 1774 /* 1775 * kludge - mirror rbd_obj_request_submit() to match a put in 1776 * rbd_img_obj_callback() 1777 */ 1778 if (obj_request_img_data_test(obj_request)) { 1779 WARN_ON(obj_request->callback != rbd_img_obj_callback); 1780 rbd_img_request_get(obj_request->img_request); 1781 } 1782 obj_request_done_set(obj_request); 1783 rbd_obj_request_complete(obj_request); 1784 } 1785 1786 static void rbd_osd_read_callback(struct rbd_obj_request *obj_request) 1787 { 1788 struct rbd_img_request *img_request = NULL; 1789 struct rbd_device *rbd_dev = NULL; 1790 bool layered = false; 1791 1792 if (obj_request_img_data_test(obj_request)) { 1793 img_request = obj_request->img_request; 1794 layered = img_request && img_request_layered_test(img_request); 1795 rbd_dev = img_request->rbd_dev; 1796 } 1797 1798 dout("%s: obj %p img %p result %d %llu/%llu\n", __func__, 1799 obj_request, img_request, obj_request->result, 1800 obj_request->xferred, obj_request->length); 1801 if (layered && obj_request->result == -ENOENT && 1802 obj_request->img_offset < rbd_dev->parent_overlap) 1803 rbd_img_parent_read(obj_request); 1804 else if (img_request) 1805 rbd_img_obj_request_read_callback(obj_request); 1806 else 1807 obj_request_done_set(obj_request); 1808 } 1809 1810 static void rbd_osd_write_callback(struct rbd_obj_request *obj_request) 1811 { 1812 dout("%s: obj %p result %d %llu\n", __func__, obj_request, 1813 obj_request->result, obj_request->length); 1814 /* 1815 * There is no such thing as a successful short write. Set 1816 * it to our originally-requested length. 1817 */ 1818 obj_request->xferred = obj_request->length; 1819 obj_request_done_set(obj_request); 1820 } 1821 1822 static void rbd_osd_discard_callback(struct rbd_obj_request *obj_request) 1823 { 1824 dout("%s: obj %p result %d %llu\n", __func__, obj_request, 1825 obj_request->result, obj_request->length); 1826 /* 1827 * There is no such thing as a successful short discard. Set 1828 * it to our originally-requested length. 1829 */ 1830 obj_request->xferred = obj_request->length; 1831 /* discarding a non-existent object is not a problem */ 1832 if (obj_request->result == -ENOENT) 1833 obj_request->result = 0; 1834 obj_request_done_set(obj_request); 1835 } 1836 1837 /* 1838 * For a simple stat call there's nothing to do. We'll do more if 1839 * this is part of a write sequence for a layered image. 1840 */ 1841 static void rbd_osd_stat_callback(struct rbd_obj_request *obj_request) 1842 { 1843 dout("%s: obj %p\n", __func__, obj_request); 1844 obj_request_done_set(obj_request); 1845 } 1846 1847 static void rbd_osd_call_callback(struct rbd_obj_request *obj_request) 1848 { 1849 dout("%s: obj %p\n", __func__, obj_request); 1850 1851 if (obj_request_img_data_test(obj_request)) 1852 rbd_osd_copyup_callback(obj_request); 1853 else 1854 obj_request_done_set(obj_request); 1855 } 1856 1857 static void rbd_osd_req_callback(struct ceph_osd_request *osd_req) 1858 { 1859 struct rbd_obj_request *obj_request = osd_req->r_priv; 1860 u16 opcode; 1861 1862 dout("%s: osd_req %p\n", __func__, osd_req); 1863 rbd_assert(osd_req == obj_request->osd_req); 1864 if (obj_request_img_data_test(obj_request)) { 1865 rbd_assert(obj_request->img_request); 1866 rbd_assert(obj_request->which != BAD_WHICH); 1867 } else { 1868 rbd_assert(obj_request->which == BAD_WHICH); 1869 } 1870 1871 if (osd_req->r_result < 0) 1872 obj_request->result = osd_req->r_result; 1873 1874 /* 1875 * We support a 64-bit length, but ultimately it has to be 1876 * passed to the block layer, which just supports a 32-bit 1877 * length field. 1878 */ 1879 obj_request->xferred = osd_req->r_ops[0].outdata_len; 1880 rbd_assert(obj_request->xferred < (u64)UINT_MAX); 1881 1882 opcode = osd_req->r_ops[0].op; 1883 switch (opcode) { 1884 case CEPH_OSD_OP_READ: 1885 rbd_osd_read_callback(obj_request); 1886 break; 1887 case CEPH_OSD_OP_SETALLOCHINT: 1888 rbd_assert(osd_req->r_ops[1].op == CEPH_OSD_OP_WRITE || 1889 osd_req->r_ops[1].op == CEPH_OSD_OP_WRITEFULL); 1890 /* fall through */ 1891 case CEPH_OSD_OP_WRITE: 1892 case CEPH_OSD_OP_WRITEFULL: 1893 rbd_osd_write_callback(obj_request); 1894 break; 1895 case CEPH_OSD_OP_STAT: 1896 rbd_osd_stat_callback(obj_request); 1897 break; 1898 case CEPH_OSD_OP_DELETE: 1899 case CEPH_OSD_OP_TRUNCATE: 1900 case CEPH_OSD_OP_ZERO: 1901 rbd_osd_discard_callback(obj_request); 1902 break; 1903 case CEPH_OSD_OP_CALL: 1904 rbd_osd_call_callback(obj_request); 1905 break; 1906 default: 1907 rbd_warn(NULL, "unexpected OSD op: object_no %016llx opcode %d", 1908 obj_request->object_no, opcode); 1909 break; 1910 } 1911 1912 if (obj_request_done_test(obj_request)) 1913 rbd_obj_request_complete(obj_request); 1914 } 1915 1916 static void rbd_osd_req_format_read(struct rbd_obj_request *obj_request) 1917 { 1918 struct ceph_osd_request *osd_req = obj_request->osd_req; 1919 1920 rbd_assert(obj_request_img_data_test(obj_request)); 1921 osd_req->r_snapid = obj_request->img_request->snap_id; 1922 } 1923 1924 static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request) 1925 { 1926 struct ceph_osd_request *osd_req = obj_request->osd_req; 1927 1928 ktime_get_real_ts(&osd_req->r_mtime); 1929 osd_req->r_data_offset = obj_request->offset; 1930 } 1931 1932 static struct ceph_osd_request * 1933 __rbd_osd_req_create(struct rbd_device *rbd_dev, 1934 struct ceph_snap_context *snapc, 1935 int num_ops, unsigned int flags, 1936 struct rbd_obj_request *obj_request) 1937 { 1938 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; 1939 struct ceph_osd_request *req; 1940 const char *name_format = rbd_dev->image_format == 1 ? 1941 RBD_V1_DATA_FORMAT : RBD_V2_DATA_FORMAT; 1942 1943 req = ceph_osdc_alloc_request(osdc, snapc, num_ops, false, GFP_NOIO); 1944 if (!req) 1945 return NULL; 1946 1947 req->r_flags = flags; 1948 req->r_callback = rbd_osd_req_callback; 1949 req->r_priv = obj_request; 1950 1951 req->r_base_oloc.pool = rbd_dev->layout.pool_id; 1952 if (ceph_oid_aprintf(&req->r_base_oid, GFP_NOIO, name_format, 1953 rbd_dev->header.object_prefix, obj_request->object_no)) 1954 goto err_req; 1955 1956 if (ceph_osdc_alloc_messages(req, GFP_NOIO)) 1957 goto err_req; 1958 1959 return req; 1960 1961 err_req: 1962 ceph_osdc_put_request(req); 1963 return NULL; 1964 } 1965 1966 /* 1967 * Create an osd request. A read request has one osd op (read). 1968 * A write request has either one (watch) or two (hint+write) osd ops. 1969 * (All rbd data writes are prefixed with an allocation hint op, but 1970 * technically osd watch is a write request, hence this distinction.) 1971 */ 1972 static struct ceph_osd_request *rbd_osd_req_create( 1973 struct rbd_device *rbd_dev, 1974 enum obj_operation_type op_type, 1975 unsigned int num_ops, 1976 struct rbd_obj_request *obj_request) 1977 { 1978 struct ceph_snap_context *snapc = NULL; 1979 1980 if (obj_request_img_data_test(obj_request) && 1981 (op_type == OBJ_OP_DISCARD || op_type == OBJ_OP_WRITE)) { 1982 struct rbd_img_request *img_request = obj_request->img_request; 1983 if (op_type == OBJ_OP_WRITE) { 1984 rbd_assert(img_request_write_test(img_request)); 1985 } else { 1986 rbd_assert(img_request_discard_test(img_request)); 1987 } 1988 snapc = img_request->snapc; 1989 } 1990 1991 rbd_assert(num_ops == 1 || ((op_type == OBJ_OP_WRITE) && num_ops == 2)); 1992 1993 return __rbd_osd_req_create(rbd_dev, snapc, num_ops, 1994 (op_type == OBJ_OP_WRITE || op_type == OBJ_OP_DISCARD) ? 1995 CEPH_OSD_FLAG_WRITE : CEPH_OSD_FLAG_READ, obj_request); 1996 } 1997 1998 /* 1999 * Create a copyup osd request based on the information in the object 2000 * request supplied. A copyup request has two or three osd ops, a 2001 * copyup method call, potentially a hint op, and a write or truncate 2002 * or zero op. 2003 */ 2004 static struct ceph_osd_request * 2005 rbd_osd_req_create_copyup(struct rbd_obj_request *obj_request) 2006 { 2007 struct rbd_img_request *img_request; 2008 int num_osd_ops = 3; 2009 2010 rbd_assert(obj_request_img_data_test(obj_request)); 2011 img_request = obj_request->img_request; 2012 rbd_assert(img_request); 2013 rbd_assert(img_request_write_test(img_request) || 2014 img_request_discard_test(img_request)); 2015 2016 if (img_request_discard_test(img_request)) 2017 num_osd_ops = 2; 2018 2019 return __rbd_osd_req_create(img_request->rbd_dev, 2020 img_request->snapc, num_osd_ops, 2021 CEPH_OSD_FLAG_WRITE, obj_request); 2022 } 2023 2024 static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req) 2025 { 2026 ceph_osdc_put_request(osd_req); 2027 } 2028 2029 static struct rbd_obj_request * 2030 rbd_obj_request_create(enum obj_request_type type) 2031 { 2032 struct rbd_obj_request *obj_request; 2033 2034 rbd_assert(obj_request_type_valid(type)); 2035 2036 obj_request = kmem_cache_zalloc(rbd_obj_request_cache, GFP_NOIO); 2037 if (!obj_request) 2038 return NULL; 2039 2040 obj_request->which = BAD_WHICH; 2041 obj_request->type = type; 2042 INIT_LIST_HEAD(&obj_request->links); 2043 init_completion(&obj_request->completion); 2044 kref_init(&obj_request->kref); 2045 2046 dout("%s %p\n", __func__, obj_request); 2047 return obj_request; 2048 } 2049 2050 static void rbd_obj_request_destroy(struct kref *kref) 2051 { 2052 struct rbd_obj_request *obj_request; 2053 2054 obj_request = container_of(kref, struct rbd_obj_request, kref); 2055 2056 dout("%s: obj %p\n", __func__, obj_request); 2057 2058 rbd_assert(obj_request->img_request == NULL); 2059 rbd_assert(obj_request->which == BAD_WHICH); 2060 2061 if (obj_request->osd_req) 2062 rbd_osd_req_destroy(obj_request->osd_req); 2063 2064 rbd_assert(obj_request_type_valid(obj_request->type)); 2065 switch (obj_request->type) { 2066 case OBJ_REQUEST_NODATA: 2067 break; /* Nothing to do */ 2068 case OBJ_REQUEST_BIO: 2069 if (obj_request->bio_list) 2070 bio_chain_put(obj_request->bio_list); 2071 break; 2072 case OBJ_REQUEST_PAGES: 2073 /* img_data requests don't own their page array */ 2074 if (obj_request->pages && 2075 !obj_request_img_data_test(obj_request)) 2076 ceph_release_page_vector(obj_request->pages, 2077 obj_request->page_count); 2078 break; 2079 } 2080 2081 kmem_cache_free(rbd_obj_request_cache, obj_request); 2082 } 2083 2084 /* It's OK to call this for a device with no parent */ 2085 2086 static void rbd_spec_put(struct rbd_spec *spec); 2087 static void rbd_dev_unparent(struct rbd_device *rbd_dev) 2088 { 2089 rbd_dev_remove_parent(rbd_dev); 2090 rbd_spec_put(rbd_dev->parent_spec); 2091 rbd_dev->parent_spec = NULL; 2092 rbd_dev->parent_overlap = 0; 2093 } 2094 2095 /* 2096 * Parent image reference counting is used to determine when an 2097 * image's parent fields can be safely torn down--after there are no 2098 * more in-flight requests to the parent image. When the last 2099 * reference is dropped, cleaning them up is safe. 2100 */ 2101 static void rbd_dev_parent_put(struct rbd_device *rbd_dev) 2102 { 2103 int counter; 2104 2105 if (!rbd_dev->parent_spec) 2106 return; 2107 2108 counter = atomic_dec_return_safe(&rbd_dev->parent_ref); 2109 if (counter > 0) 2110 return; 2111 2112 /* Last reference; clean up parent data structures */ 2113 2114 if (!counter) 2115 rbd_dev_unparent(rbd_dev); 2116 else 2117 rbd_warn(rbd_dev, "parent reference underflow"); 2118 } 2119 2120 /* 2121 * If an image has a non-zero parent overlap, get a reference to its 2122 * parent. 2123 * 2124 * Returns true if the rbd device has a parent with a non-zero 2125 * overlap and a reference for it was successfully taken, or 2126 * false otherwise. 2127 */ 2128 static bool rbd_dev_parent_get(struct rbd_device *rbd_dev) 2129 { 2130 int counter = 0; 2131 2132 if (!rbd_dev->parent_spec) 2133 return false; 2134 2135 down_read(&rbd_dev->header_rwsem); 2136 if (rbd_dev->parent_overlap) 2137 counter = atomic_inc_return_safe(&rbd_dev->parent_ref); 2138 up_read(&rbd_dev->header_rwsem); 2139 2140 if (counter < 0) 2141 rbd_warn(rbd_dev, "parent reference overflow"); 2142 2143 return counter > 0; 2144 } 2145 2146 /* 2147 * Caller is responsible for filling in the list of object requests 2148 * that comprises the image request, and the Linux request pointer 2149 * (if there is one). 2150 */ 2151 static struct rbd_img_request *rbd_img_request_create( 2152 struct rbd_device *rbd_dev, 2153 u64 offset, u64 length, 2154 enum obj_operation_type op_type, 2155 struct ceph_snap_context *snapc) 2156 { 2157 struct rbd_img_request *img_request; 2158 2159 img_request = kmem_cache_alloc(rbd_img_request_cache, GFP_NOIO); 2160 if (!img_request) 2161 return NULL; 2162 2163 img_request->rq = NULL; 2164 img_request->rbd_dev = rbd_dev; 2165 img_request->offset = offset; 2166 img_request->length = length; 2167 img_request->flags = 0; 2168 if (op_type == OBJ_OP_DISCARD) { 2169 img_request_discard_set(img_request); 2170 img_request->snapc = snapc; 2171 } else if (op_type == OBJ_OP_WRITE) { 2172 img_request_write_set(img_request); 2173 img_request->snapc = snapc; 2174 } else { 2175 img_request->snap_id = rbd_dev->spec->snap_id; 2176 } 2177 if (rbd_dev_parent_get(rbd_dev)) 2178 img_request_layered_set(img_request); 2179 spin_lock_init(&img_request->completion_lock); 2180 img_request->next_completion = 0; 2181 img_request->callback = NULL; 2182 img_request->result = 0; 2183 img_request->obj_request_count = 0; 2184 INIT_LIST_HEAD(&img_request->obj_requests); 2185 kref_init(&img_request->kref); 2186 2187 dout("%s: rbd_dev %p %s %llu/%llu -> img %p\n", __func__, rbd_dev, 2188 obj_op_name(op_type), offset, length, img_request); 2189 2190 return img_request; 2191 } 2192 2193 static void rbd_img_request_destroy(struct kref *kref) 2194 { 2195 struct rbd_img_request *img_request; 2196 struct rbd_obj_request *obj_request; 2197 struct rbd_obj_request *next_obj_request; 2198 2199 img_request = container_of(kref, struct rbd_img_request, kref); 2200 2201 dout("%s: img %p\n", __func__, img_request); 2202 2203 for_each_obj_request_safe(img_request, obj_request, next_obj_request) 2204 rbd_img_obj_request_del(img_request, obj_request); 2205 rbd_assert(img_request->obj_request_count == 0); 2206 2207 if (img_request_layered_test(img_request)) { 2208 img_request_layered_clear(img_request); 2209 rbd_dev_parent_put(img_request->rbd_dev); 2210 } 2211 2212 if (img_request_write_test(img_request) || 2213 img_request_discard_test(img_request)) 2214 ceph_put_snap_context(img_request->snapc); 2215 2216 kmem_cache_free(rbd_img_request_cache, img_request); 2217 } 2218 2219 static struct rbd_img_request *rbd_parent_request_create( 2220 struct rbd_obj_request *obj_request, 2221 u64 img_offset, u64 length) 2222 { 2223 struct rbd_img_request *parent_request; 2224 struct rbd_device *rbd_dev; 2225 2226 rbd_assert(obj_request->img_request); 2227 rbd_dev = obj_request->img_request->rbd_dev; 2228 2229 parent_request = rbd_img_request_create(rbd_dev->parent, img_offset, 2230 length, OBJ_OP_READ, NULL); 2231 if (!parent_request) 2232 return NULL; 2233 2234 img_request_child_set(parent_request); 2235 rbd_obj_request_get(obj_request); 2236 parent_request->obj_request = obj_request; 2237 2238 return parent_request; 2239 } 2240 2241 static void rbd_parent_request_destroy(struct kref *kref) 2242 { 2243 struct rbd_img_request *parent_request; 2244 struct rbd_obj_request *orig_request; 2245 2246 parent_request = container_of(kref, struct rbd_img_request, kref); 2247 orig_request = parent_request->obj_request; 2248 2249 parent_request->obj_request = NULL; 2250 rbd_obj_request_put(orig_request); 2251 img_request_child_clear(parent_request); 2252 2253 rbd_img_request_destroy(kref); 2254 } 2255 2256 static bool rbd_img_obj_end_request(struct rbd_obj_request *obj_request) 2257 { 2258 struct rbd_img_request *img_request; 2259 unsigned int xferred; 2260 int result; 2261 bool more; 2262 2263 rbd_assert(obj_request_img_data_test(obj_request)); 2264 img_request = obj_request->img_request; 2265 2266 rbd_assert(obj_request->xferred <= (u64)UINT_MAX); 2267 xferred = (unsigned int)obj_request->xferred; 2268 result = obj_request->result; 2269 if (result) { 2270 struct rbd_device *rbd_dev = img_request->rbd_dev; 2271 enum obj_operation_type op_type; 2272 2273 if (img_request_discard_test(img_request)) 2274 op_type = OBJ_OP_DISCARD; 2275 else if (img_request_write_test(img_request)) 2276 op_type = OBJ_OP_WRITE; 2277 else 2278 op_type = OBJ_OP_READ; 2279 2280 rbd_warn(rbd_dev, "%s %llx at %llx (%llx)", 2281 obj_op_name(op_type), obj_request->length, 2282 obj_request->img_offset, obj_request->offset); 2283 rbd_warn(rbd_dev, " result %d xferred %x", 2284 result, xferred); 2285 if (!img_request->result) 2286 img_request->result = result; 2287 /* 2288 * Need to end I/O on the entire obj_request worth of 2289 * bytes in case of error. 2290 */ 2291 xferred = obj_request->length; 2292 } 2293 2294 if (img_request_child_test(img_request)) { 2295 rbd_assert(img_request->obj_request != NULL); 2296 more = obj_request->which < img_request->obj_request_count - 1; 2297 } else { 2298 blk_status_t status = errno_to_blk_status(result); 2299 2300 rbd_assert(img_request->rq != NULL); 2301 2302 more = blk_update_request(img_request->rq, status, xferred); 2303 if (!more) 2304 __blk_mq_end_request(img_request->rq, status); 2305 } 2306 2307 return more; 2308 } 2309 2310 static void rbd_img_obj_callback(struct rbd_obj_request *obj_request) 2311 { 2312 struct rbd_img_request *img_request; 2313 u32 which = obj_request->which; 2314 bool more = true; 2315 2316 rbd_assert(obj_request_img_data_test(obj_request)); 2317 img_request = obj_request->img_request; 2318 2319 dout("%s: img %p obj %p\n", __func__, img_request, obj_request); 2320 rbd_assert(img_request != NULL); 2321 rbd_assert(img_request->obj_request_count > 0); 2322 rbd_assert(which != BAD_WHICH); 2323 rbd_assert(which < img_request->obj_request_count); 2324 2325 spin_lock_irq(&img_request->completion_lock); 2326 if (which != img_request->next_completion) 2327 goto out; 2328 2329 for_each_obj_request_from(img_request, obj_request) { 2330 rbd_assert(more); 2331 rbd_assert(which < img_request->obj_request_count); 2332 2333 if (!obj_request_done_test(obj_request)) 2334 break; 2335 more = rbd_img_obj_end_request(obj_request); 2336 which++; 2337 } 2338 2339 rbd_assert(more ^ (which == img_request->obj_request_count)); 2340 img_request->next_completion = which; 2341 out: 2342 spin_unlock_irq(&img_request->completion_lock); 2343 rbd_img_request_put(img_request); 2344 2345 if (!more) 2346 rbd_img_request_complete(img_request); 2347 } 2348 2349 /* 2350 * Add individual osd ops to the given ceph_osd_request and prepare 2351 * them for submission. num_ops is the current number of 2352 * osd operations already to the object request. 2353 */ 2354 static void rbd_img_obj_request_fill(struct rbd_obj_request *obj_request, 2355 struct ceph_osd_request *osd_request, 2356 enum obj_operation_type op_type, 2357 unsigned int num_ops) 2358 { 2359 struct rbd_img_request *img_request = obj_request->img_request; 2360 struct rbd_device *rbd_dev = img_request->rbd_dev; 2361 u64 object_size = rbd_obj_bytes(&rbd_dev->header); 2362 u64 offset = obj_request->offset; 2363 u64 length = obj_request->length; 2364 u64 img_end; 2365 u16 opcode; 2366 2367 if (op_type == OBJ_OP_DISCARD) { 2368 if (!offset && length == object_size && 2369 (!img_request_layered_test(img_request) || 2370 !obj_request_overlaps_parent(obj_request))) { 2371 opcode = CEPH_OSD_OP_DELETE; 2372 } else if ((offset + length == object_size)) { 2373 opcode = CEPH_OSD_OP_TRUNCATE; 2374 } else { 2375 down_read(&rbd_dev->header_rwsem); 2376 img_end = rbd_dev->header.image_size; 2377 up_read(&rbd_dev->header_rwsem); 2378 2379 if (obj_request->img_offset + length == img_end) 2380 opcode = CEPH_OSD_OP_TRUNCATE; 2381 else 2382 opcode = CEPH_OSD_OP_ZERO; 2383 } 2384 } else if (op_type == OBJ_OP_WRITE) { 2385 if (!offset && length == object_size) 2386 opcode = CEPH_OSD_OP_WRITEFULL; 2387 else 2388 opcode = CEPH_OSD_OP_WRITE; 2389 osd_req_op_alloc_hint_init(osd_request, num_ops, 2390 object_size, object_size); 2391 num_ops++; 2392 } else { 2393 opcode = CEPH_OSD_OP_READ; 2394 } 2395 2396 if (opcode == CEPH_OSD_OP_DELETE) 2397 osd_req_op_init(osd_request, num_ops, opcode, 0); 2398 else 2399 osd_req_op_extent_init(osd_request, num_ops, opcode, 2400 offset, length, 0, 0); 2401 2402 if (obj_request->type == OBJ_REQUEST_BIO) 2403 osd_req_op_extent_osd_data_bio(osd_request, num_ops, 2404 obj_request->bio_list, length); 2405 else if (obj_request->type == OBJ_REQUEST_PAGES) 2406 osd_req_op_extent_osd_data_pages(osd_request, num_ops, 2407 obj_request->pages, length, 2408 offset & ~PAGE_MASK, false, false); 2409 2410 /* Discards are also writes */ 2411 if (op_type == OBJ_OP_WRITE || op_type == OBJ_OP_DISCARD) 2412 rbd_osd_req_format_write(obj_request); 2413 else 2414 rbd_osd_req_format_read(obj_request); 2415 } 2416 2417 /* 2418 * Split up an image request into one or more object requests, each 2419 * to a different object. The "type" parameter indicates whether 2420 * "data_desc" is the pointer to the head of a list of bio 2421 * structures, or the base of a page array. In either case this 2422 * function assumes data_desc describes memory sufficient to hold 2423 * all data described by the image request. 2424 */ 2425 static int rbd_img_request_fill(struct rbd_img_request *img_request, 2426 enum obj_request_type type, 2427 void *data_desc) 2428 { 2429 struct rbd_device *rbd_dev = img_request->rbd_dev; 2430 struct rbd_obj_request *obj_request = NULL; 2431 struct rbd_obj_request *next_obj_request; 2432 struct bio *bio_list = NULL; 2433 unsigned int bio_offset = 0; 2434 struct page **pages = NULL; 2435 enum obj_operation_type op_type; 2436 u64 img_offset; 2437 u64 resid; 2438 2439 dout("%s: img %p type %d data_desc %p\n", __func__, img_request, 2440 (int)type, data_desc); 2441 2442 img_offset = img_request->offset; 2443 resid = img_request->length; 2444 rbd_assert(resid > 0); 2445 op_type = rbd_img_request_op_type(img_request); 2446 2447 if (type == OBJ_REQUEST_BIO) { 2448 bio_list = data_desc; 2449 rbd_assert(img_offset == 2450 bio_list->bi_iter.bi_sector << SECTOR_SHIFT); 2451 } else if (type == OBJ_REQUEST_PAGES) { 2452 pages = data_desc; 2453 } 2454 2455 while (resid) { 2456 struct ceph_osd_request *osd_req; 2457 u64 object_no = img_offset >> rbd_dev->header.obj_order; 2458 u64 offset = rbd_segment_offset(rbd_dev, img_offset); 2459 u64 length = rbd_segment_length(rbd_dev, img_offset, resid); 2460 2461 obj_request = rbd_obj_request_create(type); 2462 if (!obj_request) 2463 goto out_unwind; 2464 2465 obj_request->object_no = object_no; 2466 obj_request->offset = offset; 2467 obj_request->length = length; 2468 2469 /* 2470 * set obj_request->img_request before creating the 2471 * osd_request so that it gets the right snapc 2472 */ 2473 rbd_img_obj_request_add(img_request, obj_request); 2474 2475 if (type == OBJ_REQUEST_BIO) { 2476 unsigned int clone_size; 2477 2478 rbd_assert(length <= (u64)UINT_MAX); 2479 clone_size = (unsigned int)length; 2480 obj_request->bio_list = 2481 bio_chain_clone_range(&bio_list, 2482 &bio_offset, 2483 clone_size, 2484 GFP_NOIO); 2485 if (!obj_request->bio_list) 2486 goto out_unwind; 2487 } else if (type == OBJ_REQUEST_PAGES) { 2488 unsigned int page_count; 2489 2490 obj_request->pages = pages; 2491 page_count = (u32)calc_pages_for(offset, length); 2492 obj_request->page_count = page_count; 2493 if ((offset + length) & ~PAGE_MASK) 2494 page_count--; /* more on last page */ 2495 pages += page_count; 2496 } 2497 2498 osd_req = rbd_osd_req_create(rbd_dev, op_type, 2499 (op_type == OBJ_OP_WRITE) ? 2 : 1, 2500 obj_request); 2501 if (!osd_req) 2502 goto out_unwind; 2503 2504 obj_request->osd_req = osd_req; 2505 obj_request->callback = rbd_img_obj_callback; 2506 obj_request->img_offset = img_offset; 2507 2508 rbd_img_obj_request_fill(obj_request, osd_req, op_type, 0); 2509 2510 img_offset += length; 2511 resid -= length; 2512 } 2513 2514 return 0; 2515 2516 out_unwind: 2517 for_each_obj_request_safe(img_request, obj_request, next_obj_request) 2518 rbd_img_obj_request_del(img_request, obj_request); 2519 2520 return -ENOMEM; 2521 } 2522 2523 static void 2524 rbd_osd_copyup_callback(struct rbd_obj_request *obj_request) 2525 { 2526 struct rbd_img_request *img_request; 2527 struct rbd_device *rbd_dev; 2528 struct page **pages; 2529 u32 page_count; 2530 2531 dout("%s: obj %p\n", __func__, obj_request); 2532 2533 rbd_assert(obj_request->type == OBJ_REQUEST_BIO || 2534 obj_request->type == OBJ_REQUEST_NODATA); 2535 rbd_assert(obj_request_img_data_test(obj_request)); 2536 img_request = obj_request->img_request; 2537 rbd_assert(img_request); 2538 2539 rbd_dev = img_request->rbd_dev; 2540 rbd_assert(rbd_dev); 2541 2542 pages = obj_request->copyup_pages; 2543 rbd_assert(pages != NULL); 2544 obj_request->copyup_pages = NULL; 2545 page_count = obj_request->copyup_page_count; 2546 rbd_assert(page_count); 2547 obj_request->copyup_page_count = 0; 2548 ceph_release_page_vector(pages, page_count); 2549 2550 /* 2551 * We want the transfer count to reflect the size of the 2552 * original write request. There is no such thing as a 2553 * successful short write, so if the request was successful 2554 * we can just set it to the originally-requested length. 2555 */ 2556 if (!obj_request->result) 2557 obj_request->xferred = obj_request->length; 2558 2559 obj_request_done_set(obj_request); 2560 } 2561 2562 static void 2563 rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request) 2564 { 2565 struct rbd_obj_request *orig_request; 2566 struct ceph_osd_request *osd_req; 2567 struct rbd_device *rbd_dev; 2568 struct page **pages; 2569 enum obj_operation_type op_type; 2570 u32 page_count; 2571 int img_result; 2572 u64 parent_length; 2573 2574 rbd_assert(img_request_child_test(img_request)); 2575 2576 /* First get what we need from the image request */ 2577 2578 pages = img_request->copyup_pages; 2579 rbd_assert(pages != NULL); 2580 img_request->copyup_pages = NULL; 2581 page_count = img_request->copyup_page_count; 2582 rbd_assert(page_count); 2583 img_request->copyup_page_count = 0; 2584 2585 orig_request = img_request->obj_request; 2586 rbd_assert(orig_request != NULL); 2587 rbd_assert(obj_request_type_valid(orig_request->type)); 2588 img_result = img_request->result; 2589 parent_length = img_request->length; 2590 rbd_assert(img_result || parent_length == img_request->xferred); 2591 rbd_img_request_put(img_request); 2592 2593 rbd_assert(orig_request->img_request); 2594 rbd_dev = orig_request->img_request->rbd_dev; 2595 rbd_assert(rbd_dev); 2596 2597 /* 2598 * If the overlap has become 0 (most likely because the 2599 * image has been flattened) we need to free the pages 2600 * and re-submit the original write request. 2601 */ 2602 if (!rbd_dev->parent_overlap) { 2603 ceph_release_page_vector(pages, page_count); 2604 rbd_obj_request_submit(orig_request); 2605 return; 2606 } 2607 2608 if (img_result) 2609 goto out_err; 2610 2611 /* 2612 * The original osd request is of no use to use any more. 2613 * We need a new one that can hold the three ops in a copyup 2614 * request. Allocate the new copyup osd request for the 2615 * original request, and release the old one. 2616 */ 2617 img_result = -ENOMEM; 2618 osd_req = rbd_osd_req_create_copyup(orig_request); 2619 if (!osd_req) 2620 goto out_err; 2621 rbd_osd_req_destroy(orig_request->osd_req); 2622 orig_request->osd_req = osd_req; 2623 orig_request->copyup_pages = pages; 2624 orig_request->copyup_page_count = page_count; 2625 2626 /* Initialize the copyup op */ 2627 2628 osd_req_op_cls_init(osd_req, 0, CEPH_OSD_OP_CALL, "rbd", "copyup"); 2629 osd_req_op_cls_request_data_pages(osd_req, 0, pages, parent_length, 0, 2630 false, false); 2631 2632 /* Add the other op(s) */ 2633 2634 op_type = rbd_img_request_op_type(orig_request->img_request); 2635 rbd_img_obj_request_fill(orig_request, osd_req, op_type, 1); 2636 2637 /* All set, send it off. */ 2638 2639 rbd_obj_request_submit(orig_request); 2640 return; 2641 2642 out_err: 2643 ceph_release_page_vector(pages, page_count); 2644 rbd_obj_request_error(orig_request, img_result); 2645 } 2646 2647 /* 2648 * Read from the parent image the range of data that covers the 2649 * entire target of the given object request. This is used for 2650 * satisfying a layered image write request when the target of an 2651 * object request from the image request does not exist. 2652 * 2653 * A page array big enough to hold the returned data is allocated 2654 * and supplied to rbd_img_request_fill() as the "data descriptor." 2655 * When the read completes, this page array will be transferred to 2656 * the original object request for the copyup operation. 2657 * 2658 * If an error occurs, it is recorded as the result of the original 2659 * object request in rbd_img_obj_exists_callback(). 2660 */ 2661 static int rbd_img_obj_parent_read_full(struct rbd_obj_request *obj_request) 2662 { 2663 struct rbd_device *rbd_dev = obj_request->img_request->rbd_dev; 2664 struct rbd_img_request *parent_request = NULL; 2665 u64 img_offset; 2666 u64 length; 2667 struct page **pages = NULL; 2668 u32 page_count; 2669 int result; 2670 2671 rbd_assert(rbd_dev->parent != NULL); 2672 2673 /* 2674 * Determine the byte range covered by the object in the 2675 * child image to which the original request was to be sent. 2676 */ 2677 img_offset = obj_request->img_offset - obj_request->offset; 2678 length = rbd_obj_bytes(&rbd_dev->header); 2679 2680 /* 2681 * There is no defined parent data beyond the parent 2682 * overlap, so limit what we read at that boundary if 2683 * necessary. 2684 */ 2685 if (img_offset + length > rbd_dev->parent_overlap) { 2686 rbd_assert(img_offset < rbd_dev->parent_overlap); 2687 length = rbd_dev->parent_overlap - img_offset; 2688 } 2689 2690 /* 2691 * Allocate a page array big enough to receive the data read 2692 * from the parent. 2693 */ 2694 page_count = (u32)calc_pages_for(0, length); 2695 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL); 2696 if (IS_ERR(pages)) { 2697 result = PTR_ERR(pages); 2698 pages = NULL; 2699 goto out_err; 2700 } 2701 2702 result = -ENOMEM; 2703 parent_request = rbd_parent_request_create(obj_request, 2704 img_offset, length); 2705 if (!parent_request) 2706 goto out_err; 2707 2708 result = rbd_img_request_fill(parent_request, OBJ_REQUEST_PAGES, pages); 2709 if (result) 2710 goto out_err; 2711 2712 parent_request->copyup_pages = pages; 2713 parent_request->copyup_page_count = page_count; 2714 parent_request->callback = rbd_img_obj_parent_read_full_callback; 2715 2716 result = rbd_img_request_submit(parent_request); 2717 if (!result) 2718 return 0; 2719 2720 parent_request->copyup_pages = NULL; 2721 parent_request->copyup_page_count = 0; 2722 parent_request->obj_request = NULL; 2723 rbd_obj_request_put(obj_request); 2724 out_err: 2725 if (pages) 2726 ceph_release_page_vector(pages, page_count); 2727 if (parent_request) 2728 rbd_img_request_put(parent_request); 2729 return result; 2730 } 2731 2732 static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request) 2733 { 2734 struct rbd_obj_request *orig_request; 2735 struct rbd_device *rbd_dev; 2736 int result; 2737 2738 rbd_assert(!obj_request_img_data_test(obj_request)); 2739 2740 /* 2741 * All we need from the object request is the original 2742 * request and the result of the STAT op. Grab those, then 2743 * we're done with the request. 2744 */ 2745 orig_request = obj_request->obj_request; 2746 obj_request->obj_request = NULL; 2747 rbd_obj_request_put(orig_request); 2748 rbd_assert(orig_request); 2749 rbd_assert(orig_request->img_request); 2750 2751 result = obj_request->result; 2752 obj_request->result = 0; 2753 2754 dout("%s: obj %p for obj %p result %d %llu/%llu\n", __func__, 2755 obj_request, orig_request, result, 2756 obj_request->xferred, obj_request->length); 2757 rbd_obj_request_put(obj_request); 2758 2759 /* 2760 * If the overlap has become 0 (most likely because the 2761 * image has been flattened) we need to re-submit the 2762 * original request. 2763 */ 2764 rbd_dev = orig_request->img_request->rbd_dev; 2765 if (!rbd_dev->parent_overlap) { 2766 rbd_obj_request_submit(orig_request); 2767 return; 2768 } 2769 2770 /* 2771 * Our only purpose here is to determine whether the object 2772 * exists, and we don't want to treat the non-existence as 2773 * an error. If something else comes back, transfer the 2774 * error to the original request and complete it now. 2775 */ 2776 if (!result) { 2777 obj_request_existence_set(orig_request, true); 2778 } else if (result == -ENOENT) { 2779 obj_request_existence_set(orig_request, false); 2780 } else { 2781 goto fail_orig_request; 2782 } 2783 2784 /* 2785 * Resubmit the original request now that we have recorded 2786 * whether the target object exists. 2787 */ 2788 result = rbd_img_obj_request_submit(orig_request); 2789 if (result) 2790 goto fail_orig_request; 2791 2792 return; 2793 2794 fail_orig_request: 2795 rbd_obj_request_error(orig_request, result); 2796 } 2797 2798 static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request) 2799 { 2800 struct rbd_device *rbd_dev = obj_request->img_request->rbd_dev; 2801 struct rbd_obj_request *stat_request; 2802 struct page **pages; 2803 u32 page_count; 2804 size_t size; 2805 int ret; 2806 2807 stat_request = rbd_obj_request_create(OBJ_REQUEST_PAGES); 2808 if (!stat_request) 2809 return -ENOMEM; 2810 2811 stat_request->object_no = obj_request->object_no; 2812 2813 stat_request->osd_req = rbd_osd_req_create(rbd_dev, OBJ_OP_READ, 1, 2814 stat_request); 2815 if (!stat_request->osd_req) { 2816 ret = -ENOMEM; 2817 goto fail_stat_request; 2818 } 2819 2820 /* 2821 * The response data for a STAT call consists of: 2822 * le64 length; 2823 * struct { 2824 * le32 tv_sec; 2825 * le32 tv_nsec; 2826 * } mtime; 2827 */ 2828 size = sizeof (__le64) + sizeof (__le32) + sizeof (__le32); 2829 page_count = (u32)calc_pages_for(0, size); 2830 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL); 2831 if (IS_ERR(pages)) { 2832 ret = PTR_ERR(pages); 2833 goto fail_stat_request; 2834 } 2835 2836 osd_req_op_init(stat_request->osd_req, 0, CEPH_OSD_OP_STAT, 0); 2837 osd_req_op_raw_data_in_pages(stat_request->osd_req, 0, pages, size, 0, 2838 false, false); 2839 2840 rbd_obj_request_get(obj_request); 2841 stat_request->obj_request = obj_request; 2842 stat_request->pages = pages; 2843 stat_request->page_count = page_count; 2844 stat_request->callback = rbd_img_obj_exists_callback; 2845 2846 rbd_obj_request_submit(stat_request); 2847 return 0; 2848 2849 fail_stat_request: 2850 rbd_obj_request_put(stat_request); 2851 return ret; 2852 } 2853 2854 static bool img_obj_request_simple(struct rbd_obj_request *obj_request) 2855 { 2856 struct rbd_img_request *img_request = obj_request->img_request; 2857 struct rbd_device *rbd_dev = img_request->rbd_dev; 2858 2859 /* Reads */ 2860 if (!img_request_write_test(img_request) && 2861 !img_request_discard_test(img_request)) 2862 return true; 2863 2864 /* Non-layered writes */ 2865 if (!img_request_layered_test(img_request)) 2866 return true; 2867 2868 /* 2869 * Layered writes outside of the parent overlap range don't 2870 * share any data with the parent. 2871 */ 2872 if (!obj_request_overlaps_parent(obj_request)) 2873 return true; 2874 2875 /* 2876 * Entire-object layered writes - we will overwrite whatever 2877 * parent data there is anyway. 2878 */ 2879 if (!obj_request->offset && 2880 obj_request->length == rbd_obj_bytes(&rbd_dev->header)) 2881 return true; 2882 2883 /* 2884 * If the object is known to already exist, its parent data has 2885 * already been copied. 2886 */ 2887 if (obj_request_known_test(obj_request) && 2888 obj_request_exists_test(obj_request)) 2889 return true; 2890 2891 return false; 2892 } 2893 2894 static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request) 2895 { 2896 rbd_assert(obj_request_img_data_test(obj_request)); 2897 rbd_assert(obj_request_type_valid(obj_request->type)); 2898 rbd_assert(obj_request->img_request); 2899 2900 if (img_obj_request_simple(obj_request)) { 2901 rbd_obj_request_submit(obj_request); 2902 return 0; 2903 } 2904 2905 /* 2906 * It's a layered write. The target object might exist but 2907 * we may not know that yet. If we know it doesn't exist, 2908 * start by reading the data for the full target object from 2909 * the parent so we can use it for a copyup to the target. 2910 */ 2911 if (obj_request_known_test(obj_request)) 2912 return rbd_img_obj_parent_read_full(obj_request); 2913 2914 /* We don't know whether the target exists. Go find out. */ 2915 2916 return rbd_img_obj_exists_submit(obj_request); 2917 } 2918 2919 static int rbd_img_request_submit(struct rbd_img_request *img_request) 2920 { 2921 struct rbd_obj_request *obj_request; 2922 struct rbd_obj_request *next_obj_request; 2923 int ret = 0; 2924 2925 dout("%s: img %p\n", __func__, img_request); 2926 2927 rbd_img_request_get(img_request); 2928 for_each_obj_request_safe(img_request, obj_request, next_obj_request) { 2929 ret = rbd_img_obj_request_submit(obj_request); 2930 if (ret) 2931 goto out_put_ireq; 2932 } 2933 2934 out_put_ireq: 2935 rbd_img_request_put(img_request); 2936 return ret; 2937 } 2938 2939 static void rbd_img_parent_read_callback(struct rbd_img_request *img_request) 2940 { 2941 struct rbd_obj_request *obj_request; 2942 struct rbd_device *rbd_dev; 2943 u64 obj_end; 2944 u64 img_xferred; 2945 int img_result; 2946 2947 rbd_assert(img_request_child_test(img_request)); 2948 2949 /* First get what we need from the image request and release it */ 2950 2951 obj_request = img_request->obj_request; 2952 img_xferred = img_request->xferred; 2953 img_result = img_request->result; 2954 rbd_img_request_put(img_request); 2955 2956 /* 2957 * If the overlap has become 0 (most likely because the 2958 * image has been flattened) we need to re-submit the 2959 * original request. 2960 */ 2961 rbd_assert(obj_request); 2962 rbd_assert(obj_request->img_request); 2963 rbd_dev = obj_request->img_request->rbd_dev; 2964 if (!rbd_dev->parent_overlap) { 2965 rbd_obj_request_submit(obj_request); 2966 return; 2967 } 2968 2969 obj_request->result = img_result; 2970 if (obj_request->result) 2971 goto out; 2972 2973 /* 2974 * We need to zero anything beyond the parent overlap 2975 * boundary. Since rbd_img_obj_request_read_callback() 2976 * will zero anything beyond the end of a short read, an 2977 * easy way to do this is to pretend the data from the 2978 * parent came up short--ending at the overlap boundary. 2979 */ 2980 rbd_assert(obj_request->img_offset < U64_MAX - obj_request->length); 2981 obj_end = obj_request->img_offset + obj_request->length; 2982 if (obj_end > rbd_dev->parent_overlap) { 2983 u64 xferred = 0; 2984 2985 if (obj_request->img_offset < rbd_dev->parent_overlap) 2986 xferred = rbd_dev->parent_overlap - 2987 obj_request->img_offset; 2988 2989 obj_request->xferred = min(img_xferred, xferred); 2990 } else { 2991 obj_request->xferred = img_xferred; 2992 } 2993 out: 2994 rbd_img_obj_request_read_callback(obj_request); 2995 rbd_obj_request_complete(obj_request); 2996 } 2997 2998 static void rbd_img_parent_read(struct rbd_obj_request *obj_request) 2999 { 3000 struct rbd_img_request *img_request; 3001 int result; 3002 3003 rbd_assert(obj_request_img_data_test(obj_request)); 3004 rbd_assert(obj_request->img_request != NULL); 3005 rbd_assert(obj_request->result == (s32) -ENOENT); 3006 rbd_assert(obj_request_type_valid(obj_request->type)); 3007 3008 /* rbd_read_finish(obj_request, obj_request->length); */ 3009 img_request = rbd_parent_request_create(obj_request, 3010 obj_request->img_offset, 3011 obj_request->length); 3012 result = -ENOMEM; 3013 if (!img_request) 3014 goto out_err; 3015 3016 if (obj_request->type == OBJ_REQUEST_BIO) 3017 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO, 3018 obj_request->bio_list); 3019 else 3020 result = rbd_img_request_fill(img_request, OBJ_REQUEST_PAGES, 3021 obj_request->pages); 3022 if (result) 3023 goto out_err; 3024 3025 img_request->callback = rbd_img_parent_read_callback; 3026 result = rbd_img_request_submit(img_request); 3027 if (result) 3028 goto out_err; 3029 3030 return; 3031 out_err: 3032 if (img_request) 3033 rbd_img_request_put(img_request); 3034 obj_request->result = result; 3035 obj_request->xferred = 0; 3036 obj_request_done_set(obj_request); 3037 } 3038 3039 static const struct rbd_client_id rbd_empty_cid; 3040 3041 static bool rbd_cid_equal(const struct rbd_client_id *lhs, 3042 const struct rbd_client_id *rhs) 3043 { 3044 return lhs->gid == rhs->gid && lhs->handle == rhs->handle; 3045 } 3046 3047 static struct rbd_client_id rbd_get_cid(struct rbd_device *rbd_dev) 3048 { 3049 struct rbd_client_id cid; 3050 3051 mutex_lock(&rbd_dev->watch_mutex); 3052 cid.gid = ceph_client_gid(rbd_dev->rbd_client->client); 3053 cid.handle = rbd_dev->watch_cookie; 3054 mutex_unlock(&rbd_dev->watch_mutex); 3055 return cid; 3056 } 3057 3058 /* 3059 * lock_rwsem must be held for write 3060 */ 3061 static void rbd_set_owner_cid(struct rbd_device *rbd_dev, 3062 const struct rbd_client_id *cid) 3063 { 3064 dout("%s rbd_dev %p %llu-%llu -> %llu-%llu\n", __func__, rbd_dev, 3065 rbd_dev->owner_cid.gid, rbd_dev->owner_cid.handle, 3066 cid->gid, cid->handle); 3067 rbd_dev->owner_cid = *cid; /* struct */ 3068 } 3069 3070 static void format_lock_cookie(struct rbd_device *rbd_dev, char *buf) 3071 { 3072 mutex_lock(&rbd_dev->watch_mutex); 3073 sprintf(buf, "%s %llu", RBD_LOCK_COOKIE_PREFIX, rbd_dev->watch_cookie); 3074 mutex_unlock(&rbd_dev->watch_mutex); 3075 } 3076 3077 /* 3078 * lock_rwsem must be held for write 3079 */ 3080 static int rbd_lock(struct rbd_device *rbd_dev) 3081 { 3082 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; 3083 struct rbd_client_id cid = rbd_get_cid(rbd_dev); 3084 char cookie[32]; 3085 int ret; 3086 3087 WARN_ON(__rbd_is_lock_owner(rbd_dev) || 3088 rbd_dev->lock_cookie[0] != '\0'); 3089 3090 format_lock_cookie(rbd_dev, cookie); 3091 ret = ceph_cls_lock(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc, 3092 RBD_LOCK_NAME, CEPH_CLS_LOCK_EXCLUSIVE, cookie, 3093 RBD_LOCK_TAG, "", 0); 3094 if (ret) 3095 return ret; 3096 3097 rbd_dev->lock_state = RBD_LOCK_STATE_LOCKED; 3098 strcpy(rbd_dev->lock_cookie, cookie); 3099 rbd_set_owner_cid(rbd_dev, &cid); 3100 queue_work(rbd_dev->task_wq, &rbd_dev->acquired_lock_work); 3101 return 0; 3102 } 3103 3104 /* 3105 * lock_rwsem must be held for write 3106 */ 3107 static void rbd_unlock(struct rbd_device *rbd_dev) 3108 { 3109 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; 3110 int ret; 3111 3112 WARN_ON(!__rbd_is_lock_owner(rbd_dev) || 3113 rbd_dev->lock_cookie[0] == '\0'); 3114 3115 ret = ceph_cls_unlock(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc, 3116 RBD_LOCK_NAME, rbd_dev->lock_cookie); 3117 if (ret && ret != -ENOENT) 3118 rbd_warn(rbd_dev, "failed to unlock: %d", ret); 3119 3120 /* treat errors as the image is unlocked */ 3121 rbd_dev->lock_state = RBD_LOCK_STATE_UNLOCKED; 3122 rbd_dev->lock_cookie[0] = '\0'; 3123 rbd_set_owner_cid(rbd_dev, &rbd_empty_cid); 3124 queue_work(rbd_dev->task_wq, &rbd_dev->released_lock_work); 3125 } 3126 3127 static int __rbd_notify_op_lock(struct rbd_device *rbd_dev, 3128 enum rbd_notify_op notify_op, 3129 struct page ***preply_pages, 3130 size_t *preply_len) 3131 { 3132 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; 3133 struct rbd_client_id cid = rbd_get_cid(rbd_dev); 3134 int buf_size = 4 + 8 + 8 + CEPH_ENCODING_START_BLK_LEN; 3135 char buf[buf_size]; 3136 void *p = buf; 3137 3138 dout("%s rbd_dev %p notify_op %d\n", __func__, rbd_dev, notify_op); 3139 3140 /* encode *LockPayload NotifyMessage (op + ClientId) */ 3141 ceph_start_encoding(&p, 2, 1, buf_size - CEPH_ENCODING_START_BLK_LEN); 3142 ceph_encode_32(&p, notify_op); 3143 ceph_encode_64(&p, cid.gid); 3144 ceph_encode_64(&p, cid.handle); 3145 3146 return ceph_osdc_notify(osdc, &rbd_dev->header_oid, 3147 &rbd_dev->header_oloc, buf, buf_size, 3148 RBD_NOTIFY_TIMEOUT, preply_pages, preply_len); 3149 } 3150 3151 static void rbd_notify_op_lock(struct rbd_device *rbd_dev, 3152 enum rbd_notify_op notify_op) 3153 { 3154 struct page **reply_pages; 3155 size_t reply_len; 3156 3157 __rbd_notify_op_lock(rbd_dev, notify_op, &reply_pages, &reply_len); 3158 ceph_release_page_vector(reply_pages, calc_pages_for(0, reply_len)); 3159 } 3160 3161 static void rbd_notify_acquired_lock(struct work_struct *work) 3162 { 3163 struct rbd_device *rbd_dev = container_of(work, struct rbd_device, 3164 acquired_lock_work); 3165 3166 rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_ACQUIRED_LOCK); 3167 } 3168 3169 static void rbd_notify_released_lock(struct work_struct *work) 3170 { 3171 struct rbd_device *rbd_dev = container_of(work, struct rbd_device, 3172 released_lock_work); 3173 3174 rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_RELEASED_LOCK); 3175 } 3176 3177 static int rbd_request_lock(struct rbd_device *rbd_dev) 3178 { 3179 struct page **reply_pages; 3180 size_t reply_len; 3181 bool lock_owner_responded = false; 3182 int ret; 3183 3184 dout("%s rbd_dev %p\n", __func__, rbd_dev); 3185 3186 ret = __rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_REQUEST_LOCK, 3187 &reply_pages, &reply_len); 3188 if (ret && ret != -ETIMEDOUT) { 3189 rbd_warn(rbd_dev, "failed to request lock: %d", ret); 3190 goto out; 3191 } 3192 3193 if (reply_len > 0 && reply_len <= PAGE_SIZE) { 3194 void *p = page_address(reply_pages[0]); 3195 void *const end = p + reply_len; 3196 u32 n; 3197 3198 ceph_decode_32_safe(&p, end, n, e_inval); /* num_acks */ 3199 while (n--) { 3200 u8 struct_v; 3201 u32 len; 3202 3203 ceph_decode_need(&p, end, 8 + 8, e_inval); 3204 p += 8 + 8; /* skip gid and cookie */ 3205 3206 ceph_decode_32_safe(&p, end, len, e_inval); 3207 if (!len) 3208 continue; 3209 3210 if (lock_owner_responded) { 3211 rbd_warn(rbd_dev, 3212 "duplicate lock owners detected"); 3213 ret = -EIO; 3214 goto out; 3215 } 3216 3217 lock_owner_responded = true; 3218 ret = ceph_start_decoding(&p, end, 1, "ResponseMessage", 3219 &struct_v, &len); 3220 if (ret) { 3221 rbd_warn(rbd_dev, 3222 "failed to decode ResponseMessage: %d", 3223 ret); 3224 goto e_inval; 3225 } 3226 3227 ret = ceph_decode_32(&p); 3228 } 3229 } 3230 3231 if (!lock_owner_responded) { 3232 rbd_warn(rbd_dev, "no lock owners detected"); 3233 ret = -ETIMEDOUT; 3234 } 3235 3236 out: 3237 ceph_release_page_vector(reply_pages, calc_pages_for(0, reply_len)); 3238 return ret; 3239 3240 e_inval: 3241 ret = -EINVAL; 3242 goto out; 3243 } 3244 3245 static void wake_requests(struct rbd_device *rbd_dev, bool wake_all) 3246 { 3247 dout("%s rbd_dev %p wake_all %d\n", __func__, rbd_dev, wake_all); 3248 3249 cancel_delayed_work(&rbd_dev->lock_dwork); 3250 if (wake_all) 3251 wake_up_all(&rbd_dev->lock_waitq); 3252 else 3253 wake_up(&rbd_dev->lock_waitq); 3254 } 3255 3256 static int get_lock_owner_info(struct rbd_device *rbd_dev, 3257 struct ceph_locker **lockers, u32 *num_lockers) 3258 { 3259 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; 3260 u8 lock_type; 3261 char *lock_tag; 3262 int ret; 3263 3264 dout("%s rbd_dev %p\n", __func__, rbd_dev); 3265 3266 ret = ceph_cls_lock_info(osdc, &rbd_dev->header_oid, 3267 &rbd_dev->header_oloc, RBD_LOCK_NAME, 3268 &lock_type, &lock_tag, lockers, num_lockers); 3269 if (ret) 3270 return ret; 3271 3272 if (*num_lockers == 0) { 3273 dout("%s rbd_dev %p no lockers detected\n", __func__, rbd_dev); 3274 goto out; 3275 } 3276 3277 if (strcmp(lock_tag, RBD_LOCK_TAG)) { 3278 rbd_warn(rbd_dev, "locked by external mechanism, tag %s", 3279 lock_tag); 3280 ret = -EBUSY; 3281 goto out; 3282 } 3283 3284 if (lock_type == CEPH_CLS_LOCK_SHARED) { 3285 rbd_warn(rbd_dev, "shared lock type detected"); 3286 ret = -EBUSY; 3287 goto out; 3288 } 3289 3290 if (strncmp((*lockers)[0].id.cookie, RBD_LOCK_COOKIE_PREFIX, 3291 strlen(RBD_LOCK_COOKIE_PREFIX))) { 3292 rbd_warn(rbd_dev, "locked by external mechanism, cookie %s", 3293 (*lockers)[0].id.cookie); 3294 ret = -EBUSY; 3295 goto out; 3296 } 3297 3298 out: 3299 kfree(lock_tag); 3300 return ret; 3301 } 3302 3303 static int find_watcher(struct rbd_device *rbd_dev, 3304 const struct ceph_locker *locker) 3305 { 3306 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; 3307 struct ceph_watch_item *watchers; 3308 u32 num_watchers; 3309 u64 cookie; 3310 int i; 3311 int ret; 3312 3313 ret = ceph_osdc_list_watchers(osdc, &rbd_dev->header_oid, 3314 &rbd_dev->header_oloc, &watchers, 3315 &num_watchers); 3316 if (ret) 3317 return ret; 3318 3319 sscanf(locker->id.cookie, RBD_LOCK_COOKIE_PREFIX " %llu", &cookie); 3320 for (i = 0; i < num_watchers; i++) { 3321 if (!memcmp(&watchers[i].addr, &locker->info.addr, 3322 sizeof(locker->info.addr)) && 3323 watchers[i].cookie == cookie) { 3324 struct rbd_client_id cid = { 3325 .gid = le64_to_cpu(watchers[i].name.num), 3326 .handle = cookie, 3327 }; 3328 3329 dout("%s rbd_dev %p found cid %llu-%llu\n", __func__, 3330 rbd_dev, cid.gid, cid.handle); 3331 rbd_set_owner_cid(rbd_dev, &cid); 3332 ret = 1; 3333 goto out; 3334 } 3335 } 3336 3337 dout("%s rbd_dev %p no watchers\n", __func__, rbd_dev); 3338 ret = 0; 3339 out: 3340 kfree(watchers); 3341 return ret; 3342 } 3343 3344 /* 3345 * lock_rwsem must be held for write 3346 */ 3347 static int rbd_try_lock(struct rbd_device *rbd_dev) 3348 { 3349 struct ceph_client *client = rbd_dev->rbd_client->client; 3350 struct ceph_locker *lockers; 3351 u32 num_lockers; 3352 int ret; 3353 3354 for (;;) { 3355 ret = rbd_lock(rbd_dev); 3356 if (ret != -EBUSY) 3357 return ret; 3358 3359 /* determine if the current lock holder is still alive */ 3360 ret = get_lock_owner_info(rbd_dev, &lockers, &num_lockers); 3361 if (ret) 3362 return ret; 3363 3364 if (num_lockers == 0) 3365 goto again; 3366 3367 ret = find_watcher(rbd_dev, lockers); 3368 if (ret) { 3369 if (ret > 0) 3370 ret = 0; /* have to request lock */ 3371 goto out; 3372 } 3373 3374 rbd_warn(rbd_dev, "%s%llu seems dead, breaking lock", 3375 ENTITY_NAME(lockers[0].id.name)); 3376 3377 ret = ceph_monc_blacklist_add(&client->monc, 3378 &lockers[0].info.addr); 3379 if (ret) { 3380 rbd_warn(rbd_dev, "blacklist of %s%llu failed: %d", 3381 ENTITY_NAME(lockers[0].id.name), ret); 3382 goto out; 3383 } 3384 3385 ret = ceph_cls_break_lock(&client->osdc, &rbd_dev->header_oid, 3386 &rbd_dev->header_oloc, RBD_LOCK_NAME, 3387 lockers[0].id.cookie, 3388 &lockers[0].id.name); 3389 if (ret && ret != -ENOENT) 3390 goto out; 3391 3392 again: 3393 ceph_free_lockers(lockers, num_lockers); 3394 } 3395 3396 out: 3397 ceph_free_lockers(lockers, num_lockers); 3398 return ret; 3399 } 3400 3401 /* 3402 * ret is set only if lock_state is RBD_LOCK_STATE_UNLOCKED 3403 */ 3404 static enum rbd_lock_state rbd_try_acquire_lock(struct rbd_device *rbd_dev, 3405 int *pret) 3406 { 3407 enum rbd_lock_state lock_state; 3408 3409 down_read(&rbd_dev->lock_rwsem); 3410 dout("%s rbd_dev %p read lock_state %d\n", __func__, rbd_dev, 3411 rbd_dev->lock_state); 3412 if (__rbd_is_lock_owner(rbd_dev)) { 3413 lock_state = rbd_dev->lock_state; 3414 up_read(&rbd_dev->lock_rwsem); 3415 return lock_state; 3416 } 3417 3418 up_read(&rbd_dev->lock_rwsem); 3419 down_write(&rbd_dev->lock_rwsem); 3420 dout("%s rbd_dev %p write lock_state %d\n", __func__, rbd_dev, 3421 rbd_dev->lock_state); 3422 if (!__rbd_is_lock_owner(rbd_dev)) { 3423 *pret = rbd_try_lock(rbd_dev); 3424 if (*pret) 3425 rbd_warn(rbd_dev, "failed to acquire lock: %d", *pret); 3426 } 3427 3428 lock_state = rbd_dev->lock_state; 3429 up_write(&rbd_dev->lock_rwsem); 3430 return lock_state; 3431 } 3432 3433 static void rbd_acquire_lock(struct work_struct *work) 3434 { 3435 struct rbd_device *rbd_dev = container_of(to_delayed_work(work), 3436 struct rbd_device, lock_dwork); 3437 enum rbd_lock_state lock_state; 3438 int ret; 3439 3440 dout("%s rbd_dev %p\n", __func__, rbd_dev); 3441 again: 3442 lock_state = rbd_try_acquire_lock(rbd_dev, &ret); 3443 if (lock_state != RBD_LOCK_STATE_UNLOCKED || ret == -EBLACKLISTED) { 3444 if (lock_state == RBD_LOCK_STATE_LOCKED) 3445 wake_requests(rbd_dev, true); 3446 dout("%s rbd_dev %p lock_state %d ret %d - done\n", __func__, 3447 rbd_dev, lock_state, ret); 3448 return; 3449 } 3450 3451 ret = rbd_request_lock(rbd_dev); 3452 if (ret == -ETIMEDOUT) { 3453 goto again; /* treat this as a dead client */ 3454 } else if (ret == -EROFS) { 3455 rbd_warn(rbd_dev, "peer will not release lock"); 3456 /* 3457 * If this is rbd_add_acquire_lock(), we want to fail 3458 * immediately -- reuse BLACKLISTED flag. Otherwise we 3459 * want to block. 3460 */ 3461 if (!(rbd_dev->disk->flags & GENHD_FL_UP)) { 3462 set_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags); 3463 /* wake "rbd map --exclusive" process */ 3464 wake_requests(rbd_dev, false); 3465 } 3466 } else if (ret < 0) { 3467 rbd_warn(rbd_dev, "error requesting lock: %d", ret); 3468 mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 3469 RBD_RETRY_DELAY); 3470 } else { 3471 /* 3472 * lock owner acked, but resend if we don't see them 3473 * release the lock 3474 */ 3475 dout("%s rbd_dev %p requeueing lock_dwork\n", __func__, 3476 rbd_dev); 3477 mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 3478 msecs_to_jiffies(2 * RBD_NOTIFY_TIMEOUT * MSEC_PER_SEC)); 3479 } 3480 } 3481 3482 /* 3483 * lock_rwsem must be held for write 3484 */ 3485 static bool rbd_release_lock(struct rbd_device *rbd_dev) 3486 { 3487 dout("%s rbd_dev %p read lock_state %d\n", __func__, rbd_dev, 3488 rbd_dev->lock_state); 3489 if (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED) 3490 return false; 3491 3492 rbd_dev->lock_state = RBD_LOCK_STATE_RELEASING; 3493 downgrade_write(&rbd_dev->lock_rwsem); 3494 /* 3495 * Ensure that all in-flight IO is flushed. 3496 * 3497 * FIXME: ceph_osdc_sync() flushes the entire OSD client, which 3498 * may be shared with other devices. 3499 */ 3500 ceph_osdc_sync(&rbd_dev->rbd_client->client->osdc); 3501 up_read(&rbd_dev->lock_rwsem); 3502 3503 down_write(&rbd_dev->lock_rwsem); 3504 dout("%s rbd_dev %p write lock_state %d\n", __func__, rbd_dev, 3505 rbd_dev->lock_state); 3506 if (rbd_dev->lock_state != RBD_LOCK_STATE_RELEASING) 3507 return false; 3508 3509 rbd_unlock(rbd_dev); 3510 /* 3511 * Give others a chance to grab the lock - we would re-acquire 3512 * almost immediately if we got new IO during ceph_osdc_sync() 3513 * otherwise. We need to ack our own notifications, so this 3514 * lock_dwork will be requeued from rbd_wait_state_locked() 3515 * after wake_requests() in rbd_handle_released_lock(). 3516 */ 3517 cancel_delayed_work(&rbd_dev->lock_dwork); 3518 return true; 3519 } 3520 3521 static void rbd_release_lock_work(struct work_struct *work) 3522 { 3523 struct rbd_device *rbd_dev = container_of(work, struct rbd_device, 3524 unlock_work); 3525 3526 down_write(&rbd_dev->lock_rwsem); 3527 rbd_release_lock(rbd_dev); 3528 up_write(&rbd_dev->lock_rwsem); 3529 } 3530 3531 static void rbd_handle_acquired_lock(struct rbd_device *rbd_dev, u8 struct_v, 3532 void **p) 3533 { 3534 struct rbd_client_id cid = { 0 }; 3535 3536 if (struct_v >= 2) { 3537 cid.gid = ceph_decode_64(p); 3538 cid.handle = ceph_decode_64(p); 3539 } 3540 3541 dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid, 3542 cid.handle); 3543 if (!rbd_cid_equal(&cid, &rbd_empty_cid)) { 3544 down_write(&rbd_dev->lock_rwsem); 3545 if (rbd_cid_equal(&cid, &rbd_dev->owner_cid)) { 3546 /* 3547 * we already know that the remote client is 3548 * the owner 3549 */ 3550 up_write(&rbd_dev->lock_rwsem); 3551 return; 3552 } 3553 3554 rbd_set_owner_cid(rbd_dev, &cid); 3555 downgrade_write(&rbd_dev->lock_rwsem); 3556 } else { 3557 down_read(&rbd_dev->lock_rwsem); 3558 } 3559 3560 if (!__rbd_is_lock_owner(rbd_dev)) 3561 wake_requests(rbd_dev, false); 3562 up_read(&rbd_dev->lock_rwsem); 3563 } 3564 3565 static void rbd_handle_released_lock(struct rbd_device *rbd_dev, u8 struct_v, 3566 void **p) 3567 { 3568 struct rbd_client_id cid = { 0 }; 3569 3570 if (struct_v >= 2) { 3571 cid.gid = ceph_decode_64(p); 3572 cid.handle = ceph_decode_64(p); 3573 } 3574 3575 dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid, 3576 cid.handle); 3577 if (!rbd_cid_equal(&cid, &rbd_empty_cid)) { 3578 down_write(&rbd_dev->lock_rwsem); 3579 if (!rbd_cid_equal(&cid, &rbd_dev->owner_cid)) { 3580 dout("%s rbd_dev %p unexpected owner, cid %llu-%llu != owner_cid %llu-%llu\n", 3581 __func__, rbd_dev, cid.gid, cid.handle, 3582 rbd_dev->owner_cid.gid, rbd_dev->owner_cid.handle); 3583 up_write(&rbd_dev->lock_rwsem); 3584 return; 3585 } 3586 3587 rbd_set_owner_cid(rbd_dev, &rbd_empty_cid); 3588 downgrade_write(&rbd_dev->lock_rwsem); 3589 } else { 3590 down_read(&rbd_dev->lock_rwsem); 3591 } 3592 3593 if (!__rbd_is_lock_owner(rbd_dev)) 3594 wake_requests(rbd_dev, false); 3595 up_read(&rbd_dev->lock_rwsem); 3596 } 3597 3598 /* 3599 * Returns result for ResponseMessage to be encoded (<= 0), or 1 if no 3600 * ResponseMessage is needed. 3601 */ 3602 static int rbd_handle_request_lock(struct rbd_device *rbd_dev, u8 struct_v, 3603 void **p) 3604 { 3605 struct rbd_client_id my_cid = rbd_get_cid(rbd_dev); 3606 struct rbd_client_id cid = { 0 }; 3607 int result = 1; 3608 3609 if (struct_v >= 2) { 3610 cid.gid = ceph_decode_64(p); 3611 cid.handle = ceph_decode_64(p); 3612 } 3613 3614 dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid, 3615 cid.handle); 3616 if (rbd_cid_equal(&cid, &my_cid)) 3617 return result; 3618 3619 down_read(&rbd_dev->lock_rwsem); 3620 if (__rbd_is_lock_owner(rbd_dev)) { 3621 if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED && 3622 rbd_cid_equal(&rbd_dev->owner_cid, &rbd_empty_cid)) 3623 goto out_unlock; 3624 3625 /* 3626 * encode ResponseMessage(0) so the peer can detect 3627 * a missing owner 3628 */ 3629 result = 0; 3630 3631 if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED) { 3632 if (!rbd_dev->opts->exclusive) { 3633 dout("%s rbd_dev %p queueing unlock_work\n", 3634 __func__, rbd_dev); 3635 queue_work(rbd_dev->task_wq, 3636 &rbd_dev->unlock_work); 3637 } else { 3638 /* refuse to release the lock */ 3639 result = -EROFS; 3640 } 3641 } 3642 } 3643 3644 out_unlock: 3645 up_read(&rbd_dev->lock_rwsem); 3646 return result; 3647 } 3648 3649 static void __rbd_acknowledge_notify(struct rbd_device *rbd_dev, 3650 u64 notify_id, u64 cookie, s32 *result) 3651 { 3652 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; 3653 int buf_size = 4 + CEPH_ENCODING_START_BLK_LEN; 3654 char buf[buf_size]; 3655 int ret; 3656 3657 if (result) { 3658 void *p = buf; 3659 3660 /* encode ResponseMessage */ 3661 ceph_start_encoding(&p, 1, 1, 3662 buf_size - CEPH_ENCODING_START_BLK_LEN); 3663 ceph_encode_32(&p, *result); 3664 } else { 3665 buf_size = 0; 3666 } 3667 3668 ret = ceph_osdc_notify_ack(osdc, &rbd_dev->header_oid, 3669 &rbd_dev->header_oloc, notify_id, cookie, 3670 buf, buf_size); 3671 if (ret) 3672 rbd_warn(rbd_dev, "acknowledge_notify failed: %d", ret); 3673 } 3674 3675 static void rbd_acknowledge_notify(struct rbd_device *rbd_dev, u64 notify_id, 3676 u64 cookie) 3677 { 3678 dout("%s rbd_dev %p\n", __func__, rbd_dev); 3679 __rbd_acknowledge_notify(rbd_dev, notify_id, cookie, NULL); 3680 } 3681 3682 static void rbd_acknowledge_notify_result(struct rbd_device *rbd_dev, 3683 u64 notify_id, u64 cookie, s32 result) 3684 { 3685 dout("%s rbd_dev %p result %d\n", __func__, rbd_dev, result); 3686 __rbd_acknowledge_notify(rbd_dev, notify_id, cookie, &result); 3687 } 3688 3689 static void rbd_watch_cb(void *arg, u64 notify_id, u64 cookie, 3690 u64 notifier_id, void *data, size_t data_len) 3691 { 3692 struct rbd_device *rbd_dev = arg; 3693 void *p = data; 3694 void *const end = p + data_len; 3695 u8 struct_v = 0; 3696 u32 len; 3697 u32 notify_op; 3698 int ret; 3699 3700 dout("%s rbd_dev %p cookie %llu notify_id %llu data_len %zu\n", 3701 __func__, rbd_dev, cookie, notify_id, data_len); 3702 if (data_len) { 3703 ret = ceph_start_decoding(&p, end, 1, "NotifyMessage", 3704 &struct_v, &len); 3705 if (ret) { 3706 rbd_warn(rbd_dev, "failed to decode NotifyMessage: %d", 3707 ret); 3708 return; 3709 } 3710 3711 notify_op = ceph_decode_32(&p); 3712 } else { 3713 /* legacy notification for header updates */ 3714 notify_op = RBD_NOTIFY_OP_HEADER_UPDATE; 3715 len = 0; 3716 } 3717 3718 dout("%s rbd_dev %p notify_op %u\n", __func__, rbd_dev, notify_op); 3719 switch (notify_op) { 3720 case RBD_NOTIFY_OP_ACQUIRED_LOCK: 3721 rbd_handle_acquired_lock(rbd_dev, struct_v, &p); 3722 rbd_acknowledge_notify(rbd_dev, notify_id, cookie); 3723 break; 3724 case RBD_NOTIFY_OP_RELEASED_LOCK: 3725 rbd_handle_released_lock(rbd_dev, struct_v, &p); 3726 rbd_acknowledge_notify(rbd_dev, notify_id, cookie); 3727 break; 3728 case RBD_NOTIFY_OP_REQUEST_LOCK: 3729 ret = rbd_handle_request_lock(rbd_dev, struct_v, &p); 3730 if (ret <= 0) 3731 rbd_acknowledge_notify_result(rbd_dev, notify_id, 3732 cookie, ret); 3733 else 3734 rbd_acknowledge_notify(rbd_dev, notify_id, cookie); 3735 break; 3736 case RBD_NOTIFY_OP_HEADER_UPDATE: 3737 ret = rbd_dev_refresh(rbd_dev); 3738 if (ret) 3739 rbd_warn(rbd_dev, "refresh failed: %d", ret); 3740 3741 rbd_acknowledge_notify(rbd_dev, notify_id, cookie); 3742 break; 3743 default: 3744 if (rbd_is_lock_owner(rbd_dev)) 3745 rbd_acknowledge_notify_result(rbd_dev, notify_id, 3746 cookie, -EOPNOTSUPP); 3747 else 3748 rbd_acknowledge_notify(rbd_dev, notify_id, cookie); 3749 break; 3750 } 3751 } 3752 3753 static void __rbd_unregister_watch(struct rbd_device *rbd_dev); 3754 3755 static void rbd_watch_errcb(void *arg, u64 cookie, int err) 3756 { 3757 struct rbd_device *rbd_dev = arg; 3758 3759 rbd_warn(rbd_dev, "encountered watch error: %d", err); 3760 3761 down_write(&rbd_dev->lock_rwsem); 3762 rbd_set_owner_cid(rbd_dev, &rbd_empty_cid); 3763 up_write(&rbd_dev->lock_rwsem); 3764 3765 mutex_lock(&rbd_dev->watch_mutex); 3766 if (rbd_dev->watch_state == RBD_WATCH_STATE_REGISTERED) { 3767 __rbd_unregister_watch(rbd_dev); 3768 rbd_dev->watch_state = RBD_WATCH_STATE_ERROR; 3769 3770 queue_delayed_work(rbd_dev->task_wq, &rbd_dev->watch_dwork, 0); 3771 } 3772 mutex_unlock(&rbd_dev->watch_mutex); 3773 } 3774 3775 /* 3776 * watch_mutex must be locked 3777 */ 3778 static int __rbd_register_watch(struct rbd_device *rbd_dev) 3779 { 3780 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; 3781 struct ceph_osd_linger_request *handle; 3782 3783 rbd_assert(!rbd_dev->watch_handle); 3784 dout("%s rbd_dev %p\n", __func__, rbd_dev); 3785 3786 handle = ceph_osdc_watch(osdc, &rbd_dev->header_oid, 3787 &rbd_dev->header_oloc, rbd_watch_cb, 3788 rbd_watch_errcb, rbd_dev); 3789 if (IS_ERR(handle)) 3790 return PTR_ERR(handle); 3791 3792 rbd_dev->watch_handle = handle; 3793 return 0; 3794 } 3795 3796 /* 3797 * watch_mutex must be locked 3798 */ 3799 static void __rbd_unregister_watch(struct rbd_device *rbd_dev) 3800 { 3801 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; 3802 int ret; 3803 3804 rbd_assert(rbd_dev->watch_handle); 3805 dout("%s rbd_dev %p\n", __func__, rbd_dev); 3806 3807 ret = ceph_osdc_unwatch(osdc, rbd_dev->watch_handle); 3808 if (ret) 3809 rbd_warn(rbd_dev, "failed to unwatch: %d", ret); 3810 3811 rbd_dev->watch_handle = NULL; 3812 } 3813 3814 static int rbd_register_watch(struct rbd_device *rbd_dev) 3815 { 3816 int ret; 3817 3818 mutex_lock(&rbd_dev->watch_mutex); 3819 rbd_assert(rbd_dev->watch_state == RBD_WATCH_STATE_UNREGISTERED); 3820 ret = __rbd_register_watch(rbd_dev); 3821 if (ret) 3822 goto out; 3823 3824 rbd_dev->watch_state = RBD_WATCH_STATE_REGISTERED; 3825 rbd_dev->watch_cookie = rbd_dev->watch_handle->linger_id; 3826 3827 out: 3828 mutex_unlock(&rbd_dev->watch_mutex); 3829 return ret; 3830 } 3831 3832 static void cancel_tasks_sync(struct rbd_device *rbd_dev) 3833 { 3834 dout("%s rbd_dev %p\n", __func__, rbd_dev); 3835 3836 cancel_delayed_work_sync(&rbd_dev->watch_dwork); 3837 cancel_work_sync(&rbd_dev->acquired_lock_work); 3838 cancel_work_sync(&rbd_dev->released_lock_work); 3839 cancel_delayed_work_sync(&rbd_dev->lock_dwork); 3840 cancel_work_sync(&rbd_dev->unlock_work); 3841 } 3842 3843 static void rbd_unregister_watch(struct rbd_device *rbd_dev) 3844 { 3845 WARN_ON(waitqueue_active(&rbd_dev->lock_waitq)); 3846 cancel_tasks_sync(rbd_dev); 3847 3848 mutex_lock(&rbd_dev->watch_mutex); 3849 if (rbd_dev->watch_state == RBD_WATCH_STATE_REGISTERED) 3850 __rbd_unregister_watch(rbd_dev); 3851 rbd_dev->watch_state = RBD_WATCH_STATE_UNREGISTERED; 3852 mutex_unlock(&rbd_dev->watch_mutex); 3853 3854 ceph_osdc_flush_notifies(&rbd_dev->rbd_client->client->osdc); 3855 } 3856 3857 /* 3858 * lock_rwsem must be held for write 3859 */ 3860 static void rbd_reacquire_lock(struct rbd_device *rbd_dev) 3861 { 3862 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; 3863 char cookie[32]; 3864 int ret; 3865 3866 WARN_ON(rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED); 3867 3868 format_lock_cookie(rbd_dev, cookie); 3869 ret = ceph_cls_set_cookie(osdc, &rbd_dev->header_oid, 3870 &rbd_dev->header_oloc, RBD_LOCK_NAME, 3871 CEPH_CLS_LOCK_EXCLUSIVE, rbd_dev->lock_cookie, 3872 RBD_LOCK_TAG, cookie); 3873 if (ret) { 3874 if (ret != -EOPNOTSUPP) 3875 rbd_warn(rbd_dev, "failed to update lock cookie: %d", 3876 ret); 3877 3878 /* 3879 * Lock cookie cannot be updated on older OSDs, so do 3880 * a manual release and queue an acquire. 3881 */ 3882 if (rbd_release_lock(rbd_dev)) 3883 queue_delayed_work(rbd_dev->task_wq, 3884 &rbd_dev->lock_dwork, 0); 3885 } else { 3886 strcpy(rbd_dev->lock_cookie, cookie); 3887 } 3888 } 3889 3890 static void rbd_reregister_watch(struct work_struct *work) 3891 { 3892 struct rbd_device *rbd_dev = container_of(to_delayed_work(work), 3893 struct rbd_device, watch_dwork); 3894 int ret; 3895 3896 dout("%s rbd_dev %p\n", __func__, rbd_dev); 3897 3898 mutex_lock(&rbd_dev->watch_mutex); 3899 if (rbd_dev->watch_state != RBD_WATCH_STATE_ERROR) { 3900 mutex_unlock(&rbd_dev->watch_mutex); 3901 return; 3902 } 3903 3904 ret = __rbd_register_watch(rbd_dev); 3905 if (ret) { 3906 rbd_warn(rbd_dev, "failed to reregister watch: %d", ret); 3907 if (ret == -EBLACKLISTED || ret == -ENOENT) { 3908 set_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags); 3909 wake_requests(rbd_dev, true); 3910 } else { 3911 queue_delayed_work(rbd_dev->task_wq, 3912 &rbd_dev->watch_dwork, 3913 RBD_RETRY_DELAY); 3914 } 3915 mutex_unlock(&rbd_dev->watch_mutex); 3916 return; 3917 } 3918 3919 rbd_dev->watch_state = RBD_WATCH_STATE_REGISTERED; 3920 rbd_dev->watch_cookie = rbd_dev->watch_handle->linger_id; 3921 mutex_unlock(&rbd_dev->watch_mutex); 3922 3923 down_write(&rbd_dev->lock_rwsem); 3924 if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED) 3925 rbd_reacquire_lock(rbd_dev); 3926 up_write(&rbd_dev->lock_rwsem); 3927 3928 ret = rbd_dev_refresh(rbd_dev); 3929 if (ret) 3930 rbd_warn(rbd_dev, "reregisteration refresh failed: %d", ret); 3931 } 3932 3933 /* 3934 * Synchronous osd object method call. Returns the number of bytes 3935 * returned in the outbound buffer, or a negative error code. 3936 */ 3937 static int rbd_obj_method_sync(struct rbd_device *rbd_dev, 3938 struct ceph_object_id *oid, 3939 struct ceph_object_locator *oloc, 3940 const char *method_name, 3941 const void *outbound, 3942 size_t outbound_size, 3943 void *inbound, 3944 size_t inbound_size) 3945 { 3946 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; 3947 struct page *req_page = NULL; 3948 struct page *reply_page; 3949 int ret; 3950 3951 /* 3952 * Method calls are ultimately read operations. The result 3953 * should placed into the inbound buffer provided. They 3954 * also supply outbound data--parameters for the object 3955 * method. Currently if this is present it will be a 3956 * snapshot id. 3957 */ 3958 if (outbound) { 3959 if (outbound_size > PAGE_SIZE) 3960 return -E2BIG; 3961 3962 req_page = alloc_page(GFP_KERNEL); 3963 if (!req_page) 3964 return -ENOMEM; 3965 3966 memcpy(page_address(req_page), outbound, outbound_size); 3967 } 3968 3969 reply_page = alloc_page(GFP_KERNEL); 3970 if (!reply_page) { 3971 if (req_page) 3972 __free_page(req_page); 3973 return -ENOMEM; 3974 } 3975 3976 ret = ceph_osdc_call(osdc, oid, oloc, RBD_DRV_NAME, method_name, 3977 CEPH_OSD_FLAG_READ, req_page, outbound_size, 3978 reply_page, &inbound_size); 3979 if (!ret) { 3980 memcpy(inbound, page_address(reply_page), inbound_size); 3981 ret = inbound_size; 3982 } 3983 3984 if (req_page) 3985 __free_page(req_page); 3986 __free_page(reply_page); 3987 return ret; 3988 } 3989 3990 /* 3991 * lock_rwsem must be held for read 3992 */ 3993 static void rbd_wait_state_locked(struct rbd_device *rbd_dev) 3994 { 3995 DEFINE_WAIT(wait); 3996 3997 do { 3998 /* 3999 * Note the use of mod_delayed_work() in rbd_acquire_lock() 4000 * and cancel_delayed_work() in wake_requests(). 4001 */ 4002 dout("%s rbd_dev %p queueing lock_dwork\n", __func__, rbd_dev); 4003 queue_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 0); 4004 prepare_to_wait_exclusive(&rbd_dev->lock_waitq, &wait, 4005 TASK_UNINTERRUPTIBLE); 4006 up_read(&rbd_dev->lock_rwsem); 4007 schedule(); 4008 down_read(&rbd_dev->lock_rwsem); 4009 } while (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED && 4010 !test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags)); 4011 4012 finish_wait(&rbd_dev->lock_waitq, &wait); 4013 } 4014 4015 static void rbd_queue_workfn(struct work_struct *work) 4016 { 4017 struct request *rq = blk_mq_rq_from_pdu(work); 4018 struct rbd_device *rbd_dev = rq->q->queuedata; 4019 struct rbd_img_request *img_request; 4020 struct ceph_snap_context *snapc = NULL; 4021 u64 offset = (u64)blk_rq_pos(rq) << SECTOR_SHIFT; 4022 u64 length = blk_rq_bytes(rq); 4023 enum obj_operation_type op_type; 4024 u64 mapping_size; 4025 bool must_be_locked; 4026 int result; 4027 4028 switch (req_op(rq)) { 4029 case REQ_OP_DISCARD: 4030 case REQ_OP_WRITE_ZEROES: 4031 op_type = OBJ_OP_DISCARD; 4032 break; 4033 case REQ_OP_WRITE: 4034 op_type = OBJ_OP_WRITE; 4035 break; 4036 case REQ_OP_READ: 4037 op_type = OBJ_OP_READ; 4038 break; 4039 default: 4040 dout("%s: non-fs request type %d\n", __func__, req_op(rq)); 4041 result = -EIO; 4042 goto err; 4043 } 4044 4045 /* Ignore/skip any zero-length requests */ 4046 4047 if (!length) { 4048 dout("%s: zero-length request\n", __func__); 4049 result = 0; 4050 goto err_rq; 4051 } 4052 4053 /* Only reads are allowed to a read-only device */ 4054 4055 if (op_type != OBJ_OP_READ) { 4056 if (rbd_dev->mapping.read_only) { 4057 result = -EROFS; 4058 goto err_rq; 4059 } 4060 rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP); 4061 } 4062 4063 /* 4064 * Quit early if the mapped snapshot no longer exists. It's 4065 * still possible the snapshot will have disappeared by the 4066 * time our request arrives at the osd, but there's no sense in 4067 * sending it if we already know. 4068 */ 4069 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) { 4070 dout("request for non-existent snapshot"); 4071 rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP); 4072 result = -ENXIO; 4073 goto err_rq; 4074 } 4075 4076 if (offset && length > U64_MAX - offset + 1) { 4077 rbd_warn(rbd_dev, "bad request range (%llu~%llu)", offset, 4078 length); 4079 result = -EINVAL; 4080 goto err_rq; /* Shouldn't happen */ 4081 } 4082 4083 blk_mq_start_request(rq); 4084 4085 down_read(&rbd_dev->header_rwsem); 4086 mapping_size = rbd_dev->mapping.size; 4087 if (op_type != OBJ_OP_READ) { 4088 snapc = rbd_dev->header.snapc; 4089 ceph_get_snap_context(snapc); 4090 } 4091 up_read(&rbd_dev->header_rwsem); 4092 4093 if (offset + length > mapping_size) { 4094 rbd_warn(rbd_dev, "beyond EOD (%llu~%llu > %llu)", offset, 4095 length, mapping_size); 4096 result = -EIO; 4097 goto err_rq; 4098 } 4099 4100 must_be_locked = 4101 (rbd_dev->header.features & RBD_FEATURE_EXCLUSIVE_LOCK) && 4102 (op_type != OBJ_OP_READ || rbd_dev->opts->lock_on_read); 4103 if (must_be_locked) { 4104 down_read(&rbd_dev->lock_rwsem); 4105 if (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED && 4106 !test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags)) { 4107 if (rbd_dev->opts->exclusive) { 4108 rbd_warn(rbd_dev, "exclusive lock required"); 4109 result = -EROFS; 4110 goto err_unlock; 4111 } 4112 rbd_wait_state_locked(rbd_dev); 4113 } 4114 if (test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags)) { 4115 result = -EBLACKLISTED; 4116 goto err_unlock; 4117 } 4118 } 4119 4120 img_request = rbd_img_request_create(rbd_dev, offset, length, op_type, 4121 snapc); 4122 if (!img_request) { 4123 result = -ENOMEM; 4124 goto err_unlock; 4125 } 4126 img_request->rq = rq; 4127 snapc = NULL; /* img_request consumes a ref */ 4128 4129 if (op_type == OBJ_OP_DISCARD) 4130 result = rbd_img_request_fill(img_request, OBJ_REQUEST_NODATA, 4131 NULL); 4132 else 4133 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO, 4134 rq->bio); 4135 if (result) 4136 goto err_img_request; 4137 4138 result = rbd_img_request_submit(img_request); 4139 if (result) 4140 goto err_img_request; 4141 4142 if (must_be_locked) 4143 up_read(&rbd_dev->lock_rwsem); 4144 return; 4145 4146 err_img_request: 4147 rbd_img_request_put(img_request); 4148 err_unlock: 4149 if (must_be_locked) 4150 up_read(&rbd_dev->lock_rwsem); 4151 err_rq: 4152 if (result) 4153 rbd_warn(rbd_dev, "%s %llx at %llx result %d", 4154 obj_op_name(op_type), length, offset, result); 4155 ceph_put_snap_context(snapc); 4156 err: 4157 blk_mq_end_request(rq, errno_to_blk_status(result)); 4158 } 4159 4160 static blk_status_t rbd_queue_rq(struct blk_mq_hw_ctx *hctx, 4161 const struct blk_mq_queue_data *bd) 4162 { 4163 struct request *rq = bd->rq; 4164 struct work_struct *work = blk_mq_rq_to_pdu(rq); 4165 4166 queue_work(rbd_wq, work); 4167 return BLK_STS_OK; 4168 } 4169 4170 static void rbd_free_disk(struct rbd_device *rbd_dev) 4171 { 4172 blk_cleanup_queue(rbd_dev->disk->queue); 4173 blk_mq_free_tag_set(&rbd_dev->tag_set); 4174 put_disk(rbd_dev->disk); 4175 rbd_dev->disk = NULL; 4176 } 4177 4178 static int rbd_obj_read_sync(struct rbd_device *rbd_dev, 4179 struct ceph_object_id *oid, 4180 struct ceph_object_locator *oloc, 4181 void *buf, int buf_len) 4182 4183 { 4184 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; 4185 struct ceph_osd_request *req; 4186 struct page **pages; 4187 int num_pages = calc_pages_for(0, buf_len); 4188 int ret; 4189 4190 req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_KERNEL); 4191 if (!req) 4192 return -ENOMEM; 4193 4194 ceph_oid_copy(&req->r_base_oid, oid); 4195 ceph_oloc_copy(&req->r_base_oloc, oloc); 4196 req->r_flags = CEPH_OSD_FLAG_READ; 4197 4198 ret = ceph_osdc_alloc_messages(req, GFP_KERNEL); 4199 if (ret) 4200 goto out_req; 4201 4202 pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL); 4203 if (IS_ERR(pages)) { 4204 ret = PTR_ERR(pages); 4205 goto out_req; 4206 } 4207 4208 osd_req_op_extent_init(req, 0, CEPH_OSD_OP_READ, 0, buf_len, 0, 0); 4209 osd_req_op_extent_osd_data_pages(req, 0, pages, buf_len, 0, false, 4210 true); 4211 4212 ceph_osdc_start_request(osdc, req, false); 4213 ret = ceph_osdc_wait_request(osdc, req); 4214 if (ret >= 0) 4215 ceph_copy_from_page_vector(pages, buf, 0, ret); 4216 4217 out_req: 4218 ceph_osdc_put_request(req); 4219 return ret; 4220 } 4221 4222 /* 4223 * Read the complete header for the given rbd device. On successful 4224 * return, the rbd_dev->header field will contain up-to-date 4225 * information about the image. 4226 */ 4227 static int rbd_dev_v1_header_info(struct rbd_device *rbd_dev) 4228 { 4229 struct rbd_image_header_ondisk *ondisk = NULL; 4230 u32 snap_count = 0; 4231 u64 names_size = 0; 4232 u32 want_count; 4233 int ret; 4234 4235 /* 4236 * The complete header will include an array of its 64-bit 4237 * snapshot ids, followed by the names of those snapshots as 4238 * a contiguous block of NUL-terminated strings. Note that 4239 * the number of snapshots could change by the time we read 4240 * it in, in which case we re-read it. 4241 */ 4242 do { 4243 size_t size; 4244 4245 kfree(ondisk); 4246 4247 size = sizeof (*ondisk); 4248 size += snap_count * sizeof (struct rbd_image_snap_ondisk); 4249 size += names_size; 4250 ondisk = kmalloc(size, GFP_KERNEL); 4251 if (!ondisk) 4252 return -ENOMEM; 4253 4254 ret = rbd_obj_read_sync(rbd_dev, &rbd_dev->header_oid, 4255 &rbd_dev->header_oloc, ondisk, size); 4256 if (ret < 0) 4257 goto out; 4258 if ((size_t)ret < size) { 4259 ret = -ENXIO; 4260 rbd_warn(rbd_dev, "short header read (want %zd got %d)", 4261 size, ret); 4262 goto out; 4263 } 4264 if (!rbd_dev_ondisk_valid(ondisk)) { 4265 ret = -ENXIO; 4266 rbd_warn(rbd_dev, "invalid header"); 4267 goto out; 4268 } 4269 4270 names_size = le64_to_cpu(ondisk->snap_names_len); 4271 want_count = snap_count; 4272 snap_count = le32_to_cpu(ondisk->snap_count); 4273 } while (snap_count != want_count); 4274 4275 ret = rbd_header_from_disk(rbd_dev, ondisk); 4276 out: 4277 kfree(ondisk); 4278 4279 return ret; 4280 } 4281 4282 /* 4283 * Clear the rbd device's EXISTS flag if the snapshot it's mapped to 4284 * has disappeared from the (just updated) snapshot context. 4285 */ 4286 static void rbd_exists_validate(struct rbd_device *rbd_dev) 4287 { 4288 u64 snap_id; 4289 4290 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) 4291 return; 4292 4293 snap_id = rbd_dev->spec->snap_id; 4294 if (snap_id == CEPH_NOSNAP) 4295 return; 4296 4297 if (rbd_dev_snap_index(rbd_dev, snap_id) == BAD_SNAP_INDEX) 4298 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags); 4299 } 4300 4301 static void rbd_dev_update_size(struct rbd_device *rbd_dev) 4302 { 4303 sector_t size; 4304 4305 /* 4306 * If EXISTS is not set, rbd_dev->disk may be NULL, so don't 4307 * try to update its size. If REMOVING is set, updating size 4308 * is just useless work since the device can't be opened. 4309 */ 4310 if (test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags) && 4311 !test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags)) { 4312 size = (sector_t)rbd_dev->mapping.size / SECTOR_SIZE; 4313 dout("setting size to %llu sectors", (unsigned long long)size); 4314 set_capacity(rbd_dev->disk, size); 4315 revalidate_disk(rbd_dev->disk); 4316 } 4317 } 4318 4319 static int rbd_dev_refresh(struct rbd_device *rbd_dev) 4320 { 4321 u64 mapping_size; 4322 int ret; 4323 4324 down_write(&rbd_dev->header_rwsem); 4325 mapping_size = rbd_dev->mapping.size; 4326 4327 ret = rbd_dev_header_info(rbd_dev); 4328 if (ret) 4329 goto out; 4330 4331 /* 4332 * If there is a parent, see if it has disappeared due to the 4333 * mapped image getting flattened. 4334 */ 4335 if (rbd_dev->parent) { 4336 ret = rbd_dev_v2_parent_info(rbd_dev); 4337 if (ret) 4338 goto out; 4339 } 4340 4341 if (rbd_dev->spec->snap_id == CEPH_NOSNAP) { 4342 rbd_dev->mapping.size = rbd_dev->header.image_size; 4343 } else { 4344 /* validate mapped snapshot's EXISTS flag */ 4345 rbd_exists_validate(rbd_dev); 4346 } 4347 4348 out: 4349 up_write(&rbd_dev->header_rwsem); 4350 if (!ret && mapping_size != rbd_dev->mapping.size) 4351 rbd_dev_update_size(rbd_dev); 4352 4353 return ret; 4354 } 4355 4356 static int rbd_init_request(struct blk_mq_tag_set *set, struct request *rq, 4357 unsigned int hctx_idx, unsigned int numa_node) 4358 { 4359 struct work_struct *work = blk_mq_rq_to_pdu(rq); 4360 4361 INIT_WORK(work, rbd_queue_workfn); 4362 return 0; 4363 } 4364 4365 static const struct blk_mq_ops rbd_mq_ops = { 4366 .queue_rq = rbd_queue_rq, 4367 .init_request = rbd_init_request, 4368 }; 4369 4370 static int rbd_init_disk(struct rbd_device *rbd_dev) 4371 { 4372 struct gendisk *disk; 4373 struct request_queue *q; 4374 u64 segment_size; 4375 int err; 4376 4377 /* create gendisk info */ 4378 disk = alloc_disk(single_major ? 4379 (1 << RBD_SINGLE_MAJOR_PART_SHIFT) : 4380 RBD_MINORS_PER_MAJOR); 4381 if (!disk) 4382 return -ENOMEM; 4383 4384 snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d", 4385 rbd_dev->dev_id); 4386 disk->major = rbd_dev->major; 4387 disk->first_minor = rbd_dev->minor; 4388 if (single_major) 4389 disk->flags |= GENHD_FL_EXT_DEVT; 4390 disk->fops = &rbd_bd_ops; 4391 disk->private_data = rbd_dev; 4392 4393 memset(&rbd_dev->tag_set, 0, sizeof(rbd_dev->tag_set)); 4394 rbd_dev->tag_set.ops = &rbd_mq_ops; 4395 rbd_dev->tag_set.queue_depth = rbd_dev->opts->queue_depth; 4396 rbd_dev->tag_set.numa_node = NUMA_NO_NODE; 4397 rbd_dev->tag_set.flags = BLK_MQ_F_SHOULD_MERGE | BLK_MQ_F_SG_MERGE; 4398 rbd_dev->tag_set.nr_hw_queues = 1; 4399 rbd_dev->tag_set.cmd_size = sizeof(struct work_struct); 4400 4401 err = blk_mq_alloc_tag_set(&rbd_dev->tag_set); 4402 if (err) 4403 goto out_disk; 4404 4405 q = blk_mq_init_queue(&rbd_dev->tag_set); 4406 if (IS_ERR(q)) { 4407 err = PTR_ERR(q); 4408 goto out_tag_set; 4409 } 4410 4411 queue_flag_set_unlocked(QUEUE_FLAG_NONROT, q); 4412 /* QUEUE_FLAG_ADD_RANDOM is off by default for blk-mq */ 4413 4414 /* set io sizes to object size */ 4415 segment_size = rbd_obj_bytes(&rbd_dev->header); 4416 blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE); 4417 q->limits.max_sectors = queue_max_hw_sectors(q); 4418 blk_queue_max_segments(q, segment_size / SECTOR_SIZE); 4419 blk_queue_max_segment_size(q, segment_size); 4420 blk_queue_io_min(q, segment_size); 4421 blk_queue_io_opt(q, segment_size); 4422 4423 /* enable the discard support */ 4424 queue_flag_set_unlocked(QUEUE_FLAG_DISCARD, q); 4425 q->limits.discard_granularity = segment_size; 4426 q->limits.discard_alignment = segment_size; 4427 blk_queue_max_discard_sectors(q, segment_size / SECTOR_SIZE); 4428 blk_queue_max_write_zeroes_sectors(q, segment_size / SECTOR_SIZE); 4429 4430 if (!ceph_test_opt(rbd_dev->rbd_client->client, NOCRC)) 4431 q->backing_dev_info->capabilities |= BDI_CAP_STABLE_WRITES; 4432 4433 /* 4434 * disk_release() expects a queue ref from add_disk() and will 4435 * put it. Hold an extra ref until add_disk() is called. 4436 */ 4437 WARN_ON(!blk_get_queue(q)); 4438 disk->queue = q; 4439 q->queuedata = rbd_dev; 4440 4441 rbd_dev->disk = disk; 4442 4443 return 0; 4444 out_tag_set: 4445 blk_mq_free_tag_set(&rbd_dev->tag_set); 4446 out_disk: 4447 put_disk(disk); 4448 return err; 4449 } 4450 4451 /* 4452 sysfs 4453 */ 4454 4455 static struct rbd_device *dev_to_rbd_dev(struct device *dev) 4456 { 4457 return container_of(dev, struct rbd_device, dev); 4458 } 4459 4460 static ssize_t rbd_size_show(struct device *dev, 4461 struct device_attribute *attr, char *buf) 4462 { 4463 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 4464 4465 return sprintf(buf, "%llu\n", 4466 (unsigned long long)rbd_dev->mapping.size); 4467 } 4468 4469 /* 4470 * Note this shows the features for whatever's mapped, which is not 4471 * necessarily the base image. 4472 */ 4473 static ssize_t rbd_features_show(struct device *dev, 4474 struct device_attribute *attr, char *buf) 4475 { 4476 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 4477 4478 return sprintf(buf, "0x%016llx\n", 4479 (unsigned long long)rbd_dev->mapping.features); 4480 } 4481 4482 static ssize_t rbd_major_show(struct device *dev, 4483 struct device_attribute *attr, char *buf) 4484 { 4485 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 4486 4487 if (rbd_dev->major) 4488 return sprintf(buf, "%d\n", rbd_dev->major); 4489 4490 return sprintf(buf, "(none)\n"); 4491 } 4492 4493 static ssize_t rbd_minor_show(struct device *dev, 4494 struct device_attribute *attr, char *buf) 4495 { 4496 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 4497 4498 return sprintf(buf, "%d\n", rbd_dev->minor); 4499 } 4500 4501 static ssize_t rbd_client_addr_show(struct device *dev, 4502 struct device_attribute *attr, char *buf) 4503 { 4504 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 4505 struct ceph_entity_addr *client_addr = 4506 ceph_client_addr(rbd_dev->rbd_client->client); 4507 4508 return sprintf(buf, "%pISpc/%u\n", &client_addr->in_addr, 4509 le32_to_cpu(client_addr->nonce)); 4510 } 4511 4512 static ssize_t rbd_client_id_show(struct device *dev, 4513 struct device_attribute *attr, char *buf) 4514 { 4515 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 4516 4517 return sprintf(buf, "client%lld\n", 4518 ceph_client_gid(rbd_dev->rbd_client->client)); 4519 } 4520 4521 static ssize_t rbd_cluster_fsid_show(struct device *dev, 4522 struct device_attribute *attr, char *buf) 4523 { 4524 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 4525 4526 return sprintf(buf, "%pU\n", &rbd_dev->rbd_client->client->fsid); 4527 } 4528 4529 static ssize_t rbd_config_info_show(struct device *dev, 4530 struct device_attribute *attr, char *buf) 4531 { 4532 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 4533 4534 return sprintf(buf, "%s\n", rbd_dev->config_info); 4535 } 4536 4537 static ssize_t rbd_pool_show(struct device *dev, 4538 struct device_attribute *attr, char *buf) 4539 { 4540 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 4541 4542 return sprintf(buf, "%s\n", rbd_dev->spec->pool_name); 4543 } 4544 4545 static ssize_t rbd_pool_id_show(struct device *dev, 4546 struct device_attribute *attr, char *buf) 4547 { 4548 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 4549 4550 return sprintf(buf, "%llu\n", 4551 (unsigned long long) rbd_dev->spec->pool_id); 4552 } 4553 4554 static ssize_t rbd_name_show(struct device *dev, 4555 struct device_attribute *attr, char *buf) 4556 { 4557 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 4558 4559 if (rbd_dev->spec->image_name) 4560 return sprintf(buf, "%s\n", rbd_dev->spec->image_name); 4561 4562 return sprintf(buf, "(unknown)\n"); 4563 } 4564 4565 static ssize_t rbd_image_id_show(struct device *dev, 4566 struct device_attribute *attr, char *buf) 4567 { 4568 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 4569 4570 return sprintf(buf, "%s\n", rbd_dev->spec->image_id); 4571 } 4572 4573 /* 4574 * Shows the name of the currently-mapped snapshot (or 4575 * RBD_SNAP_HEAD_NAME for the base image). 4576 */ 4577 static ssize_t rbd_snap_show(struct device *dev, 4578 struct device_attribute *attr, 4579 char *buf) 4580 { 4581 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 4582 4583 return sprintf(buf, "%s\n", rbd_dev->spec->snap_name); 4584 } 4585 4586 static ssize_t rbd_snap_id_show(struct device *dev, 4587 struct device_attribute *attr, char *buf) 4588 { 4589 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 4590 4591 return sprintf(buf, "%llu\n", rbd_dev->spec->snap_id); 4592 } 4593 4594 /* 4595 * For a v2 image, shows the chain of parent images, separated by empty 4596 * lines. For v1 images or if there is no parent, shows "(no parent 4597 * image)". 4598 */ 4599 static ssize_t rbd_parent_show(struct device *dev, 4600 struct device_attribute *attr, 4601 char *buf) 4602 { 4603 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 4604 ssize_t count = 0; 4605 4606 if (!rbd_dev->parent) 4607 return sprintf(buf, "(no parent image)\n"); 4608 4609 for ( ; rbd_dev->parent; rbd_dev = rbd_dev->parent) { 4610 struct rbd_spec *spec = rbd_dev->parent_spec; 4611 4612 count += sprintf(&buf[count], "%s" 4613 "pool_id %llu\npool_name %s\n" 4614 "image_id %s\nimage_name %s\n" 4615 "snap_id %llu\nsnap_name %s\n" 4616 "overlap %llu\n", 4617 !count ? "" : "\n", /* first? */ 4618 spec->pool_id, spec->pool_name, 4619 spec->image_id, spec->image_name ?: "(unknown)", 4620 spec->snap_id, spec->snap_name, 4621 rbd_dev->parent_overlap); 4622 } 4623 4624 return count; 4625 } 4626 4627 static ssize_t rbd_image_refresh(struct device *dev, 4628 struct device_attribute *attr, 4629 const char *buf, 4630 size_t size) 4631 { 4632 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 4633 int ret; 4634 4635 ret = rbd_dev_refresh(rbd_dev); 4636 if (ret) 4637 return ret; 4638 4639 return size; 4640 } 4641 4642 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL); 4643 static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL); 4644 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL); 4645 static DEVICE_ATTR(minor, S_IRUGO, rbd_minor_show, NULL); 4646 static DEVICE_ATTR(client_addr, S_IRUGO, rbd_client_addr_show, NULL); 4647 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL); 4648 static DEVICE_ATTR(cluster_fsid, S_IRUGO, rbd_cluster_fsid_show, NULL); 4649 static DEVICE_ATTR(config_info, S_IRUSR, rbd_config_info_show, NULL); 4650 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL); 4651 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL); 4652 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL); 4653 static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL); 4654 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh); 4655 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL); 4656 static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL); 4657 static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL); 4658 4659 static struct attribute *rbd_attrs[] = { 4660 &dev_attr_size.attr, 4661 &dev_attr_features.attr, 4662 &dev_attr_major.attr, 4663 &dev_attr_minor.attr, 4664 &dev_attr_client_addr.attr, 4665 &dev_attr_client_id.attr, 4666 &dev_attr_cluster_fsid.attr, 4667 &dev_attr_config_info.attr, 4668 &dev_attr_pool.attr, 4669 &dev_attr_pool_id.attr, 4670 &dev_attr_name.attr, 4671 &dev_attr_image_id.attr, 4672 &dev_attr_current_snap.attr, 4673 &dev_attr_snap_id.attr, 4674 &dev_attr_parent.attr, 4675 &dev_attr_refresh.attr, 4676 NULL 4677 }; 4678 4679 static struct attribute_group rbd_attr_group = { 4680 .attrs = rbd_attrs, 4681 }; 4682 4683 static const struct attribute_group *rbd_attr_groups[] = { 4684 &rbd_attr_group, 4685 NULL 4686 }; 4687 4688 static void rbd_dev_release(struct device *dev); 4689 4690 static const struct device_type rbd_device_type = { 4691 .name = "rbd", 4692 .groups = rbd_attr_groups, 4693 .release = rbd_dev_release, 4694 }; 4695 4696 static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec) 4697 { 4698 kref_get(&spec->kref); 4699 4700 return spec; 4701 } 4702 4703 static void rbd_spec_free(struct kref *kref); 4704 static void rbd_spec_put(struct rbd_spec *spec) 4705 { 4706 if (spec) 4707 kref_put(&spec->kref, rbd_spec_free); 4708 } 4709 4710 static struct rbd_spec *rbd_spec_alloc(void) 4711 { 4712 struct rbd_spec *spec; 4713 4714 spec = kzalloc(sizeof (*spec), GFP_KERNEL); 4715 if (!spec) 4716 return NULL; 4717 4718 spec->pool_id = CEPH_NOPOOL; 4719 spec->snap_id = CEPH_NOSNAP; 4720 kref_init(&spec->kref); 4721 4722 return spec; 4723 } 4724 4725 static void rbd_spec_free(struct kref *kref) 4726 { 4727 struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref); 4728 4729 kfree(spec->pool_name); 4730 kfree(spec->image_id); 4731 kfree(spec->image_name); 4732 kfree(spec->snap_name); 4733 kfree(spec); 4734 } 4735 4736 static void rbd_dev_free(struct rbd_device *rbd_dev) 4737 { 4738 WARN_ON(rbd_dev->watch_state != RBD_WATCH_STATE_UNREGISTERED); 4739 WARN_ON(rbd_dev->lock_state != RBD_LOCK_STATE_UNLOCKED); 4740 4741 ceph_oid_destroy(&rbd_dev->header_oid); 4742 ceph_oloc_destroy(&rbd_dev->header_oloc); 4743 kfree(rbd_dev->config_info); 4744 4745 rbd_put_client(rbd_dev->rbd_client); 4746 rbd_spec_put(rbd_dev->spec); 4747 kfree(rbd_dev->opts); 4748 kfree(rbd_dev); 4749 } 4750 4751 static void rbd_dev_release(struct device *dev) 4752 { 4753 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 4754 bool need_put = !!rbd_dev->opts; 4755 4756 if (need_put) { 4757 destroy_workqueue(rbd_dev->task_wq); 4758 ida_simple_remove(&rbd_dev_id_ida, rbd_dev->dev_id); 4759 } 4760 4761 rbd_dev_free(rbd_dev); 4762 4763 /* 4764 * This is racy, but way better than putting module outside of 4765 * the release callback. The race window is pretty small, so 4766 * doing something similar to dm (dm-builtin.c) is overkill. 4767 */ 4768 if (need_put) 4769 module_put(THIS_MODULE); 4770 } 4771 4772 static struct rbd_device *__rbd_dev_create(struct rbd_client *rbdc, 4773 struct rbd_spec *spec) 4774 { 4775 struct rbd_device *rbd_dev; 4776 4777 rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL); 4778 if (!rbd_dev) 4779 return NULL; 4780 4781 spin_lock_init(&rbd_dev->lock); 4782 INIT_LIST_HEAD(&rbd_dev->node); 4783 init_rwsem(&rbd_dev->header_rwsem); 4784 4785 rbd_dev->header.data_pool_id = CEPH_NOPOOL; 4786 ceph_oid_init(&rbd_dev->header_oid); 4787 rbd_dev->header_oloc.pool = spec->pool_id; 4788 4789 mutex_init(&rbd_dev->watch_mutex); 4790 rbd_dev->watch_state = RBD_WATCH_STATE_UNREGISTERED; 4791 INIT_DELAYED_WORK(&rbd_dev->watch_dwork, rbd_reregister_watch); 4792 4793 init_rwsem(&rbd_dev->lock_rwsem); 4794 rbd_dev->lock_state = RBD_LOCK_STATE_UNLOCKED; 4795 INIT_WORK(&rbd_dev->acquired_lock_work, rbd_notify_acquired_lock); 4796 INIT_WORK(&rbd_dev->released_lock_work, rbd_notify_released_lock); 4797 INIT_DELAYED_WORK(&rbd_dev->lock_dwork, rbd_acquire_lock); 4798 INIT_WORK(&rbd_dev->unlock_work, rbd_release_lock_work); 4799 init_waitqueue_head(&rbd_dev->lock_waitq); 4800 4801 rbd_dev->dev.bus = &rbd_bus_type; 4802 rbd_dev->dev.type = &rbd_device_type; 4803 rbd_dev->dev.parent = &rbd_root_dev; 4804 device_initialize(&rbd_dev->dev); 4805 4806 rbd_dev->rbd_client = rbdc; 4807 rbd_dev->spec = spec; 4808 4809 return rbd_dev; 4810 } 4811 4812 /* 4813 * Create a mapping rbd_dev. 4814 */ 4815 static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc, 4816 struct rbd_spec *spec, 4817 struct rbd_options *opts) 4818 { 4819 struct rbd_device *rbd_dev; 4820 4821 rbd_dev = __rbd_dev_create(rbdc, spec); 4822 if (!rbd_dev) 4823 return NULL; 4824 4825 rbd_dev->opts = opts; 4826 4827 /* get an id and fill in device name */ 4828 rbd_dev->dev_id = ida_simple_get(&rbd_dev_id_ida, 0, 4829 minor_to_rbd_dev_id(1 << MINORBITS), 4830 GFP_KERNEL); 4831 if (rbd_dev->dev_id < 0) 4832 goto fail_rbd_dev; 4833 4834 sprintf(rbd_dev->name, RBD_DRV_NAME "%d", rbd_dev->dev_id); 4835 rbd_dev->task_wq = alloc_ordered_workqueue("%s-tasks", WQ_MEM_RECLAIM, 4836 rbd_dev->name); 4837 if (!rbd_dev->task_wq) 4838 goto fail_dev_id; 4839 4840 /* we have a ref from do_rbd_add() */ 4841 __module_get(THIS_MODULE); 4842 4843 dout("%s rbd_dev %p dev_id %d\n", __func__, rbd_dev, rbd_dev->dev_id); 4844 return rbd_dev; 4845 4846 fail_dev_id: 4847 ida_simple_remove(&rbd_dev_id_ida, rbd_dev->dev_id); 4848 fail_rbd_dev: 4849 rbd_dev_free(rbd_dev); 4850 return NULL; 4851 } 4852 4853 static void rbd_dev_destroy(struct rbd_device *rbd_dev) 4854 { 4855 if (rbd_dev) 4856 put_device(&rbd_dev->dev); 4857 } 4858 4859 /* 4860 * Get the size and object order for an image snapshot, or if 4861 * snap_id is CEPH_NOSNAP, gets this information for the base 4862 * image. 4863 */ 4864 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id, 4865 u8 *order, u64 *snap_size) 4866 { 4867 __le64 snapid = cpu_to_le64(snap_id); 4868 int ret; 4869 struct { 4870 u8 order; 4871 __le64 size; 4872 } __attribute__ ((packed)) size_buf = { 0 }; 4873 4874 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid, 4875 &rbd_dev->header_oloc, "get_size", 4876 &snapid, sizeof(snapid), 4877 &size_buf, sizeof(size_buf)); 4878 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret); 4879 if (ret < 0) 4880 return ret; 4881 if (ret < sizeof (size_buf)) 4882 return -ERANGE; 4883 4884 if (order) { 4885 *order = size_buf.order; 4886 dout(" order %u", (unsigned int)*order); 4887 } 4888 *snap_size = le64_to_cpu(size_buf.size); 4889 4890 dout(" snap_id 0x%016llx snap_size = %llu\n", 4891 (unsigned long long)snap_id, 4892 (unsigned long long)*snap_size); 4893 4894 return 0; 4895 } 4896 4897 static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev) 4898 { 4899 return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP, 4900 &rbd_dev->header.obj_order, 4901 &rbd_dev->header.image_size); 4902 } 4903 4904 static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev) 4905 { 4906 void *reply_buf; 4907 int ret; 4908 void *p; 4909 4910 reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL); 4911 if (!reply_buf) 4912 return -ENOMEM; 4913 4914 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid, 4915 &rbd_dev->header_oloc, "get_object_prefix", 4916 NULL, 0, reply_buf, RBD_OBJ_PREFIX_LEN_MAX); 4917 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret); 4918 if (ret < 0) 4919 goto out; 4920 4921 p = reply_buf; 4922 rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p, 4923 p + ret, NULL, GFP_NOIO); 4924 ret = 0; 4925 4926 if (IS_ERR(rbd_dev->header.object_prefix)) { 4927 ret = PTR_ERR(rbd_dev->header.object_prefix); 4928 rbd_dev->header.object_prefix = NULL; 4929 } else { 4930 dout(" object_prefix = %s\n", rbd_dev->header.object_prefix); 4931 } 4932 out: 4933 kfree(reply_buf); 4934 4935 return ret; 4936 } 4937 4938 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id, 4939 u64 *snap_features) 4940 { 4941 __le64 snapid = cpu_to_le64(snap_id); 4942 struct { 4943 __le64 features; 4944 __le64 incompat; 4945 } __attribute__ ((packed)) features_buf = { 0 }; 4946 u64 unsup; 4947 int ret; 4948 4949 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid, 4950 &rbd_dev->header_oloc, "get_features", 4951 &snapid, sizeof(snapid), 4952 &features_buf, sizeof(features_buf)); 4953 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret); 4954 if (ret < 0) 4955 return ret; 4956 if (ret < sizeof (features_buf)) 4957 return -ERANGE; 4958 4959 unsup = le64_to_cpu(features_buf.incompat) & ~RBD_FEATURES_SUPPORTED; 4960 if (unsup) { 4961 rbd_warn(rbd_dev, "image uses unsupported features: 0x%llx", 4962 unsup); 4963 return -ENXIO; 4964 } 4965 4966 *snap_features = le64_to_cpu(features_buf.features); 4967 4968 dout(" snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n", 4969 (unsigned long long)snap_id, 4970 (unsigned long long)*snap_features, 4971 (unsigned long long)le64_to_cpu(features_buf.incompat)); 4972 4973 return 0; 4974 } 4975 4976 static int rbd_dev_v2_features(struct rbd_device *rbd_dev) 4977 { 4978 return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP, 4979 &rbd_dev->header.features); 4980 } 4981 4982 static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev) 4983 { 4984 struct rbd_spec *parent_spec; 4985 size_t size; 4986 void *reply_buf = NULL; 4987 __le64 snapid; 4988 void *p; 4989 void *end; 4990 u64 pool_id; 4991 char *image_id; 4992 u64 snap_id; 4993 u64 overlap; 4994 int ret; 4995 4996 parent_spec = rbd_spec_alloc(); 4997 if (!parent_spec) 4998 return -ENOMEM; 4999 5000 size = sizeof (__le64) + /* pool_id */ 5001 sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX + /* image_id */ 5002 sizeof (__le64) + /* snap_id */ 5003 sizeof (__le64); /* overlap */ 5004 reply_buf = kmalloc(size, GFP_KERNEL); 5005 if (!reply_buf) { 5006 ret = -ENOMEM; 5007 goto out_err; 5008 } 5009 5010 snapid = cpu_to_le64(rbd_dev->spec->snap_id); 5011 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid, 5012 &rbd_dev->header_oloc, "get_parent", 5013 &snapid, sizeof(snapid), reply_buf, size); 5014 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret); 5015 if (ret < 0) 5016 goto out_err; 5017 5018 p = reply_buf; 5019 end = reply_buf + ret; 5020 ret = -ERANGE; 5021 ceph_decode_64_safe(&p, end, pool_id, out_err); 5022 if (pool_id == CEPH_NOPOOL) { 5023 /* 5024 * Either the parent never existed, or we have 5025 * record of it but the image got flattened so it no 5026 * longer has a parent. When the parent of a 5027 * layered image disappears we immediately set the 5028 * overlap to 0. The effect of this is that all new 5029 * requests will be treated as if the image had no 5030 * parent. 5031 */ 5032 if (rbd_dev->parent_overlap) { 5033 rbd_dev->parent_overlap = 0; 5034 rbd_dev_parent_put(rbd_dev); 5035 pr_info("%s: clone image has been flattened\n", 5036 rbd_dev->disk->disk_name); 5037 } 5038 5039 goto out; /* No parent? No problem. */ 5040 } 5041 5042 /* The ceph file layout needs to fit pool id in 32 bits */ 5043 5044 ret = -EIO; 5045 if (pool_id > (u64)U32_MAX) { 5046 rbd_warn(NULL, "parent pool id too large (%llu > %u)", 5047 (unsigned long long)pool_id, U32_MAX); 5048 goto out_err; 5049 } 5050 5051 image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL); 5052 if (IS_ERR(image_id)) { 5053 ret = PTR_ERR(image_id); 5054 goto out_err; 5055 } 5056 ceph_decode_64_safe(&p, end, snap_id, out_err); 5057 ceph_decode_64_safe(&p, end, overlap, out_err); 5058 5059 /* 5060 * The parent won't change (except when the clone is 5061 * flattened, already handled that). So we only need to 5062 * record the parent spec we have not already done so. 5063 */ 5064 if (!rbd_dev->parent_spec) { 5065 parent_spec->pool_id = pool_id; 5066 parent_spec->image_id = image_id; 5067 parent_spec->snap_id = snap_id; 5068 rbd_dev->parent_spec = parent_spec; 5069 parent_spec = NULL; /* rbd_dev now owns this */ 5070 } else { 5071 kfree(image_id); 5072 } 5073 5074 /* 5075 * We always update the parent overlap. If it's zero we issue 5076 * a warning, as we will proceed as if there was no parent. 5077 */ 5078 if (!overlap) { 5079 if (parent_spec) { 5080 /* refresh, careful to warn just once */ 5081 if (rbd_dev->parent_overlap) 5082 rbd_warn(rbd_dev, 5083 "clone now standalone (overlap became 0)"); 5084 } else { 5085 /* initial probe */ 5086 rbd_warn(rbd_dev, "clone is standalone (overlap 0)"); 5087 } 5088 } 5089 rbd_dev->parent_overlap = overlap; 5090 5091 out: 5092 ret = 0; 5093 out_err: 5094 kfree(reply_buf); 5095 rbd_spec_put(parent_spec); 5096 5097 return ret; 5098 } 5099 5100 static int rbd_dev_v2_striping_info(struct rbd_device *rbd_dev) 5101 { 5102 struct { 5103 __le64 stripe_unit; 5104 __le64 stripe_count; 5105 } __attribute__ ((packed)) striping_info_buf = { 0 }; 5106 size_t size = sizeof (striping_info_buf); 5107 void *p; 5108 u64 obj_size; 5109 u64 stripe_unit; 5110 u64 stripe_count; 5111 int ret; 5112 5113 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid, 5114 &rbd_dev->header_oloc, "get_stripe_unit_count", 5115 NULL, 0, &striping_info_buf, size); 5116 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret); 5117 if (ret < 0) 5118 return ret; 5119 if (ret < size) 5120 return -ERANGE; 5121 5122 /* 5123 * We don't actually support the "fancy striping" feature 5124 * (STRIPINGV2) yet, but if the striping sizes are the 5125 * defaults the behavior is the same as before. So find 5126 * out, and only fail if the image has non-default values. 5127 */ 5128 ret = -EINVAL; 5129 obj_size = rbd_obj_bytes(&rbd_dev->header); 5130 p = &striping_info_buf; 5131 stripe_unit = ceph_decode_64(&p); 5132 if (stripe_unit != obj_size) { 5133 rbd_warn(rbd_dev, "unsupported stripe unit " 5134 "(got %llu want %llu)", 5135 stripe_unit, obj_size); 5136 return -EINVAL; 5137 } 5138 stripe_count = ceph_decode_64(&p); 5139 if (stripe_count != 1) { 5140 rbd_warn(rbd_dev, "unsupported stripe count " 5141 "(got %llu want 1)", stripe_count); 5142 return -EINVAL; 5143 } 5144 rbd_dev->header.stripe_unit = stripe_unit; 5145 rbd_dev->header.stripe_count = stripe_count; 5146 5147 return 0; 5148 } 5149 5150 static int rbd_dev_v2_data_pool(struct rbd_device *rbd_dev) 5151 { 5152 __le64 data_pool_id; 5153 int ret; 5154 5155 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid, 5156 &rbd_dev->header_oloc, "get_data_pool", 5157 NULL, 0, &data_pool_id, sizeof(data_pool_id)); 5158 if (ret < 0) 5159 return ret; 5160 if (ret < sizeof(data_pool_id)) 5161 return -EBADMSG; 5162 5163 rbd_dev->header.data_pool_id = le64_to_cpu(data_pool_id); 5164 WARN_ON(rbd_dev->header.data_pool_id == CEPH_NOPOOL); 5165 return 0; 5166 } 5167 5168 static char *rbd_dev_image_name(struct rbd_device *rbd_dev) 5169 { 5170 CEPH_DEFINE_OID_ONSTACK(oid); 5171 size_t image_id_size; 5172 char *image_id; 5173 void *p; 5174 void *end; 5175 size_t size; 5176 void *reply_buf = NULL; 5177 size_t len = 0; 5178 char *image_name = NULL; 5179 int ret; 5180 5181 rbd_assert(!rbd_dev->spec->image_name); 5182 5183 len = strlen(rbd_dev->spec->image_id); 5184 image_id_size = sizeof (__le32) + len; 5185 image_id = kmalloc(image_id_size, GFP_KERNEL); 5186 if (!image_id) 5187 return NULL; 5188 5189 p = image_id; 5190 end = image_id + image_id_size; 5191 ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32)len); 5192 5193 size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX; 5194 reply_buf = kmalloc(size, GFP_KERNEL); 5195 if (!reply_buf) 5196 goto out; 5197 5198 ceph_oid_printf(&oid, "%s", RBD_DIRECTORY); 5199 ret = rbd_obj_method_sync(rbd_dev, &oid, &rbd_dev->header_oloc, 5200 "dir_get_name", image_id, image_id_size, 5201 reply_buf, size); 5202 if (ret < 0) 5203 goto out; 5204 p = reply_buf; 5205 end = reply_buf + ret; 5206 5207 image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL); 5208 if (IS_ERR(image_name)) 5209 image_name = NULL; 5210 else 5211 dout("%s: name is %s len is %zd\n", __func__, image_name, len); 5212 out: 5213 kfree(reply_buf); 5214 kfree(image_id); 5215 5216 return image_name; 5217 } 5218 5219 static u64 rbd_v1_snap_id_by_name(struct rbd_device *rbd_dev, const char *name) 5220 { 5221 struct ceph_snap_context *snapc = rbd_dev->header.snapc; 5222 const char *snap_name; 5223 u32 which = 0; 5224 5225 /* Skip over names until we find the one we are looking for */ 5226 5227 snap_name = rbd_dev->header.snap_names; 5228 while (which < snapc->num_snaps) { 5229 if (!strcmp(name, snap_name)) 5230 return snapc->snaps[which]; 5231 snap_name += strlen(snap_name) + 1; 5232 which++; 5233 } 5234 return CEPH_NOSNAP; 5235 } 5236 5237 static u64 rbd_v2_snap_id_by_name(struct rbd_device *rbd_dev, const char *name) 5238 { 5239 struct ceph_snap_context *snapc = rbd_dev->header.snapc; 5240 u32 which; 5241 bool found = false; 5242 u64 snap_id; 5243 5244 for (which = 0; !found && which < snapc->num_snaps; which++) { 5245 const char *snap_name; 5246 5247 snap_id = snapc->snaps[which]; 5248 snap_name = rbd_dev_v2_snap_name(rbd_dev, snap_id); 5249 if (IS_ERR(snap_name)) { 5250 /* ignore no-longer existing snapshots */ 5251 if (PTR_ERR(snap_name) == -ENOENT) 5252 continue; 5253 else 5254 break; 5255 } 5256 found = !strcmp(name, snap_name); 5257 kfree(snap_name); 5258 } 5259 return found ? snap_id : CEPH_NOSNAP; 5260 } 5261 5262 /* 5263 * Assumes name is never RBD_SNAP_HEAD_NAME; returns CEPH_NOSNAP if 5264 * no snapshot by that name is found, or if an error occurs. 5265 */ 5266 static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name) 5267 { 5268 if (rbd_dev->image_format == 1) 5269 return rbd_v1_snap_id_by_name(rbd_dev, name); 5270 5271 return rbd_v2_snap_id_by_name(rbd_dev, name); 5272 } 5273 5274 /* 5275 * An image being mapped will have everything but the snap id. 5276 */ 5277 static int rbd_spec_fill_snap_id(struct rbd_device *rbd_dev) 5278 { 5279 struct rbd_spec *spec = rbd_dev->spec; 5280 5281 rbd_assert(spec->pool_id != CEPH_NOPOOL && spec->pool_name); 5282 rbd_assert(spec->image_id && spec->image_name); 5283 rbd_assert(spec->snap_name); 5284 5285 if (strcmp(spec->snap_name, RBD_SNAP_HEAD_NAME)) { 5286 u64 snap_id; 5287 5288 snap_id = rbd_snap_id_by_name(rbd_dev, spec->snap_name); 5289 if (snap_id == CEPH_NOSNAP) 5290 return -ENOENT; 5291 5292 spec->snap_id = snap_id; 5293 } else { 5294 spec->snap_id = CEPH_NOSNAP; 5295 } 5296 5297 return 0; 5298 } 5299 5300 /* 5301 * A parent image will have all ids but none of the names. 5302 * 5303 * All names in an rbd spec are dynamically allocated. It's OK if we 5304 * can't figure out the name for an image id. 5305 */ 5306 static int rbd_spec_fill_names(struct rbd_device *rbd_dev) 5307 { 5308 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; 5309 struct rbd_spec *spec = rbd_dev->spec; 5310 const char *pool_name; 5311 const char *image_name; 5312 const char *snap_name; 5313 int ret; 5314 5315 rbd_assert(spec->pool_id != CEPH_NOPOOL); 5316 rbd_assert(spec->image_id); 5317 rbd_assert(spec->snap_id != CEPH_NOSNAP); 5318 5319 /* Get the pool name; we have to make our own copy of this */ 5320 5321 pool_name = ceph_pg_pool_name_by_id(osdc->osdmap, spec->pool_id); 5322 if (!pool_name) { 5323 rbd_warn(rbd_dev, "no pool with id %llu", spec->pool_id); 5324 return -EIO; 5325 } 5326 pool_name = kstrdup(pool_name, GFP_KERNEL); 5327 if (!pool_name) 5328 return -ENOMEM; 5329 5330 /* Fetch the image name; tolerate failure here */ 5331 5332 image_name = rbd_dev_image_name(rbd_dev); 5333 if (!image_name) 5334 rbd_warn(rbd_dev, "unable to get image name"); 5335 5336 /* Fetch the snapshot name */ 5337 5338 snap_name = rbd_snap_name(rbd_dev, spec->snap_id); 5339 if (IS_ERR(snap_name)) { 5340 ret = PTR_ERR(snap_name); 5341 goto out_err; 5342 } 5343 5344 spec->pool_name = pool_name; 5345 spec->image_name = image_name; 5346 spec->snap_name = snap_name; 5347 5348 return 0; 5349 5350 out_err: 5351 kfree(image_name); 5352 kfree(pool_name); 5353 return ret; 5354 } 5355 5356 static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev) 5357 { 5358 size_t size; 5359 int ret; 5360 void *reply_buf; 5361 void *p; 5362 void *end; 5363 u64 seq; 5364 u32 snap_count; 5365 struct ceph_snap_context *snapc; 5366 u32 i; 5367 5368 /* 5369 * We'll need room for the seq value (maximum snapshot id), 5370 * snapshot count, and array of that many snapshot ids. 5371 * For now we have a fixed upper limit on the number we're 5372 * prepared to receive. 5373 */ 5374 size = sizeof (__le64) + sizeof (__le32) + 5375 RBD_MAX_SNAP_COUNT * sizeof (__le64); 5376 reply_buf = kzalloc(size, GFP_KERNEL); 5377 if (!reply_buf) 5378 return -ENOMEM; 5379 5380 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid, 5381 &rbd_dev->header_oloc, "get_snapcontext", 5382 NULL, 0, reply_buf, size); 5383 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret); 5384 if (ret < 0) 5385 goto out; 5386 5387 p = reply_buf; 5388 end = reply_buf + ret; 5389 ret = -ERANGE; 5390 ceph_decode_64_safe(&p, end, seq, out); 5391 ceph_decode_32_safe(&p, end, snap_count, out); 5392 5393 /* 5394 * Make sure the reported number of snapshot ids wouldn't go 5395 * beyond the end of our buffer. But before checking that, 5396 * make sure the computed size of the snapshot context we 5397 * allocate is representable in a size_t. 5398 */ 5399 if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context)) 5400 / sizeof (u64)) { 5401 ret = -EINVAL; 5402 goto out; 5403 } 5404 if (!ceph_has_room(&p, end, snap_count * sizeof (__le64))) 5405 goto out; 5406 ret = 0; 5407 5408 snapc = ceph_create_snap_context(snap_count, GFP_KERNEL); 5409 if (!snapc) { 5410 ret = -ENOMEM; 5411 goto out; 5412 } 5413 snapc->seq = seq; 5414 for (i = 0; i < snap_count; i++) 5415 snapc->snaps[i] = ceph_decode_64(&p); 5416 5417 ceph_put_snap_context(rbd_dev->header.snapc); 5418 rbd_dev->header.snapc = snapc; 5419 5420 dout(" snap context seq = %llu, snap_count = %u\n", 5421 (unsigned long long)seq, (unsigned int)snap_count); 5422 out: 5423 kfree(reply_buf); 5424 5425 return ret; 5426 } 5427 5428 static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev, 5429 u64 snap_id) 5430 { 5431 size_t size; 5432 void *reply_buf; 5433 __le64 snapid; 5434 int ret; 5435 void *p; 5436 void *end; 5437 char *snap_name; 5438 5439 size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN; 5440 reply_buf = kmalloc(size, GFP_KERNEL); 5441 if (!reply_buf) 5442 return ERR_PTR(-ENOMEM); 5443 5444 snapid = cpu_to_le64(snap_id); 5445 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid, 5446 &rbd_dev->header_oloc, "get_snapshot_name", 5447 &snapid, sizeof(snapid), reply_buf, size); 5448 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret); 5449 if (ret < 0) { 5450 snap_name = ERR_PTR(ret); 5451 goto out; 5452 } 5453 5454 p = reply_buf; 5455 end = reply_buf + ret; 5456 snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL); 5457 if (IS_ERR(snap_name)) 5458 goto out; 5459 5460 dout(" snap_id 0x%016llx snap_name = %s\n", 5461 (unsigned long long)snap_id, snap_name); 5462 out: 5463 kfree(reply_buf); 5464 5465 return snap_name; 5466 } 5467 5468 static int rbd_dev_v2_header_info(struct rbd_device *rbd_dev) 5469 { 5470 bool first_time = rbd_dev->header.object_prefix == NULL; 5471 int ret; 5472 5473 ret = rbd_dev_v2_image_size(rbd_dev); 5474 if (ret) 5475 return ret; 5476 5477 if (first_time) { 5478 ret = rbd_dev_v2_header_onetime(rbd_dev); 5479 if (ret) 5480 return ret; 5481 } 5482 5483 ret = rbd_dev_v2_snap_context(rbd_dev); 5484 if (ret && first_time) { 5485 kfree(rbd_dev->header.object_prefix); 5486 rbd_dev->header.object_prefix = NULL; 5487 } 5488 5489 return ret; 5490 } 5491 5492 static int rbd_dev_header_info(struct rbd_device *rbd_dev) 5493 { 5494 rbd_assert(rbd_image_format_valid(rbd_dev->image_format)); 5495 5496 if (rbd_dev->image_format == 1) 5497 return rbd_dev_v1_header_info(rbd_dev); 5498 5499 return rbd_dev_v2_header_info(rbd_dev); 5500 } 5501 5502 /* 5503 * Skips over white space at *buf, and updates *buf to point to the 5504 * first found non-space character (if any). Returns the length of 5505 * the token (string of non-white space characters) found. Note 5506 * that *buf must be terminated with '\0'. 5507 */ 5508 static inline size_t next_token(const char **buf) 5509 { 5510 /* 5511 * These are the characters that produce nonzero for 5512 * isspace() in the "C" and "POSIX" locales. 5513 */ 5514 const char *spaces = " \f\n\r\t\v"; 5515 5516 *buf += strspn(*buf, spaces); /* Find start of token */ 5517 5518 return strcspn(*buf, spaces); /* Return token length */ 5519 } 5520 5521 /* 5522 * Finds the next token in *buf, dynamically allocates a buffer big 5523 * enough to hold a copy of it, and copies the token into the new 5524 * buffer. The copy is guaranteed to be terminated with '\0'. Note 5525 * that a duplicate buffer is created even for a zero-length token. 5526 * 5527 * Returns a pointer to the newly-allocated duplicate, or a null 5528 * pointer if memory for the duplicate was not available. If 5529 * the lenp argument is a non-null pointer, the length of the token 5530 * (not including the '\0') is returned in *lenp. 5531 * 5532 * If successful, the *buf pointer will be updated to point beyond 5533 * the end of the found token. 5534 * 5535 * Note: uses GFP_KERNEL for allocation. 5536 */ 5537 static inline char *dup_token(const char **buf, size_t *lenp) 5538 { 5539 char *dup; 5540 size_t len; 5541 5542 len = next_token(buf); 5543 dup = kmemdup(*buf, len + 1, GFP_KERNEL); 5544 if (!dup) 5545 return NULL; 5546 *(dup + len) = '\0'; 5547 *buf += len; 5548 5549 if (lenp) 5550 *lenp = len; 5551 5552 return dup; 5553 } 5554 5555 /* 5556 * Parse the options provided for an "rbd add" (i.e., rbd image 5557 * mapping) request. These arrive via a write to /sys/bus/rbd/add, 5558 * and the data written is passed here via a NUL-terminated buffer. 5559 * Returns 0 if successful or an error code otherwise. 5560 * 5561 * The information extracted from these options is recorded in 5562 * the other parameters which return dynamically-allocated 5563 * structures: 5564 * ceph_opts 5565 * The address of a pointer that will refer to a ceph options 5566 * structure. Caller must release the returned pointer using 5567 * ceph_destroy_options() when it is no longer needed. 5568 * rbd_opts 5569 * Address of an rbd options pointer. Fully initialized by 5570 * this function; caller must release with kfree(). 5571 * spec 5572 * Address of an rbd image specification pointer. Fully 5573 * initialized by this function based on parsed options. 5574 * Caller must release with rbd_spec_put(). 5575 * 5576 * The options passed take this form: 5577 * <mon_addrs> <options> <pool_name> <image_name> [<snap_id>] 5578 * where: 5579 * <mon_addrs> 5580 * A comma-separated list of one or more monitor addresses. 5581 * A monitor address is an ip address, optionally followed 5582 * by a port number (separated by a colon). 5583 * I.e.: ip1[:port1][,ip2[:port2]...] 5584 * <options> 5585 * A comma-separated list of ceph and/or rbd options. 5586 * <pool_name> 5587 * The name of the rados pool containing the rbd image. 5588 * <image_name> 5589 * The name of the image in that pool to map. 5590 * <snap_id> 5591 * An optional snapshot id. If provided, the mapping will 5592 * present data from the image at the time that snapshot was 5593 * created. The image head is used if no snapshot id is 5594 * provided. Snapshot mappings are always read-only. 5595 */ 5596 static int rbd_add_parse_args(const char *buf, 5597 struct ceph_options **ceph_opts, 5598 struct rbd_options **opts, 5599 struct rbd_spec **rbd_spec) 5600 { 5601 size_t len; 5602 char *options; 5603 const char *mon_addrs; 5604 char *snap_name; 5605 size_t mon_addrs_size; 5606 struct rbd_spec *spec = NULL; 5607 struct rbd_options *rbd_opts = NULL; 5608 struct ceph_options *copts; 5609 int ret; 5610 5611 /* The first four tokens are required */ 5612 5613 len = next_token(&buf); 5614 if (!len) { 5615 rbd_warn(NULL, "no monitor address(es) provided"); 5616 return -EINVAL; 5617 } 5618 mon_addrs = buf; 5619 mon_addrs_size = len + 1; 5620 buf += len; 5621 5622 ret = -EINVAL; 5623 options = dup_token(&buf, NULL); 5624 if (!options) 5625 return -ENOMEM; 5626 if (!*options) { 5627 rbd_warn(NULL, "no options provided"); 5628 goto out_err; 5629 } 5630 5631 spec = rbd_spec_alloc(); 5632 if (!spec) 5633 goto out_mem; 5634 5635 spec->pool_name = dup_token(&buf, NULL); 5636 if (!spec->pool_name) 5637 goto out_mem; 5638 if (!*spec->pool_name) { 5639 rbd_warn(NULL, "no pool name provided"); 5640 goto out_err; 5641 } 5642 5643 spec->image_name = dup_token(&buf, NULL); 5644 if (!spec->image_name) 5645 goto out_mem; 5646 if (!*spec->image_name) { 5647 rbd_warn(NULL, "no image name provided"); 5648 goto out_err; 5649 } 5650 5651 /* 5652 * Snapshot name is optional; default is to use "-" 5653 * (indicating the head/no snapshot). 5654 */ 5655 len = next_token(&buf); 5656 if (!len) { 5657 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */ 5658 len = sizeof (RBD_SNAP_HEAD_NAME) - 1; 5659 } else if (len > RBD_MAX_SNAP_NAME_LEN) { 5660 ret = -ENAMETOOLONG; 5661 goto out_err; 5662 } 5663 snap_name = kmemdup(buf, len + 1, GFP_KERNEL); 5664 if (!snap_name) 5665 goto out_mem; 5666 *(snap_name + len) = '\0'; 5667 spec->snap_name = snap_name; 5668 5669 /* Initialize all rbd options to the defaults */ 5670 5671 rbd_opts = kzalloc(sizeof (*rbd_opts), GFP_KERNEL); 5672 if (!rbd_opts) 5673 goto out_mem; 5674 5675 rbd_opts->read_only = RBD_READ_ONLY_DEFAULT; 5676 rbd_opts->queue_depth = RBD_QUEUE_DEPTH_DEFAULT; 5677 rbd_opts->lock_on_read = RBD_LOCK_ON_READ_DEFAULT; 5678 rbd_opts->exclusive = RBD_EXCLUSIVE_DEFAULT; 5679 5680 copts = ceph_parse_options(options, mon_addrs, 5681 mon_addrs + mon_addrs_size - 1, 5682 parse_rbd_opts_token, rbd_opts); 5683 if (IS_ERR(copts)) { 5684 ret = PTR_ERR(copts); 5685 goto out_err; 5686 } 5687 kfree(options); 5688 5689 *ceph_opts = copts; 5690 *opts = rbd_opts; 5691 *rbd_spec = spec; 5692 5693 return 0; 5694 out_mem: 5695 ret = -ENOMEM; 5696 out_err: 5697 kfree(rbd_opts); 5698 rbd_spec_put(spec); 5699 kfree(options); 5700 5701 return ret; 5702 } 5703 5704 /* 5705 * Return pool id (>= 0) or a negative error code. 5706 */ 5707 static int rbd_add_get_pool_id(struct rbd_client *rbdc, const char *pool_name) 5708 { 5709 struct ceph_options *opts = rbdc->client->options; 5710 u64 newest_epoch; 5711 int tries = 0; 5712 int ret; 5713 5714 again: 5715 ret = ceph_pg_poolid_by_name(rbdc->client->osdc.osdmap, pool_name); 5716 if (ret == -ENOENT && tries++ < 1) { 5717 ret = ceph_monc_get_version(&rbdc->client->monc, "osdmap", 5718 &newest_epoch); 5719 if (ret < 0) 5720 return ret; 5721 5722 if (rbdc->client->osdc.osdmap->epoch < newest_epoch) { 5723 ceph_osdc_maybe_request_map(&rbdc->client->osdc); 5724 (void) ceph_monc_wait_osdmap(&rbdc->client->monc, 5725 newest_epoch, 5726 opts->mount_timeout); 5727 goto again; 5728 } else { 5729 /* the osdmap we have is new enough */ 5730 return -ENOENT; 5731 } 5732 } 5733 5734 return ret; 5735 } 5736 5737 static void rbd_dev_image_unlock(struct rbd_device *rbd_dev) 5738 { 5739 down_write(&rbd_dev->lock_rwsem); 5740 if (__rbd_is_lock_owner(rbd_dev)) 5741 rbd_unlock(rbd_dev); 5742 up_write(&rbd_dev->lock_rwsem); 5743 } 5744 5745 static int rbd_add_acquire_lock(struct rbd_device *rbd_dev) 5746 { 5747 if (!(rbd_dev->header.features & RBD_FEATURE_EXCLUSIVE_LOCK)) { 5748 rbd_warn(rbd_dev, "exclusive-lock feature is not enabled"); 5749 return -EINVAL; 5750 } 5751 5752 /* FIXME: "rbd map --exclusive" should be in interruptible */ 5753 down_read(&rbd_dev->lock_rwsem); 5754 rbd_wait_state_locked(rbd_dev); 5755 up_read(&rbd_dev->lock_rwsem); 5756 if (test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags)) { 5757 rbd_warn(rbd_dev, "failed to acquire exclusive lock"); 5758 return -EROFS; 5759 } 5760 5761 return 0; 5762 } 5763 5764 /* 5765 * An rbd format 2 image has a unique identifier, distinct from the 5766 * name given to it by the user. Internally, that identifier is 5767 * what's used to specify the names of objects related to the image. 5768 * 5769 * A special "rbd id" object is used to map an rbd image name to its 5770 * id. If that object doesn't exist, then there is no v2 rbd image 5771 * with the supplied name. 5772 * 5773 * This function will record the given rbd_dev's image_id field if 5774 * it can be determined, and in that case will return 0. If any 5775 * errors occur a negative errno will be returned and the rbd_dev's 5776 * image_id field will be unchanged (and should be NULL). 5777 */ 5778 static int rbd_dev_image_id(struct rbd_device *rbd_dev) 5779 { 5780 int ret; 5781 size_t size; 5782 CEPH_DEFINE_OID_ONSTACK(oid); 5783 void *response; 5784 char *image_id; 5785 5786 /* 5787 * When probing a parent image, the image id is already 5788 * known (and the image name likely is not). There's no 5789 * need to fetch the image id again in this case. We 5790 * do still need to set the image format though. 5791 */ 5792 if (rbd_dev->spec->image_id) { 5793 rbd_dev->image_format = *rbd_dev->spec->image_id ? 2 : 1; 5794 5795 return 0; 5796 } 5797 5798 /* 5799 * First, see if the format 2 image id file exists, and if 5800 * so, get the image's persistent id from it. 5801 */ 5802 ret = ceph_oid_aprintf(&oid, GFP_KERNEL, "%s%s", RBD_ID_PREFIX, 5803 rbd_dev->spec->image_name); 5804 if (ret) 5805 return ret; 5806 5807 dout("rbd id object name is %s\n", oid.name); 5808 5809 /* Response will be an encoded string, which includes a length */ 5810 5811 size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX; 5812 response = kzalloc(size, GFP_NOIO); 5813 if (!response) { 5814 ret = -ENOMEM; 5815 goto out; 5816 } 5817 5818 /* If it doesn't exist we'll assume it's a format 1 image */ 5819 5820 ret = rbd_obj_method_sync(rbd_dev, &oid, &rbd_dev->header_oloc, 5821 "get_id", NULL, 0, 5822 response, RBD_IMAGE_ID_LEN_MAX); 5823 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret); 5824 if (ret == -ENOENT) { 5825 image_id = kstrdup("", GFP_KERNEL); 5826 ret = image_id ? 0 : -ENOMEM; 5827 if (!ret) 5828 rbd_dev->image_format = 1; 5829 } else if (ret >= 0) { 5830 void *p = response; 5831 5832 image_id = ceph_extract_encoded_string(&p, p + ret, 5833 NULL, GFP_NOIO); 5834 ret = PTR_ERR_OR_ZERO(image_id); 5835 if (!ret) 5836 rbd_dev->image_format = 2; 5837 } 5838 5839 if (!ret) { 5840 rbd_dev->spec->image_id = image_id; 5841 dout("image_id is %s\n", image_id); 5842 } 5843 out: 5844 kfree(response); 5845 ceph_oid_destroy(&oid); 5846 return ret; 5847 } 5848 5849 /* 5850 * Undo whatever state changes are made by v1 or v2 header info 5851 * call. 5852 */ 5853 static void rbd_dev_unprobe(struct rbd_device *rbd_dev) 5854 { 5855 struct rbd_image_header *header; 5856 5857 rbd_dev_parent_put(rbd_dev); 5858 5859 /* Free dynamic fields from the header, then zero it out */ 5860 5861 header = &rbd_dev->header; 5862 ceph_put_snap_context(header->snapc); 5863 kfree(header->snap_sizes); 5864 kfree(header->snap_names); 5865 kfree(header->object_prefix); 5866 memset(header, 0, sizeof (*header)); 5867 } 5868 5869 static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev) 5870 { 5871 int ret; 5872 5873 ret = rbd_dev_v2_object_prefix(rbd_dev); 5874 if (ret) 5875 goto out_err; 5876 5877 /* 5878 * Get the and check features for the image. Currently the 5879 * features are assumed to never change. 5880 */ 5881 ret = rbd_dev_v2_features(rbd_dev); 5882 if (ret) 5883 goto out_err; 5884 5885 /* If the image supports fancy striping, get its parameters */ 5886 5887 if (rbd_dev->header.features & RBD_FEATURE_STRIPINGV2) { 5888 ret = rbd_dev_v2_striping_info(rbd_dev); 5889 if (ret < 0) 5890 goto out_err; 5891 } 5892 5893 if (rbd_dev->header.features & RBD_FEATURE_DATA_POOL) { 5894 ret = rbd_dev_v2_data_pool(rbd_dev); 5895 if (ret) 5896 goto out_err; 5897 } 5898 5899 rbd_init_layout(rbd_dev); 5900 return 0; 5901 5902 out_err: 5903 rbd_dev->header.features = 0; 5904 kfree(rbd_dev->header.object_prefix); 5905 rbd_dev->header.object_prefix = NULL; 5906 return ret; 5907 } 5908 5909 /* 5910 * @depth is rbd_dev_image_probe() -> rbd_dev_probe_parent() -> 5911 * rbd_dev_image_probe() recursion depth, which means it's also the 5912 * length of the already discovered part of the parent chain. 5913 */ 5914 static int rbd_dev_probe_parent(struct rbd_device *rbd_dev, int depth) 5915 { 5916 struct rbd_device *parent = NULL; 5917 int ret; 5918 5919 if (!rbd_dev->parent_spec) 5920 return 0; 5921 5922 if (++depth > RBD_MAX_PARENT_CHAIN_LEN) { 5923 pr_info("parent chain is too long (%d)\n", depth); 5924 ret = -EINVAL; 5925 goto out_err; 5926 } 5927 5928 parent = __rbd_dev_create(rbd_dev->rbd_client, rbd_dev->parent_spec); 5929 if (!parent) { 5930 ret = -ENOMEM; 5931 goto out_err; 5932 } 5933 5934 /* 5935 * Images related by parent/child relationships always share 5936 * rbd_client and spec/parent_spec, so bump their refcounts. 5937 */ 5938 __rbd_get_client(rbd_dev->rbd_client); 5939 rbd_spec_get(rbd_dev->parent_spec); 5940 5941 ret = rbd_dev_image_probe(parent, depth); 5942 if (ret < 0) 5943 goto out_err; 5944 5945 rbd_dev->parent = parent; 5946 atomic_set(&rbd_dev->parent_ref, 1); 5947 return 0; 5948 5949 out_err: 5950 rbd_dev_unparent(rbd_dev); 5951 rbd_dev_destroy(parent); 5952 return ret; 5953 } 5954 5955 static void rbd_dev_device_release(struct rbd_device *rbd_dev) 5956 { 5957 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags); 5958 rbd_dev_mapping_clear(rbd_dev); 5959 rbd_free_disk(rbd_dev); 5960 if (!single_major) 5961 unregister_blkdev(rbd_dev->major, rbd_dev->name); 5962 } 5963 5964 /* 5965 * rbd_dev->header_rwsem must be locked for write and will be unlocked 5966 * upon return. 5967 */ 5968 static int rbd_dev_device_setup(struct rbd_device *rbd_dev) 5969 { 5970 int ret; 5971 5972 /* Record our major and minor device numbers. */ 5973 5974 if (!single_major) { 5975 ret = register_blkdev(0, rbd_dev->name); 5976 if (ret < 0) 5977 goto err_out_unlock; 5978 5979 rbd_dev->major = ret; 5980 rbd_dev->minor = 0; 5981 } else { 5982 rbd_dev->major = rbd_major; 5983 rbd_dev->minor = rbd_dev_id_to_minor(rbd_dev->dev_id); 5984 } 5985 5986 /* Set up the blkdev mapping. */ 5987 5988 ret = rbd_init_disk(rbd_dev); 5989 if (ret) 5990 goto err_out_blkdev; 5991 5992 ret = rbd_dev_mapping_set(rbd_dev); 5993 if (ret) 5994 goto err_out_disk; 5995 5996 set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE); 5997 set_disk_ro(rbd_dev->disk, rbd_dev->mapping.read_only); 5998 5999 ret = dev_set_name(&rbd_dev->dev, "%d", rbd_dev->dev_id); 6000 if (ret) 6001 goto err_out_mapping; 6002 6003 set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags); 6004 up_write(&rbd_dev->header_rwsem); 6005 return 0; 6006 6007 err_out_mapping: 6008 rbd_dev_mapping_clear(rbd_dev); 6009 err_out_disk: 6010 rbd_free_disk(rbd_dev); 6011 err_out_blkdev: 6012 if (!single_major) 6013 unregister_blkdev(rbd_dev->major, rbd_dev->name); 6014 err_out_unlock: 6015 up_write(&rbd_dev->header_rwsem); 6016 return ret; 6017 } 6018 6019 static int rbd_dev_header_name(struct rbd_device *rbd_dev) 6020 { 6021 struct rbd_spec *spec = rbd_dev->spec; 6022 int ret; 6023 6024 /* Record the header object name for this rbd image. */ 6025 6026 rbd_assert(rbd_image_format_valid(rbd_dev->image_format)); 6027 if (rbd_dev->image_format == 1) 6028 ret = ceph_oid_aprintf(&rbd_dev->header_oid, GFP_KERNEL, "%s%s", 6029 spec->image_name, RBD_SUFFIX); 6030 else 6031 ret = ceph_oid_aprintf(&rbd_dev->header_oid, GFP_KERNEL, "%s%s", 6032 RBD_HEADER_PREFIX, spec->image_id); 6033 6034 return ret; 6035 } 6036 6037 static void rbd_dev_image_release(struct rbd_device *rbd_dev) 6038 { 6039 rbd_dev_unprobe(rbd_dev); 6040 if (rbd_dev->opts) 6041 rbd_unregister_watch(rbd_dev); 6042 rbd_dev->image_format = 0; 6043 kfree(rbd_dev->spec->image_id); 6044 rbd_dev->spec->image_id = NULL; 6045 } 6046 6047 /* 6048 * Probe for the existence of the header object for the given rbd 6049 * device. If this image is the one being mapped (i.e., not a 6050 * parent), initiate a watch on its header object before using that 6051 * object to get detailed information about the rbd image. 6052 */ 6053 static int rbd_dev_image_probe(struct rbd_device *rbd_dev, int depth) 6054 { 6055 int ret; 6056 6057 /* 6058 * Get the id from the image id object. Unless there's an 6059 * error, rbd_dev->spec->image_id will be filled in with 6060 * a dynamically-allocated string, and rbd_dev->image_format 6061 * will be set to either 1 or 2. 6062 */ 6063 ret = rbd_dev_image_id(rbd_dev); 6064 if (ret) 6065 return ret; 6066 6067 ret = rbd_dev_header_name(rbd_dev); 6068 if (ret) 6069 goto err_out_format; 6070 6071 if (!depth) { 6072 ret = rbd_register_watch(rbd_dev); 6073 if (ret) { 6074 if (ret == -ENOENT) 6075 pr_info("image %s/%s does not exist\n", 6076 rbd_dev->spec->pool_name, 6077 rbd_dev->spec->image_name); 6078 goto err_out_format; 6079 } 6080 } 6081 6082 ret = rbd_dev_header_info(rbd_dev); 6083 if (ret) 6084 goto err_out_watch; 6085 6086 /* 6087 * If this image is the one being mapped, we have pool name and 6088 * id, image name and id, and snap name - need to fill snap id. 6089 * Otherwise this is a parent image, identified by pool, image 6090 * and snap ids - need to fill in names for those ids. 6091 */ 6092 if (!depth) 6093 ret = rbd_spec_fill_snap_id(rbd_dev); 6094 else 6095 ret = rbd_spec_fill_names(rbd_dev); 6096 if (ret) { 6097 if (ret == -ENOENT) 6098 pr_info("snap %s/%s@%s does not exist\n", 6099 rbd_dev->spec->pool_name, 6100 rbd_dev->spec->image_name, 6101 rbd_dev->spec->snap_name); 6102 goto err_out_probe; 6103 } 6104 6105 if (rbd_dev->header.features & RBD_FEATURE_LAYERING) { 6106 ret = rbd_dev_v2_parent_info(rbd_dev); 6107 if (ret) 6108 goto err_out_probe; 6109 6110 /* 6111 * Need to warn users if this image is the one being 6112 * mapped and has a parent. 6113 */ 6114 if (!depth && rbd_dev->parent_spec) 6115 rbd_warn(rbd_dev, 6116 "WARNING: kernel layering is EXPERIMENTAL!"); 6117 } 6118 6119 ret = rbd_dev_probe_parent(rbd_dev, depth); 6120 if (ret) 6121 goto err_out_probe; 6122 6123 dout("discovered format %u image, header name is %s\n", 6124 rbd_dev->image_format, rbd_dev->header_oid.name); 6125 return 0; 6126 6127 err_out_probe: 6128 rbd_dev_unprobe(rbd_dev); 6129 err_out_watch: 6130 if (!depth) 6131 rbd_unregister_watch(rbd_dev); 6132 err_out_format: 6133 rbd_dev->image_format = 0; 6134 kfree(rbd_dev->spec->image_id); 6135 rbd_dev->spec->image_id = NULL; 6136 return ret; 6137 } 6138 6139 static ssize_t do_rbd_add(struct bus_type *bus, 6140 const char *buf, 6141 size_t count) 6142 { 6143 struct rbd_device *rbd_dev = NULL; 6144 struct ceph_options *ceph_opts = NULL; 6145 struct rbd_options *rbd_opts = NULL; 6146 struct rbd_spec *spec = NULL; 6147 struct rbd_client *rbdc; 6148 bool read_only; 6149 int rc; 6150 6151 if (!try_module_get(THIS_MODULE)) 6152 return -ENODEV; 6153 6154 /* parse add command */ 6155 rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec); 6156 if (rc < 0) 6157 goto out; 6158 6159 rbdc = rbd_get_client(ceph_opts); 6160 if (IS_ERR(rbdc)) { 6161 rc = PTR_ERR(rbdc); 6162 goto err_out_args; 6163 } 6164 6165 /* pick the pool */ 6166 rc = rbd_add_get_pool_id(rbdc, spec->pool_name); 6167 if (rc < 0) { 6168 if (rc == -ENOENT) 6169 pr_info("pool %s does not exist\n", spec->pool_name); 6170 goto err_out_client; 6171 } 6172 spec->pool_id = (u64)rc; 6173 6174 rbd_dev = rbd_dev_create(rbdc, spec, rbd_opts); 6175 if (!rbd_dev) { 6176 rc = -ENOMEM; 6177 goto err_out_client; 6178 } 6179 rbdc = NULL; /* rbd_dev now owns this */ 6180 spec = NULL; /* rbd_dev now owns this */ 6181 rbd_opts = NULL; /* rbd_dev now owns this */ 6182 6183 rbd_dev->config_info = kstrdup(buf, GFP_KERNEL); 6184 if (!rbd_dev->config_info) { 6185 rc = -ENOMEM; 6186 goto err_out_rbd_dev; 6187 } 6188 6189 down_write(&rbd_dev->header_rwsem); 6190 rc = rbd_dev_image_probe(rbd_dev, 0); 6191 if (rc < 0) { 6192 up_write(&rbd_dev->header_rwsem); 6193 goto err_out_rbd_dev; 6194 } 6195 6196 /* If we are mapping a snapshot it must be marked read-only */ 6197 6198 read_only = rbd_dev->opts->read_only; 6199 if (rbd_dev->spec->snap_id != CEPH_NOSNAP) 6200 read_only = true; 6201 rbd_dev->mapping.read_only = read_only; 6202 6203 rc = rbd_dev_device_setup(rbd_dev); 6204 if (rc) 6205 goto err_out_image_probe; 6206 6207 if (rbd_dev->opts->exclusive) { 6208 rc = rbd_add_acquire_lock(rbd_dev); 6209 if (rc) 6210 goto err_out_device_setup; 6211 } 6212 6213 /* Everything's ready. Announce the disk to the world. */ 6214 6215 rc = device_add(&rbd_dev->dev); 6216 if (rc) 6217 goto err_out_image_lock; 6218 6219 add_disk(rbd_dev->disk); 6220 /* see rbd_init_disk() */ 6221 blk_put_queue(rbd_dev->disk->queue); 6222 6223 spin_lock(&rbd_dev_list_lock); 6224 list_add_tail(&rbd_dev->node, &rbd_dev_list); 6225 spin_unlock(&rbd_dev_list_lock); 6226 6227 pr_info("%s: capacity %llu features 0x%llx\n", rbd_dev->disk->disk_name, 6228 (unsigned long long)get_capacity(rbd_dev->disk) << SECTOR_SHIFT, 6229 rbd_dev->header.features); 6230 rc = count; 6231 out: 6232 module_put(THIS_MODULE); 6233 return rc; 6234 6235 err_out_image_lock: 6236 rbd_dev_image_unlock(rbd_dev); 6237 err_out_device_setup: 6238 rbd_dev_device_release(rbd_dev); 6239 err_out_image_probe: 6240 rbd_dev_image_release(rbd_dev); 6241 err_out_rbd_dev: 6242 rbd_dev_destroy(rbd_dev); 6243 err_out_client: 6244 rbd_put_client(rbdc); 6245 err_out_args: 6246 rbd_spec_put(spec); 6247 kfree(rbd_opts); 6248 goto out; 6249 } 6250 6251 static ssize_t rbd_add(struct bus_type *bus, 6252 const char *buf, 6253 size_t count) 6254 { 6255 if (single_major) 6256 return -EINVAL; 6257 6258 return do_rbd_add(bus, buf, count); 6259 } 6260 6261 static ssize_t rbd_add_single_major(struct bus_type *bus, 6262 const char *buf, 6263 size_t count) 6264 { 6265 return do_rbd_add(bus, buf, count); 6266 } 6267 6268 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev) 6269 { 6270 while (rbd_dev->parent) { 6271 struct rbd_device *first = rbd_dev; 6272 struct rbd_device *second = first->parent; 6273 struct rbd_device *third; 6274 6275 /* 6276 * Follow to the parent with no grandparent and 6277 * remove it. 6278 */ 6279 while (second && (third = second->parent)) { 6280 first = second; 6281 second = third; 6282 } 6283 rbd_assert(second); 6284 rbd_dev_image_release(second); 6285 rbd_dev_destroy(second); 6286 first->parent = NULL; 6287 first->parent_overlap = 0; 6288 6289 rbd_assert(first->parent_spec); 6290 rbd_spec_put(first->parent_spec); 6291 first->parent_spec = NULL; 6292 } 6293 } 6294 6295 static ssize_t do_rbd_remove(struct bus_type *bus, 6296 const char *buf, 6297 size_t count) 6298 { 6299 struct rbd_device *rbd_dev = NULL; 6300 struct list_head *tmp; 6301 int dev_id; 6302 char opt_buf[6]; 6303 bool already = false; 6304 bool force = false; 6305 int ret; 6306 6307 dev_id = -1; 6308 opt_buf[0] = '\0'; 6309 sscanf(buf, "%d %5s", &dev_id, opt_buf); 6310 if (dev_id < 0) { 6311 pr_err("dev_id out of range\n"); 6312 return -EINVAL; 6313 } 6314 if (opt_buf[0] != '\0') { 6315 if (!strcmp(opt_buf, "force")) { 6316 force = true; 6317 } else { 6318 pr_err("bad remove option at '%s'\n", opt_buf); 6319 return -EINVAL; 6320 } 6321 } 6322 6323 ret = -ENOENT; 6324 spin_lock(&rbd_dev_list_lock); 6325 list_for_each(tmp, &rbd_dev_list) { 6326 rbd_dev = list_entry(tmp, struct rbd_device, node); 6327 if (rbd_dev->dev_id == dev_id) { 6328 ret = 0; 6329 break; 6330 } 6331 } 6332 if (!ret) { 6333 spin_lock_irq(&rbd_dev->lock); 6334 if (rbd_dev->open_count && !force) 6335 ret = -EBUSY; 6336 else 6337 already = test_and_set_bit(RBD_DEV_FLAG_REMOVING, 6338 &rbd_dev->flags); 6339 spin_unlock_irq(&rbd_dev->lock); 6340 } 6341 spin_unlock(&rbd_dev_list_lock); 6342 if (ret < 0 || already) 6343 return ret; 6344 6345 if (force) { 6346 /* 6347 * Prevent new IO from being queued and wait for existing 6348 * IO to complete/fail. 6349 */ 6350 blk_mq_freeze_queue(rbd_dev->disk->queue); 6351 blk_set_queue_dying(rbd_dev->disk->queue); 6352 } 6353 6354 del_gendisk(rbd_dev->disk); 6355 spin_lock(&rbd_dev_list_lock); 6356 list_del_init(&rbd_dev->node); 6357 spin_unlock(&rbd_dev_list_lock); 6358 device_del(&rbd_dev->dev); 6359 6360 rbd_dev_image_unlock(rbd_dev); 6361 rbd_dev_device_release(rbd_dev); 6362 rbd_dev_image_release(rbd_dev); 6363 rbd_dev_destroy(rbd_dev); 6364 return count; 6365 } 6366 6367 static ssize_t rbd_remove(struct bus_type *bus, 6368 const char *buf, 6369 size_t count) 6370 { 6371 if (single_major) 6372 return -EINVAL; 6373 6374 return do_rbd_remove(bus, buf, count); 6375 } 6376 6377 static ssize_t rbd_remove_single_major(struct bus_type *bus, 6378 const char *buf, 6379 size_t count) 6380 { 6381 return do_rbd_remove(bus, buf, count); 6382 } 6383 6384 /* 6385 * create control files in sysfs 6386 * /sys/bus/rbd/... 6387 */ 6388 static int rbd_sysfs_init(void) 6389 { 6390 int ret; 6391 6392 ret = device_register(&rbd_root_dev); 6393 if (ret < 0) 6394 return ret; 6395 6396 ret = bus_register(&rbd_bus_type); 6397 if (ret < 0) 6398 device_unregister(&rbd_root_dev); 6399 6400 return ret; 6401 } 6402 6403 static void rbd_sysfs_cleanup(void) 6404 { 6405 bus_unregister(&rbd_bus_type); 6406 device_unregister(&rbd_root_dev); 6407 } 6408 6409 static int rbd_slab_init(void) 6410 { 6411 rbd_assert(!rbd_img_request_cache); 6412 rbd_img_request_cache = KMEM_CACHE(rbd_img_request, 0); 6413 if (!rbd_img_request_cache) 6414 return -ENOMEM; 6415 6416 rbd_assert(!rbd_obj_request_cache); 6417 rbd_obj_request_cache = KMEM_CACHE(rbd_obj_request, 0); 6418 if (!rbd_obj_request_cache) 6419 goto out_err; 6420 6421 rbd_assert(!rbd_bio_clone); 6422 rbd_bio_clone = bioset_create(BIO_POOL_SIZE, 0, 0); 6423 if (!rbd_bio_clone) 6424 goto out_err_clone; 6425 6426 return 0; 6427 6428 out_err_clone: 6429 kmem_cache_destroy(rbd_obj_request_cache); 6430 rbd_obj_request_cache = NULL; 6431 out_err: 6432 kmem_cache_destroy(rbd_img_request_cache); 6433 rbd_img_request_cache = NULL; 6434 return -ENOMEM; 6435 } 6436 6437 static void rbd_slab_exit(void) 6438 { 6439 rbd_assert(rbd_obj_request_cache); 6440 kmem_cache_destroy(rbd_obj_request_cache); 6441 rbd_obj_request_cache = NULL; 6442 6443 rbd_assert(rbd_img_request_cache); 6444 kmem_cache_destroy(rbd_img_request_cache); 6445 rbd_img_request_cache = NULL; 6446 6447 rbd_assert(rbd_bio_clone); 6448 bioset_free(rbd_bio_clone); 6449 rbd_bio_clone = NULL; 6450 } 6451 6452 static int __init rbd_init(void) 6453 { 6454 int rc; 6455 6456 if (!libceph_compatible(NULL)) { 6457 rbd_warn(NULL, "libceph incompatibility (quitting)"); 6458 return -EINVAL; 6459 } 6460 6461 rc = rbd_slab_init(); 6462 if (rc) 6463 return rc; 6464 6465 /* 6466 * The number of active work items is limited by the number of 6467 * rbd devices * queue depth, so leave @max_active at default. 6468 */ 6469 rbd_wq = alloc_workqueue(RBD_DRV_NAME, WQ_MEM_RECLAIM, 0); 6470 if (!rbd_wq) { 6471 rc = -ENOMEM; 6472 goto err_out_slab; 6473 } 6474 6475 if (single_major) { 6476 rbd_major = register_blkdev(0, RBD_DRV_NAME); 6477 if (rbd_major < 0) { 6478 rc = rbd_major; 6479 goto err_out_wq; 6480 } 6481 } 6482 6483 rc = rbd_sysfs_init(); 6484 if (rc) 6485 goto err_out_blkdev; 6486 6487 if (single_major) 6488 pr_info("loaded (major %d)\n", rbd_major); 6489 else 6490 pr_info("loaded\n"); 6491 6492 return 0; 6493 6494 err_out_blkdev: 6495 if (single_major) 6496 unregister_blkdev(rbd_major, RBD_DRV_NAME); 6497 err_out_wq: 6498 destroy_workqueue(rbd_wq); 6499 err_out_slab: 6500 rbd_slab_exit(); 6501 return rc; 6502 } 6503 6504 static void __exit rbd_exit(void) 6505 { 6506 ida_destroy(&rbd_dev_id_ida); 6507 rbd_sysfs_cleanup(); 6508 if (single_major) 6509 unregister_blkdev(rbd_major, RBD_DRV_NAME); 6510 destroy_workqueue(rbd_wq); 6511 rbd_slab_exit(); 6512 } 6513 6514 module_init(rbd_init); 6515 module_exit(rbd_exit); 6516 6517 MODULE_AUTHOR("Alex Elder <elder@inktank.com>"); 6518 MODULE_AUTHOR("Sage Weil <sage@newdream.net>"); 6519 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>"); 6520 /* following authorship retained from original osdblk.c */ 6521 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>"); 6522 6523 MODULE_DESCRIPTION("RADOS Block Device (RBD) driver"); 6524 MODULE_LICENSE("GPL"); 6525