1 /* 2 rbd.c -- Export ceph rados objects as a Linux block device 3 4 5 based on drivers/block/osdblk.c: 6 7 Copyright 2009 Red Hat, Inc. 8 9 This program is free software; you can redistribute it and/or modify 10 it under the terms of the GNU General Public License as published by 11 the Free Software Foundation. 12 13 This program is distributed in the hope that it will be useful, 14 but WITHOUT ANY WARRANTY; without even the implied warranty of 15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 16 GNU General Public License for more details. 17 18 You should have received a copy of the GNU General Public License 19 along with this program; see the file COPYING. If not, write to 20 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA. 21 22 23 24 For usage instructions, please refer to: 25 26 Documentation/ABI/testing/sysfs-bus-rbd 27 28 */ 29 30 #include <linux/ceph/libceph.h> 31 #include <linux/ceph/osd_client.h> 32 #include <linux/ceph/mon_client.h> 33 #include <linux/ceph/decode.h> 34 #include <linux/parser.h> 35 36 #include <linux/kernel.h> 37 #include <linux/device.h> 38 #include <linux/module.h> 39 #include <linux/fs.h> 40 #include <linux/blkdev.h> 41 42 #include "rbd_types.h" 43 44 #define DRV_NAME "rbd" 45 #define DRV_NAME_LONG "rbd (rados block device)" 46 47 #define RBD_MINORS_PER_MAJOR 256 /* max minors per blkdev */ 48 49 #define RBD_MAX_MD_NAME_LEN (96 + sizeof(RBD_SUFFIX)) 50 #define RBD_MAX_POOL_NAME_LEN 64 51 #define RBD_MAX_SNAP_NAME_LEN 32 52 #define RBD_MAX_OPT_LEN 1024 53 54 #define RBD_SNAP_HEAD_NAME "-" 55 56 #define DEV_NAME_LEN 32 57 58 #define RBD_NOTIFY_TIMEOUT_DEFAULT 10 59 60 /* 61 * block device image metadata (in-memory version) 62 */ 63 struct rbd_image_header { 64 u64 image_size; 65 char block_name[32]; 66 __u8 obj_order; 67 __u8 crypt_type; 68 __u8 comp_type; 69 struct rw_semaphore snap_rwsem; 70 struct ceph_snap_context *snapc; 71 size_t snap_names_len; 72 u64 snap_seq; 73 u32 total_snaps; 74 75 char *snap_names; 76 u64 *snap_sizes; 77 78 u64 obj_version; 79 }; 80 81 struct rbd_options { 82 int notify_timeout; 83 }; 84 85 /* 86 * an instance of the client. multiple devices may share a client. 87 */ 88 struct rbd_client { 89 struct ceph_client *client; 90 struct rbd_options *rbd_opts; 91 struct kref kref; 92 struct list_head node; 93 }; 94 95 struct rbd_req_coll; 96 97 /* 98 * a single io request 99 */ 100 struct rbd_request { 101 struct request *rq; /* blk layer request */ 102 struct bio *bio; /* cloned bio */ 103 struct page **pages; /* list of used pages */ 104 u64 len; 105 int coll_index; 106 struct rbd_req_coll *coll; 107 }; 108 109 struct rbd_req_status { 110 int done; 111 int rc; 112 u64 bytes; 113 }; 114 115 /* 116 * a collection of requests 117 */ 118 struct rbd_req_coll { 119 int total; 120 int num_done; 121 struct kref kref; 122 struct rbd_req_status status[0]; 123 }; 124 125 struct rbd_snap { 126 struct device dev; 127 const char *name; 128 size_t size; 129 struct list_head node; 130 u64 id; 131 }; 132 133 /* 134 * a single device 135 */ 136 struct rbd_device { 137 int id; /* blkdev unique id */ 138 139 int major; /* blkdev assigned major */ 140 struct gendisk *disk; /* blkdev's gendisk and rq */ 141 struct request_queue *q; 142 143 struct ceph_client *client; 144 struct rbd_client *rbd_client; 145 146 char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */ 147 148 spinlock_t lock; /* queue lock */ 149 150 struct rbd_image_header header; 151 char obj[RBD_MAX_OBJ_NAME_LEN]; /* rbd image name */ 152 int obj_len; 153 char obj_md_name[RBD_MAX_MD_NAME_LEN]; /* hdr nm. */ 154 char pool_name[RBD_MAX_POOL_NAME_LEN]; 155 int poolid; 156 157 struct ceph_osd_event *watch_event; 158 struct ceph_osd_request *watch_request; 159 160 char snap_name[RBD_MAX_SNAP_NAME_LEN]; 161 u32 cur_snap; /* index+1 of current snapshot within snap context 162 0 - for the head */ 163 int read_only; 164 165 struct list_head node; 166 167 /* list of snapshots */ 168 struct list_head snaps; 169 170 /* sysfs related */ 171 struct device dev; 172 }; 173 174 static struct bus_type rbd_bus_type = { 175 .name = "rbd", 176 }; 177 178 static spinlock_t node_lock; /* protects client get/put */ 179 180 static DEFINE_MUTEX(ctl_mutex); /* Serialize open/close/setup/teardown */ 181 static LIST_HEAD(rbd_dev_list); /* devices */ 182 static LIST_HEAD(rbd_client_list); /* clients */ 183 184 static int __rbd_init_snaps_header(struct rbd_device *rbd_dev); 185 static void rbd_dev_release(struct device *dev); 186 static ssize_t rbd_snap_rollback(struct device *dev, 187 struct device_attribute *attr, 188 const char *buf, 189 size_t size); 190 static ssize_t rbd_snap_add(struct device *dev, 191 struct device_attribute *attr, 192 const char *buf, 193 size_t count); 194 static void __rbd_remove_snap_dev(struct rbd_device *rbd_dev, 195 struct rbd_snap *snap);; 196 197 198 static struct rbd_device *dev_to_rbd(struct device *dev) 199 { 200 return container_of(dev, struct rbd_device, dev); 201 } 202 203 static struct device *rbd_get_dev(struct rbd_device *rbd_dev) 204 { 205 return get_device(&rbd_dev->dev); 206 } 207 208 static void rbd_put_dev(struct rbd_device *rbd_dev) 209 { 210 put_device(&rbd_dev->dev); 211 } 212 213 static int __rbd_update_snaps(struct rbd_device *rbd_dev); 214 215 static int rbd_open(struct block_device *bdev, fmode_t mode) 216 { 217 struct gendisk *disk = bdev->bd_disk; 218 struct rbd_device *rbd_dev = disk->private_data; 219 220 rbd_get_dev(rbd_dev); 221 222 set_device_ro(bdev, rbd_dev->read_only); 223 224 if ((mode & FMODE_WRITE) && rbd_dev->read_only) 225 return -EROFS; 226 227 return 0; 228 } 229 230 static int rbd_release(struct gendisk *disk, fmode_t mode) 231 { 232 struct rbd_device *rbd_dev = disk->private_data; 233 234 rbd_put_dev(rbd_dev); 235 236 return 0; 237 } 238 239 static const struct block_device_operations rbd_bd_ops = { 240 .owner = THIS_MODULE, 241 .open = rbd_open, 242 .release = rbd_release, 243 }; 244 245 /* 246 * Initialize an rbd client instance. 247 * We own *opt. 248 */ 249 static struct rbd_client *rbd_client_create(struct ceph_options *opt, 250 struct rbd_options *rbd_opts) 251 { 252 struct rbd_client *rbdc; 253 int ret = -ENOMEM; 254 255 dout("rbd_client_create\n"); 256 rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL); 257 if (!rbdc) 258 goto out_opt; 259 260 kref_init(&rbdc->kref); 261 INIT_LIST_HEAD(&rbdc->node); 262 263 rbdc->client = ceph_create_client(opt, rbdc); 264 if (IS_ERR(rbdc->client)) 265 goto out_rbdc; 266 opt = NULL; /* Now rbdc->client is responsible for opt */ 267 268 ret = ceph_open_session(rbdc->client); 269 if (ret < 0) 270 goto out_err; 271 272 rbdc->rbd_opts = rbd_opts; 273 274 spin_lock(&node_lock); 275 list_add_tail(&rbdc->node, &rbd_client_list); 276 spin_unlock(&node_lock); 277 278 dout("rbd_client_create created %p\n", rbdc); 279 return rbdc; 280 281 out_err: 282 ceph_destroy_client(rbdc->client); 283 out_rbdc: 284 kfree(rbdc); 285 out_opt: 286 if (opt) 287 ceph_destroy_options(opt); 288 return ERR_PTR(ret); 289 } 290 291 /* 292 * Find a ceph client with specific addr and configuration. 293 */ 294 static struct rbd_client *__rbd_client_find(struct ceph_options *opt) 295 { 296 struct rbd_client *client_node; 297 298 if (opt->flags & CEPH_OPT_NOSHARE) 299 return NULL; 300 301 list_for_each_entry(client_node, &rbd_client_list, node) 302 if (ceph_compare_options(opt, client_node->client) == 0) 303 return client_node; 304 return NULL; 305 } 306 307 /* 308 * mount options 309 */ 310 enum { 311 Opt_notify_timeout, 312 Opt_last_int, 313 /* int args above */ 314 Opt_last_string, 315 /* string args above */ 316 }; 317 318 static match_table_t rbdopt_tokens = { 319 {Opt_notify_timeout, "notify_timeout=%d"}, 320 /* int args above */ 321 /* string args above */ 322 {-1, NULL} 323 }; 324 325 static int parse_rbd_opts_token(char *c, void *private) 326 { 327 struct rbd_options *rbdopt = private; 328 substring_t argstr[MAX_OPT_ARGS]; 329 int token, intval, ret; 330 331 token = match_token((char *)c, rbdopt_tokens, argstr); 332 if (token < 0) 333 return -EINVAL; 334 335 if (token < Opt_last_int) { 336 ret = match_int(&argstr[0], &intval); 337 if (ret < 0) { 338 pr_err("bad mount option arg (not int) " 339 "at '%s'\n", c); 340 return ret; 341 } 342 dout("got int token %d val %d\n", token, intval); 343 } else if (token > Opt_last_int && token < Opt_last_string) { 344 dout("got string token %d val %s\n", token, 345 argstr[0].from); 346 } else { 347 dout("got token %d\n", token); 348 } 349 350 switch (token) { 351 case Opt_notify_timeout: 352 rbdopt->notify_timeout = intval; 353 break; 354 default: 355 BUG_ON(token); 356 } 357 return 0; 358 } 359 360 /* 361 * Get a ceph client with specific addr and configuration, if one does 362 * not exist create it. 363 */ 364 static int rbd_get_client(struct rbd_device *rbd_dev, const char *mon_addr, 365 char *options) 366 { 367 struct rbd_client *rbdc; 368 struct ceph_options *opt; 369 int ret; 370 struct rbd_options *rbd_opts; 371 372 rbd_opts = kzalloc(sizeof(*rbd_opts), GFP_KERNEL); 373 if (!rbd_opts) 374 return -ENOMEM; 375 376 rbd_opts->notify_timeout = RBD_NOTIFY_TIMEOUT_DEFAULT; 377 378 ret = ceph_parse_options(&opt, options, mon_addr, 379 mon_addr + strlen(mon_addr), parse_rbd_opts_token, rbd_opts); 380 if (ret < 0) 381 goto done_err; 382 383 spin_lock(&node_lock); 384 rbdc = __rbd_client_find(opt); 385 if (rbdc) { 386 ceph_destroy_options(opt); 387 388 /* using an existing client */ 389 kref_get(&rbdc->kref); 390 rbd_dev->rbd_client = rbdc; 391 rbd_dev->client = rbdc->client; 392 spin_unlock(&node_lock); 393 return 0; 394 } 395 spin_unlock(&node_lock); 396 397 rbdc = rbd_client_create(opt, rbd_opts); 398 if (IS_ERR(rbdc)) { 399 ret = PTR_ERR(rbdc); 400 goto done_err; 401 } 402 403 rbd_dev->rbd_client = rbdc; 404 rbd_dev->client = rbdc->client; 405 return 0; 406 done_err: 407 kfree(rbd_opts); 408 return ret; 409 } 410 411 /* 412 * Destroy ceph client 413 */ 414 static void rbd_client_release(struct kref *kref) 415 { 416 struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref); 417 418 dout("rbd_release_client %p\n", rbdc); 419 spin_lock(&node_lock); 420 list_del(&rbdc->node); 421 spin_unlock(&node_lock); 422 423 ceph_destroy_client(rbdc->client); 424 kfree(rbdc->rbd_opts); 425 kfree(rbdc); 426 } 427 428 /* 429 * Drop reference to ceph client node. If it's not referenced anymore, release 430 * it. 431 */ 432 static void rbd_put_client(struct rbd_device *rbd_dev) 433 { 434 kref_put(&rbd_dev->rbd_client->kref, rbd_client_release); 435 rbd_dev->rbd_client = NULL; 436 rbd_dev->client = NULL; 437 } 438 439 /* 440 * Destroy requests collection 441 */ 442 static void rbd_coll_release(struct kref *kref) 443 { 444 struct rbd_req_coll *coll = 445 container_of(kref, struct rbd_req_coll, kref); 446 447 dout("rbd_coll_release %p\n", coll); 448 kfree(coll); 449 } 450 451 /* 452 * Create a new header structure, translate header format from the on-disk 453 * header. 454 */ 455 static int rbd_header_from_disk(struct rbd_image_header *header, 456 struct rbd_image_header_ondisk *ondisk, 457 int allocated_snaps, 458 gfp_t gfp_flags) 459 { 460 int i; 461 u32 snap_count = le32_to_cpu(ondisk->snap_count); 462 int ret = -ENOMEM; 463 464 init_rwsem(&header->snap_rwsem); 465 header->snap_names_len = le64_to_cpu(ondisk->snap_names_len); 466 header->snapc = kmalloc(sizeof(struct ceph_snap_context) + 467 snap_count * 468 sizeof(struct rbd_image_snap_ondisk), 469 gfp_flags); 470 if (!header->snapc) 471 return -ENOMEM; 472 if (snap_count) { 473 header->snap_names = kmalloc(header->snap_names_len, 474 GFP_KERNEL); 475 if (!header->snap_names) 476 goto err_snapc; 477 header->snap_sizes = kmalloc(snap_count * sizeof(u64), 478 GFP_KERNEL); 479 if (!header->snap_sizes) 480 goto err_names; 481 } else { 482 header->snap_names = NULL; 483 header->snap_sizes = NULL; 484 } 485 memcpy(header->block_name, ondisk->block_name, 486 sizeof(ondisk->block_name)); 487 488 header->image_size = le64_to_cpu(ondisk->image_size); 489 header->obj_order = ondisk->options.order; 490 header->crypt_type = ondisk->options.crypt_type; 491 header->comp_type = ondisk->options.comp_type; 492 493 atomic_set(&header->snapc->nref, 1); 494 header->snap_seq = le64_to_cpu(ondisk->snap_seq); 495 header->snapc->num_snaps = snap_count; 496 header->total_snaps = snap_count; 497 498 if (snap_count && 499 allocated_snaps == snap_count) { 500 for (i = 0; i < snap_count; i++) { 501 header->snapc->snaps[i] = 502 le64_to_cpu(ondisk->snaps[i].id); 503 header->snap_sizes[i] = 504 le64_to_cpu(ondisk->snaps[i].image_size); 505 } 506 507 /* copy snapshot names */ 508 memcpy(header->snap_names, &ondisk->snaps[i], 509 header->snap_names_len); 510 } 511 512 return 0; 513 514 err_names: 515 kfree(header->snap_names); 516 err_snapc: 517 kfree(header->snapc); 518 return ret; 519 } 520 521 static int snap_index(struct rbd_image_header *header, int snap_num) 522 { 523 return header->total_snaps - snap_num; 524 } 525 526 static u64 cur_snap_id(struct rbd_device *rbd_dev) 527 { 528 struct rbd_image_header *header = &rbd_dev->header; 529 530 if (!rbd_dev->cur_snap) 531 return 0; 532 533 return header->snapc->snaps[snap_index(header, rbd_dev->cur_snap)]; 534 } 535 536 static int snap_by_name(struct rbd_image_header *header, const char *snap_name, 537 u64 *seq, u64 *size) 538 { 539 int i; 540 char *p = header->snap_names; 541 542 for (i = 0; i < header->total_snaps; i++, p += strlen(p) + 1) { 543 if (strcmp(snap_name, p) == 0) 544 break; 545 } 546 if (i == header->total_snaps) 547 return -ENOENT; 548 if (seq) 549 *seq = header->snapc->snaps[i]; 550 551 if (size) 552 *size = header->snap_sizes[i]; 553 554 return i; 555 } 556 557 static int rbd_header_set_snap(struct rbd_device *dev, 558 const char *snap_name, 559 u64 *size) 560 { 561 struct rbd_image_header *header = &dev->header; 562 struct ceph_snap_context *snapc = header->snapc; 563 int ret = -ENOENT; 564 565 down_write(&header->snap_rwsem); 566 567 if (!snap_name || 568 !*snap_name || 569 strcmp(snap_name, "-") == 0 || 570 strcmp(snap_name, RBD_SNAP_HEAD_NAME) == 0) { 571 if (header->total_snaps) 572 snapc->seq = header->snap_seq; 573 else 574 snapc->seq = 0; 575 dev->cur_snap = 0; 576 dev->read_only = 0; 577 if (size) 578 *size = header->image_size; 579 } else { 580 ret = snap_by_name(header, snap_name, &snapc->seq, size); 581 if (ret < 0) 582 goto done; 583 584 dev->cur_snap = header->total_snaps - ret; 585 dev->read_only = 1; 586 } 587 588 ret = 0; 589 done: 590 up_write(&header->snap_rwsem); 591 return ret; 592 } 593 594 static void rbd_header_free(struct rbd_image_header *header) 595 { 596 kfree(header->snapc); 597 kfree(header->snap_names); 598 kfree(header->snap_sizes); 599 } 600 601 /* 602 * get the actual striped segment name, offset and length 603 */ 604 static u64 rbd_get_segment(struct rbd_image_header *header, 605 const char *block_name, 606 u64 ofs, u64 len, 607 char *seg_name, u64 *segofs) 608 { 609 u64 seg = ofs >> header->obj_order; 610 611 if (seg_name) 612 snprintf(seg_name, RBD_MAX_SEG_NAME_LEN, 613 "%s.%012llx", block_name, seg); 614 615 ofs = ofs & ((1 << header->obj_order) - 1); 616 len = min_t(u64, len, (1 << header->obj_order) - ofs); 617 618 if (segofs) 619 *segofs = ofs; 620 621 return len; 622 } 623 624 static int rbd_get_num_segments(struct rbd_image_header *header, 625 u64 ofs, u64 len) 626 { 627 u64 start_seg = ofs >> header->obj_order; 628 u64 end_seg = (ofs + len - 1) >> header->obj_order; 629 return end_seg - start_seg + 1; 630 } 631 632 /* 633 * bio helpers 634 */ 635 636 static void bio_chain_put(struct bio *chain) 637 { 638 struct bio *tmp; 639 640 while (chain) { 641 tmp = chain; 642 chain = chain->bi_next; 643 bio_put(tmp); 644 } 645 } 646 647 /* 648 * zeros a bio chain, starting at specific offset 649 */ 650 static void zero_bio_chain(struct bio *chain, int start_ofs) 651 { 652 struct bio_vec *bv; 653 unsigned long flags; 654 void *buf; 655 int i; 656 int pos = 0; 657 658 while (chain) { 659 bio_for_each_segment(bv, chain, i) { 660 if (pos + bv->bv_len > start_ofs) { 661 int remainder = max(start_ofs - pos, 0); 662 buf = bvec_kmap_irq(bv, &flags); 663 memset(buf + remainder, 0, 664 bv->bv_len - remainder); 665 bvec_kunmap_irq(buf, &flags); 666 } 667 pos += bv->bv_len; 668 } 669 670 chain = chain->bi_next; 671 } 672 } 673 674 /* 675 * bio_chain_clone - clone a chain of bios up to a certain length. 676 * might return a bio_pair that will need to be released. 677 */ 678 static struct bio *bio_chain_clone(struct bio **old, struct bio **next, 679 struct bio_pair **bp, 680 int len, gfp_t gfpmask) 681 { 682 struct bio *tmp, *old_chain = *old, *new_chain = NULL, *tail = NULL; 683 int total = 0; 684 685 if (*bp) { 686 bio_pair_release(*bp); 687 *bp = NULL; 688 } 689 690 while (old_chain && (total < len)) { 691 tmp = bio_kmalloc(gfpmask, old_chain->bi_max_vecs); 692 if (!tmp) 693 goto err_out; 694 695 if (total + old_chain->bi_size > len) { 696 struct bio_pair *bp; 697 698 /* 699 * this split can only happen with a single paged bio, 700 * split_bio will BUG_ON if this is not the case 701 */ 702 dout("bio_chain_clone split! total=%d remaining=%d" 703 "bi_size=%d\n", 704 (int)total, (int)len-total, 705 (int)old_chain->bi_size); 706 707 /* split the bio. We'll release it either in the next 708 call, or it will have to be released outside */ 709 bp = bio_split(old_chain, (len - total) / 512ULL); 710 if (!bp) 711 goto err_out; 712 713 __bio_clone(tmp, &bp->bio1); 714 715 *next = &bp->bio2; 716 } else { 717 __bio_clone(tmp, old_chain); 718 *next = old_chain->bi_next; 719 } 720 721 tmp->bi_bdev = NULL; 722 gfpmask &= ~__GFP_WAIT; 723 tmp->bi_next = NULL; 724 725 if (!new_chain) { 726 new_chain = tail = tmp; 727 } else { 728 tail->bi_next = tmp; 729 tail = tmp; 730 } 731 old_chain = old_chain->bi_next; 732 733 total += tmp->bi_size; 734 } 735 736 BUG_ON(total < len); 737 738 if (tail) 739 tail->bi_next = NULL; 740 741 *old = old_chain; 742 743 return new_chain; 744 745 err_out: 746 dout("bio_chain_clone with err\n"); 747 bio_chain_put(new_chain); 748 return NULL; 749 } 750 751 /* 752 * helpers for osd request op vectors. 753 */ 754 static int rbd_create_rw_ops(struct ceph_osd_req_op **ops, 755 int num_ops, 756 int opcode, 757 u32 payload_len) 758 { 759 *ops = kzalloc(sizeof(struct ceph_osd_req_op) * (num_ops + 1), 760 GFP_NOIO); 761 if (!*ops) 762 return -ENOMEM; 763 (*ops)[0].op = opcode; 764 /* 765 * op extent offset and length will be set later on 766 * in calc_raw_layout() 767 */ 768 (*ops)[0].payload_len = payload_len; 769 return 0; 770 } 771 772 static void rbd_destroy_ops(struct ceph_osd_req_op *ops) 773 { 774 kfree(ops); 775 } 776 777 static void rbd_coll_end_req_index(struct request *rq, 778 struct rbd_req_coll *coll, 779 int index, 780 int ret, u64 len) 781 { 782 struct request_queue *q; 783 int min, max, i; 784 785 dout("rbd_coll_end_req_index %p index %d ret %d len %lld\n", 786 coll, index, ret, len); 787 788 if (!rq) 789 return; 790 791 if (!coll) { 792 blk_end_request(rq, ret, len); 793 return; 794 } 795 796 q = rq->q; 797 798 spin_lock_irq(q->queue_lock); 799 coll->status[index].done = 1; 800 coll->status[index].rc = ret; 801 coll->status[index].bytes = len; 802 max = min = coll->num_done; 803 while (max < coll->total && coll->status[max].done) 804 max++; 805 806 for (i = min; i<max; i++) { 807 __blk_end_request(rq, coll->status[i].rc, 808 coll->status[i].bytes); 809 coll->num_done++; 810 kref_put(&coll->kref, rbd_coll_release); 811 } 812 spin_unlock_irq(q->queue_lock); 813 } 814 815 static void rbd_coll_end_req(struct rbd_request *req, 816 int ret, u64 len) 817 { 818 rbd_coll_end_req_index(req->rq, req->coll, req->coll_index, ret, len); 819 } 820 821 /* 822 * Send ceph osd request 823 */ 824 static int rbd_do_request(struct request *rq, 825 struct rbd_device *dev, 826 struct ceph_snap_context *snapc, 827 u64 snapid, 828 const char *obj, u64 ofs, u64 len, 829 struct bio *bio, 830 struct page **pages, 831 int num_pages, 832 int flags, 833 struct ceph_osd_req_op *ops, 834 int num_reply, 835 struct rbd_req_coll *coll, 836 int coll_index, 837 void (*rbd_cb)(struct ceph_osd_request *req, 838 struct ceph_msg *msg), 839 struct ceph_osd_request **linger_req, 840 u64 *ver) 841 { 842 struct ceph_osd_request *req; 843 struct ceph_file_layout *layout; 844 int ret; 845 u64 bno; 846 struct timespec mtime = CURRENT_TIME; 847 struct rbd_request *req_data; 848 struct ceph_osd_request_head *reqhead; 849 struct rbd_image_header *header = &dev->header; 850 851 req_data = kzalloc(sizeof(*req_data), GFP_NOIO); 852 if (!req_data) { 853 if (coll) 854 rbd_coll_end_req_index(rq, coll, coll_index, 855 -ENOMEM, len); 856 return -ENOMEM; 857 } 858 859 if (coll) { 860 req_data->coll = coll; 861 req_data->coll_index = coll_index; 862 } 863 864 dout("rbd_do_request obj=%s ofs=%lld len=%lld\n", obj, len, ofs); 865 866 down_read(&header->snap_rwsem); 867 868 req = ceph_osdc_alloc_request(&dev->client->osdc, flags, 869 snapc, 870 ops, 871 false, 872 GFP_NOIO, pages, bio); 873 if (!req) { 874 up_read(&header->snap_rwsem); 875 ret = -ENOMEM; 876 goto done_pages; 877 } 878 879 req->r_callback = rbd_cb; 880 881 req_data->rq = rq; 882 req_data->bio = bio; 883 req_data->pages = pages; 884 req_data->len = len; 885 886 req->r_priv = req_data; 887 888 reqhead = req->r_request->front.iov_base; 889 reqhead->snapid = cpu_to_le64(CEPH_NOSNAP); 890 891 strncpy(req->r_oid, obj, sizeof(req->r_oid)); 892 req->r_oid_len = strlen(req->r_oid); 893 894 layout = &req->r_file_layout; 895 memset(layout, 0, sizeof(*layout)); 896 layout->fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER); 897 layout->fl_stripe_count = cpu_to_le32(1); 898 layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER); 899 layout->fl_pg_preferred = cpu_to_le32(-1); 900 layout->fl_pg_pool = cpu_to_le32(dev->poolid); 901 ceph_calc_raw_layout(&dev->client->osdc, layout, snapid, 902 ofs, &len, &bno, req, ops); 903 904 ceph_osdc_build_request(req, ofs, &len, 905 ops, 906 snapc, 907 &mtime, 908 req->r_oid, req->r_oid_len); 909 up_read(&header->snap_rwsem); 910 911 if (linger_req) { 912 ceph_osdc_set_request_linger(&dev->client->osdc, req); 913 *linger_req = req; 914 } 915 916 ret = ceph_osdc_start_request(&dev->client->osdc, req, false); 917 if (ret < 0) 918 goto done_err; 919 920 if (!rbd_cb) { 921 ret = ceph_osdc_wait_request(&dev->client->osdc, req); 922 if (ver) 923 *ver = le64_to_cpu(req->r_reassert_version.version); 924 dout("reassert_ver=%lld\n", 925 le64_to_cpu(req->r_reassert_version.version)); 926 ceph_osdc_put_request(req); 927 } 928 return ret; 929 930 done_err: 931 bio_chain_put(req_data->bio); 932 ceph_osdc_put_request(req); 933 done_pages: 934 rbd_coll_end_req(req_data, ret, len); 935 kfree(req_data); 936 return ret; 937 } 938 939 /* 940 * Ceph osd op callback 941 */ 942 static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg) 943 { 944 struct rbd_request *req_data = req->r_priv; 945 struct ceph_osd_reply_head *replyhead; 946 struct ceph_osd_op *op; 947 __s32 rc; 948 u64 bytes; 949 int read_op; 950 951 /* parse reply */ 952 replyhead = msg->front.iov_base; 953 WARN_ON(le32_to_cpu(replyhead->num_ops) == 0); 954 op = (void *)(replyhead + 1); 955 rc = le32_to_cpu(replyhead->result); 956 bytes = le64_to_cpu(op->extent.length); 957 read_op = (le32_to_cpu(op->op) == CEPH_OSD_OP_READ); 958 959 dout("rbd_req_cb bytes=%lld readop=%d rc=%d\n", bytes, read_op, rc); 960 961 if (rc == -ENOENT && read_op) { 962 zero_bio_chain(req_data->bio, 0); 963 rc = 0; 964 } else if (rc == 0 && read_op && bytes < req_data->len) { 965 zero_bio_chain(req_data->bio, bytes); 966 bytes = req_data->len; 967 } 968 969 rbd_coll_end_req(req_data, rc, bytes); 970 971 if (req_data->bio) 972 bio_chain_put(req_data->bio); 973 974 ceph_osdc_put_request(req); 975 kfree(req_data); 976 } 977 978 static void rbd_simple_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg) 979 { 980 ceph_osdc_put_request(req); 981 } 982 983 /* 984 * Do a synchronous ceph osd operation 985 */ 986 static int rbd_req_sync_op(struct rbd_device *dev, 987 struct ceph_snap_context *snapc, 988 u64 snapid, 989 int opcode, 990 int flags, 991 struct ceph_osd_req_op *orig_ops, 992 int num_reply, 993 const char *obj, 994 u64 ofs, u64 len, 995 char *buf, 996 struct ceph_osd_request **linger_req, 997 u64 *ver) 998 { 999 int ret; 1000 struct page **pages; 1001 int num_pages; 1002 struct ceph_osd_req_op *ops = orig_ops; 1003 u32 payload_len; 1004 1005 num_pages = calc_pages_for(ofs , len); 1006 pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL); 1007 if (IS_ERR(pages)) 1008 return PTR_ERR(pages); 1009 1010 if (!orig_ops) { 1011 payload_len = (flags & CEPH_OSD_FLAG_WRITE ? len : 0); 1012 ret = rbd_create_rw_ops(&ops, 1, opcode, payload_len); 1013 if (ret < 0) 1014 goto done; 1015 1016 if ((flags & CEPH_OSD_FLAG_WRITE) && buf) { 1017 ret = ceph_copy_to_page_vector(pages, buf, ofs, len); 1018 if (ret < 0) 1019 goto done_ops; 1020 } 1021 } 1022 1023 ret = rbd_do_request(NULL, dev, snapc, snapid, 1024 obj, ofs, len, NULL, 1025 pages, num_pages, 1026 flags, 1027 ops, 1028 2, 1029 NULL, 0, 1030 NULL, 1031 linger_req, ver); 1032 if (ret < 0) 1033 goto done_ops; 1034 1035 if ((flags & CEPH_OSD_FLAG_READ) && buf) 1036 ret = ceph_copy_from_page_vector(pages, buf, ofs, ret); 1037 1038 done_ops: 1039 if (!orig_ops) 1040 rbd_destroy_ops(ops); 1041 done: 1042 ceph_release_page_vector(pages, num_pages); 1043 return ret; 1044 } 1045 1046 /* 1047 * Do an asynchronous ceph osd operation 1048 */ 1049 static int rbd_do_op(struct request *rq, 1050 struct rbd_device *rbd_dev , 1051 struct ceph_snap_context *snapc, 1052 u64 snapid, 1053 int opcode, int flags, int num_reply, 1054 u64 ofs, u64 len, 1055 struct bio *bio, 1056 struct rbd_req_coll *coll, 1057 int coll_index) 1058 { 1059 char *seg_name; 1060 u64 seg_ofs; 1061 u64 seg_len; 1062 int ret; 1063 struct ceph_osd_req_op *ops; 1064 u32 payload_len; 1065 1066 seg_name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO); 1067 if (!seg_name) 1068 return -ENOMEM; 1069 1070 seg_len = rbd_get_segment(&rbd_dev->header, 1071 rbd_dev->header.block_name, 1072 ofs, len, 1073 seg_name, &seg_ofs); 1074 1075 payload_len = (flags & CEPH_OSD_FLAG_WRITE ? seg_len : 0); 1076 1077 ret = rbd_create_rw_ops(&ops, 1, opcode, payload_len); 1078 if (ret < 0) 1079 goto done; 1080 1081 /* we've taken care of segment sizes earlier when we 1082 cloned the bios. We should never have a segment 1083 truncated at this point */ 1084 BUG_ON(seg_len < len); 1085 1086 ret = rbd_do_request(rq, rbd_dev, snapc, snapid, 1087 seg_name, seg_ofs, seg_len, 1088 bio, 1089 NULL, 0, 1090 flags, 1091 ops, 1092 num_reply, 1093 coll, coll_index, 1094 rbd_req_cb, 0, NULL); 1095 1096 rbd_destroy_ops(ops); 1097 done: 1098 kfree(seg_name); 1099 return ret; 1100 } 1101 1102 /* 1103 * Request async osd write 1104 */ 1105 static int rbd_req_write(struct request *rq, 1106 struct rbd_device *rbd_dev, 1107 struct ceph_snap_context *snapc, 1108 u64 ofs, u64 len, 1109 struct bio *bio, 1110 struct rbd_req_coll *coll, 1111 int coll_index) 1112 { 1113 return rbd_do_op(rq, rbd_dev, snapc, CEPH_NOSNAP, 1114 CEPH_OSD_OP_WRITE, 1115 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK, 1116 2, 1117 ofs, len, bio, coll, coll_index); 1118 } 1119 1120 /* 1121 * Request async osd read 1122 */ 1123 static int rbd_req_read(struct request *rq, 1124 struct rbd_device *rbd_dev, 1125 u64 snapid, 1126 u64 ofs, u64 len, 1127 struct bio *bio, 1128 struct rbd_req_coll *coll, 1129 int coll_index) 1130 { 1131 return rbd_do_op(rq, rbd_dev, NULL, 1132 (snapid ? snapid : CEPH_NOSNAP), 1133 CEPH_OSD_OP_READ, 1134 CEPH_OSD_FLAG_READ, 1135 2, 1136 ofs, len, bio, coll, coll_index); 1137 } 1138 1139 /* 1140 * Request sync osd read 1141 */ 1142 static int rbd_req_sync_read(struct rbd_device *dev, 1143 struct ceph_snap_context *snapc, 1144 u64 snapid, 1145 const char *obj, 1146 u64 ofs, u64 len, 1147 char *buf, 1148 u64 *ver) 1149 { 1150 return rbd_req_sync_op(dev, NULL, 1151 (snapid ? snapid : CEPH_NOSNAP), 1152 CEPH_OSD_OP_READ, 1153 CEPH_OSD_FLAG_READ, 1154 NULL, 1155 1, obj, ofs, len, buf, NULL, ver); 1156 } 1157 1158 /* 1159 * Request sync osd watch 1160 */ 1161 static int rbd_req_sync_notify_ack(struct rbd_device *dev, 1162 u64 ver, 1163 u64 notify_id, 1164 const char *obj) 1165 { 1166 struct ceph_osd_req_op *ops; 1167 struct page **pages = NULL; 1168 int ret; 1169 1170 ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_NOTIFY_ACK, 0); 1171 if (ret < 0) 1172 return ret; 1173 1174 ops[0].watch.ver = cpu_to_le64(dev->header.obj_version); 1175 ops[0].watch.cookie = notify_id; 1176 ops[0].watch.flag = 0; 1177 1178 ret = rbd_do_request(NULL, dev, NULL, CEPH_NOSNAP, 1179 obj, 0, 0, NULL, 1180 pages, 0, 1181 CEPH_OSD_FLAG_READ, 1182 ops, 1183 1, 1184 NULL, 0, 1185 rbd_simple_req_cb, 0, NULL); 1186 1187 rbd_destroy_ops(ops); 1188 return ret; 1189 } 1190 1191 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data) 1192 { 1193 struct rbd_device *dev = (struct rbd_device *)data; 1194 int rc; 1195 1196 if (!dev) 1197 return; 1198 1199 dout("rbd_watch_cb %s notify_id=%lld opcode=%d\n", dev->obj_md_name, 1200 notify_id, (int)opcode); 1201 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING); 1202 rc = __rbd_update_snaps(dev); 1203 mutex_unlock(&ctl_mutex); 1204 if (rc) 1205 pr_warning(DRV_NAME "%d got notification but failed to update" 1206 " snaps: %d\n", dev->major, rc); 1207 1208 rbd_req_sync_notify_ack(dev, ver, notify_id, dev->obj_md_name); 1209 } 1210 1211 /* 1212 * Request sync osd watch 1213 */ 1214 static int rbd_req_sync_watch(struct rbd_device *dev, 1215 const char *obj, 1216 u64 ver) 1217 { 1218 struct ceph_osd_req_op *ops; 1219 struct ceph_osd_client *osdc = &dev->client->osdc; 1220 1221 int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_WATCH, 0); 1222 if (ret < 0) 1223 return ret; 1224 1225 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, 0, 1226 (void *)dev, &dev->watch_event); 1227 if (ret < 0) 1228 goto fail; 1229 1230 ops[0].watch.ver = cpu_to_le64(ver); 1231 ops[0].watch.cookie = cpu_to_le64(dev->watch_event->cookie); 1232 ops[0].watch.flag = 1; 1233 1234 ret = rbd_req_sync_op(dev, NULL, 1235 CEPH_NOSNAP, 1236 0, 1237 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK, 1238 ops, 1239 1, obj, 0, 0, NULL, 1240 &dev->watch_request, NULL); 1241 1242 if (ret < 0) 1243 goto fail_event; 1244 1245 rbd_destroy_ops(ops); 1246 return 0; 1247 1248 fail_event: 1249 ceph_osdc_cancel_event(dev->watch_event); 1250 dev->watch_event = NULL; 1251 fail: 1252 rbd_destroy_ops(ops); 1253 return ret; 1254 } 1255 1256 struct rbd_notify_info { 1257 struct rbd_device *dev; 1258 }; 1259 1260 static void rbd_notify_cb(u64 ver, u64 notify_id, u8 opcode, void *data) 1261 { 1262 struct rbd_device *dev = (struct rbd_device *)data; 1263 if (!dev) 1264 return; 1265 1266 dout("rbd_notify_cb %s notify_id=%lld opcode=%d\n", dev->obj_md_name, 1267 notify_id, (int)opcode); 1268 } 1269 1270 /* 1271 * Request sync osd notify 1272 */ 1273 static int rbd_req_sync_notify(struct rbd_device *dev, 1274 const char *obj) 1275 { 1276 struct ceph_osd_req_op *ops; 1277 struct ceph_osd_client *osdc = &dev->client->osdc; 1278 struct ceph_osd_event *event; 1279 struct rbd_notify_info info; 1280 int payload_len = sizeof(u32) + sizeof(u32); 1281 int ret; 1282 1283 ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_NOTIFY, payload_len); 1284 if (ret < 0) 1285 return ret; 1286 1287 info.dev = dev; 1288 1289 ret = ceph_osdc_create_event(osdc, rbd_notify_cb, 1, 1290 (void *)&info, &event); 1291 if (ret < 0) 1292 goto fail; 1293 1294 ops[0].watch.ver = 1; 1295 ops[0].watch.flag = 1; 1296 ops[0].watch.cookie = event->cookie; 1297 ops[0].watch.prot_ver = RADOS_NOTIFY_VER; 1298 ops[0].watch.timeout = 12; 1299 1300 ret = rbd_req_sync_op(dev, NULL, 1301 CEPH_NOSNAP, 1302 0, 1303 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK, 1304 ops, 1305 1, obj, 0, 0, NULL, NULL, NULL); 1306 if (ret < 0) 1307 goto fail_event; 1308 1309 ret = ceph_osdc_wait_event(event, CEPH_OSD_TIMEOUT_DEFAULT); 1310 dout("ceph_osdc_wait_event returned %d\n", ret); 1311 rbd_destroy_ops(ops); 1312 return 0; 1313 1314 fail_event: 1315 ceph_osdc_cancel_event(event); 1316 fail: 1317 rbd_destroy_ops(ops); 1318 return ret; 1319 } 1320 1321 /* 1322 * Request sync osd rollback 1323 */ 1324 static int rbd_req_sync_rollback_obj(struct rbd_device *dev, 1325 u64 snapid, 1326 const char *obj) 1327 { 1328 struct ceph_osd_req_op *ops; 1329 int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_ROLLBACK, 0); 1330 if (ret < 0) 1331 return ret; 1332 1333 ops[0].snap.snapid = snapid; 1334 1335 ret = rbd_req_sync_op(dev, NULL, 1336 CEPH_NOSNAP, 1337 0, 1338 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK, 1339 ops, 1340 1, obj, 0, 0, NULL, NULL, NULL); 1341 1342 rbd_destroy_ops(ops); 1343 1344 return ret; 1345 } 1346 1347 /* 1348 * Request sync osd read 1349 */ 1350 static int rbd_req_sync_exec(struct rbd_device *dev, 1351 const char *obj, 1352 const char *cls, 1353 const char *method, 1354 const char *data, 1355 int len, 1356 u64 *ver) 1357 { 1358 struct ceph_osd_req_op *ops; 1359 int cls_len = strlen(cls); 1360 int method_len = strlen(method); 1361 int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_CALL, 1362 cls_len + method_len + len); 1363 if (ret < 0) 1364 return ret; 1365 1366 ops[0].cls.class_name = cls; 1367 ops[0].cls.class_len = (__u8)cls_len; 1368 ops[0].cls.method_name = method; 1369 ops[0].cls.method_len = (__u8)method_len; 1370 ops[0].cls.argc = 0; 1371 ops[0].cls.indata = data; 1372 ops[0].cls.indata_len = len; 1373 1374 ret = rbd_req_sync_op(dev, NULL, 1375 CEPH_NOSNAP, 1376 0, 1377 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK, 1378 ops, 1379 1, obj, 0, 0, NULL, NULL, ver); 1380 1381 rbd_destroy_ops(ops); 1382 1383 dout("cls_exec returned %d\n", ret); 1384 return ret; 1385 } 1386 1387 static struct rbd_req_coll *rbd_alloc_coll(int num_reqs) 1388 { 1389 struct rbd_req_coll *coll = 1390 kzalloc(sizeof(struct rbd_req_coll) + 1391 sizeof(struct rbd_req_status) * num_reqs, 1392 GFP_ATOMIC); 1393 1394 if (!coll) 1395 return NULL; 1396 coll->total = num_reqs; 1397 kref_init(&coll->kref); 1398 return coll; 1399 } 1400 1401 /* 1402 * block device queue callback 1403 */ 1404 static void rbd_rq_fn(struct request_queue *q) 1405 { 1406 struct rbd_device *rbd_dev = q->queuedata; 1407 struct request *rq; 1408 struct bio_pair *bp = NULL; 1409 1410 rq = blk_fetch_request(q); 1411 1412 while (1) { 1413 struct bio *bio; 1414 struct bio *rq_bio, *next_bio = NULL; 1415 bool do_write; 1416 int size, op_size = 0; 1417 u64 ofs; 1418 int num_segs, cur_seg = 0; 1419 struct rbd_req_coll *coll; 1420 1421 /* peek at request from block layer */ 1422 if (!rq) 1423 break; 1424 1425 dout("fetched request\n"); 1426 1427 /* filter out block requests we don't understand */ 1428 if ((rq->cmd_type != REQ_TYPE_FS)) { 1429 __blk_end_request_all(rq, 0); 1430 goto next; 1431 } 1432 1433 /* deduce our operation (read, write) */ 1434 do_write = (rq_data_dir(rq) == WRITE); 1435 1436 size = blk_rq_bytes(rq); 1437 ofs = blk_rq_pos(rq) * 512ULL; 1438 rq_bio = rq->bio; 1439 if (do_write && rbd_dev->read_only) { 1440 __blk_end_request_all(rq, -EROFS); 1441 goto next; 1442 } 1443 1444 spin_unlock_irq(q->queue_lock); 1445 1446 dout("%s 0x%x bytes at 0x%llx\n", 1447 do_write ? "write" : "read", 1448 size, blk_rq_pos(rq) * 512ULL); 1449 1450 num_segs = rbd_get_num_segments(&rbd_dev->header, ofs, size); 1451 coll = rbd_alloc_coll(num_segs); 1452 if (!coll) { 1453 spin_lock_irq(q->queue_lock); 1454 __blk_end_request_all(rq, -ENOMEM); 1455 goto next; 1456 } 1457 1458 do { 1459 /* a bio clone to be passed down to OSD req */ 1460 dout("rq->bio->bi_vcnt=%d\n", rq->bio->bi_vcnt); 1461 op_size = rbd_get_segment(&rbd_dev->header, 1462 rbd_dev->header.block_name, 1463 ofs, size, 1464 NULL, NULL); 1465 kref_get(&coll->kref); 1466 bio = bio_chain_clone(&rq_bio, &next_bio, &bp, 1467 op_size, GFP_ATOMIC); 1468 if (!bio) { 1469 rbd_coll_end_req_index(rq, coll, cur_seg, 1470 -ENOMEM, op_size); 1471 goto next_seg; 1472 } 1473 1474 1475 /* init OSD command: write or read */ 1476 if (do_write) 1477 rbd_req_write(rq, rbd_dev, 1478 rbd_dev->header.snapc, 1479 ofs, 1480 op_size, bio, 1481 coll, cur_seg); 1482 else 1483 rbd_req_read(rq, rbd_dev, 1484 cur_snap_id(rbd_dev), 1485 ofs, 1486 op_size, bio, 1487 coll, cur_seg); 1488 1489 next_seg: 1490 size -= op_size; 1491 ofs += op_size; 1492 1493 cur_seg++; 1494 rq_bio = next_bio; 1495 } while (size > 0); 1496 kref_put(&coll->kref, rbd_coll_release); 1497 1498 if (bp) 1499 bio_pair_release(bp); 1500 spin_lock_irq(q->queue_lock); 1501 next: 1502 rq = blk_fetch_request(q); 1503 } 1504 } 1505 1506 /* 1507 * a queue callback. Makes sure that we don't create a bio that spans across 1508 * multiple osd objects. One exception would be with a single page bios, 1509 * which we handle later at bio_chain_clone 1510 */ 1511 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd, 1512 struct bio_vec *bvec) 1513 { 1514 struct rbd_device *rbd_dev = q->queuedata; 1515 unsigned int chunk_sectors = 1 << (rbd_dev->header.obj_order - 9); 1516 sector_t sector = bmd->bi_sector + get_start_sect(bmd->bi_bdev); 1517 unsigned int bio_sectors = bmd->bi_size >> 9; 1518 int max; 1519 1520 max = (chunk_sectors - ((sector & (chunk_sectors - 1)) 1521 + bio_sectors)) << 9; 1522 if (max < 0) 1523 max = 0; /* bio_add cannot handle a negative return */ 1524 if (max <= bvec->bv_len && bio_sectors == 0) 1525 return bvec->bv_len; 1526 return max; 1527 } 1528 1529 static void rbd_free_disk(struct rbd_device *rbd_dev) 1530 { 1531 struct gendisk *disk = rbd_dev->disk; 1532 1533 if (!disk) 1534 return; 1535 1536 rbd_header_free(&rbd_dev->header); 1537 1538 if (disk->flags & GENHD_FL_UP) 1539 del_gendisk(disk); 1540 if (disk->queue) 1541 blk_cleanup_queue(disk->queue); 1542 put_disk(disk); 1543 } 1544 1545 /* 1546 * reload the ondisk the header 1547 */ 1548 static int rbd_read_header(struct rbd_device *rbd_dev, 1549 struct rbd_image_header *header) 1550 { 1551 ssize_t rc; 1552 struct rbd_image_header_ondisk *dh; 1553 int snap_count = 0; 1554 u64 snap_names_len = 0; 1555 u64 ver; 1556 1557 while (1) { 1558 int len = sizeof(*dh) + 1559 snap_count * sizeof(struct rbd_image_snap_ondisk) + 1560 snap_names_len; 1561 1562 rc = -ENOMEM; 1563 dh = kmalloc(len, GFP_KERNEL); 1564 if (!dh) 1565 return -ENOMEM; 1566 1567 rc = rbd_req_sync_read(rbd_dev, 1568 NULL, CEPH_NOSNAP, 1569 rbd_dev->obj_md_name, 1570 0, len, 1571 (char *)dh, &ver); 1572 if (rc < 0) 1573 goto out_dh; 1574 1575 rc = rbd_header_from_disk(header, dh, snap_count, GFP_KERNEL); 1576 if (rc < 0) 1577 goto out_dh; 1578 1579 if (snap_count != header->total_snaps) { 1580 snap_count = header->total_snaps; 1581 snap_names_len = header->snap_names_len; 1582 rbd_header_free(header); 1583 kfree(dh); 1584 continue; 1585 } 1586 break; 1587 } 1588 header->obj_version = ver; 1589 1590 out_dh: 1591 kfree(dh); 1592 return rc; 1593 } 1594 1595 /* 1596 * create a snapshot 1597 */ 1598 static int rbd_header_add_snap(struct rbd_device *dev, 1599 const char *snap_name, 1600 gfp_t gfp_flags) 1601 { 1602 int name_len = strlen(snap_name); 1603 u64 new_snapid; 1604 int ret; 1605 void *data, *p, *e; 1606 u64 ver; 1607 1608 /* we should create a snapshot only if we're pointing at the head */ 1609 if (dev->cur_snap) 1610 return -EINVAL; 1611 1612 ret = ceph_monc_create_snapid(&dev->client->monc, dev->poolid, 1613 &new_snapid); 1614 dout("created snapid=%lld\n", new_snapid); 1615 if (ret < 0) 1616 return ret; 1617 1618 data = kmalloc(name_len + 16, gfp_flags); 1619 if (!data) 1620 return -ENOMEM; 1621 1622 p = data; 1623 e = data + name_len + 16; 1624 1625 ceph_encode_string_safe(&p, e, snap_name, name_len, bad); 1626 ceph_encode_64_safe(&p, e, new_snapid, bad); 1627 1628 ret = rbd_req_sync_exec(dev, dev->obj_md_name, "rbd", "snap_add", 1629 data, p - data, &ver); 1630 1631 kfree(data); 1632 1633 if (ret < 0) 1634 return ret; 1635 1636 dev->header.snapc->seq = new_snapid; 1637 1638 return 0; 1639 bad: 1640 return -ERANGE; 1641 } 1642 1643 static void __rbd_remove_all_snaps(struct rbd_device *rbd_dev) 1644 { 1645 struct rbd_snap *snap; 1646 1647 while (!list_empty(&rbd_dev->snaps)) { 1648 snap = list_first_entry(&rbd_dev->snaps, struct rbd_snap, node); 1649 __rbd_remove_snap_dev(rbd_dev, snap); 1650 } 1651 } 1652 1653 /* 1654 * only read the first part of the ondisk header, without the snaps info 1655 */ 1656 static int __rbd_update_snaps(struct rbd_device *rbd_dev) 1657 { 1658 int ret; 1659 struct rbd_image_header h; 1660 u64 snap_seq; 1661 int follow_seq = 0; 1662 1663 ret = rbd_read_header(rbd_dev, &h); 1664 if (ret < 0) 1665 return ret; 1666 1667 /* resized? */ 1668 set_capacity(rbd_dev->disk, h.image_size / 512ULL); 1669 1670 down_write(&rbd_dev->header.snap_rwsem); 1671 1672 snap_seq = rbd_dev->header.snapc->seq; 1673 if (rbd_dev->header.total_snaps && 1674 rbd_dev->header.snapc->snaps[0] == snap_seq) 1675 /* pointing at the head, will need to follow that 1676 if head moves */ 1677 follow_seq = 1; 1678 1679 kfree(rbd_dev->header.snapc); 1680 kfree(rbd_dev->header.snap_names); 1681 kfree(rbd_dev->header.snap_sizes); 1682 1683 rbd_dev->header.total_snaps = h.total_snaps; 1684 rbd_dev->header.snapc = h.snapc; 1685 rbd_dev->header.snap_names = h.snap_names; 1686 rbd_dev->header.snap_names_len = h.snap_names_len; 1687 rbd_dev->header.snap_sizes = h.snap_sizes; 1688 if (follow_seq) 1689 rbd_dev->header.snapc->seq = rbd_dev->header.snapc->snaps[0]; 1690 else 1691 rbd_dev->header.snapc->seq = snap_seq; 1692 1693 ret = __rbd_init_snaps_header(rbd_dev); 1694 1695 up_write(&rbd_dev->header.snap_rwsem); 1696 1697 return ret; 1698 } 1699 1700 static int rbd_init_disk(struct rbd_device *rbd_dev) 1701 { 1702 struct gendisk *disk; 1703 struct request_queue *q; 1704 int rc; 1705 u64 total_size = 0; 1706 1707 /* contact OSD, request size info about the object being mapped */ 1708 rc = rbd_read_header(rbd_dev, &rbd_dev->header); 1709 if (rc) 1710 return rc; 1711 1712 /* no need to lock here, as rbd_dev is not registered yet */ 1713 rc = __rbd_init_snaps_header(rbd_dev); 1714 if (rc) 1715 return rc; 1716 1717 rc = rbd_header_set_snap(rbd_dev, rbd_dev->snap_name, &total_size); 1718 if (rc) 1719 return rc; 1720 1721 /* create gendisk info */ 1722 rc = -ENOMEM; 1723 disk = alloc_disk(RBD_MINORS_PER_MAJOR); 1724 if (!disk) 1725 goto out; 1726 1727 snprintf(disk->disk_name, sizeof(disk->disk_name), DRV_NAME "%d", 1728 rbd_dev->id); 1729 disk->major = rbd_dev->major; 1730 disk->first_minor = 0; 1731 disk->fops = &rbd_bd_ops; 1732 disk->private_data = rbd_dev; 1733 1734 /* init rq */ 1735 rc = -ENOMEM; 1736 q = blk_init_queue(rbd_rq_fn, &rbd_dev->lock); 1737 if (!q) 1738 goto out_disk; 1739 blk_queue_merge_bvec(q, rbd_merge_bvec); 1740 disk->queue = q; 1741 1742 q->queuedata = rbd_dev; 1743 1744 rbd_dev->disk = disk; 1745 rbd_dev->q = q; 1746 1747 /* finally, announce the disk to the world */ 1748 set_capacity(disk, total_size / 512ULL); 1749 add_disk(disk); 1750 1751 pr_info("%s: added with size 0x%llx\n", 1752 disk->disk_name, (unsigned long long)total_size); 1753 return 0; 1754 1755 out_disk: 1756 put_disk(disk); 1757 out: 1758 return rc; 1759 } 1760 1761 /* 1762 sysfs 1763 */ 1764 1765 static ssize_t rbd_size_show(struct device *dev, 1766 struct device_attribute *attr, char *buf) 1767 { 1768 struct rbd_device *rbd_dev = dev_to_rbd(dev); 1769 1770 return sprintf(buf, "%llu\n", (unsigned long long)rbd_dev->header.image_size); 1771 } 1772 1773 static ssize_t rbd_major_show(struct device *dev, 1774 struct device_attribute *attr, char *buf) 1775 { 1776 struct rbd_device *rbd_dev = dev_to_rbd(dev); 1777 1778 return sprintf(buf, "%d\n", rbd_dev->major); 1779 } 1780 1781 static ssize_t rbd_client_id_show(struct device *dev, 1782 struct device_attribute *attr, char *buf) 1783 { 1784 struct rbd_device *rbd_dev = dev_to_rbd(dev); 1785 1786 return sprintf(buf, "client%lld\n", ceph_client_id(rbd_dev->client)); 1787 } 1788 1789 static ssize_t rbd_pool_show(struct device *dev, 1790 struct device_attribute *attr, char *buf) 1791 { 1792 struct rbd_device *rbd_dev = dev_to_rbd(dev); 1793 1794 return sprintf(buf, "%s\n", rbd_dev->pool_name); 1795 } 1796 1797 static ssize_t rbd_name_show(struct device *dev, 1798 struct device_attribute *attr, char *buf) 1799 { 1800 struct rbd_device *rbd_dev = dev_to_rbd(dev); 1801 1802 return sprintf(buf, "%s\n", rbd_dev->obj); 1803 } 1804 1805 static ssize_t rbd_snap_show(struct device *dev, 1806 struct device_attribute *attr, 1807 char *buf) 1808 { 1809 struct rbd_device *rbd_dev = dev_to_rbd(dev); 1810 1811 return sprintf(buf, "%s\n", rbd_dev->snap_name); 1812 } 1813 1814 static ssize_t rbd_image_refresh(struct device *dev, 1815 struct device_attribute *attr, 1816 const char *buf, 1817 size_t size) 1818 { 1819 struct rbd_device *rbd_dev = dev_to_rbd(dev); 1820 int rc; 1821 int ret = size; 1822 1823 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING); 1824 1825 rc = __rbd_update_snaps(rbd_dev); 1826 if (rc < 0) 1827 ret = rc; 1828 1829 mutex_unlock(&ctl_mutex); 1830 return ret; 1831 } 1832 1833 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL); 1834 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL); 1835 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL); 1836 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL); 1837 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL); 1838 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh); 1839 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL); 1840 static DEVICE_ATTR(create_snap, S_IWUSR, NULL, rbd_snap_add); 1841 static DEVICE_ATTR(rollback_snap, S_IWUSR, NULL, rbd_snap_rollback); 1842 1843 static struct attribute *rbd_attrs[] = { 1844 &dev_attr_size.attr, 1845 &dev_attr_major.attr, 1846 &dev_attr_client_id.attr, 1847 &dev_attr_pool.attr, 1848 &dev_attr_name.attr, 1849 &dev_attr_current_snap.attr, 1850 &dev_attr_refresh.attr, 1851 &dev_attr_create_snap.attr, 1852 &dev_attr_rollback_snap.attr, 1853 NULL 1854 }; 1855 1856 static struct attribute_group rbd_attr_group = { 1857 .attrs = rbd_attrs, 1858 }; 1859 1860 static const struct attribute_group *rbd_attr_groups[] = { 1861 &rbd_attr_group, 1862 NULL 1863 }; 1864 1865 static void rbd_sysfs_dev_release(struct device *dev) 1866 { 1867 } 1868 1869 static struct device_type rbd_device_type = { 1870 .name = "rbd", 1871 .groups = rbd_attr_groups, 1872 .release = rbd_sysfs_dev_release, 1873 }; 1874 1875 1876 /* 1877 sysfs - snapshots 1878 */ 1879 1880 static ssize_t rbd_snap_size_show(struct device *dev, 1881 struct device_attribute *attr, 1882 char *buf) 1883 { 1884 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev); 1885 1886 return sprintf(buf, "%lld\n", (long long)snap->size); 1887 } 1888 1889 static ssize_t rbd_snap_id_show(struct device *dev, 1890 struct device_attribute *attr, 1891 char *buf) 1892 { 1893 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev); 1894 1895 return sprintf(buf, "%lld\n", (long long)snap->id); 1896 } 1897 1898 static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL); 1899 static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL); 1900 1901 static struct attribute *rbd_snap_attrs[] = { 1902 &dev_attr_snap_size.attr, 1903 &dev_attr_snap_id.attr, 1904 NULL, 1905 }; 1906 1907 static struct attribute_group rbd_snap_attr_group = { 1908 .attrs = rbd_snap_attrs, 1909 }; 1910 1911 static void rbd_snap_dev_release(struct device *dev) 1912 { 1913 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev); 1914 kfree(snap->name); 1915 kfree(snap); 1916 } 1917 1918 static const struct attribute_group *rbd_snap_attr_groups[] = { 1919 &rbd_snap_attr_group, 1920 NULL 1921 }; 1922 1923 static struct device_type rbd_snap_device_type = { 1924 .groups = rbd_snap_attr_groups, 1925 .release = rbd_snap_dev_release, 1926 }; 1927 1928 static void __rbd_remove_snap_dev(struct rbd_device *rbd_dev, 1929 struct rbd_snap *snap) 1930 { 1931 list_del(&snap->node); 1932 device_unregister(&snap->dev); 1933 } 1934 1935 static int rbd_register_snap_dev(struct rbd_device *rbd_dev, 1936 struct rbd_snap *snap, 1937 struct device *parent) 1938 { 1939 struct device *dev = &snap->dev; 1940 int ret; 1941 1942 dev->type = &rbd_snap_device_type; 1943 dev->parent = parent; 1944 dev->release = rbd_snap_dev_release; 1945 dev_set_name(dev, "snap_%s", snap->name); 1946 ret = device_register(dev); 1947 1948 return ret; 1949 } 1950 1951 static int __rbd_add_snap_dev(struct rbd_device *rbd_dev, 1952 int i, const char *name, 1953 struct rbd_snap **snapp) 1954 { 1955 int ret; 1956 struct rbd_snap *snap = kzalloc(sizeof(*snap), GFP_KERNEL); 1957 if (!snap) 1958 return -ENOMEM; 1959 snap->name = kstrdup(name, GFP_KERNEL); 1960 snap->size = rbd_dev->header.snap_sizes[i]; 1961 snap->id = rbd_dev->header.snapc->snaps[i]; 1962 if (device_is_registered(&rbd_dev->dev)) { 1963 ret = rbd_register_snap_dev(rbd_dev, snap, 1964 &rbd_dev->dev); 1965 if (ret < 0) 1966 goto err; 1967 } 1968 *snapp = snap; 1969 return 0; 1970 err: 1971 kfree(snap->name); 1972 kfree(snap); 1973 return ret; 1974 } 1975 1976 /* 1977 * search for the previous snap in a null delimited string list 1978 */ 1979 const char *rbd_prev_snap_name(const char *name, const char *start) 1980 { 1981 if (name < start + 2) 1982 return NULL; 1983 1984 name -= 2; 1985 while (*name) { 1986 if (name == start) 1987 return start; 1988 name--; 1989 } 1990 return name + 1; 1991 } 1992 1993 /* 1994 * compare the old list of snapshots that we have to what's in the header 1995 * and update it accordingly. Note that the header holds the snapshots 1996 * in a reverse order (from newest to oldest) and we need to go from 1997 * older to new so that we don't get a duplicate snap name when 1998 * doing the process (e.g., removed snapshot and recreated a new 1999 * one with the same name. 2000 */ 2001 static int __rbd_init_snaps_header(struct rbd_device *rbd_dev) 2002 { 2003 const char *name, *first_name; 2004 int i = rbd_dev->header.total_snaps; 2005 struct rbd_snap *snap, *old_snap = NULL; 2006 int ret; 2007 struct list_head *p, *n; 2008 2009 first_name = rbd_dev->header.snap_names; 2010 name = first_name + rbd_dev->header.snap_names_len; 2011 2012 list_for_each_prev_safe(p, n, &rbd_dev->snaps) { 2013 u64 cur_id; 2014 2015 old_snap = list_entry(p, struct rbd_snap, node); 2016 2017 if (i) 2018 cur_id = rbd_dev->header.snapc->snaps[i - 1]; 2019 2020 if (!i || old_snap->id < cur_id) { 2021 /* old_snap->id was skipped, thus was removed */ 2022 __rbd_remove_snap_dev(rbd_dev, old_snap); 2023 continue; 2024 } 2025 if (old_snap->id == cur_id) { 2026 /* we have this snapshot already */ 2027 i--; 2028 name = rbd_prev_snap_name(name, first_name); 2029 continue; 2030 } 2031 for (; i > 0; 2032 i--, name = rbd_prev_snap_name(name, first_name)) { 2033 if (!name) { 2034 WARN_ON(1); 2035 return -EINVAL; 2036 } 2037 cur_id = rbd_dev->header.snapc->snaps[i]; 2038 /* snapshot removal? handle it above */ 2039 if (cur_id >= old_snap->id) 2040 break; 2041 /* a new snapshot */ 2042 ret = __rbd_add_snap_dev(rbd_dev, i - 1, name, &snap); 2043 if (ret < 0) 2044 return ret; 2045 2046 /* note that we add it backward so using n and not p */ 2047 list_add(&snap->node, n); 2048 p = &snap->node; 2049 } 2050 } 2051 /* we're done going over the old snap list, just add what's left */ 2052 for (; i > 0; i--) { 2053 name = rbd_prev_snap_name(name, first_name); 2054 if (!name) { 2055 WARN_ON(1); 2056 return -EINVAL; 2057 } 2058 ret = __rbd_add_snap_dev(rbd_dev, i - 1, name, &snap); 2059 if (ret < 0) 2060 return ret; 2061 list_add(&snap->node, &rbd_dev->snaps); 2062 } 2063 2064 return 0; 2065 } 2066 2067 2068 static void rbd_root_dev_release(struct device *dev) 2069 { 2070 } 2071 2072 static struct device rbd_root_dev = { 2073 .init_name = "rbd", 2074 .release = rbd_root_dev_release, 2075 }; 2076 2077 static int rbd_bus_add_dev(struct rbd_device *rbd_dev) 2078 { 2079 int ret = -ENOMEM; 2080 struct device *dev; 2081 struct rbd_snap *snap; 2082 2083 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING); 2084 dev = &rbd_dev->dev; 2085 2086 dev->bus = &rbd_bus_type; 2087 dev->type = &rbd_device_type; 2088 dev->parent = &rbd_root_dev; 2089 dev->release = rbd_dev_release; 2090 dev_set_name(dev, "%d", rbd_dev->id); 2091 ret = device_register(dev); 2092 if (ret < 0) 2093 goto done_free; 2094 2095 list_for_each_entry(snap, &rbd_dev->snaps, node) { 2096 ret = rbd_register_snap_dev(rbd_dev, snap, 2097 &rbd_dev->dev); 2098 if (ret < 0) 2099 break; 2100 } 2101 2102 mutex_unlock(&ctl_mutex); 2103 return 0; 2104 done_free: 2105 mutex_unlock(&ctl_mutex); 2106 return ret; 2107 } 2108 2109 static void rbd_bus_del_dev(struct rbd_device *rbd_dev) 2110 { 2111 device_unregister(&rbd_dev->dev); 2112 } 2113 2114 static int rbd_init_watch_dev(struct rbd_device *rbd_dev) 2115 { 2116 int ret, rc; 2117 2118 do { 2119 ret = rbd_req_sync_watch(rbd_dev, rbd_dev->obj_md_name, 2120 rbd_dev->header.obj_version); 2121 if (ret == -ERANGE) { 2122 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING); 2123 rc = __rbd_update_snaps(rbd_dev); 2124 mutex_unlock(&ctl_mutex); 2125 if (rc < 0) 2126 return rc; 2127 } 2128 } while (ret == -ERANGE); 2129 2130 return ret; 2131 } 2132 2133 static ssize_t rbd_add(struct bus_type *bus, 2134 const char *buf, 2135 size_t count) 2136 { 2137 struct ceph_osd_client *osdc; 2138 struct rbd_device *rbd_dev; 2139 ssize_t rc = -ENOMEM; 2140 int irc, new_id = 0; 2141 struct list_head *tmp; 2142 char *mon_dev_name; 2143 char *options; 2144 2145 if (!try_module_get(THIS_MODULE)) 2146 return -ENODEV; 2147 2148 mon_dev_name = kmalloc(RBD_MAX_OPT_LEN, GFP_KERNEL); 2149 if (!mon_dev_name) 2150 goto err_out_mod; 2151 2152 options = kmalloc(RBD_MAX_OPT_LEN, GFP_KERNEL); 2153 if (!options) 2154 goto err_mon_dev; 2155 2156 /* new rbd_device object */ 2157 rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL); 2158 if (!rbd_dev) 2159 goto err_out_opt; 2160 2161 /* static rbd_device initialization */ 2162 spin_lock_init(&rbd_dev->lock); 2163 INIT_LIST_HEAD(&rbd_dev->node); 2164 INIT_LIST_HEAD(&rbd_dev->snaps); 2165 2166 /* generate unique id: find highest unique id, add one */ 2167 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING); 2168 2169 list_for_each(tmp, &rbd_dev_list) { 2170 struct rbd_device *rbd_dev; 2171 2172 rbd_dev = list_entry(tmp, struct rbd_device, node); 2173 if (rbd_dev->id >= new_id) 2174 new_id = rbd_dev->id + 1; 2175 } 2176 2177 rbd_dev->id = new_id; 2178 2179 /* add to global list */ 2180 list_add_tail(&rbd_dev->node, &rbd_dev_list); 2181 2182 /* parse add command */ 2183 if (sscanf(buf, "%" __stringify(RBD_MAX_OPT_LEN) "s " 2184 "%" __stringify(RBD_MAX_OPT_LEN) "s " 2185 "%" __stringify(RBD_MAX_POOL_NAME_LEN) "s " 2186 "%" __stringify(RBD_MAX_OBJ_NAME_LEN) "s" 2187 "%" __stringify(RBD_MAX_SNAP_NAME_LEN) "s", 2188 mon_dev_name, options, rbd_dev->pool_name, 2189 rbd_dev->obj, rbd_dev->snap_name) < 4) { 2190 rc = -EINVAL; 2191 goto err_out_slot; 2192 } 2193 2194 if (rbd_dev->snap_name[0] == 0) 2195 rbd_dev->snap_name[0] = '-'; 2196 2197 rbd_dev->obj_len = strlen(rbd_dev->obj); 2198 snprintf(rbd_dev->obj_md_name, sizeof(rbd_dev->obj_md_name), "%s%s", 2199 rbd_dev->obj, RBD_SUFFIX); 2200 2201 /* initialize rest of new object */ 2202 snprintf(rbd_dev->name, DEV_NAME_LEN, DRV_NAME "%d", rbd_dev->id); 2203 rc = rbd_get_client(rbd_dev, mon_dev_name, options); 2204 if (rc < 0) 2205 goto err_out_slot; 2206 2207 mutex_unlock(&ctl_mutex); 2208 2209 /* pick the pool */ 2210 osdc = &rbd_dev->client->osdc; 2211 rc = ceph_pg_poolid_by_name(osdc->osdmap, rbd_dev->pool_name); 2212 if (rc < 0) 2213 goto err_out_client; 2214 rbd_dev->poolid = rc; 2215 2216 /* register our block device */ 2217 irc = register_blkdev(0, rbd_dev->name); 2218 if (irc < 0) { 2219 rc = irc; 2220 goto err_out_client; 2221 } 2222 rbd_dev->major = irc; 2223 2224 rc = rbd_bus_add_dev(rbd_dev); 2225 if (rc) 2226 goto err_out_blkdev; 2227 2228 /* set up and announce blkdev mapping */ 2229 rc = rbd_init_disk(rbd_dev); 2230 if (rc) 2231 goto err_out_bus; 2232 2233 rc = rbd_init_watch_dev(rbd_dev); 2234 if (rc) 2235 goto err_out_bus; 2236 2237 return count; 2238 2239 err_out_bus: 2240 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING); 2241 list_del_init(&rbd_dev->node); 2242 mutex_unlock(&ctl_mutex); 2243 2244 /* this will also clean up rest of rbd_dev stuff */ 2245 2246 rbd_bus_del_dev(rbd_dev); 2247 kfree(options); 2248 kfree(mon_dev_name); 2249 return rc; 2250 2251 err_out_blkdev: 2252 unregister_blkdev(rbd_dev->major, rbd_dev->name); 2253 err_out_client: 2254 rbd_put_client(rbd_dev); 2255 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING); 2256 err_out_slot: 2257 list_del_init(&rbd_dev->node); 2258 mutex_unlock(&ctl_mutex); 2259 2260 kfree(rbd_dev); 2261 err_out_opt: 2262 kfree(options); 2263 err_mon_dev: 2264 kfree(mon_dev_name); 2265 err_out_mod: 2266 dout("Error adding device %s\n", buf); 2267 module_put(THIS_MODULE); 2268 return rc; 2269 } 2270 2271 static struct rbd_device *__rbd_get_dev(unsigned long id) 2272 { 2273 struct list_head *tmp; 2274 struct rbd_device *rbd_dev; 2275 2276 list_for_each(tmp, &rbd_dev_list) { 2277 rbd_dev = list_entry(tmp, struct rbd_device, node); 2278 if (rbd_dev->id == id) 2279 return rbd_dev; 2280 } 2281 return NULL; 2282 } 2283 2284 static void rbd_dev_release(struct device *dev) 2285 { 2286 struct rbd_device *rbd_dev = 2287 container_of(dev, struct rbd_device, dev); 2288 2289 if (rbd_dev->watch_request) 2290 ceph_osdc_unregister_linger_request(&rbd_dev->client->osdc, 2291 rbd_dev->watch_request); 2292 if (rbd_dev->watch_event) 2293 ceph_osdc_cancel_event(rbd_dev->watch_event); 2294 2295 rbd_put_client(rbd_dev); 2296 2297 /* clean up and free blkdev */ 2298 rbd_free_disk(rbd_dev); 2299 unregister_blkdev(rbd_dev->major, rbd_dev->name); 2300 kfree(rbd_dev); 2301 2302 /* release module ref */ 2303 module_put(THIS_MODULE); 2304 } 2305 2306 static ssize_t rbd_remove(struct bus_type *bus, 2307 const char *buf, 2308 size_t count) 2309 { 2310 struct rbd_device *rbd_dev = NULL; 2311 int target_id, rc; 2312 unsigned long ul; 2313 int ret = count; 2314 2315 rc = strict_strtoul(buf, 10, &ul); 2316 if (rc) 2317 return rc; 2318 2319 /* convert to int; abort if we lost anything in the conversion */ 2320 target_id = (int) ul; 2321 if (target_id != ul) 2322 return -EINVAL; 2323 2324 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING); 2325 2326 rbd_dev = __rbd_get_dev(target_id); 2327 if (!rbd_dev) { 2328 ret = -ENOENT; 2329 goto done; 2330 } 2331 2332 list_del_init(&rbd_dev->node); 2333 2334 __rbd_remove_all_snaps(rbd_dev); 2335 rbd_bus_del_dev(rbd_dev); 2336 2337 done: 2338 mutex_unlock(&ctl_mutex); 2339 return ret; 2340 } 2341 2342 static ssize_t rbd_snap_add(struct device *dev, 2343 struct device_attribute *attr, 2344 const char *buf, 2345 size_t count) 2346 { 2347 struct rbd_device *rbd_dev = dev_to_rbd(dev); 2348 int ret; 2349 char *name = kmalloc(count + 1, GFP_KERNEL); 2350 if (!name) 2351 return -ENOMEM; 2352 2353 snprintf(name, count, "%s", buf); 2354 2355 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING); 2356 2357 ret = rbd_header_add_snap(rbd_dev, 2358 name, GFP_KERNEL); 2359 if (ret < 0) 2360 goto err_unlock; 2361 2362 ret = __rbd_update_snaps(rbd_dev); 2363 if (ret < 0) 2364 goto err_unlock; 2365 2366 /* shouldn't hold ctl_mutex when notifying.. notify might 2367 trigger a watch callback that would need to get that mutex */ 2368 mutex_unlock(&ctl_mutex); 2369 2370 /* make a best effort, don't error if failed */ 2371 rbd_req_sync_notify(rbd_dev, rbd_dev->obj_md_name); 2372 2373 ret = count; 2374 kfree(name); 2375 return ret; 2376 2377 err_unlock: 2378 mutex_unlock(&ctl_mutex); 2379 kfree(name); 2380 return ret; 2381 } 2382 2383 static ssize_t rbd_snap_rollback(struct device *dev, 2384 struct device_attribute *attr, 2385 const char *buf, 2386 size_t count) 2387 { 2388 struct rbd_device *rbd_dev = dev_to_rbd(dev); 2389 int ret; 2390 u64 snapid; 2391 u64 cur_ofs; 2392 char *seg_name = NULL; 2393 char *snap_name = kmalloc(count + 1, GFP_KERNEL); 2394 ret = -ENOMEM; 2395 if (!snap_name) 2396 return ret; 2397 2398 /* parse snaps add command */ 2399 snprintf(snap_name, count, "%s", buf); 2400 seg_name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO); 2401 if (!seg_name) 2402 goto done; 2403 2404 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING); 2405 2406 ret = snap_by_name(&rbd_dev->header, snap_name, &snapid, NULL); 2407 if (ret < 0) 2408 goto done_unlock; 2409 2410 dout("snapid=%lld\n", snapid); 2411 2412 cur_ofs = 0; 2413 while (cur_ofs < rbd_dev->header.image_size) { 2414 cur_ofs += rbd_get_segment(&rbd_dev->header, 2415 rbd_dev->obj, 2416 cur_ofs, (u64)-1, 2417 seg_name, NULL); 2418 dout("seg_name=%s\n", seg_name); 2419 2420 ret = rbd_req_sync_rollback_obj(rbd_dev, snapid, seg_name); 2421 if (ret < 0) 2422 pr_warning("could not roll back obj %s err=%d\n", 2423 seg_name, ret); 2424 } 2425 2426 ret = __rbd_update_snaps(rbd_dev); 2427 if (ret < 0) 2428 goto done_unlock; 2429 2430 ret = count; 2431 2432 done_unlock: 2433 mutex_unlock(&ctl_mutex); 2434 done: 2435 kfree(seg_name); 2436 kfree(snap_name); 2437 2438 return ret; 2439 } 2440 2441 static struct bus_attribute rbd_bus_attrs[] = { 2442 __ATTR(add, S_IWUSR, NULL, rbd_add), 2443 __ATTR(remove, S_IWUSR, NULL, rbd_remove), 2444 __ATTR_NULL 2445 }; 2446 2447 /* 2448 * create control files in sysfs 2449 * /sys/bus/rbd/... 2450 */ 2451 static int rbd_sysfs_init(void) 2452 { 2453 int ret; 2454 2455 rbd_bus_type.bus_attrs = rbd_bus_attrs; 2456 2457 ret = bus_register(&rbd_bus_type); 2458 if (ret < 0) 2459 return ret; 2460 2461 ret = device_register(&rbd_root_dev); 2462 2463 return ret; 2464 } 2465 2466 static void rbd_sysfs_cleanup(void) 2467 { 2468 device_unregister(&rbd_root_dev); 2469 bus_unregister(&rbd_bus_type); 2470 } 2471 2472 int __init rbd_init(void) 2473 { 2474 int rc; 2475 2476 rc = rbd_sysfs_init(); 2477 if (rc) 2478 return rc; 2479 spin_lock_init(&node_lock); 2480 pr_info("loaded " DRV_NAME_LONG "\n"); 2481 return 0; 2482 } 2483 2484 void __exit rbd_exit(void) 2485 { 2486 rbd_sysfs_cleanup(); 2487 } 2488 2489 module_init(rbd_init); 2490 module_exit(rbd_exit); 2491 2492 MODULE_AUTHOR("Sage Weil <sage@newdream.net>"); 2493 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>"); 2494 MODULE_DESCRIPTION("rados block device"); 2495 2496 /* following authorship retained from original osdblk.c */ 2497 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>"); 2498 2499 MODULE_LICENSE("GPL"); 2500