1 /* 2 rbd.c -- Export ceph rados objects as a Linux block device 3 4 5 based on drivers/block/osdblk.c: 6 7 Copyright 2009 Red Hat, Inc. 8 9 This program is free software; you can redistribute it and/or modify 10 it under the terms of the GNU General Public License as published by 11 the Free Software Foundation. 12 13 This program is distributed in the hope that it will be useful, 14 but WITHOUT ANY WARRANTY; without even the implied warranty of 15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 16 GNU General Public License for more details. 17 18 You should have received a copy of the GNU General Public License 19 along with this program; see the file COPYING. If not, write to 20 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA. 21 22 23 24 For usage instructions, please refer to: 25 26 Documentation/ABI/testing/sysfs-bus-rbd 27 28 */ 29 30 #include <linux/ceph/libceph.h> 31 #include <linux/ceph/osd_client.h> 32 #include <linux/ceph/mon_client.h> 33 #include <linux/ceph/decode.h> 34 #include <linux/parser.h> 35 36 #include <linux/kernel.h> 37 #include <linux/device.h> 38 #include <linux/module.h> 39 #include <linux/fs.h> 40 #include <linux/blkdev.h> 41 42 #include "rbd_types.h" 43 44 #define DRV_NAME "rbd" 45 #define DRV_NAME_LONG "rbd (rados block device)" 46 47 #define RBD_MINORS_PER_MAJOR 256 /* max minors per blkdev */ 48 49 #define RBD_MAX_MD_NAME_LEN (96 + sizeof(RBD_SUFFIX)) 50 #define RBD_MAX_POOL_NAME_LEN 64 51 #define RBD_MAX_SNAP_NAME_LEN 32 52 #define RBD_MAX_OPT_LEN 1024 53 54 #define RBD_SNAP_HEAD_NAME "-" 55 56 #define DEV_NAME_LEN 32 57 58 #define RBD_NOTIFY_TIMEOUT_DEFAULT 10 59 60 /* 61 * block device image metadata (in-memory version) 62 */ 63 struct rbd_image_header { 64 u64 image_size; 65 char block_name[32]; 66 __u8 obj_order; 67 __u8 crypt_type; 68 __u8 comp_type; 69 struct rw_semaphore snap_rwsem; 70 struct ceph_snap_context *snapc; 71 size_t snap_names_len; 72 u64 snap_seq; 73 u32 total_snaps; 74 75 char *snap_names; 76 u64 *snap_sizes; 77 78 u64 obj_version; 79 }; 80 81 struct rbd_options { 82 int notify_timeout; 83 }; 84 85 /* 86 * an instance of the client. multiple devices may share a client. 87 */ 88 struct rbd_client { 89 struct ceph_client *client; 90 struct rbd_options *rbd_opts; 91 struct kref kref; 92 struct list_head node; 93 }; 94 95 /* 96 * a single io request 97 */ 98 struct rbd_request { 99 struct request *rq; /* blk layer request */ 100 struct bio *bio; /* cloned bio */ 101 struct page **pages; /* list of used pages */ 102 u64 len; 103 }; 104 105 struct rbd_snap { 106 struct device dev; 107 const char *name; 108 size_t size; 109 struct list_head node; 110 u64 id; 111 }; 112 113 /* 114 * a single device 115 */ 116 struct rbd_device { 117 int id; /* blkdev unique id */ 118 119 int major; /* blkdev assigned major */ 120 struct gendisk *disk; /* blkdev's gendisk and rq */ 121 struct request_queue *q; 122 123 struct ceph_client *client; 124 struct rbd_client *rbd_client; 125 126 char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */ 127 128 spinlock_t lock; /* queue lock */ 129 130 struct rbd_image_header header; 131 char obj[RBD_MAX_OBJ_NAME_LEN]; /* rbd image name */ 132 int obj_len; 133 char obj_md_name[RBD_MAX_MD_NAME_LEN]; /* hdr nm. */ 134 char pool_name[RBD_MAX_POOL_NAME_LEN]; 135 int poolid; 136 137 struct ceph_osd_event *watch_event; 138 struct ceph_osd_request *watch_request; 139 140 char snap_name[RBD_MAX_SNAP_NAME_LEN]; 141 u32 cur_snap; /* index+1 of current snapshot within snap context 142 0 - for the head */ 143 int read_only; 144 145 struct list_head node; 146 147 /* list of snapshots */ 148 struct list_head snaps; 149 150 /* sysfs related */ 151 struct device dev; 152 }; 153 154 static struct bus_type rbd_bus_type = { 155 .name = "rbd", 156 }; 157 158 static spinlock_t node_lock; /* protects client get/put */ 159 160 static DEFINE_MUTEX(ctl_mutex); /* Serialize open/close/setup/teardown */ 161 static LIST_HEAD(rbd_dev_list); /* devices */ 162 static LIST_HEAD(rbd_client_list); /* clients */ 163 164 static int __rbd_init_snaps_header(struct rbd_device *rbd_dev); 165 static void rbd_dev_release(struct device *dev); 166 static ssize_t rbd_snap_rollback(struct device *dev, 167 struct device_attribute *attr, 168 const char *buf, 169 size_t size); 170 static ssize_t rbd_snap_add(struct device *dev, 171 struct device_attribute *attr, 172 const char *buf, 173 size_t count); 174 static void __rbd_remove_snap_dev(struct rbd_device *rbd_dev, 175 struct rbd_snap *snap);; 176 177 178 static struct rbd_device *dev_to_rbd(struct device *dev) 179 { 180 return container_of(dev, struct rbd_device, dev); 181 } 182 183 static struct device *rbd_get_dev(struct rbd_device *rbd_dev) 184 { 185 return get_device(&rbd_dev->dev); 186 } 187 188 static void rbd_put_dev(struct rbd_device *rbd_dev) 189 { 190 put_device(&rbd_dev->dev); 191 } 192 193 static int __rbd_update_snaps(struct rbd_device *rbd_dev); 194 195 static int rbd_open(struct block_device *bdev, fmode_t mode) 196 { 197 struct gendisk *disk = bdev->bd_disk; 198 struct rbd_device *rbd_dev = disk->private_data; 199 200 rbd_get_dev(rbd_dev); 201 202 set_device_ro(bdev, rbd_dev->read_only); 203 204 if ((mode & FMODE_WRITE) && rbd_dev->read_only) 205 return -EROFS; 206 207 return 0; 208 } 209 210 static int rbd_release(struct gendisk *disk, fmode_t mode) 211 { 212 struct rbd_device *rbd_dev = disk->private_data; 213 214 rbd_put_dev(rbd_dev); 215 216 return 0; 217 } 218 219 static const struct block_device_operations rbd_bd_ops = { 220 .owner = THIS_MODULE, 221 .open = rbd_open, 222 .release = rbd_release, 223 }; 224 225 /* 226 * Initialize an rbd client instance. 227 * We own *opt. 228 */ 229 static struct rbd_client *rbd_client_create(struct ceph_options *opt, 230 struct rbd_options *rbd_opts) 231 { 232 struct rbd_client *rbdc; 233 int ret = -ENOMEM; 234 235 dout("rbd_client_create\n"); 236 rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL); 237 if (!rbdc) 238 goto out_opt; 239 240 kref_init(&rbdc->kref); 241 INIT_LIST_HEAD(&rbdc->node); 242 243 rbdc->client = ceph_create_client(opt, rbdc); 244 if (IS_ERR(rbdc->client)) 245 goto out_rbdc; 246 opt = NULL; /* Now rbdc->client is responsible for opt */ 247 248 ret = ceph_open_session(rbdc->client); 249 if (ret < 0) 250 goto out_err; 251 252 rbdc->rbd_opts = rbd_opts; 253 254 spin_lock(&node_lock); 255 list_add_tail(&rbdc->node, &rbd_client_list); 256 spin_unlock(&node_lock); 257 258 dout("rbd_client_create created %p\n", rbdc); 259 return rbdc; 260 261 out_err: 262 ceph_destroy_client(rbdc->client); 263 out_rbdc: 264 kfree(rbdc); 265 out_opt: 266 if (opt) 267 ceph_destroy_options(opt); 268 return ERR_PTR(ret); 269 } 270 271 /* 272 * Find a ceph client with specific addr and configuration. 273 */ 274 static struct rbd_client *__rbd_client_find(struct ceph_options *opt) 275 { 276 struct rbd_client *client_node; 277 278 if (opt->flags & CEPH_OPT_NOSHARE) 279 return NULL; 280 281 list_for_each_entry(client_node, &rbd_client_list, node) 282 if (ceph_compare_options(opt, client_node->client) == 0) 283 return client_node; 284 return NULL; 285 } 286 287 /* 288 * mount options 289 */ 290 enum { 291 Opt_notify_timeout, 292 Opt_last_int, 293 /* int args above */ 294 Opt_last_string, 295 /* string args above */ 296 }; 297 298 static match_table_t rbdopt_tokens = { 299 {Opt_notify_timeout, "notify_timeout=%d"}, 300 /* int args above */ 301 /* string args above */ 302 {-1, NULL} 303 }; 304 305 static int parse_rbd_opts_token(char *c, void *private) 306 { 307 struct rbd_options *rbdopt = private; 308 substring_t argstr[MAX_OPT_ARGS]; 309 int token, intval, ret; 310 311 token = match_token((char *)c, rbdopt_tokens, argstr); 312 if (token < 0) 313 return -EINVAL; 314 315 if (token < Opt_last_int) { 316 ret = match_int(&argstr[0], &intval); 317 if (ret < 0) { 318 pr_err("bad mount option arg (not int) " 319 "at '%s'\n", c); 320 return ret; 321 } 322 dout("got int token %d val %d\n", token, intval); 323 } else if (token > Opt_last_int && token < Opt_last_string) { 324 dout("got string token %d val %s\n", token, 325 argstr[0].from); 326 } else { 327 dout("got token %d\n", token); 328 } 329 330 switch (token) { 331 case Opt_notify_timeout: 332 rbdopt->notify_timeout = intval; 333 break; 334 default: 335 BUG_ON(token); 336 } 337 return 0; 338 } 339 340 /* 341 * Get a ceph client with specific addr and configuration, if one does 342 * not exist create it. 343 */ 344 static int rbd_get_client(struct rbd_device *rbd_dev, const char *mon_addr, 345 char *options) 346 { 347 struct rbd_client *rbdc; 348 struct ceph_options *opt; 349 int ret; 350 struct rbd_options *rbd_opts; 351 352 rbd_opts = kzalloc(sizeof(*rbd_opts), GFP_KERNEL); 353 if (!rbd_opts) 354 return -ENOMEM; 355 356 rbd_opts->notify_timeout = RBD_NOTIFY_TIMEOUT_DEFAULT; 357 358 ret = ceph_parse_options(&opt, options, mon_addr, 359 mon_addr + strlen(mon_addr), parse_rbd_opts_token, rbd_opts); 360 if (ret < 0) 361 goto done_err; 362 363 spin_lock(&node_lock); 364 rbdc = __rbd_client_find(opt); 365 if (rbdc) { 366 ceph_destroy_options(opt); 367 368 /* using an existing client */ 369 kref_get(&rbdc->kref); 370 rbd_dev->rbd_client = rbdc; 371 rbd_dev->client = rbdc->client; 372 spin_unlock(&node_lock); 373 return 0; 374 } 375 spin_unlock(&node_lock); 376 377 rbdc = rbd_client_create(opt, rbd_opts); 378 if (IS_ERR(rbdc)) { 379 ret = PTR_ERR(rbdc); 380 goto done_err; 381 } 382 383 rbd_dev->rbd_client = rbdc; 384 rbd_dev->client = rbdc->client; 385 return 0; 386 done_err: 387 kfree(rbd_opts); 388 return ret; 389 } 390 391 /* 392 * Destroy ceph client 393 */ 394 static void rbd_client_release(struct kref *kref) 395 { 396 struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref); 397 398 dout("rbd_release_client %p\n", rbdc); 399 spin_lock(&node_lock); 400 list_del(&rbdc->node); 401 spin_unlock(&node_lock); 402 403 ceph_destroy_client(rbdc->client); 404 kfree(rbdc->rbd_opts); 405 kfree(rbdc); 406 } 407 408 /* 409 * Drop reference to ceph client node. If it's not referenced anymore, release 410 * it. 411 */ 412 static void rbd_put_client(struct rbd_device *rbd_dev) 413 { 414 kref_put(&rbd_dev->rbd_client->kref, rbd_client_release); 415 rbd_dev->rbd_client = NULL; 416 rbd_dev->client = NULL; 417 } 418 419 420 /* 421 * Create a new header structure, translate header format from the on-disk 422 * header. 423 */ 424 static int rbd_header_from_disk(struct rbd_image_header *header, 425 struct rbd_image_header_ondisk *ondisk, 426 int allocated_snaps, 427 gfp_t gfp_flags) 428 { 429 int i; 430 u32 snap_count = le32_to_cpu(ondisk->snap_count); 431 int ret = -ENOMEM; 432 433 init_rwsem(&header->snap_rwsem); 434 header->snap_names_len = le64_to_cpu(ondisk->snap_names_len); 435 header->snapc = kmalloc(sizeof(struct ceph_snap_context) + 436 snap_count * 437 sizeof(struct rbd_image_snap_ondisk), 438 gfp_flags); 439 if (!header->snapc) 440 return -ENOMEM; 441 if (snap_count) { 442 header->snap_names = kmalloc(header->snap_names_len, 443 GFP_KERNEL); 444 if (!header->snap_names) 445 goto err_snapc; 446 header->snap_sizes = kmalloc(snap_count * sizeof(u64), 447 GFP_KERNEL); 448 if (!header->snap_sizes) 449 goto err_names; 450 } else { 451 header->snap_names = NULL; 452 header->snap_sizes = NULL; 453 } 454 memcpy(header->block_name, ondisk->block_name, 455 sizeof(ondisk->block_name)); 456 457 header->image_size = le64_to_cpu(ondisk->image_size); 458 header->obj_order = ondisk->options.order; 459 header->crypt_type = ondisk->options.crypt_type; 460 header->comp_type = ondisk->options.comp_type; 461 462 atomic_set(&header->snapc->nref, 1); 463 header->snap_seq = le64_to_cpu(ondisk->snap_seq); 464 header->snapc->num_snaps = snap_count; 465 header->total_snaps = snap_count; 466 467 if (snap_count && 468 allocated_snaps == snap_count) { 469 for (i = 0; i < snap_count; i++) { 470 header->snapc->snaps[i] = 471 le64_to_cpu(ondisk->snaps[i].id); 472 header->snap_sizes[i] = 473 le64_to_cpu(ondisk->snaps[i].image_size); 474 } 475 476 /* copy snapshot names */ 477 memcpy(header->snap_names, &ondisk->snaps[i], 478 header->snap_names_len); 479 } 480 481 return 0; 482 483 err_names: 484 kfree(header->snap_names); 485 err_snapc: 486 kfree(header->snapc); 487 return ret; 488 } 489 490 static int snap_index(struct rbd_image_header *header, int snap_num) 491 { 492 return header->total_snaps - snap_num; 493 } 494 495 static u64 cur_snap_id(struct rbd_device *rbd_dev) 496 { 497 struct rbd_image_header *header = &rbd_dev->header; 498 499 if (!rbd_dev->cur_snap) 500 return 0; 501 502 return header->snapc->snaps[snap_index(header, rbd_dev->cur_snap)]; 503 } 504 505 static int snap_by_name(struct rbd_image_header *header, const char *snap_name, 506 u64 *seq, u64 *size) 507 { 508 int i; 509 char *p = header->snap_names; 510 511 for (i = 0; i < header->total_snaps; i++, p += strlen(p) + 1) { 512 if (strcmp(snap_name, p) == 0) 513 break; 514 } 515 if (i == header->total_snaps) 516 return -ENOENT; 517 if (seq) 518 *seq = header->snapc->snaps[i]; 519 520 if (size) 521 *size = header->snap_sizes[i]; 522 523 return i; 524 } 525 526 static int rbd_header_set_snap(struct rbd_device *dev, 527 const char *snap_name, 528 u64 *size) 529 { 530 struct rbd_image_header *header = &dev->header; 531 struct ceph_snap_context *snapc = header->snapc; 532 int ret = -ENOENT; 533 534 down_write(&header->snap_rwsem); 535 536 if (!snap_name || 537 !*snap_name || 538 strcmp(snap_name, "-") == 0 || 539 strcmp(snap_name, RBD_SNAP_HEAD_NAME) == 0) { 540 if (header->total_snaps) 541 snapc->seq = header->snap_seq; 542 else 543 snapc->seq = 0; 544 dev->cur_snap = 0; 545 dev->read_only = 0; 546 if (size) 547 *size = header->image_size; 548 } else { 549 ret = snap_by_name(header, snap_name, &snapc->seq, size); 550 if (ret < 0) 551 goto done; 552 553 dev->cur_snap = header->total_snaps - ret; 554 dev->read_only = 1; 555 } 556 557 ret = 0; 558 done: 559 up_write(&header->snap_rwsem); 560 return ret; 561 } 562 563 static void rbd_header_free(struct rbd_image_header *header) 564 { 565 kfree(header->snapc); 566 kfree(header->snap_names); 567 kfree(header->snap_sizes); 568 } 569 570 /* 571 * get the actual striped segment name, offset and length 572 */ 573 static u64 rbd_get_segment(struct rbd_image_header *header, 574 const char *block_name, 575 u64 ofs, u64 len, 576 char *seg_name, u64 *segofs) 577 { 578 u64 seg = ofs >> header->obj_order; 579 580 if (seg_name) 581 snprintf(seg_name, RBD_MAX_SEG_NAME_LEN, 582 "%s.%012llx", block_name, seg); 583 584 ofs = ofs & ((1 << header->obj_order) - 1); 585 len = min_t(u64, len, (1 << header->obj_order) - ofs); 586 587 if (segofs) 588 *segofs = ofs; 589 590 return len; 591 } 592 593 /* 594 * bio helpers 595 */ 596 597 static void bio_chain_put(struct bio *chain) 598 { 599 struct bio *tmp; 600 601 while (chain) { 602 tmp = chain; 603 chain = chain->bi_next; 604 bio_put(tmp); 605 } 606 } 607 608 /* 609 * zeros a bio chain, starting at specific offset 610 */ 611 static void zero_bio_chain(struct bio *chain, int start_ofs) 612 { 613 struct bio_vec *bv; 614 unsigned long flags; 615 void *buf; 616 int i; 617 int pos = 0; 618 619 while (chain) { 620 bio_for_each_segment(bv, chain, i) { 621 if (pos + bv->bv_len > start_ofs) { 622 int remainder = max(start_ofs - pos, 0); 623 buf = bvec_kmap_irq(bv, &flags); 624 memset(buf + remainder, 0, 625 bv->bv_len - remainder); 626 bvec_kunmap_irq(buf, &flags); 627 } 628 pos += bv->bv_len; 629 } 630 631 chain = chain->bi_next; 632 } 633 } 634 635 /* 636 * bio_chain_clone - clone a chain of bios up to a certain length. 637 * might return a bio_pair that will need to be released. 638 */ 639 static struct bio *bio_chain_clone(struct bio **old, struct bio **next, 640 struct bio_pair **bp, 641 int len, gfp_t gfpmask) 642 { 643 struct bio *tmp, *old_chain = *old, *new_chain = NULL, *tail = NULL; 644 int total = 0; 645 646 if (*bp) { 647 bio_pair_release(*bp); 648 *bp = NULL; 649 } 650 651 while (old_chain && (total < len)) { 652 tmp = bio_kmalloc(gfpmask, old_chain->bi_max_vecs); 653 if (!tmp) 654 goto err_out; 655 656 if (total + old_chain->bi_size > len) { 657 struct bio_pair *bp; 658 659 /* 660 * this split can only happen with a single paged bio, 661 * split_bio will BUG_ON if this is not the case 662 */ 663 dout("bio_chain_clone split! total=%d remaining=%d" 664 "bi_size=%d\n", 665 (int)total, (int)len-total, 666 (int)old_chain->bi_size); 667 668 /* split the bio. We'll release it either in the next 669 call, or it will have to be released outside */ 670 bp = bio_split(old_chain, (len - total) / 512ULL); 671 if (!bp) 672 goto err_out; 673 674 __bio_clone(tmp, &bp->bio1); 675 676 *next = &bp->bio2; 677 } else { 678 __bio_clone(tmp, old_chain); 679 *next = old_chain->bi_next; 680 } 681 682 tmp->bi_bdev = NULL; 683 gfpmask &= ~__GFP_WAIT; 684 tmp->bi_next = NULL; 685 686 if (!new_chain) { 687 new_chain = tail = tmp; 688 } else { 689 tail->bi_next = tmp; 690 tail = tmp; 691 } 692 old_chain = old_chain->bi_next; 693 694 total += tmp->bi_size; 695 } 696 697 BUG_ON(total < len); 698 699 if (tail) 700 tail->bi_next = NULL; 701 702 *old = old_chain; 703 704 return new_chain; 705 706 err_out: 707 dout("bio_chain_clone with err\n"); 708 bio_chain_put(new_chain); 709 return NULL; 710 } 711 712 /* 713 * helpers for osd request op vectors. 714 */ 715 static int rbd_create_rw_ops(struct ceph_osd_req_op **ops, 716 int num_ops, 717 int opcode, 718 u32 payload_len) 719 { 720 *ops = kzalloc(sizeof(struct ceph_osd_req_op) * (num_ops + 1), 721 GFP_NOIO); 722 if (!*ops) 723 return -ENOMEM; 724 (*ops)[0].op = opcode; 725 /* 726 * op extent offset and length will be set later on 727 * in calc_raw_layout() 728 */ 729 (*ops)[0].payload_len = payload_len; 730 return 0; 731 } 732 733 static void rbd_destroy_ops(struct ceph_osd_req_op *ops) 734 { 735 kfree(ops); 736 } 737 738 /* 739 * Send ceph osd request 740 */ 741 static int rbd_do_request(struct request *rq, 742 struct rbd_device *dev, 743 struct ceph_snap_context *snapc, 744 u64 snapid, 745 const char *obj, u64 ofs, u64 len, 746 struct bio *bio, 747 struct page **pages, 748 int num_pages, 749 int flags, 750 struct ceph_osd_req_op *ops, 751 int num_reply, 752 void (*rbd_cb)(struct ceph_osd_request *req, 753 struct ceph_msg *msg), 754 struct ceph_osd_request **linger_req, 755 u64 *ver) 756 { 757 struct ceph_osd_request *req; 758 struct ceph_file_layout *layout; 759 int ret; 760 u64 bno; 761 struct timespec mtime = CURRENT_TIME; 762 struct rbd_request *req_data; 763 struct ceph_osd_request_head *reqhead; 764 struct rbd_image_header *header = &dev->header; 765 766 ret = -ENOMEM; 767 req_data = kzalloc(sizeof(*req_data), GFP_NOIO); 768 if (!req_data) 769 goto done; 770 771 dout("rbd_do_request len=%lld ofs=%lld\n", len, ofs); 772 773 down_read(&header->snap_rwsem); 774 775 req = ceph_osdc_alloc_request(&dev->client->osdc, flags, 776 snapc, 777 ops, 778 false, 779 GFP_NOIO, pages, bio); 780 if (IS_ERR(req)) { 781 up_read(&header->snap_rwsem); 782 ret = PTR_ERR(req); 783 goto done_pages; 784 } 785 786 req->r_callback = rbd_cb; 787 788 req_data->rq = rq; 789 req_data->bio = bio; 790 req_data->pages = pages; 791 req_data->len = len; 792 793 req->r_priv = req_data; 794 795 reqhead = req->r_request->front.iov_base; 796 reqhead->snapid = cpu_to_le64(CEPH_NOSNAP); 797 798 strncpy(req->r_oid, obj, sizeof(req->r_oid)); 799 req->r_oid_len = strlen(req->r_oid); 800 801 layout = &req->r_file_layout; 802 memset(layout, 0, sizeof(*layout)); 803 layout->fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER); 804 layout->fl_stripe_count = cpu_to_le32(1); 805 layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER); 806 layout->fl_pg_preferred = cpu_to_le32(-1); 807 layout->fl_pg_pool = cpu_to_le32(dev->poolid); 808 ceph_calc_raw_layout(&dev->client->osdc, layout, snapid, 809 ofs, &len, &bno, req, ops); 810 811 ceph_osdc_build_request(req, ofs, &len, 812 ops, 813 snapc, 814 &mtime, 815 req->r_oid, req->r_oid_len); 816 up_read(&header->snap_rwsem); 817 818 if (linger_req) { 819 ceph_osdc_set_request_linger(&dev->client->osdc, req); 820 *linger_req = req; 821 } 822 823 ret = ceph_osdc_start_request(&dev->client->osdc, req, false); 824 if (ret < 0) 825 goto done_err; 826 827 if (!rbd_cb) { 828 ret = ceph_osdc_wait_request(&dev->client->osdc, req); 829 if (ver) 830 *ver = le64_to_cpu(req->r_reassert_version.version); 831 dout("reassert_ver=%lld\n", le64_to_cpu(req->r_reassert_version.version)); 832 ceph_osdc_put_request(req); 833 } 834 return ret; 835 836 done_err: 837 bio_chain_put(req_data->bio); 838 ceph_osdc_put_request(req); 839 done_pages: 840 kfree(req_data); 841 done: 842 if (rq) 843 blk_end_request(rq, ret, len); 844 return ret; 845 } 846 847 /* 848 * Ceph osd op callback 849 */ 850 static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg) 851 { 852 struct rbd_request *req_data = req->r_priv; 853 struct ceph_osd_reply_head *replyhead; 854 struct ceph_osd_op *op; 855 __s32 rc; 856 u64 bytes; 857 int read_op; 858 859 /* parse reply */ 860 replyhead = msg->front.iov_base; 861 WARN_ON(le32_to_cpu(replyhead->num_ops) == 0); 862 op = (void *)(replyhead + 1); 863 rc = le32_to_cpu(replyhead->result); 864 bytes = le64_to_cpu(op->extent.length); 865 read_op = (le32_to_cpu(op->op) == CEPH_OSD_OP_READ); 866 867 dout("rbd_req_cb bytes=%lld readop=%d rc=%d\n", bytes, read_op, rc); 868 869 if (rc == -ENOENT && read_op) { 870 zero_bio_chain(req_data->bio, 0); 871 rc = 0; 872 } else if (rc == 0 && read_op && bytes < req_data->len) { 873 zero_bio_chain(req_data->bio, bytes); 874 bytes = req_data->len; 875 } 876 877 blk_end_request(req_data->rq, rc, bytes); 878 879 if (req_data->bio) 880 bio_chain_put(req_data->bio); 881 882 ceph_osdc_put_request(req); 883 kfree(req_data); 884 } 885 886 static void rbd_simple_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg) 887 { 888 ceph_osdc_put_request(req); 889 } 890 891 /* 892 * Do a synchronous ceph osd operation 893 */ 894 static int rbd_req_sync_op(struct rbd_device *dev, 895 struct ceph_snap_context *snapc, 896 u64 snapid, 897 int opcode, 898 int flags, 899 struct ceph_osd_req_op *orig_ops, 900 int num_reply, 901 const char *obj, 902 u64 ofs, u64 len, 903 char *buf, 904 struct ceph_osd_request **linger_req, 905 u64 *ver) 906 { 907 int ret; 908 struct page **pages; 909 int num_pages; 910 struct ceph_osd_req_op *ops = orig_ops; 911 u32 payload_len; 912 913 num_pages = calc_pages_for(ofs , len); 914 pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL); 915 if (IS_ERR(pages)) 916 return PTR_ERR(pages); 917 918 if (!orig_ops) { 919 payload_len = (flags & CEPH_OSD_FLAG_WRITE ? len : 0); 920 ret = rbd_create_rw_ops(&ops, 1, opcode, payload_len); 921 if (ret < 0) 922 goto done; 923 924 if ((flags & CEPH_OSD_FLAG_WRITE) && buf) { 925 ret = ceph_copy_to_page_vector(pages, buf, ofs, len); 926 if (ret < 0) 927 goto done_ops; 928 } 929 } 930 931 ret = rbd_do_request(NULL, dev, snapc, snapid, 932 obj, ofs, len, NULL, 933 pages, num_pages, 934 flags, 935 ops, 936 2, 937 NULL, 938 linger_req, ver); 939 if (ret < 0) 940 goto done_ops; 941 942 if ((flags & CEPH_OSD_FLAG_READ) && buf) 943 ret = ceph_copy_from_page_vector(pages, buf, ofs, ret); 944 945 done_ops: 946 if (!orig_ops) 947 rbd_destroy_ops(ops); 948 done: 949 ceph_release_page_vector(pages, num_pages); 950 return ret; 951 } 952 953 /* 954 * Do an asynchronous ceph osd operation 955 */ 956 static int rbd_do_op(struct request *rq, 957 struct rbd_device *rbd_dev , 958 struct ceph_snap_context *snapc, 959 u64 snapid, 960 int opcode, int flags, int num_reply, 961 u64 ofs, u64 len, 962 struct bio *bio) 963 { 964 char *seg_name; 965 u64 seg_ofs; 966 u64 seg_len; 967 int ret; 968 struct ceph_osd_req_op *ops; 969 u32 payload_len; 970 971 seg_name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO); 972 if (!seg_name) 973 return -ENOMEM; 974 975 seg_len = rbd_get_segment(&rbd_dev->header, 976 rbd_dev->header.block_name, 977 ofs, len, 978 seg_name, &seg_ofs); 979 980 payload_len = (flags & CEPH_OSD_FLAG_WRITE ? seg_len : 0); 981 982 ret = rbd_create_rw_ops(&ops, 1, opcode, payload_len); 983 if (ret < 0) 984 goto done; 985 986 /* we've taken care of segment sizes earlier when we 987 cloned the bios. We should never have a segment 988 truncated at this point */ 989 BUG_ON(seg_len < len); 990 991 ret = rbd_do_request(rq, rbd_dev, snapc, snapid, 992 seg_name, seg_ofs, seg_len, 993 bio, 994 NULL, 0, 995 flags, 996 ops, 997 num_reply, 998 rbd_req_cb, 0, NULL); 999 done: 1000 kfree(seg_name); 1001 return ret; 1002 } 1003 1004 /* 1005 * Request async osd write 1006 */ 1007 static int rbd_req_write(struct request *rq, 1008 struct rbd_device *rbd_dev, 1009 struct ceph_snap_context *snapc, 1010 u64 ofs, u64 len, 1011 struct bio *bio) 1012 { 1013 return rbd_do_op(rq, rbd_dev, snapc, CEPH_NOSNAP, 1014 CEPH_OSD_OP_WRITE, 1015 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK, 1016 2, 1017 ofs, len, bio); 1018 } 1019 1020 /* 1021 * Request async osd read 1022 */ 1023 static int rbd_req_read(struct request *rq, 1024 struct rbd_device *rbd_dev, 1025 u64 snapid, 1026 u64 ofs, u64 len, 1027 struct bio *bio) 1028 { 1029 return rbd_do_op(rq, rbd_dev, NULL, 1030 (snapid ? snapid : CEPH_NOSNAP), 1031 CEPH_OSD_OP_READ, 1032 CEPH_OSD_FLAG_READ, 1033 2, 1034 ofs, len, bio); 1035 } 1036 1037 /* 1038 * Request sync osd read 1039 */ 1040 static int rbd_req_sync_read(struct rbd_device *dev, 1041 struct ceph_snap_context *snapc, 1042 u64 snapid, 1043 const char *obj, 1044 u64 ofs, u64 len, 1045 char *buf, 1046 u64 *ver) 1047 { 1048 return rbd_req_sync_op(dev, NULL, 1049 (snapid ? snapid : CEPH_NOSNAP), 1050 CEPH_OSD_OP_READ, 1051 CEPH_OSD_FLAG_READ, 1052 NULL, 1053 1, obj, ofs, len, buf, NULL, ver); 1054 } 1055 1056 /* 1057 * Request sync osd watch 1058 */ 1059 static int rbd_req_sync_notify_ack(struct rbd_device *dev, 1060 u64 ver, 1061 u64 notify_id, 1062 const char *obj) 1063 { 1064 struct ceph_osd_req_op *ops; 1065 struct page **pages = NULL; 1066 int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_NOTIFY_ACK, 0); 1067 if (ret < 0) 1068 return ret; 1069 1070 ops[0].watch.ver = cpu_to_le64(dev->header.obj_version); 1071 ops[0].watch.cookie = notify_id; 1072 ops[0].watch.flag = 0; 1073 1074 ret = rbd_do_request(NULL, dev, NULL, CEPH_NOSNAP, 1075 obj, 0, 0, NULL, 1076 pages, 0, 1077 CEPH_OSD_FLAG_READ, 1078 ops, 1079 1, 1080 rbd_simple_req_cb, 0, NULL); 1081 1082 rbd_destroy_ops(ops); 1083 return ret; 1084 } 1085 1086 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data) 1087 { 1088 struct rbd_device *dev = (struct rbd_device *)data; 1089 if (!dev) 1090 return; 1091 1092 dout("rbd_watch_cb %s notify_id=%lld opcode=%d\n", dev->obj_md_name, 1093 notify_id, (int)opcode); 1094 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING); 1095 __rbd_update_snaps(dev); 1096 mutex_unlock(&ctl_mutex); 1097 1098 rbd_req_sync_notify_ack(dev, ver, notify_id, dev->obj_md_name); 1099 } 1100 1101 /* 1102 * Request sync osd watch 1103 */ 1104 static int rbd_req_sync_watch(struct rbd_device *dev, 1105 const char *obj, 1106 u64 ver) 1107 { 1108 struct ceph_osd_req_op *ops; 1109 struct ceph_osd_client *osdc = &dev->client->osdc; 1110 1111 int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_WATCH, 0); 1112 if (ret < 0) 1113 return ret; 1114 1115 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, 0, 1116 (void *)dev, &dev->watch_event); 1117 if (ret < 0) 1118 goto fail; 1119 1120 ops[0].watch.ver = cpu_to_le64(ver); 1121 ops[0].watch.cookie = cpu_to_le64(dev->watch_event->cookie); 1122 ops[0].watch.flag = 1; 1123 1124 ret = rbd_req_sync_op(dev, NULL, 1125 CEPH_NOSNAP, 1126 0, 1127 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK, 1128 ops, 1129 1, obj, 0, 0, NULL, 1130 &dev->watch_request, NULL); 1131 1132 if (ret < 0) 1133 goto fail_event; 1134 1135 rbd_destroy_ops(ops); 1136 return 0; 1137 1138 fail_event: 1139 ceph_osdc_cancel_event(dev->watch_event); 1140 dev->watch_event = NULL; 1141 fail: 1142 rbd_destroy_ops(ops); 1143 return ret; 1144 } 1145 1146 struct rbd_notify_info { 1147 struct rbd_device *dev; 1148 }; 1149 1150 static void rbd_notify_cb(u64 ver, u64 notify_id, u8 opcode, void *data) 1151 { 1152 struct rbd_device *dev = (struct rbd_device *)data; 1153 if (!dev) 1154 return; 1155 1156 dout("rbd_notify_cb %s notify_id=%lld opcode=%d\n", dev->obj_md_name, 1157 notify_id, (int)opcode); 1158 } 1159 1160 /* 1161 * Request sync osd notify 1162 */ 1163 static int rbd_req_sync_notify(struct rbd_device *dev, 1164 const char *obj) 1165 { 1166 struct ceph_osd_req_op *ops; 1167 struct ceph_osd_client *osdc = &dev->client->osdc; 1168 struct ceph_osd_event *event; 1169 struct rbd_notify_info info; 1170 int payload_len = sizeof(u32) + sizeof(u32); 1171 int ret; 1172 1173 ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_NOTIFY, payload_len); 1174 if (ret < 0) 1175 return ret; 1176 1177 info.dev = dev; 1178 1179 ret = ceph_osdc_create_event(osdc, rbd_notify_cb, 1, 1180 (void *)&info, &event); 1181 if (ret < 0) 1182 goto fail; 1183 1184 ops[0].watch.ver = 1; 1185 ops[0].watch.flag = 1; 1186 ops[0].watch.cookie = event->cookie; 1187 ops[0].watch.prot_ver = RADOS_NOTIFY_VER; 1188 ops[0].watch.timeout = 12; 1189 1190 ret = rbd_req_sync_op(dev, NULL, 1191 CEPH_NOSNAP, 1192 0, 1193 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK, 1194 ops, 1195 1, obj, 0, 0, NULL, NULL, NULL); 1196 if (ret < 0) 1197 goto fail_event; 1198 1199 ret = ceph_osdc_wait_event(event, CEPH_OSD_TIMEOUT_DEFAULT); 1200 dout("ceph_osdc_wait_event returned %d\n", ret); 1201 rbd_destroy_ops(ops); 1202 return 0; 1203 1204 fail_event: 1205 ceph_osdc_cancel_event(event); 1206 fail: 1207 rbd_destroy_ops(ops); 1208 return ret; 1209 } 1210 1211 /* 1212 * Request sync osd rollback 1213 */ 1214 static int rbd_req_sync_rollback_obj(struct rbd_device *dev, 1215 u64 snapid, 1216 const char *obj) 1217 { 1218 struct ceph_osd_req_op *ops; 1219 int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_ROLLBACK, 0); 1220 if (ret < 0) 1221 return ret; 1222 1223 ops[0].snap.snapid = snapid; 1224 1225 ret = rbd_req_sync_op(dev, NULL, 1226 CEPH_NOSNAP, 1227 0, 1228 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK, 1229 ops, 1230 1, obj, 0, 0, NULL, NULL, NULL); 1231 1232 rbd_destroy_ops(ops); 1233 1234 return ret; 1235 } 1236 1237 /* 1238 * Request sync osd read 1239 */ 1240 static int rbd_req_sync_exec(struct rbd_device *dev, 1241 const char *obj, 1242 const char *cls, 1243 const char *method, 1244 const char *data, 1245 int len, 1246 u64 *ver) 1247 { 1248 struct ceph_osd_req_op *ops; 1249 int cls_len = strlen(cls); 1250 int method_len = strlen(method); 1251 int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_CALL, 1252 cls_len + method_len + len); 1253 if (ret < 0) 1254 return ret; 1255 1256 ops[0].cls.class_name = cls; 1257 ops[0].cls.class_len = (__u8)cls_len; 1258 ops[0].cls.method_name = method; 1259 ops[0].cls.method_len = (__u8)method_len; 1260 ops[0].cls.argc = 0; 1261 ops[0].cls.indata = data; 1262 ops[0].cls.indata_len = len; 1263 1264 ret = rbd_req_sync_op(dev, NULL, 1265 CEPH_NOSNAP, 1266 0, 1267 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK, 1268 ops, 1269 1, obj, 0, 0, NULL, NULL, ver); 1270 1271 rbd_destroy_ops(ops); 1272 1273 dout("cls_exec returned %d\n", ret); 1274 return ret; 1275 } 1276 1277 /* 1278 * block device queue callback 1279 */ 1280 static void rbd_rq_fn(struct request_queue *q) 1281 { 1282 struct rbd_device *rbd_dev = q->queuedata; 1283 struct request *rq; 1284 struct bio_pair *bp = NULL; 1285 1286 rq = blk_fetch_request(q); 1287 1288 while (1) { 1289 struct bio *bio; 1290 struct bio *rq_bio, *next_bio = NULL; 1291 bool do_write; 1292 int size, op_size = 0; 1293 u64 ofs; 1294 1295 /* peek at request from block layer */ 1296 if (!rq) 1297 break; 1298 1299 dout("fetched request\n"); 1300 1301 /* filter out block requests we don't understand */ 1302 if ((rq->cmd_type != REQ_TYPE_FS)) { 1303 __blk_end_request_all(rq, 0); 1304 goto next; 1305 } 1306 1307 /* deduce our operation (read, write) */ 1308 do_write = (rq_data_dir(rq) == WRITE); 1309 1310 size = blk_rq_bytes(rq); 1311 ofs = blk_rq_pos(rq) * 512ULL; 1312 rq_bio = rq->bio; 1313 if (do_write && rbd_dev->read_only) { 1314 __blk_end_request_all(rq, -EROFS); 1315 goto next; 1316 } 1317 1318 spin_unlock_irq(q->queue_lock); 1319 1320 dout("%s 0x%x bytes at 0x%llx\n", 1321 do_write ? "write" : "read", 1322 size, blk_rq_pos(rq) * 512ULL); 1323 1324 do { 1325 /* a bio clone to be passed down to OSD req */ 1326 dout("rq->bio->bi_vcnt=%d\n", rq->bio->bi_vcnt); 1327 op_size = rbd_get_segment(&rbd_dev->header, 1328 rbd_dev->header.block_name, 1329 ofs, size, 1330 NULL, NULL); 1331 bio = bio_chain_clone(&rq_bio, &next_bio, &bp, 1332 op_size, GFP_ATOMIC); 1333 if (!bio) { 1334 spin_lock_irq(q->queue_lock); 1335 __blk_end_request_all(rq, -ENOMEM); 1336 goto next; 1337 } 1338 1339 /* init OSD command: write or read */ 1340 if (do_write) 1341 rbd_req_write(rq, rbd_dev, 1342 rbd_dev->header.snapc, 1343 ofs, 1344 op_size, bio); 1345 else 1346 rbd_req_read(rq, rbd_dev, 1347 cur_snap_id(rbd_dev), 1348 ofs, 1349 op_size, bio); 1350 1351 size -= op_size; 1352 ofs += op_size; 1353 1354 rq_bio = next_bio; 1355 } while (size > 0); 1356 1357 if (bp) 1358 bio_pair_release(bp); 1359 1360 spin_lock_irq(q->queue_lock); 1361 next: 1362 rq = blk_fetch_request(q); 1363 } 1364 } 1365 1366 /* 1367 * a queue callback. Makes sure that we don't create a bio that spans across 1368 * multiple osd objects. One exception would be with a single page bios, 1369 * which we handle later at bio_chain_clone 1370 */ 1371 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd, 1372 struct bio_vec *bvec) 1373 { 1374 struct rbd_device *rbd_dev = q->queuedata; 1375 unsigned int chunk_sectors = 1 << (rbd_dev->header.obj_order - 9); 1376 sector_t sector = bmd->bi_sector + get_start_sect(bmd->bi_bdev); 1377 unsigned int bio_sectors = bmd->bi_size >> 9; 1378 int max; 1379 1380 max = (chunk_sectors - ((sector & (chunk_sectors - 1)) 1381 + bio_sectors)) << 9; 1382 if (max < 0) 1383 max = 0; /* bio_add cannot handle a negative return */ 1384 if (max <= bvec->bv_len && bio_sectors == 0) 1385 return bvec->bv_len; 1386 return max; 1387 } 1388 1389 static void rbd_free_disk(struct rbd_device *rbd_dev) 1390 { 1391 struct gendisk *disk = rbd_dev->disk; 1392 1393 if (!disk) 1394 return; 1395 1396 rbd_header_free(&rbd_dev->header); 1397 1398 if (disk->flags & GENHD_FL_UP) 1399 del_gendisk(disk); 1400 if (disk->queue) 1401 blk_cleanup_queue(disk->queue); 1402 put_disk(disk); 1403 } 1404 1405 /* 1406 * reload the ondisk the header 1407 */ 1408 static int rbd_read_header(struct rbd_device *rbd_dev, 1409 struct rbd_image_header *header) 1410 { 1411 ssize_t rc; 1412 struct rbd_image_header_ondisk *dh; 1413 int snap_count = 0; 1414 u64 snap_names_len = 0; 1415 u64 ver; 1416 1417 while (1) { 1418 int len = sizeof(*dh) + 1419 snap_count * sizeof(struct rbd_image_snap_ondisk) + 1420 snap_names_len; 1421 1422 rc = -ENOMEM; 1423 dh = kmalloc(len, GFP_KERNEL); 1424 if (!dh) 1425 return -ENOMEM; 1426 1427 rc = rbd_req_sync_read(rbd_dev, 1428 NULL, CEPH_NOSNAP, 1429 rbd_dev->obj_md_name, 1430 0, len, 1431 (char *)dh, &ver); 1432 if (rc < 0) 1433 goto out_dh; 1434 1435 rc = rbd_header_from_disk(header, dh, snap_count, GFP_KERNEL); 1436 if (rc < 0) 1437 goto out_dh; 1438 1439 if (snap_count != header->total_snaps) { 1440 snap_count = header->total_snaps; 1441 snap_names_len = header->snap_names_len; 1442 rbd_header_free(header); 1443 kfree(dh); 1444 continue; 1445 } 1446 break; 1447 } 1448 header->obj_version = ver; 1449 1450 out_dh: 1451 kfree(dh); 1452 return rc; 1453 } 1454 1455 /* 1456 * create a snapshot 1457 */ 1458 static int rbd_header_add_snap(struct rbd_device *dev, 1459 const char *snap_name, 1460 gfp_t gfp_flags) 1461 { 1462 int name_len = strlen(snap_name); 1463 u64 new_snapid; 1464 int ret; 1465 void *data, *data_start, *data_end; 1466 u64 ver; 1467 1468 /* we should create a snapshot only if we're pointing at the head */ 1469 if (dev->cur_snap) 1470 return -EINVAL; 1471 1472 ret = ceph_monc_create_snapid(&dev->client->monc, dev->poolid, 1473 &new_snapid); 1474 dout("created snapid=%lld\n", new_snapid); 1475 if (ret < 0) 1476 return ret; 1477 1478 data = kmalloc(name_len + 16, gfp_flags); 1479 if (!data) 1480 return -ENOMEM; 1481 1482 data_start = data; 1483 data_end = data + name_len + 16; 1484 1485 ceph_encode_string_safe(&data, data_end, snap_name, name_len, bad); 1486 ceph_encode_64_safe(&data, data_end, new_snapid, bad); 1487 1488 ret = rbd_req_sync_exec(dev, dev->obj_md_name, "rbd", "snap_add", 1489 data_start, data - data_start, &ver); 1490 1491 kfree(data_start); 1492 1493 if (ret < 0) 1494 return ret; 1495 1496 dev->header.snapc->seq = new_snapid; 1497 1498 return 0; 1499 bad: 1500 return -ERANGE; 1501 } 1502 1503 static void __rbd_remove_all_snaps(struct rbd_device *rbd_dev) 1504 { 1505 struct rbd_snap *snap; 1506 1507 while (!list_empty(&rbd_dev->snaps)) { 1508 snap = list_first_entry(&rbd_dev->snaps, struct rbd_snap, node); 1509 __rbd_remove_snap_dev(rbd_dev, snap); 1510 } 1511 } 1512 1513 /* 1514 * only read the first part of the ondisk header, without the snaps info 1515 */ 1516 static int __rbd_update_snaps(struct rbd_device *rbd_dev) 1517 { 1518 int ret; 1519 struct rbd_image_header h; 1520 u64 snap_seq; 1521 int follow_seq = 0; 1522 1523 ret = rbd_read_header(rbd_dev, &h); 1524 if (ret < 0) 1525 return ret; 1526 1527 down_write(&rbd_dev->header.snap_rwsem); 1528 1529 snap_seq = rbd_dev->header.snapc->seq; 1530 if (rbd_dev->header.total_snaps && 1531 rbd_dev->header.snapc->snaps[0] == snap_seq) 1532 /* pointing at the head, will need to follow that 1533 if head moves */ 1534 follow_seq = 1; 1535 1536 kfree(rbd_dev->header.snapc); 1537 kfree(rbd_dev->header.snap_names); 1538 kfree(rbd_dev->header.snap_sizes); 1539 1540 rbd_dev->header.total_snaps = h.total_snaps; 1541 rbd_dev->header.snapc = h.snapc; 1542 rbd_dev->header.snap_names = h.snap_names; 1543 rbd_dev->header.snap_names_len = h.snap_names_len; 1544 rbd_dev->header.snap_sizes = h.snap_sizes; 1545 if (follow_seq) 1546 rbd_dev->header.snapc->seq = rbd_dev->header.snapc->snaps[0]; 1547 else 1548 rbd_dev->header.snapc->seq = snap_seq; 1549 1550 ret = __rbd_init_snaps_header(rbd_dev); 1551 1552 up_write(&rbd_dev->header.snap_rwsem); 1553 1554 return ret; 1555 } 1556 1557 static int rbd_init_disk(struct rbd_device *rbd_dev) 1558 { 1559 struct gendisk *disk; 1560 struct request_queue *q; 1561 int rc; 1562 u64 total_size = 0; 1563 1564 /* contact OSD, request size info about the object being mapped */ 1565 rc = rbd_read_header(rbd_dev, &rbd_dev->header); 1566 if (rc) 1567 return rc; 1568 1569 /* no need to lock here, as rbd_dev is not registered yet */ 1570 rc = __rbd_init_snaps_header(rbd_dev); 1571 if (rc) 1572 return rc; 1573 1574 rc = rbd_header_set_snap(rbd_dev, rbd_dev->snap_name, &total_size); 1575 if (rc) 1576 return rc; 1577 1578 /* create gendisk info */ 1579 rc = -ENOMEM; 1580 disk = alloc_disk(RBD_MINORS_PER_MAJOR); 1581 if (!disk) 1582 goto out; 1583 1584 sprintf(disk->disk_name, DRV_NAME "%d", rbd_dev->id); 1585 disk->major = rbd_dev->major; 1586 disk->first_minor = 0; 1587 disk->fops = &rbd_bd_ops; 1588 disk->private_data = rbd_dev; 1589 1590 /* init rq */ 1591 rc = -ENOMEM; 1592 q = blk_init_queue(rbd_rq_fn, &rbd_dev->lock); 1593 if (!q) 1594 goto out_disk; 1595 blk_queue_merge_bvec(q, rbd_merge_bvec); 1596 disk->queue = q; 1597 1598 q->queuedata = rbd_dev; 1599 1600 rbd_dev->disk = disk; 1601 rbd_dev->q = q; 1602 1603 /* finally, announce the disk to the world */ 1604 set_capacity(disk, total_size / 512ULL); 1605 add_disk(disk); 1606 1607 pr_info("%s: added with size 0x%llx\n", 1608 disk->disk_name, (unsigned long long)total_size); 1609 return 0; 1610 1611 out_disk: 1612 put_disk(disk); 1613 out: 1614 return rc; 1615 } 1616 1617 /* 1618 sysfs 1619 */ 1620 1621 static ssize_t rbd_size_show(struct device *dev, 1622 struct device_attribute *attr, char *buf) 1623 { 1624 struct rbd_device *rbd_dev = dev_to_rbd(dev); 1625 1626 return sprintf(buf, "%llu\n", (unsigned long long)rbd_dev->header.image_size); 1627 } 1628 1629 static ssize_t rbd_major_show(struct device *dev, 1630 struct device_attribute *attr, char *buf) 1631 { 1632 struct rbd_device *rbd_dev = dev_to_rbd(dev); 1633 1634 return sprintf(buf, "%d\n", rbd_dev->major); 1635 } 1636 1637 static ssize_t rbd_client_id_show(struct device *dev, 1638 struct device_attribute *attr, char *buf) 1639 { 1640 struct rbd_device *rbd_dev = dev_to_rbd(dev); 1641 1642 return sprintf(buf, "client%lld\n", ceph_client_id(rbd_dev->client)); 1643 } 1644 1645 static ssize_t rbd_pool_show(struct device *dev, 1646 struct device_attribute *attr, char *buf) 1647 { 1648 struct rbd_device *rbd_dev = dev_to_rbd(dev); 1649 1650 return sprintf(buf, "%s\n", rbd_dev->pool_name); 1651 } 1652 1653 static ssize_t rbd_name_show(struct device *dev, 1654 struct device_attribute *attr, char *buf) 1655 { 1656 struct rbd_device *rbd_dev = dev_to_rbd(dev); 1657 1658 return sprintf(buf, "%s\n", rbd_dev->obj); 1659 } 1660 1661 static ssize_t rbd_snap_show(struct device *dev, 1662 struct device_attribute *attr, 1663 char *buf) 1664 { 1665 struct rbd_device *rbd_dev = dev_to_rbd(dev); 1666 1667 return sprintf(buf, "%s\n", rbd_dev->snap_name); 1668 } 1669 1670 static ssize_t rbd_image_refresh(struct device *dev, 1671 struct device_attribute *attr, 1672 const char *buf, 1673 size_t size) 1674 { 1675 struct rbd_device *rbd_dev = dev_to_rbd(dev); 1676 int rc; 1677 int ret = size; 1678 1679 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING); 1680 1681 rc = __rbd_update_snaps(rbd_dev); 1682 if (rc < 0) 1683 ret = rc; 1684 1685 mutex_unlock(&ctl_mutex); 1686 return ret; 1687 } 1688 1689 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL); 1690 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL); 1691 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL); 1692 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL); 1693 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL); 1694 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh); 1695 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL); 1696 static DEVICE_ATTR(create_snap, S_IWUSR, NULL, rbd_snap_add); 1697 static DEVICE_ATTR(rollback_snap, S_IWUSR, NULL, rbd_snap_rollback); 1698 1699 static struct attribute *rbd_attrs[] = { 1700 &dev_attr_size.attr, 1701 &dev_attr_major.attr, 1702 &dev_attr_client_id.attr, 1703 &dev_attr_pool.attr, 1704 &dev_attr_name.attr, 1705 &dev_attr_current_snap.attr, 1706 &dev_attr_refresh.attr, 1707 &dev_attr_create_snap.attr, 1708 &dev_attr_rollback_snap.attr, 1709 NULL 1710 }; 1711 1712 static struct attribute_group rbd_attr_group = { 1713 .attrs = rbd_attrs, 1714 }; 1715 1716 static const struct attribute_group *rbd_attr_groups[] = { 1717 &rbd_attr_group, 1718 NULL 1719 }; 1720 1721 static void rbd_sysfs_dev_release(struct device *dev) 1722 { 1723 } 1724 1725 static struct device_type rbd_device_type = { 1726 .name = "rbd", 1727 .groups = rbd_attr_groups, 1728 .release = rbd_sysfs_dev_release, 1729 }; 1730 1731 1732 /* 1733 sysfs - snapshots 1734 */ 1735 1736 static ssize_t rbd_snap_size_show(struct device *dev, 1737 struct device_attribute *attr, 1738 char *buf) 1739 { 1740 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev); 1741 1742 return sprintf(buf, "%lld\n", (long long)snap->size); 1743 } 1744 1745 static ssize_t rbd_snap_id_show(struct device *dev, 1746 struct device_attribute *attr, 1747 char *buf) 1748 { 1749 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev); 1750 1751 return sprintf(buf, "%lld\n", (long long)snap->id); 1752 } 1753 1754 static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL); 1755 static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL); 1756 1757 static struct attribute *rbd_snap_attrs[] = { 1758 &dev_attr_snap_size.attr, 1759 &dev_attr_snap_id.attr, 1760 NULL, 1761 }; 1762 1763 static struct attribute_group rbd_snap_attr_group = { 1764 .attrs = rbd_snap_attrs, 1765 }; 1766 1767 static void rbd_snap_dev_release(struct device *dev) 1768 { 1769 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev); 1770 kfree(snap->name); 1771 kfree(snap); 1772 } 1773 1774 static const struct attribute_group *rbd_snap_attr_groups[] = { 1775 &rbd_snap_attr_group, 1776 NULL 1777 }; 1778 1779 static struct device_type rbd_snap_device_type = { 1780 .groups = rbd_snap_attr_groups, 1781 .release = rbd_snap_dev_release, 1782 }; 1783 1784 static void __rbd_remove_snap_dev(struct rbd_device *rbd_dev, 1785 struct rbd_snap *snap) 1786 { 1787 list_del(&snap->node); 1788 device_unregister(&snap->dev); 1789 } 1790 1791 static int rbd_register_snap_dev(struct rbd_device *rbd_dev, 1792 struct rbd_snap *snap, 1793 struct device *parent) 1794 { 1795 struct device *dev = &snap->dev; 1796 int ret; 1797 1798 dev->type = &rbd_snap_device_type; 1799 dev->parent = parent; 1800 dev->release = rbd_snap_dev_release; 1801 dev_set_name(dev, "snap_%s", snap->name); 1802 ret = device_register(dev); 1803 1804 return ret; 1805 } 1806 1807 static int __rbd_add_snap_dev(struct rbd_device *rbd_dev, 1808 int i, const char *name, 1809 struct rbd_snap **snapp) 1810 { 1811 int ret; 1812 struct rbd_snap *snap = kzalloc(sizeof(*snap), GFP_KERNEL); 1813 if (!snap) 1814 return -ENOMEM; 1815 snap->name = kstrdup(name, GFP_KERNEL); 1816 snap->size = rbd_dev->header.snap_sizes[i]; 1817 snap->id = rbd_dev->header.snapc->snaps[i]; 1818 if (device_is_registered(&rbd_dev->dev)) { 1819 ret = rbd_register_snap_dev(rbd_dev, snap, 1820 &rbd_dev->dev); 1821 if (ret < 0) 1822 goto err; 1823 } 1824 *snapp = snap; 1825 return 0; 1826 err: 1827 kfree(snap->name); 1828 kfree(snap); 1829 return ret; 1830 } 1831 1832 /* 1833 * search for the previous snap in a null delimited string list 1834 */ 1835 const char *rbd_prev_snap_name(const char *name, const char *start) 1836 { 1837 if (name < start + 2) 1838 return NULL; 1839 1840 name -= 2; 1841 while (*name) { 1842 if (name == start) 1843 return start; 1844 name--; 1845 } 1846 return name + 1; 1847 } 1848 1849 /* 1850 * compare the old list of snapshots that we have to what's in the header 1851 * and update it accordingly. Note that the header holds the snapshots 1852 * in a reverse order (from newest to oldest) and we need to go from 1853 * older to new so that we don't get a duplicate snap name when 1854 * doing the process (e.g., removed snapshot and recreated a new 1855 * one with the same name. 1856 */ 1857 static int __rbd_init_snaps_header(struct rbd_device *rbd_dev) 1858 { 1859 const char *name, *first_name; 1860 int i = rbd_dev->header.total_snaps; 1861 struct rbd_snap *snap, *old_snap = NULL; 1862 int ret; 1863 struct list_head *p, *n; 1864 1865 first_name = rbd_dev->header.snap_names; 1866 name = first_name + rbd_dev->header.snap_names_len; 1867 1868 list_for_each_prev_safe(p, n, &rbd_dev->snaps) { 1869 u64 cur_id; 1870 1871 old_snap = list_entry(p, struct rbd_snap, node); 1872 1873 if (i) 1874 cur_id = rbd_dev->header.snapc->snaps[i - 1]; 1875 1876 if (!i || old_snap->id < cur_id) { 1877 /* old_snap->id was skipped, thus was removed */ 1878 __rbd_remove_snap_dev(rbd_dev, old_snap); 1879 continue; 1880 } 1881 if (old_snap->id == cur_id) { 1882 /* we have this snapshot already */ 1883 i--; 1884 name = rbd_prev_snap_name(name, first_name); 1885 continue; 1886 } 1887 for (; i > 0; 1888 i--, name = rbd_prev_snap_name(name, first_name)) { 1889 if (!name) { 1890 WARN_ON(1); 1891 return -EINVAL; 1892 } 1893 cur_id = rbd_dev->header.snapc->snaps[i]; 1894 /* snapshot removal? handle it above */ 1895 if (cur_id >= old_snap->id) 1896 break; 1897 /* a new snapshot */ 1898 ret = __rbd_add_snap_dev(rbd_dev, i - 1, name, &snap); 1899 if (ret < 0) 1900 return ret; 1901 1902 /* note that we add it backward so using n and not p */ 1903 list_add(&snap->node, n); 1904 p = &snap->node; 1905 } 1906 } 1907 /* we're done going over the old snap list, just add what's left */ 1908 for (; i > 0; i--) { 1909 name = rbd_prev_snap_name(name, first_name); 1910 if (!name) { 1911 WARN_ON(1); 1912 return -EINVAL; 1913 } 1914 ret = __rbd_add_snap_dev(rbd_dev, i - 1, name, &snap); 1915 if (ret < 0) 1916 return ret; 1917 list_add(&snap->node, &rbd_dev->snaps); 1918 } 1919 1920 return 0; 1921 } 1922 1923 1924 static void rbd_root_dev_release(struct device *dev) 1925 { 1926 } 1927 1928 static struct device rbd_root_dev = { 1929 .init_name = "rbd", 1930 .release = rbd_root_dev_release, 1931 }; 1932 1933 static int rbd_bus_add_dev(struct rbd_device *rbd_dev) 1934 { 1935 int ret = -ENOMEM; 1936 struct device *dev; 1937 struct rbd_snap *snap; 1938 1939 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING); 1940 dev = &rbd_dev->dev; 1941 1942 dev->bus = &rbd_bus_type; 1943 dev->type = &rbd_device_type; 1944 dev->parent = &rbd_root_dev; 1945 dev->release = rbd_dev_release; 1946 dev_set_name(dev, "%d", rbd_dev->id); 1947 ret = device_register(dev); 1948 if (ret < 0) 1949 goto done_free; 1950 1951 list_for_each_entry(snap, &rbd_dev->snaps, node) { 1952 ret = rbd_register_snap_dev(rbd_dev, snap, 1953 &rbd_dev->dev); 1954 if (ret < 0) 1955 break; 1956 } 1957 1958 mutex_unlock(&ctl_mutex); 1959 return 0; 1960 done_free: 1961 mutex_unlock(&ctl_mutex); 1962 return ret; 1963 } 1964 1965 static void rbd_bus_del_dev(struct rbd_device *rbd_dev) 1966 { 1967 device_unregister(&rbd_dev->dev); 1968 } 1969 1970 static int rbd_init_watch_dev(struct rbd_device *rbd_dev) 1971 { 1972 int ret, rc; 1973 1974 do { 1975 ret = rbd_req_sync_watch(rbd_dev, rbd_dev->obj_md_name, 1976 rbd_dev->header.obj_version); 1977 if (ret == -ERANGE) { 1978 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING); 1979 rc = __rbd_update_snaps(rbd_dev); 1980 mutex_unlock(&ctl_mutex); 1981 if (rc < 0) 1982 return rc; 1983 } 1984 } while (ret == -ERANGE); 1985 1986 return ret; 1987 } 1988 1989 static ssize_t rbd_add(struct bus_type *bus, 1990 const char *buf, 1991 size_t count) 1992 { 1993 struct ceph_osd_client *osdc; 1994 struct rbd_device *rbd_dev; 1995 ssize_t rc = -ENOMEM; 1996 int irc, new_id = 0; 1997 struct list_head *tmp; 1998 char *mon_dev_name; 1999 char *options; 2000 2001 if (!try_module_get(THIS_MODULE)) 2002 return -ENODEV; 2003 2004 mon_dev_name = kmalloc(RBD_MAX_OPT_LEN, GFP_KERNEL); 2005 if (!mon_dev_name) 2006 goto err_out_mod; 2007 2008 options = kmalloc(RBD_MAX_OPT_LEN, GFP_KERNEL); 2009 if (!options) 2010 goto err_mon_dev; 2011 2012 /* new rbd_device object */ 2013 rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL); 2014 if (!rbd_dev) 2015 goto err_out_opt; 2016 2017 /* static rbd_device initialization */ 2018 spin_lock_init(&rbd_dev->lock); 2019 INIT_LIST_HEAD(&rbd_dev->node); 2020 INIT_LIST_HEAD(&rbd_dev->snaps); 2021 2022 /* generate unique id: find highest unique id, add one */ 2023 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING); 2024 2025 list_for_each(tmp, &rbd_dev_list) { 2026 struct rbd_device *rbd_dev; 2027 2028 rbd_dev = list_entry(tmp, struct rbd_device, node); 2029 if (rbd_dev->id >= new_id) 2030 new_id = rbd_dev->id + 1; 2031 } 2032 2033 rbd_dev->id = new_id; 2034 2035 /* add to global list */ 2036 list_add_tail(&rbd_dev->node, &rbd_dev_list); 2037 2038 /* parse add command */ 2039 if (sscanf(buf, "%" __stringify(RBD_MAX_OPT_LEN) "s " 2040 "%" __stringify(RBD_MAX_OPT_LEN) "s " 2041 "%" __stringify(RBD_MAX_POOL_NAME_LEN) "s " 2042 "%" __stringify(RBD_MAX_OBJ_NAME_LEN) "s" 2043 "%" __stringify(RBD_MAX_SNAP_NAME_LEN) "s", 2044 mon_dev_name, options, rbd_dev->pool_name, 2045 rbd_dev->obj, rbd_dev->snap_name) < 4) { 2046 rc = -EINVAL; 2047 goto err_out_slot; 2048 } 2049 2050 if (rbd_dev->snap_name[0] == 0) 2051 rbd_dev->snap_name[0] = '-'; 2052 2053 rbd_dev->obj_len = strlen(rbd_dev->obj); 2054 snprintf(rbd_dev->obj_md_name, sizeof(rbd_dev->obj_md_name), "%s%s", 2055 rbd_dev->obj, RBD_SUFFIX); 2056 2057 /* initialize rest of new object */ 2058 snprintf(rbd_dev->name, DEV_NAME_LEN, DRV_NAME "%d", rbd_dev->id); 2059 rc = rbd_get_client(rbd_dev, mon_dev_name, options); 2060 if (rc < 0) 2061 goto err_out_slot; 2062 2063 mutex_unlock(&ctl_mutex); 2064 2065 /* pick the pool */ 2066 osdc = &rbd_dev->client->osdc; 2067 rc = ceph_pg_poolid_by_name(osdc->osdmap, rbd_dev->pool_name); 2068 if (rc < 0) 2069 goto err_out_client; 2070 rbd_dev->poolid = rc; 2071 2072 /* register our block device */ 2073 irc = register_blkdev(0, rbd_dev->name); 2074 if (irc < 0) { 2075 rc = irc; 2076 goto err_out_client; 2077 } 2078 rbd_dev->major = irc; 2079 2080 rc = rbd_bus_add_dev(rbd_dev); 2081 if (rc) 2082 goto err_out_blkdev; 2083 2084 /* set up and announce blkdev mapping */ 2085 rc = rbd_init_disk(rbd_dev); 2086 if (rc) 2087 goto err_out_bus; 2088 2089 rc = rbd_init_watch_dev(rbd_dev); 2090 if (rc) 2091 goto err_out_bus; 2092 2093 return count; 2094 2095 err_out_bus: 2096 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING); 2097 list_del_init(&rbd_dev->node); 2098 mutex_unlock(&ctl_mutex); 2099 2100 /* this will also clean up rest of rbd_dev stuff */ 2101 2102 rbd_bus_del_dev(rbd_dev); 2103 kfree(options); 2104 kfree(mon_dev_name); 2105 return rc; 2106 2107 err_out_blkdev: 2108 unregister_blkdev(rbd_dev->major, rbd_dev->name); 2109 err_out_client: 2110 rbd_put_client(rbd_dev); 2111 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING); 2112 err_out_slot: 2113 list_del_init(&rbd_dev->node); 2114 mutex_unlock(&ctl_mutex); 2115 2116 kfree(rbd_dev); 2117 err_out_opt: 2118 kfree(options); 2119 err_mon_dev: 2120 kfree(mon_dev_name); 2121 err_out_mod: 2122 dout("Error adding device %s\n", buf); 2123 module_put(THIS_MODULE); 2124 return rc; 2125 } 2126 2127 static struct rbd_device *__rbd_get_dev(unsigned long id) 2128 { 2129 struct list_head *tmp; 2130 struct rbd_device *rbd_dev; 2131 2132 list_for_each(tmp, &rbd_dev_list) { 2133 rbd_dev = list_entry(tmp, struct rbd_device, node); 2134 if (rbd_dev->id == id) 2135 return rbd_dev; 2136 } 2137 return NULL; 2138 } 2139 2140 static void rbd_dev_release(struct device *dev) 2141 { 2142 struct rbd_device *rbd_dev = 2143 container_of(dev, struct rbd_device, dev); 2144 2145 if (rbd_dev->watch_request) 2146 ceph_osdc_unregister_linger_request(&rbd_dev->client->osdc, 2147 rbd_dev->watch_request); 2148 if (rbd_dev->watch_event) 2149 ceph_osdc_cancel_event(rbd_dev->watch_event); 2150 2151 rbd_put_client(rbd_dev); 2152 2153 /* clean up and free blkdev */ 2154 rbd_free_disk(rbd_dev); 2155 unregister_blkdev(rbd_dev->major, rbd_dev->name); 2156 kfree(rbd_dev); 2157 2158 /* release module ref */ 2159 module_put(THIS_MODULE); 2160 } 2161 2162 static ssize_t rbd_remove(struct bus_type *bus, 2163 const char *buf, 2164 size_t count) 2165 { 2166 struct rbd_device *rbd_dev = NULL; 2167 int target_id, rc; 2168 unsigned long ul; 2169 int ret = count; 2170 2171 rc = strict_strtoul(buf, 10, &ul); 2172 if (rc) 2173 return rc; 2174 2175 /* convert to int; abort if we lost anything in the conversion */ 2176 target_id = (int) ul; 2177 if (target_id != ul) 2178 return -EINVAL; 2179 2180 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING); 2181 2182 rbd_dev = __rbd_get_dev(target_id); 2183 if (!rbd_dev) { 2184 ret = -ENOENT; 2185 goto done; 2186 } 2187 2188 list_del_init(&rbd_dev->node); 2189 2190 __rbd_remove_all_snaps(rbd_dev); 2191 rbd_bus_del_dev(rbd_dev); 2192 2193 done: 2194 mutex_unlock(&ctl_mutex); 2195 return ret; 2196 } 2197 2198 static ssize_t rbd_snap_add(struct device *dev, 2199 struct device_attribute *attr, 2200 const char *buf, 2201 size_t count) 2202 { 2203 struct rbd_device *rbd_dev = dev_to_rbd(dev); 2204 int ret; 2205 char *name = kmalloc(count + 1, GFP_KERNEL); 2206 if (!name) 2207 return -ENOMEM; 2208 2209 snprintf(name, count, "%s", buf); 2210 2211 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING); 2212 2213 ret = rbd_header_add_snap(rbd_dev, 2214 name, GFP_KERNEL); 2215 if (ret < 0) 2216 goto err_unlock; 2217 2218 ret = __rbd_update_snaps(rbd_dev); 2219 if (ret < 0) 2220 goto err_unlock; 2221 2222 /* shouldn't hold ctl_mutex when notifying.. notify might 2223 trigger a watch callback that would need to get that mutex */ 2224 mutex_unlock(&ctl_mutex); 2225 2226 /* make a best effort, don't error if failed */ 2227 rbd_req_sync_notify(rbd_dev, rbd_dev->obj_md_name); 2228 2229 ret = count; 2230 kfree(name); 2231 return ret; 2232 2233 err_unlock: 2234 mutex_unlock(&ctl_mutex); 2235 kfree(name); 2236 return ret; 2237 } 2238 2239 static ssize_t rbd_snap_rollback(struct device *dev, 2240 struct device_attribute *attr, 2241 const char *buf, 2242 size_t count) 2243 { 2244 struct rbd_device *rbd_dev = dev_to_rbd(dev); 2245 int ret; 2246 u64 snapid; 2247 u64 cur_ofs; 2248 char *seg_name = NULL; 2249 char *snap_name = kmalloc(count + 1, GFP_KERNEL); 2250 ret = -ENOMEM; 2251 if (!snap_name) 2252 return ret; 2253 2254 /* parse snaps add command */ 2255 snprintf(snap_name, count, "%s", buf); 2256 seg_name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO); 2257 if (!seg_name) 2258 goto done; 2259 2260 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING); 2261 2262 ret = snap_by_name(&rbd_dev->header, snap_name, &snapid, NULL); 2263 if (ret < 0) 2264 goto done_unlock; 2265 2266 dout("snapid=%lld\n", snapid); 2267 2268 cur_ofs = 0; 2269 while (cur_ofs < rbd_dev->header.image_size) { 2270 cur_ofs += rbd_get_segment(&rbd_dev->header, 2271 rbd_dev->obj, 2272 cur_ofs, (u64)-1, 2273 seg_name, NULL); 2274 dout("seg_name=%s\n", seg_name); 2275 2276 ret = rbd_req_sync_rollback_obj(rbd_dev, snapid, seg_name); 2277 if (ret < 0) 2278 pr_warning("could not roll back obj %s err=%d\n", 2279 seg_name, ret); 2280 } 2281 2282 ret = __rbd_update_snaps(rbd_dev); 2283 if (ret < 0) 2284 goto done_unlock; 2285 2286 ret = count; 2287 2288 done_unlock: 2289 mutex_unlock(&ctl_mutex); 2290 done: 2291 kfree(seg_name); 2292 kfree(snap_name); 2293 2294 return ret; 2295 } 2296 2297 static struct bus_attribute rbd_bus_attrs[] = { 2298 __ATTR(add, S_IWUSR, NULL, rbd_add), 2299 __ATTR(remove, S_IWUSR, NULL, rbd_remove), 2300 __ATTR_NULL 2301 }; 2302 2303 /* 2304 * create control files in sysfs 2305 * /sys/bus/rbd/... 2306 */ 2307 static int rbd_sysfs_init(void) 2308 { 2309 int ret; 2310 2311 rbd_bus_type.bus_attrs = rbd_bus_attrs; 2312 2313 ret = bus_register(&rbd_bus_type); 2314 if (ret < 0) 2315 return ret; 2316 2317 ret = device_register(&rbd_root_dev); 2318 2319 return ret; 2320 } 2321 2322 static void rbd_sysfs_cleanup(void) 2323 { 2324 device_unregister(&rbd_root_dev); 2325 bus_unregister(&rbd_bus_type); 2326 } 2327 2328 int __init rbd_init(void) 2329 { 2330 int rc; 2331 2332 rc = rbd_sysfs_init(); 2333 if (rc) 2334 return rc; 2335 spin_lock_init(&node_lock); 2336 pr_info("loaded " DRV_NAME_LONG "\n"); 2337 return 0; 2338 } 2339 2340 void __exit rbd_exit(void) 2341 { 2342 rbd_sysfs_cleanup(); 2343 } 2344 2345 module_init(rbd_init); 2346 module_exit(rbd_exit); 2347 2348 MODULE_AUTHOR("Sage Weil <sage@newdream.net>"); 2349 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>"); 2350 MODULE_DESCRIPTION("rados block device"); 2351 2352 /* following authorship retained from original osdblk.c */ 2353 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>"); 2354 2355 MODULE_LICENSE("GPL"); 2356