1 /* 2 rbd.c -- Export ceph rados objects as a Linux block device 3 4 5 based on drivers/block/osdblk.c: 6 7 Copyright 2009 Red Hat, Inc. 8 9 This program is free software; you can redistribute it and/or modify 10 it under the terms of the GNU General Public License as published by 11 the Free Software Foundation. 12 13 This program is distributed in the hope that it will be useful, 14 but WITHOUT ANY WARRANTY; without even the implied warranty of 15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 16 GNU General Public License for more details. 17 18 You should have received a copy of the GNU General Public License 19 along with this program; see the file COPYING. If not, write to 20 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA. 21 22 23 24 For usage instructions, please refer to: 25 26 Documentation/ABI/testing/sysfs-bus-rbd 27 28 */ 29 30 #include <linux/ceph/libceph.h> 31 #include <linux/ceph/osd_client.h> 32 #include <linux/ceph/mon_client.h> 33 #include <linux/ceph/decode.h> 34 35 #include <linux/kernel.h> 36 #include <linux/device.h> 37 #include <linux/module.h> 38 #include <linux/fs.h> 39 #include <linux/blkdev.h> 40 41 #include "rbd_types.h" 42 43 #define DRV_NAME "rbd" 44 #define DRV_NAME_LONG "rbd (rados block device)" 45 46 #define RBD_MINORS_PER_MAJOR 256 /* max minors per blkdev */ 47 48 #define RBD_MAX_MD_NAME_LEN (96 + sizeof(RBD_SUFFIX)) 49 #define RBD_MAX_POOL_NAME_LEN 64 50 #define RBD_MAX_SNAP_NAME_LEN 32 51 #define RBD_MAX_OPT_LEN 1024 52 53 #define RBD_SNAP_HEAD_NAME "-" 54 55 #define DEV_NAME_LEN 32 56 57 /* 58 * block device image metadata (in-memory version) 59 */ 60 struct rbd_image_header { 61 u64 image_size; 62 char block_name[32]; 63 __u8 obj_order; 64 __u8 crypt_type; 65 __u8 comp_type; 66 struct rw_semaphore snap_rwsem; 67 struct ceph_snap_context *snapc; 68 size_t snap_names_len; 69 u64 snap_seq; 70 u32 total_snaps; 71 72 char *snap_names; 73 u64 *snap_sizes; 74 }; 75 76 /* 77 * an instance of the client. multiple devices may share a client. 78 */ 79 struct rbd_client { 80 struct ceph_client *client; 81 struct kref kref; 82 struct list_head node; 83 }; 84 85 /* 86 * a single io request 87 */ 88 struct rbd_request { 89 struct request *rq; /* blk layer request */ 90 struct bio *bio; /* cloned bio */ 91 struct page **pages; /* list of used pages */ 92 u64 len; 93 }; 94 95 struct rbd_snap { 96 struct device dev; 97 const char *name; 98 size_t size; 99 struct list_head node; 100 u64 id; 101 }; 102 103 /* 104 * a single device 105 */ 106 struct rbd_device { 107 int id; /* blkdev unique id */ 108 109 int major; /* blkdev assigned major */ 110 struct gendisk *disk; /* blkdev's gendisk and rq */ 111 struct request_queue *q; 112 113 struct ceph_client *client; 114 struct rbd_client *rbd_client; 115 116 char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */ 117 118 spinlock_t lock; /* queue lock */ 119 120 struct rbd_image_header header; 121 char obj[RBD_MAX_OBJ_NAME_LEN]; /* rbd image name */ 122 int obj_len; 123 char obj_md_name[RBD_MAX_MD_NAME_LEN]; /* hdr nm. */ 124 char pool_name[RBD_MAX_POOL_NAME_LEN]; 125 int poolid; 126 127 char snap_name[RBD_MAX_SNAP_NAME_LEN]; 128 u32 cur_snap; /* index+1 of current snapshot within snap context 129 0 - for the head */ 130 int read_only; 131 132 struct list_head node; 133 134 /* list of snapshots */ 135 struct list_head snaps; 136 137 /* sysfs related */ 138 struct device dev; 139 }; 140 141 static struct bus_type rbd_bus_type = { 142 .name = "rbd", 143 }; 144 145 static spinlock_t node_lock; /* protects client get/put */ 146 147 static DEFINE_MUTEX(ctl_mutex); /* Serialize open/close/setup/teardown */ 148 static LIST_HEAD(rbd_dev_list); /* devices */ 149 static LIST_HEAD(rbd_client_list); /* clients */ 150 151 static int __rbd_init_snaps_header(struct rbd_device *rbd_dev); 152 static void rbd_dev_release(struct device *dev); 153 static ssize_t rbd_snap_rollback(struct device *dev, 154 struct device_attribute *attr, 155 const char *buf, 156 size_t size); 157 static ssize_t rbd_snap_add(struct device *dev, 158 struct device_attribute *attr, 159 const char *buf, 160 size_t count); 161 static void __rbd_remove_snap_dev(struct rbd_device *rbd_dev, 162 struct rbd_snap *snap);; 163 164 165 static struct rbd_device *dev_to_rbd(struct device *dev) 166 { 167 return container_of(dev, struct rbd_device, dev); 168 } 169 170 static struct device *rbd_get_dev(struct rbd_device *rbd_dev) 171 { 172 return get_device(&rbd_dev->dev); 173 } 174 175 static void rbd_put_dev(struct rbd_device *rbd_dev) 176 { 177 put_device(&rbd_dev->dev); 178 } 179 180 static int rbd_open(struct block_device *bdev, fmode_t mode) 181 { 182 struct gendisk *disk = bdev->bd_disk; 183 struct rbd_device *rbd_dev = disk->private_data; 184 185 rbd_get_dev(rbd_dev); 186 187 set_device_ro(bdev, rbd_dev->read_only); 188 189 if ((mode & FMODE_WRITE) && rbd_dev->read_only) 190 return -EROFS; 191 192 return 0; 193 } 194 195 static int rbd_release(struct gendisk *disk, fmode_t mode) 196 { 197 struct rbd_device *rbd_dev = disk->private_data; 198 199 rbd_put_dev(rbd_dev); 200 201 return 0; 202 } 203 204 static const struct block_device_operations rbd_bd_ops = { 205 .owner = THIS_MODULE, 206 .open = rbd_open, 207 .release = rbd_release, 208 }; 209 210 /* 211 * Initialize an rbd client instance. 212 * We own *opt. 213 */ 214 static struct rbd_client *rbd_client_create(struct ceph_options *opt) 215 { 216 struct rbd_client *rbdc; 217 int ret = -ENOMEM; 218 219 dout("rbd_client_create\n"); 220 rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL); 221 if (!rbdc) 222 goto out_opt; 223 224 kref_init(&rbdc->kref); 225 INIT_LIST_HEAD(&rbdc->node); 226 227 rbdc->client = ceph_create_client(opt, rbdc); 228 if (IS_ERR(rbdc->client)) 229 goto out_rbdc; 230 opt = NULL; /* Now rbdc->client is responsible for opt */ 231 232 ret = ceph_open_session(rbdc->client); 233 if (ret < 0) 234 goto out_err; 235 236 spin_lock(&node_lock); 237 list_add_tail(&rbdc->node, &rbd_client_list); 238 spin_unlock(&node_lock); 239 240 dout("rbd_client_create created %p\n", rbdc); 241 return rbdc; 242 243 out_err: 244 ceph_destroy_client(rbdc->client); 245 out_rbdc: 246 kfree(rbdc); 247 out_opt: 248 if (opt) 249 ceph_destroy_options(opt); 250 return ERR_PTR(ret); 251 } 252 253 /* 254 * Find a ceph client with specific addr and configuration. 255 */ 256 static struct rbd_client *__rbd_client_find(struct ceph_options *opt) 257 { 258 struct rbd_client *client_node; 259 260 if (opt->flags & CEPH_OPT_NOSHARE) 261 return NULL; 262 263 list_for_each_entry(client_node, &rbd_client_list, node) 264 if (ceph_compare_options(opt, client_node->client) == 0) 265 return client_node; 266 return NULL; 267 } 268 269 /* 270 * Get a ceph client with specific addr and configuration, if one does 271 * not exist create it. 272 */ 273 static int rbd_get_client(struct rbd_device *rbd_dev, const char *mon_addr, 274 char *options) 275 { 276 struct rbd_client *rbdc; 277 struct ceph_options *opt; 278 int ret; 279 280 ret = ceph_parse_options(&opt, options, mon_addr, 281 mon_addr + strlen(mon_addr), NULL, NULL); 282 if (ret < 0) 283 return ret; 284 285 spin_lock(&node_lock); 286 rbdc = __rbd_client_find(opt); 287 if (rbdc) { 288 ceph_destroy_options(opt); 289 290 /* using an existing client */ 291 kref_get(&rbdc->kref); 292 rbd_dev->rbd_client = rbdc; 293 rbd_dev->client = rbdc->client; 294 spin_unlock(&node_lock); 295 return 0; 296 } 297 spin_unlock(&node_lock); 298 299 rbdc = rbd_client_create(opt); 300 if (IS_ERR(rbdc)) 301 return PTR_ERR(rbdc); 302 303 rbd_dev->rbd_client = rbdc; 304 rbd_dev->client = rbdc->client; 305 return 0; 306 } 307 308 /* 309 * Destroy ceph client 310 */ 311 static void rbd_client_release(struct kref *kref) 312 { 313 struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref); 314 315 dout("rbd_release_client %p\n", rbdc); 316 spin_lock(&node_lock); 317 list_del(&rbdc->node); 318 spin_unlock(&node_lock); 319 320 ceph_destroy_client(rbdc->client); 321 kfree(rbdc); 322 } 323 324 /* 325 * Drop reference to ceph client node. If it's not referenced anymore, release 326 * it. 327 */ 328 static void rbd_put_client(struct rbd_device *rbd_dev) 329 { 330 kref_put(&rbd_dev->rbd_client->kref, rbd_client_release); 331 rbd_dev->rbd_client = NULL; 332 rbd_dev->client = NULL; 333 } 334 335 336 /* 337 * Create a new header structure, translate header format from the on-disk 338 * header. 339 */ 340 static int rbd_header_from_disk(struct rbd_image_header *header, 341 struct rbd_image_header_ondisk *ondisk, 342 int allocated_snaps, 343 gfp_t gfp_flags) 344 { 345 int i; 346 u32 snap_count = le32_to_cpu(ondisk->snap_count); 347 int ret = -ENOMEM; 348 349 init_rwsem(&header->snap_rwsem); 350 header->snap_names_len = le64_to_cpu(ondisk->snap_names_len); 351 header->snapc = kmalloc(sizeof(struct ceph_snap_context) + 352 snap_count * 353 sizeof(struct rbd_image_snap_ondisk), 354 gfp_flags); 355 if (!header->snapc) 356 return -ENOMEM; 357 if (snap_count) { 358 header->snap_names = kmalloc(header->snap_names_len, 359 GFP_KERNEL); 360 if (!header->snap_names) 361 goto err_snapc; 362 header->snap_sizes = kmalloc(snap_count * sizeof(u64), 363 GFP_KERNEL); 364 if (!header->snap_sizes) 365 goto err_names; 366 } else { 367 header->snap_names = NULL; 368 header->snap_sizes = NULL; 369 } 370 memcpy(header->block_name, ondisk->block_name, 371 sizeof(ondisk->block_name)); 372 373 header->image_size = le64_to_cpu(ondisk->image_size); 374 header->obj_order = ondisk->options.order; 375 header->crypt_type = ondisk->options.crypt_type; 376 header->comp_type = ondisk->options.comp_type; 377 378 atomic_set(&header->snapc->nref, 1); 379 header->snap_seq = le64_to_cpu(ondisk->snap_seq); 380 header->snapc->num_snaps = snap_count; 381 header->total_snaps = snap_count; 382 383 if (snap_count && 384 allocated_snaps == snap_count) { 385 for (i = 0; i < snap_count; i++) { 386 header->snapc->snaps[i] = 387 le64_to_cpu(ondisk->snaps[i].id); 388 header->snap_sizes[i] = 389 le64_to_cpu(ondisk->snaps[i].image_size); 390 } 391 392 /* copy snapshot names */ 393 memcpy(header->snap_names, &ondisk->snaps[i], 394 header->snap_names_len); 395 } 396 397 return 0; 398 399 err_names: 400 kfree(header->snap_names); 401 err_snapc: 402 kfree(header->snapc); 403 return ret; 404 } 405 406 static int snap_index(struct rbd_image_header *header, int snap_num) 407 { 408 return header->total_snaps - snap_num; 409 } 410 411 static u64 cur_snap_id(struct rbd_device *rbd_dev) 412 { 413 struct rbd_image_header *header = &rbd_dev->header; 414 415 if (!rbd_dev->cur_snap) 416 return 0; 417 418 return header->snapc->snaps[snap_index(header, rbd_dev->cur_snap)]; 419 } 420 421 static int snap_by_name(struct rbd_image_header *header, const char *snap_name, 422 u64 *seq, u64 *size) 423 { 424 int i; 425 char *p = header->snap_names; 426 427 for (i = 0; i < header->total_snaps; i++, p += strlen(p) + 1) { 428 if (strcmp(snap_name, p) == 0) 429 break; 430 } 431 if (i == header->total_snaps) 432 return -ENOENT; 433 if (seq) 434 *seq = header->snapc->snaps[i]; 435 436 if (size) 437 *size = header->snap_sizes[i]; 438 439 return i; 440 } 441 442 static int rbd_header_set_snap(struct rbd_device *dev, 443 const char *snap_name, 444 u64 *size) 445 { 446 struct rbd_image_header *header = &dev->header; 447 struct ceph_snap_context *snapc = header->snapc; 448 int ret = -ENOENT; 449 450 down_write(&header->snap_rwsem); 451 452 if (!snap_name || 453 !*snap_name || 454 strcmp(snap_name, "-") == 0 || 455 strcmp(snap_name, RBD_SNAP_HEAD_NAME) == 0) { 456 if (header->total_snaps) 457 snapc->seq = header->snap_seq; 458 else 459 snapc->seq = 0; 460 dev->cur_snap = 0; 461 dev->read_only = 0; 462 if (size) 463 *size = header->image_size; 464 } else { 465 ret = snap_by_name(header, snap_name, &snapc->seq, size); 466 if (ret < 0) 467 goto done; 468 469 dev->cur_snap = header->total_snaps - ret; 470 dev->read_only = 1; 471 } 472 473 ret = 0; 474 done: 475 up_write(&header->snap_rwsem); 476 return ret; 477 } 478 479 static void rbd_header_free(struct rbd_image_header *header) 480 { 481 kfree(header->snapc); 482 kfree(header->snap_names); 483 kfree(header->snap_sizes); 484 } 485 486 /* 487 * get the actual striped segment name, offset and length 488 */ 489 static u64 rbd_get_segment(struct rbd_image_header *header, 490 const char *block_name, 491 u64 ofs, u64 len, 492 char *seg_name, u64 *segofs) 493 { 494 u64 seg = ofs >> header->obj_order; 495 496 if (seg_name) 497 snprintf(seg_name, RBD_MAX_SEG_NAME_LEN, 498 "%s.%012llx", block_name, seg); 499 500 ofs = ofs & ((1 << header->obj_order) - 1); 501 len = min_t(u64, len, (1 << header->obj_order) - ofs); 502 503 if (segofs) 504 *segofs = ofs; 505 506 return len; 507 } 508 509 /* 510 * bio helpers 511 */ 512 513 static void bio_chain_put(struct bio *chain) 514 { 515 struct bio *tmp; 516 517 while (chain) { 518 tmp = chain; 519 chain = chain->bi_next; 520 bio_put(tmp); 521 } 522 } 523 524 /* 525 * zeros a bio chain, starting at specific offset 526 */ 527 static void zero_bio_chain(struct bio *chain, int start_ofs) 528 { 529 struct bio_vec *bv; 530 unsigned long flags; 531 void *buf; 532 int i; 533 int pos = 0; 534 535 while (chain) { 536 bio_for_each_segment(bv, chain, i) { 537 if (pos + bv->bv_len > start_ofs) { 538 int remainder = max(start_ofs - pos, 0); 539 buf = bvec_kmap_irq(bv, &flags); 540 memset(buf + remainder, 0, 541 bv->bv_len - remainder); 542 bvec_kunmap_irq(buf, &flags); 543 } 544 pos += bv->bv_len; 545 } 546 547 chain = chain->bi_next; 548 } 549 } 550 551 /* 552 * bio_chain_clone - clone a chain of bios up to a certain length. 553 * might return a bio_pair that will need to be released. 554 */ 555 static struct bio *bio_chain_clone(struct bio **old, struct bio **next, 556 struct bio_pair **bp, 557 int len, gfp_t gfpmask) 558 { 559 struct bio *tmp, *old_chain = *old, *new_chain = NULL, *tail = NULL; 560 int total = 0; 561 562 if (*bp) { 563 bio_pair_release(*bp); 564 *bp = NULL; 565 } 566 567 while (old_chain && (total < len)) { 568 tmp = bio_kmalloc(gfpmask, old_chain->bi_max_vecs); 569 if (!tmp) 570 goto err_out; 571 572 if (total + old_chain->bi_size > len) { 573 struct bio_pair *bp; 574 575 /* 576 * this split can only happen with a single paged bio, 577 * split_bio will BUG_ON if this is not the case 578 */ 579 dout("bio_chain_clone split! total=%d remaining=%d" 580 "bi_size=%d\n", 581 (int)total, (int)len-total, 582 (int)old_chain->bi_size); 583 584 /* split the bio. We'll release it either in the next 585 call, or it will have to be released outside */ 586 bp = bio_split(old_chain, (len - total) / 512ULL); 587 if (!bp) 588 goto err_out; 589 590 __bio_clone(tmp, &bp->bio1); 591 592 *next = &bp->bio2; 593 } else { 594 __bio_clone(tmp, old_chain); 595 *next = old_chain->bi_next; 596 } 597 598 tmp->bi_bdev = NULL; 599 gfpmask &= ~__GFP_WAIT; 600 tmp->bi_next = NULL; 601 602 if (!new_chain) { 603 new_chain = tail = tmp; 604 } else { 605 tail->bi_next = tmp; 606 tail = tmp; 607 } 608 old_chain = old_chain->bi_next; 609 610 total += tmp->bi_size; 611 } 612 613 BUG_ON(total < len); 614 615 if (tail) 616 tail->bi_next = NULL; 617 618 *old = old_chain; 619 620 return new_chain; 621 622 err_out: 623 dout("bio_chain_clone with err\n"); 624 bio_chain_put(new_chain); 625 return NULL; 626 } 627 628 /* 629 * helpers for osd request op vectors. 630 */ 631 static int rbd_create_rw_ops(struct ceph_osd_req_op **ops, 632 int num_ops, 633 int opcode, 634 u32 payload_len) 635 { 636 *ops = kzalloc(sizeof(struct ceph_osd_req_op) * (num_ops + 1), 637 GFP_NOIO); 638 if (!*ops) 639 return -ENOMEM; 640 (*ops)[0].op = opcode; 641 /* 642 * op extent offset and length will be set later on 643 * in calc_raw_layout() 644 */ 645 (*ops)[0].payload_len = payload_len; 646 return 0; 647 } 648 649 static void rbd_destroy_ops(struct ceph_osd_req_op *ops) 650 { 651 kfree(ops); 652 } 653 654 /* 655 * Send ceph osd request 656 */ 657 static int rbd_do_request(struct request *rq, 658 struct rbd_device *dev, 659 struct ceph_snap_context *snapc, 660 u64 snapid, 661 const char *obj, u64 ofs, u64 len, 662 struct bio *bio, 663 struct page **pages, 664 int num_pages, 665 int flags, 666 struct ceph_osd_req_op *ops, 667 int num_reply, 668 void (*rbd_cb)(struct ceph_osd_request *req, 669 struct ceph_msg *msg)) 670 { 671 struct ceph_osd_request *req; 672 struct ceph_file_layout *layout; 673 int ret; 674 u64 bno; 675 struct timespec mtime = CURRENT_TIME; 676 struct rbd_request *req_data; 677 struct ceph_osd_request_head *reqhead; 678 struct rbd_image_header *header = &dev->header; 679 680 ret = -ENOMEM; 681 req_data = kzalloc(sizeof(*req_data), GFP_NOIO); 682 if (!req_data) 683 goto done; 684 685 dout("rbd_do_request len=%lld ofs=%lld\n", len, ofs); 686 687 down_read(&header->snap_rwsem); 688 689 req = ceph_osdc_alloc_request(&dev->client->osdc, flags, 690 snapc, 691 ops, 692 false, 693 GFP_NOIO, pages, bio); 694 if (IS_ERR(req)) { 695 up_read(&header->snap_rwsem); 696 ret = PTR_ERR(req); 697 goto done_pages; 698 } 699 700 req->r_callback = rbd_cb; 701 702 req_data->rq = rq; 703 req_data->bio = bio; 704 req_data->pages = pages; 705 req_data->len = len; 706 707 req->r_priv = req_data; 708 709 reqhead = req->r_request->front.iov_base; 710 reqhead->snapid = cpu_to_le64(CEPH_NOSNAP); 711 712 strncpy(req->r_oid, obj, sizeof(req->r_oid)); 713 req->r_oid_len = strlen(req->r_oid); 714 715 layout = &req->r_file_layout; 716 memset(layout, 0, sizeof(*layout)); 717 layout->fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER); 718 layout->fl_stripe_count = cpu_to_le32(1); 719 layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER); 720 layout->fl_pg_preferred = cpu_to_le32(-1); 721 layout->fl_pg_pool = cpu_to_le32(dev->poolid); 722 ceph_calc_raw_layout(&dev->client->osdc, layout, snapid, 723 ofs, &len, &bno, req, ops); 724 725 ceph_osdc_build_request(req, ofs, &len, 726 ops, 727 snapc, 728 &mtime, 729 req->r_oid, req->r_oid_len); 730 up_read(&header->snap_rwsem); 731 732 ret = ceph_osdc_start_request(&dev->client->osdc, req, false); 733 if (ret < 0) 734 goto done_err; 735 736 if (!rbd_cb) { 737 ret = ceph_osdc_wait_request(&dev->client->osdc, req); 738 ceph_osdc_put_request(req); 739 } 740 return ret; 741 742 done_err: 743 bio_chain_put(req_data->bio); 744 ceph_osdc_put_request(req); 745 done_pages: 746 kfree(req_data); 747 done: 748 if (rq) 749 blk_end_request(rq, ret, len); 750 return ret; 751 } 752 753 /* 754 * Ceph osd op callback 755 */ 756 static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg) 757 { 758 struct rbd_request *req_data = req->r_priv; 759 struct ceph_osd_reply_head *replyhead; 760 struct ceph_osd_op *op; 761 __s32 rc; 762 u64 bytes; 763 int read_op; 764 765 /* parse reply */ 766 replyhead = msg->front.iov_base; 767 WARN_ON(le32_to_cpu(replyhead->num_ops) == 0); 768 op = (void *)(replyhead + 1); 769 rc = le32_to_cpu(replyhead->result); 770 bytes = le64_to_cpu(op->extent.length); 771 read_op = (le32_to_cpu(op->op) == CEPH_OSD_OP_READ); 772 773 dout("rbd_req_cb bytes=%lld readop=%d rc=%d\n", bytes, read_op, rc); 774 775 if (rc == -ENOENT && read_op) { 776 zero_bio_chain(req_data->bio, 0); 777 rc = 0; 778 } else if (rc == 0 && read_op && bytes < req_data->len) { 779 zero_bio_chain(req_data->bio, bytes); 780 bytes = req_data->len; 781 } 782 783 blk_end_request(req_data->rq, rc, bytes); 784 785 if (req_data->bio) 786 bio_chain_put(req_data->bio); 787 788 ceph_osdc_put_request(req); 789 kfree(req_data); 790 } 791 792 /* 793 * Do a synchronous ceph osd operation 794 */ 795 static int rbd_req_sync_op(struct rbd_device *dev, 796 struct ceph_snap_context *snapc, 797 u64 snapid, 798 int opcode, 799 int flags, 800 struct ceph_osd_req_op *orig_ops, 801 int num_reply, 802 const char *obj, 803 u64 ofs, u64 len, 804 char *buf) 805 { 806 int ret; 807 struct page **pages; 808 int num_pages; 809 struct ceph_osd_req_op *ops = orig_ops; 810 u32 payload_len; 811 812 num_pages = calc_pages_for(ofs , len); 813 pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL); 814 if (IS_ERR(pages)) 815 return PTR_ERR(pages); 816 817 if (!orig_ops) { 818 payload_len = (flags & CEPH_OSD_FLAG_WRITE ? len : 0); 819 ret = rbd_create_rw_ops(&ops, 1, opcode, payload_len); 820 if (ret < 0) 821 goto done; 822 823 if ((flags & CEPH_OSD_FLAG_WRITE) && buf) { 824 ret = ceph_copy_to_page_vector(pages, buf, ofs, len); 825 if (ret < 0) 826 goto done_ops; 827 } 828 } 829 830 ret = rbd_do_request(NULL, dev, snapc, snapid, 831 obj, ofs, len, NULL, 832 pages, num_pages, 833 flags, 834 ops, 835 2, 836 NULL); 837 if (ret < 0) 838 goto done_ops; 839 840 if ((flags & CEPH_OSD_FLAG_READ) && buf) 841 ret = ceph_copy_from_page_vector(pages, buf, ofs, ret); 842 843 done_ops: 844 if (!orig_ops) 845 rbd_destroy_ops(ops); 846 done: 847 ceph_release_page_vector(pages, num_pages); 848 return ret; 849 } 850 851 /* 852 * Do an asynchronous ceph osd operation 853 */ 854 static int rbd_do_op(struct request *rq, 855 struct rbd_device *rbd_dev , 856 struct ceph_snap_context *snapc, 857 u64 snapid, 858 int opcode, int flags, int num_reply, 859 u64 ofs, u64 len, 860 struct bio *bio) 861 { 862 char *seg_name; 863 u64 seg_ofs; 864 u64 seg_len; 865 int ret; 866 struct ceph_osd_req_op *ops; 867 u32 payload_len; 868 869 seg_name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO); 870 if (!seg_name) 871 return -ENOMEM; 872 873 seg_len = rbd_get_segment(&rbd_dev->header, 874 rbd_dev->header.block_name, 875 ofs, len, 876 seg_name, &seg_ofs); 877 878 payload_len = (flags & CEPH_OSD_FLAG_WRITE ? seg_len : 0); 879 880 ret = rbd_create_rw_ops(&ops, 1, opcode, payload_len); 881 if (ret < 0) 882 goto done; 883 884 /* we've taken care of segment sizes earlier when we 885 cloned the bios. We should never have a segment 886 truncated at this point */ 887 BUG_ON(seg_len < len); 888 889 ret = rbd_do_request(rq, rbd_dev, snapc, snapid, 890 seg_name, seg_ofs, seg_len, 891 bio, 892 NULL, 0, 893 flags, 894 ops, 895 num_reply, 896 rbd_req_cb); 897 done: 898 kfree(seg_name); 899 return ret; 900 } 901 902 /* 903 * Request async osd write 904 */ 905 static int rbd_req_write(struct request *rq, 906 struct rbd_device *rbd_dev, 907 struct ceph_snap_context *snapc, 908 u64 ofs, u64 len, 909 struct bio *bio) 910 { 911 return rbd_do_op(rq, rbd_dev, snapc, CEPH_NOSNAP, 912 CEPH_OSD_OP_WRITE, 913 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK, 914 2, 915 ofs, len, bio); 916 } 917 918 /* 919 * Request async osd read 920 */ 921 static int rbd_req_read(struct request *rq, 922 struct rbd_device *rbd_dev, 923 u64 snapid, 924 u64 ofs, u64 len, 925 struct bio *bio) 926 { 927 return rbd_do_op(rq, rbd_dev, NULL, 928 (snapid ? snapid : CEPH_NOSNAP), 929 CEPH_OSD_OP_READ, 930 CEPH_OSD_FLAG_READ, 931 2, 932 ofs, len, bio); 933 } 934 935 /* 936 * Request sync osd read 937 */ 938 static int rbd_req_sync_read(struct rbd_device *dev, 939 struct ceph_snap_context *snapc, 940 u64 snapid, 941 const char *obj, 942 u64 ofs, u64 len, 943 char *buf) 944 { 945 return rbd_req_sync_op(dev, NULL, 946 (snapid ? snapid : CEPH_NOSNAP), 947 CEPH_OSD_OP_READ, 948 CEPH_OSD_FLAG_READ, 949 NULL, 950 1, obj, ofs, len, buf); 951 } 952 953 /* 954 * Request sync osd read 955 */ 956 static int rbd_req_sync_rollback_obj(struct rbd_device *dev, 957 u64 snapid, 958 const char *obj) 959 { 960 struct ceph_osd_req_op *ops; 961 int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_ROLLBACK, 0); 962 if (ret < 0) 963 return ret; 964 965 ops[0].snap.snapid = snapid; 966 967 ret = rbd_req_sync_op(dev, NULL, 968 CEPH_NOSNAP, 969 0, 970 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK, 971 ops, 972 1, obj, 0, 0, NULL); 973 974 rbd_destroy_ops(ops); 975 976 if (ret < 0) 977 return ret; 978 979 return ret; 980 } 981 982 /* 983 * Request sync osd read 984 */ 985 static int rbd_req_sync_exec(struct rbd_device *dev, 986 const char *obj, 987 const char *cls, 988 const char *method, 989 const char *data, 990 int len) 991 { 992 struct ceph_osd_req_op *ops; 993 int cls_len = strlen(cls); 994 int method_len = strlen(method); 995 int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_CALL, 996 cls_len + method_len + len); 997 if (ret < 0) 998 return ret; 999 1000 ops[0].cls.class_name = cls; 1001 ops[0].cls.class_len = (__u8)cls_len; 1002 ops[0].cls.method_name = method; 1003 ops[0].cls.method_len = (__u8)method_len; 1004 ops[0].cls.argc = 0; 1005 ops[0].cls.indata = data; 1006 ops[0].cls.indata_len = len; 1007 1008 ret = rbd_req_sync_op(dev, NULL, 1009 CEPH_NOSNAP, 1010 0, 1011 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK, 1012 ops, 1013 1, obj, 0, 0, NULL); 1014 1015 rbd_destroy_ops(ops); 1016 1017 dout("cls_exec returned %d\n", ret); 1018 return ret; 1019 } 1020 1021 /* 1022 * block device queue callback 1023 */ 1024 static void rbd_rq_fn(struct request_queue *q) 1025 { 1026 struct rbd_device *rbd_dev = q->queuedata; 1027 struct request *rq; 1028 struct bio_pair *bp = NULL; 1029 1030 rq = blk_fetch_request(q); 1031 1032 while (1) { 1033 struct bio *bio; 1034 struct bio *rq_bio, *next_bio = NULL; 1035 bool do_write; 1036 int size, op_size = 0; 1037 u64 ofs; 1038 1039 /* peek at request from block layer */ 1040 if (!rq) 1041 break; 1042 1043 dout("fetched request\n"); 1044 1045 /* filter out block requests we don't understand */ 1046 if ((rq->cmd_type != REQ_TYPE_FS)) { 1047 __blk_end_request_all(rq, 0); 1048 goto next; 1049 } 1050 1051 /* deduce our operation (read, write) */ 1052 do_write = (rq_data_dir(rq) == WRITE); 1053 1054 size = blk_rq_bytes(rq); 1055 ofs = blk_rq_pos(rq) * 512ULL; 1056 rq_bio = rq->bio; 1057 if (do_write && rbd_dev->read_only) { 1058 __blk_end_request_all(rq, -EROFS); 1059 goto next; 1060 } 1061 1062 spin_unlock_irq(q->queue_lock); 1063 1064 dout("%s 0x%x bytes at 0x%llx\n", 1065 do_write ? "write" : "read", 1066 size, blk_rq_pos(rq) * 512ULL); 1067 1068 do { 1069 /* a bio clone to be passed down to OSD req */ 1070 dout("rq->bio->bi_vcnt=%d\n", rq->bio->bi_vcnt); 1071 op_size = rbd_get_segment(&rbd_dev->header, 1072 rbd_dev->header.block_name, 1073 ofs, size, 1074 NULL, NULL); 1075 bio = bio_chain_clone(&rq_bio, &next_bio, &bp, 1076 op_size, GFP_ATOMIC); 1077 if (!bio) { 1078 spin_lock_irq(q->queue_lock); 1079 __blk_end_request_all(rq, -ENOMEM); 1080 goto next; 1081 } 1082 1083 /* init OSD command: write or read */ 1084 if (do_write) 1085 rbd_req_write(rq, rbd_dev, 1086 rbd_dev->header.snapc, 1087 ofs, 1088 op_size, bio); 1089 else 1090 rbd_req_read(rq, rbd_dev, 1091 cur_snap_id(rbd_dev), 1092 ofs, 1093 op_size, bio); 1094 1095 size -= op_size; 1096 ofs += op_size; 1097 1098 rq_bio = next_bio; 1099 } while (size > 0); 1100 1101 if (bp) 1102 bio_pair_release(bp); 1103 1104 spin_lock_irq(q->queue_lock); 1105 next: 1106 rq = blk_fetch_request(q); 1107 } 1108 } 1109 1110 /* 1111 * a queue callback. Makes sure that we don't create a bio that spans across 1112 * multiple osd objects. One exception would be with a single page bios, 1113 * which we handle later at bio_chain_clone 1114 */ 1115 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd, 1116 struct bio_vec *bvec) 1117 { 1118 struct rbd_device *rbd_dev = q->queuedata; 1119 unsigned int chunk_sectors = 1 << (rbd_dev->header.obj_order - 9); 1120 sector_t sector = bmd->bi_sector + get_start_sect(bmd->bi_bdev); 1121 unsigned int bio_sectors = bmd->bi_size >> 9; 1122 int max; 1123 1124 max = (chunk_sectors - ((sector & (chunk_sectors - 1)) 1125 + bio_sectors)) << 9; 1126 if (max < 0) 1127 max = 0; /* bio_add cannot handle a negative return */ 1128 if (max <= bvec->bv_len && bio_sectors == 0) 1129 return bvec->bv_len; 1130 return max; 1131 } 1132 1133 static void rbd_free_disk(struct rbd_device *rbd_dev) 1134 { 1135 struct gendisk *disk = rbd_dev->disk; 1136 1137 if (!disk) 1138 return; 1139 1140 rbd_header_free(&rbd_dev->header); 1141 1142 if (disk->flags & GENHD_FL_UP) 1143 del_gendisk(disk); 1144 if (disk->queue) 1145 blk_cleanup_queue(disk->queue); 1146 put_disk(disk); 1147 } 1148 1149 /* 1150 * reload the ondisk the header 1151 */ 1152 static int rbd_read_header(struct rbd_device *rbd_dev, 1153 struct rbd_image_header *header) 1154 { 1155 ssize_t rc; 1156 struct rbd_image_header_ondisk *dh; 1157 int snap_count = 0; 1158 u64 snap_names_len = 0; 1159 1160 while (1) { 1161 int len = sizeof(*dh) + 1162 snap_count * sizeof(struct rbd_image_snap_ondisk) + 1163 snap_names_len; 1164 1165 rc = -ENOMEM; 1166 dh = kmalloc(len, GFP_KERNEL); 1167 if (!dh) 1168 return -ENOMEM; 1169 1170 rc = rbd_req_sync_read(rbd_dev, 1171 NULL, CEPH_NOSNAP, 1172 rbd_dev->obj_md_name, 1173 0, len, 1174 (char *)dh); 1175 if (rc < 0) 1176 goto out_dh; 1177 1178 rc = rbd_header_from_disk(header, dh, snap_count, GFP_KERNEL); 1179 if (rc < 0) 1180 goto out_dh; 1181 1182 if (snap_count != header->total_snaps) { 1183 snap_count = header->total_snaps; 1184 snap_names_len = header->snap_names_len; 1185 rbd_header_free(header); 1186 kfree(dh); 1187 continue; 1188 } 1189 break; 1190 } 1191 1192 out_dh: 1193 kfree(dh); 1194 return rc; 1195 } 1196 1197 /* 1198 * create a snapshot 1199 */ 1200 static int rbd_header_add_snap(struct rbd_device *dev, 1201 const char *snap_name, 1202 gfp_t gfp_flags) 1203 { 1204 int name_len = strlen(snap_name); 1205 u64 new_snapid; 1206 int ret; 1207 void *data, *data_start, *data_end; 1208 1209 /* we should create a snapshot only if we're pointing at the head */ 1210 if (dev->cur_snap) 1211 return -EINVAL; 1212 1213 ret = ceph_monc_create_snapid(&dev->client->monc, dev->poolid, 1214 &new_snapid); 1215 dout("created snapid=%lld\n", new_snapid); 1216 if (ret < 0) 1217 return ret; 1218 1219 data = kmalloc(name_len + 16, gfp_flags); 1220 if (!data) 1221 return -ENOMEM; 1222 1223 data_start = data; 1224 data_end = data + name_len + 16; 1225 1226 ceph_encode_string_safe(&data, data_end, snap_name, name_len, bad); 1227 ceph_encode_64_safe(&data, data_end, new_snapid, bad); 1228 1229 ret = rbd_req_sync_exec(dev, dev->obj_md_name, "rbd", "snap_add", 1230 data_start, data - data_start); 1231 1232 kfree(data_start); 1233 1234 if (ret < 0) 1235 return ret; 1236 1237 dev->header.snapc->seq = new_snapid; 1238 1239 return 0; 1240 bad: 1241 return -ERANGE; 1242 } 1243 1244 static void __rbd_remove_all_snaps(struct rbd_device *rbd_dev) 1245 { 1246 struct rbd_snap *snap; 1247 1248 while (!list_empty(&rbd_dev->snaps)) { 1249 snap = list_first_entry(&rbd_dev->snaps, struct rbd_snap, node); 1250 __rbd_remove_snap_dev(rbd_dev, snap); 1251 } 1252 } 1253 1254 /* 1255 * only read the first part of the ondisk header, without the snaps info 1256 */ 1257 static int __rbd_update_snaps(struct rbd_device *rbd_dev) 1258 { 1259 int ret; 1260 struct rbd_image_header h; 1261 u64 snap_seq; 1262 1263 ret = rbd_read_header(rbd_dev, &h); 1264 if (ret < 0) 1265 return ret; 1266 1267 down_write(&rbd_dev->header.snap_rwsem); 1268 1269 snap_seq = rbd_dev->header.snapc->seq; 1270 1271 kfree(rbd_dev->header.snapc); 1272 kfree(rbd_dev->header.snap_names); 1273 kfree(rbd_dev->header.snap_sizes); 1274 1275 rbd_dev->header.total_snaps = h.total_snaps; 1276 rbd_dev->header.snapc = h.snapc; 1277 rbd_dev->header.snap_names = h.snap_names; 1278 rbd_dev->header.snap_names_len = h.snap_names_len; 1279 rbd_dev->header.snap_sizes = h.snap_sizes; 1280 rbd_dev->header.snapc->seq = snap_seq; 1281 1282 ret = __rbd_init_snaps_header(rbd_dev); 1283 1284 up_write(&rbd_dev->header.snap_rwsem); 1285 1286 return ret; 1287 } 1288 1289 static int rbd_init_disk(struct rbd_device *rbd_dev) 1290 { 1291 struct gendisk *disk; 1292 struct request_queue *q; 1293 int rc; 1294 u64 total_size = 0; 1295 1296 /* contact OSD, request size info about the object being mapped */ 1297 rc = rbd_read_header(rbd_dev, &rbd_dev->header); 1298 if (rc) 1299 return rc; 1300 1301 /* no need to lock here, as rbd_dev is not registered yet */ 1302 rc = __rbd_init_snaps_header(rbd_dev); 1303 if (rc) 1304 return rc; 1305 1306 rc = rbd_header_set_snap(rbd_dev, rbd_dev->snap_name, &total_size); 1307 if (rc) 1308 return rc; 1309 1310 /* create gendisk info */ 1311 rc = -ENOMEM; 1312 disk = alloc_disk(RBD_MINORS_PER_MAJOR); 1313 if (!disk) 1314 goto out; 1315 1316 sprintf(disk->disk_name, DRV_NAME "%d", rbd_dev->id); 1317 disk->major = rbd_dev->major; 1318 disk->first_minor = 0; 1319 disk->fops = &rbd_bd_ops; 1320 disk->private_data = rbd_dev; 1321 1322 /* init rq */ 1323 rc = -ENOMEM; 1324 q = blk_init_queue(rbd_rq_fn, &rbd_dev->lock); 1325 if (!q) 1326 goto out_disk; 1327 blk_queue_merge_bvec(q, rbd_merge_bvec); 1328 disk->queue = q; 1329 1330 q->queuedata = rbd_dev; 1331 1332 rbd_dev->disk = disk; 1333 rbd_dev->q = q; 1334 1335 /* finally, announce the disk to the world */ 1336 set_capacity(disk, total_size / 512ULL); 1337 add_disk(disk); 1338 1339 pr_info("%s: added with size 0x%llx\n", 1340 disk->disk_name, (unsigned long long)total_size); 1341 return 0; 1342 1343 out_disk: 1344 put_disk(disk); 1345 out: 1346 return rc; 1347 } 1348 1349 /* 1350 sysfs 1351 */ 1352 1353 static ssize_t rbd_size_show(struct device *dev, 1354 struct device_attribute *attr, char *buf) 1355 { 1356 struct rbd_device *rbd_dev = dev_to_rbd(dev); 1357 1358 return sprintf(buf, "%llu\n", (unsigned long long)rbd_dev->header.image_size); 1359 } 1360 1361 static ssize_t rbd_major_show(struct device *dev, 1362 struct device_attribute *attr, char *buf) 1363 { 1364 struct rbd_device *rbd_dev = dev_to_rbd(dev); 1365 1366 return sprintf(buf, "%d\n", rbd_dev->major); 1367 } 1368 1369 static ssize_t rbd_client_id_show(struct device *dev, 1370 struct device_attribute *attr, char *buf) 1371 { 1372 struct rbd_device *rbd_dev = dev_to_rbd(dev); 1373 1374 return sprintf(buf, "client%lld\n", ceph_client_id(rbd_dev->client)); 1375 } 1376 1377 static ssize_t rbd_pool_show(struct device *dev, 1378 struct device_attribute *attr, char *buf) 1379 { 1380 struct rbd_device *rbd_dev = dev_to_rbd(dev); 1381 1382 return sprintf(buf, "%s\n", rbd_dev->pool_name); 1383 } 1384 1385 static ssize_t rbd_name_show(struct device *dev, 1386 struct device_attribute *attr, char *buf) 1387 { 1388 struct rbd_device *rbd_dev = dev_to_rbd(dev); 1389 1390 return sprintf(buf, "%s\n", rbd_dev->obj); 1391 } 1392 1393 static ssize_t rbd_snap_show(struct device *dev, 1394 struct device_attribute *attr, 1395 char *buf) 1396 { 1397 struct rbd_device *rbd_dev = dev_to_rbd(dev); 1398 1399 return sprintf(buf, "%s\n", rbd_dev->snap_name); 1400 } 1401 1402 static ssize_t rbd_image_refresh(struct device *dev, 1403 struct device_attribute *attr, 1404 const char *buf, 1405 size_t size) 1406 { 1407 struct rbd_device *rbd_dev = dev_to_rbd(dev); 1408 int rc; 1409 int ret = size; 1410 1411 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING); 1412 1413 rc = __rbd_update_snaps(rbd_dev); 1414 if (rc < 0) 1415 ret = rc; 1416 1417 mutex_unlock(&ctl_mutex); 1418 return ret; 1419 } 1420 1421 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL); 1422 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL); 1423 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL); 1424 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL); 1425 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL); 1426 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh); 1427 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL); 1428 static DEVICE_ATTR(create_snap, S_IWUSR, NULL, rbd_snap_add); 1429 static DEVICE_ATTR(rollback_snap, S_IWUSR, NULL, rbd_snap_rollback); 1430 1431 static struct attribute *rbd_attrs[] = { 1432 &dev_attr_size.attr, 1433 &dev_attr_major.attr, 1434 &dev_attr_client_id.attr, 1435 &dev_attr_pool.attr, 1436 &dev_attr_name.attr, 1437 &dev_attr_current_snap.attr, 1438 &dev_attr_refresh.attr, 1439 &dev_attr_create_snap.attr, 1440 &dev_attr_rollback_snap.attr, 1441 NULL 1442 }; 1443 1444 static struct attribute_group rbd_attr_group = { 1445 .attrs = rbd_attrs, 1446 }; 1447 1448 static const struct attribute_group *rbd_attr_groups[] = { 1449 &rbd_attr_group, 1450 NULL 1451 }; 1452 1453 static void rbd_sysfs_dev_release(struct device *dev) 1454 { 1455 } 1456 1457 static struct device_type rbd_device_type = { 1458 .name = "rbd", 1459 .groups = rbd_attr_groups, 1460 .release = rbd_sysfs_dev_release, 1461 }; 1462 1463 1464 /* 1465 sysfs - snapshots 1466 */ 1467 1468 static ssize_t rbd_snap_size_show(struct device *dev, 1469 struct device_attribute *attr, 1470 char *buf) 1471 { 1472 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev); 1473 1474 return sprintf(buf, "%lld\n", (long long)snap->size); 1475 } 1476 1477 static ssize_t rbd_snap_id_show(struct device *dev, 1478 struct device_attribute *attr, 1479 char *buf) 1480 { 1481 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev); 1482 1483 return sprintf(buf, "%lld\n", (long long)snap->id); 1484 } 1485 1486 static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL); 1487 static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL); 1488 1489 static struct attribute *rbd_snap_attrs[] = { 1490 &dev_attr_snap_size.attr, 1491 &dev_attr_snap_id.attr, 1492 NULL, 1493 }; 1494 1495 static struct attribute_group rbd_snap_attr_group = { 1496 .attrs = rbd_snap_attrs, 1497 }; 1498 1499 static void rbd_snap_dev_release(struct device *dev) 1500 { 1501 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev); 1502 kfree(snap->name); 1503 kfree(snap); 1504 } 1505 1506 static const struct attribute_group *rbd_snap_attr_groups[] = { 1507 &rbd_snap_attr_group, 1508 NULL 1509 }; 1510 1511 static struct device_type rbd_snap_device_type = { 1512 .groups = rbd_snap_attr_groups, 1513 .release = rbd_snap_dev_release, 1514 }; 1515 1516 static void __rbd_remove_snap_dev(struct rbd_device *rbd_dev, 1517 struct rbd_snap *snap) 1518 { 1519 list_del(&snap->node); 1520 device_unregister(&snap->dev); 1521 } 1522 1523 static int rbd_register_snap_dev(struct rbd_device *rbd_dev, 1524 struct rbd_snap *snap, 1525 struct device *parent) 1526 { 1527 struct device *dev = &snap->dev; 1528 int ret; 1529 1530 dev->type = &rbd_snap_device_type; 1531 dev->parent = parent; 1532 dev->release = rbd_snap_dev_release; 1533 dev_set_name(dev, "snap_%s", snap->name); 1534 ret = device_register(dev); 1535 1536 return ret; 1537 } 1538 1539 static int __rbd_add_snap_dev(struct rbd_device *rbd_dev, 1540 int i, const char *name, 1541 struct rbd_snap **snapp) 1542 { 1543 int ret; 1544 struct rbd_snap *snap = kzalloc(sizeof(*snap), GFP_KERNEL); 1545 if (!snap) 1546 return -ENOMEM; 1547 snap->name = kstrdup(name, GFP_KERNEL); 1548 snap->size = rbd_dev->header.snap_sizes[i]; 1549 snap->id = rbd_dev->header.snapc->snaps[i]; 1550 if (device_is_registered(&rbd_dev->dev)) { 1551 ret = rbd_register_snap_dev(rbd_dev, snap, 1552 &rbd_dev->dev); 1553 if (ret < 0) 1554 goto err; 1555 } 1556 *snapp = snap; 1557 return 0; 1558 err: 1559 kfree(snap->name); 1560 kfree(snap); 1561 return ret; 1562 } 1563 1564 /* 1565 * search for the previous snap in a null delimited string list 1566 */ 1567 const char *rbd_prev_snap_name(const char *name, const char *start) 1568 { 1569 if (name < start + 2) 1570 return NULL; 1571 1572 name -= 2; 1573 while (*name) { 1574 if (name == start) 1575 return start; 1576 name--; 1577 } 1578 return name + 1; 1579 } 1580 1581 /* 1582 * compare the old list of snapshots that we have to what's in the header 1583 * and update it accordingly. Note that the header holds the snapshots 1584 * in a reverse order (from newest to oldest) and we need to go from 1585 * older to new so that we don't get a duplicate snap name when 1586 * doing the process (e.g., removed snapshot and recreated a new 1587 * one with the same name. 1588 */ 1589 static int __rbd_init_snaps_header(struct rbd_device *rbd_dev) 1590 { 1591 const char *name, *first_name; 1592 int i = rbd_dev->header.total_snaps; 1593 struct rbd_snap *snap, *old_snap = NULL; 1594 int ret; 1595 struct list_head *p, *n; 1596 1597 first_name = rbd_dev->header.snap_names; 1598 name = first_name + rbd_dev->header.snap_names_len; 1599 1600 list_for_each_prev_safe(p, n, &rbd_dev->snaps) { 1601 u64 cur_id; 1602 1603 old_snap = list_entry(p, struct rbd_snap, node); 1604 1605 if (i) 1606 cur_id = rbd_dev->header.snapc->snaps[i - 1]; 1607 1608 if (!i || old_snap->id < cur_id) { 1609 /* old_snap->id was skipped, thus was removed */ 1610 __rbd_remove_snap_dev(rbd_dev, old_snap); 1611 continue; 1612 } 1613 if (old_snap->id == cur_id) { 1614 /* we have this snapshot already */ 1615 i--; 1616 name = rbd_prev_snap_name(name, first_name); 1617 continue; 1618 } 1619 for (; i > 0; 1620 i--, name = rbd_prev_snap_name(name, first_name)) { 1621 if (!name) { 1622 WARN_ON(1); 1623 return -EINVAL; 1624 } 1625 cur_id = rbd_dev->header.snapc->snaps[i]; 1626 /* snapshot removal? handle it above */ 1627 if (cur_id >= old_snap->id) 1628 break; 1629 /* a new snapshot */ 1630 ret = __rbd_add_snap_dev(rbd_dev, i - 1, name, &snap); 1631 if (ret < 0) 1632 return ret; 1633 1634 /* note that we add it backward so using n and not p */ 1635 list_add(&snap->node, n); 1636 p = &snap->node; 1637 } 1638 } 1639 /* we're done going over the old snap list, just add what's left */ 1640 for (; i > 0; i--) { 1641 name = rbd_prev_snap_name(name, first_name); 1642 if (!name) { 1643 WARN_ON(1); 1644 return -EINVAL; 1645 } 1646 ret = __rbd_add_snap_dev(rbd_dev, i - 1, name, &snap); 1647 if (ret < 0) 1648 return ret; 1649 list_add(&snap->node, &rbd_dev->snaps); 1650 } 1651 1652 return 0; 1653 } 1654 1655 1656 static void rbd_root_dev_release(struct device *dev) 1657 { 1658 } 1659 1660 static struct device rbd_root_dev = { 1661 .init_name = "rbd", 1662 .release = rbd_root_dev_release, 1663 }; 1664 1665 static int rbd_bus_add_dev(struct rbd_device *rbd_dev) 1666 { 1667 int ret = -ENOMEM; 1668 struct device *dev; 1669 struct rbd_snap *snap; 1670 1671 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING); 1672 dev = &rbd_dev->dev; 1673 1674 dev->bus = &rbd_bus_type; 1675 dev->type = &rbd_device_type; 1676 dev->parent = &rbd_root_dev; 1677 dev->release = rbd_dev_release; 1678 dev_set_name(dev, "%d", rbd_dev->id); 1679 ret = device_register(dev); 1680 if (ret < 0) 1681 goto done_free; 1682 1683 list_for_each_entry(snap, &rbd_dev->snaps, node) { 1684 ret = rbd_register_snap_dev(rbd_dev, snap, 1685 &rbd_dev->dev); 1686 if (ret < 0) 1687 break; 1688 } 1689 1690 mutex_unlock(&ctl_mutex); 1691 return 0; 1692 done_free: 1693 mutex_unlock(&ctl_mutex); 1694 return ret; 1695 } 1696 1697 static void rbd_bus_del_dev(struct rbd_device *rbd_dev) 1698 { 1699 device_unregister(&rbd_dev->dev); 1700 } 1701 1702 static ssize_t rbd_add(struct bus_type *bus, const char *buf, size_t count) 1703 { 1704 struct ceph_osd_client *osdc; 1705 struct rbd_device *rbd_dev; 1706 ssize_t rc = -ENOMEM; 1707 int irc, new_id = 0; 1708 struct list_head *tmp; 1709 char *mon_dev_name; 1710 char *options; 1711 1712 if (!try_module_get(THIS_MODULE)) 1713 return -ENODEV; 1714 1715 mon_dev_name = kmalloc(RBD_MAX_OPT_LEN, GFP_KERNEL); 1716 if (!mon_dev_name) 1717 goto err_out_mod; 1718 1719 options = kmalloc(RBD_MAX_OPT_LEN, GFP_KERNEL); 1720 if (!options) 1721 goto err_mon_dev; 1722 1723 /* new rbd_device object */ 1724 rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL); 1725 if (!rbd_dev) 1726 goto err_out_opt; 1727 1728 /* static rbd_device initialization */ 1729 spin_lock_init(&rbd_dev->lock); 1730 INIT_LIST_HEAD(&rbd_dev->node); 1731 INIT_LIST_HEAD(&rbd_dev->snaps); 1732 1733 /* generate unique id: find highest unique id, add one */ 1734 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING); 1735 1736 list_for_each(tmp, &rbd_dev_list) { 1737 struct rbd_device *rbd_dev; 1738 1739 rbd_dev = list_entry(tmp, struct rbd_device, node); 1740 if (rbd_dev->id >= new_id) 1741 new_id = rbd_dev->id + 1; 1742 } 1743 1744 rbd_dev->id = new_id; 1745 1746 /* add to global list */ 1747 list_add_tail(&rbd_dev->node, &rbd_dev_list); 1748 1749 /* parse add command */ 1750 if (sscanf(buf, "%" __stringify(RBD_MAX_OPT_LEN) "s " 1751 "%" __stringify(RBD_MAX_OPT_LEN) "s " 1752 "%" __stringify(RBD_MAX_POOL_NAME_LEN) "s " 1753 "%" __stringify(RBD_MAX_OBJ_NAME_LEN) "s" 1754 "%" __stringify(RBD_MAX_SNAP_NAME_LEN) "s", 1755 mon_dev_name, options, rbd_dev->pool_name, 1756 rbd_dev->obj, rbd_dev->snap_name) < 4) { 1757 rc = -EINVAL; 1758 goto err_out_slot; 1759 } 1760 1761 if (rbd_dev->snap_name[0] == 0) 1762 rbd_dev->snap_name[0] = '-'; 1763 1764 rbd_dev->obj_len = strlen(rbd_dev->obj); 1765 snprintf(rbd_dev->obj_md_name, sizeof(rbd_dev->obj_md_name), "%s%s", 1766 rbd_dev->obj, RBD_SUFFIX); 1767 1768 /* initialize rest of new object */ 1769 snprintf(rbd_dev->name, DEV_NAME_LEN, DRV_NAME "%d", rbd_dev->id); 1770 rc = rbd_get_client(rbd_dev, mon_dev_name, options); 1771 if (rc < 0) 1772 goto err_out_slot; 1773 1774 mutex_unlock(&ctl_mutex); 1775 1776 /* pick the pool */ 1777 osdc = &rbd_dev->client->osdc; 1778 rc = ceph_pg_poolid_by_name(osdc->osdmap, rbd_dev->pool_name); 1779 if (rc < 0) 1780 goto err_out_client; 1781 rbd_dev->poolid = rc; 1782 1783 /* register our block device */ 1784 irc = register_blkdev(0, rbd_dev->name); 1785 if (irc < 0) { 1786 rc = irc; 1787 goto err_out_client; 1788 } 1789 rbd_dev->major = irc; 1790 1791 rc = rbd_bus_add_dev(rbd_dev); 1792 if (rc) 1793 goto err_out_blkdev; 1794 1795 /* set up and announce blkdev mapping */ 1796 rc = rbd_init_disk(rbd_dev); 1797 if (rc) 1798 goto err_out_bus; 1799 1800 return count; 1801 1802 err_out_bus: 1803 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING); 1804 list_del_init(&rbd_dev->node); 1805 mutex_unlock(&ctl_mutex); 1806 1807 /* this will also clean up rest of rbd_dev stuff */ 1808 1809 rbd_bus_del_dev(rbd_dev); 1810 kfree(options); 1811 kfree(mon_dev_name); 1812 return rc; 1813 1814 err_out_blkdev: 1815 unregister_blkdev(rbd_dev->major, rbd_dev->name); 1816 err_out_client: 1817 rbd_put_client(rbd_dev); 1818 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING); 1819 err_out_slot: 1820 list_del_init(&rbd_dev->node); 1821 mutex_unlock(&ctl_mutex); 1822 1823 kfree(rbd_dev); 1824 err_out_opt: 1825 kfree(options); 1826 err_mon_dev: 1827 kfree(mon_dev_name); 1828 err_out_mod: 1829 dout("Error adding device %s\n", buf); 1830 module_put(THIS_MODULE); 1831 return rc; 1832 } 1833 1834 static struct rbd_device *__rbd_get_dev(unsigned long id) 1835 { 1836 struct list_head *tmp; 1837 struct rbd_device *rbd_dev; 1838 1839 list_for_each(tmp, &rbd_dev_list) { 1840 rbd_dev = list_entry(tmp, struct rbd_device, node); 1841 if (rbd_dev->id == id) 1842 return rbd_dev; 1843 } 1844 return NULL; 1845 } 1846 1847 static void rbd_dev_release(struct device *dev) 1848 { 1849 struct rbd_device *rbd_dev = 1850 container_of(dev, struct rbd_device, dev); 1851 1852 rbd_put_client(rbd_dev); 1853 1854 /* clean up and free blkdev */ 1855 rbd_free_disk(rbd_dev); 1856 unregister_blkdev(rbd_dev->major, rbd_dev->name); 1857 kfree(rbd_dev); 1858 1859 /* release module ref */ 1860 module_put(THIS_MODULE); 1861 } 1862 1863 static ssize_t rbd_remove(struct bus_type *bus, 1864 const char *buf, 1865 size_t count) 1866 { 1867 struct rbd_device *rbd_dev = NULL; 1868 int target_id, rc; 1869 unsigned long ul; 1870 int ret = count; 1871 1872 rc = strict_strtoul(buf, 10, &ul); 1873 if (rc) 1874 return rc; 1875 1876 /* convert to int; abort if we lost anything in the conversion */ 1877 target_id = (int) ul; 1878 if (target_id != ul) 1879 return -EINVAL; 1880 1881 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING); 1882 1883 rbd_dev = __rbd_get_dev(target_id); 1884 if (!rbd_dev) { 1885 ret = -ENOENT; 1886 goto done; 1887 } 1888 1889 list_del_init(&rbd_dev->node); 1890 1891 __rbd_remove_all_snaps(rbd_dev); 1892 rbd_bus_del_dev(rbd_dev); 1893 1894 done: 1895 mutex_unlock(&ctl_mutex); 1896 return ret; 1897 } 1898 1899 static ssize_t rbd_snap_add(struct device *dev, 1900 struct device_attribute *attr, 1901 const char *buf, 1902 size_t count) 1903 { 1904 struct rbd_device *rbd_dev = dev_to_rbd(dev); 1905 int ret; 1906 char *name = kmalloc(count + 1, GFP_KERNEL); 1907 if (!name) 1908 return -ENOMEM; 1909 1910 snprintf(name, count, "%s", buf); 1911 1912 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING); 1913 1914 ret = rbd_header_add_snap(rbd_dev, 1915 name, GFP_KERNEL); 1916 if (ret < 0) 1917 goto done_unlock; 1918 1919 ret = __rbd_update_snaps(rbd_dev); 1920 if (ret < 0) 1921 goto done_unlock; 1922 1923 ret = count; 1924 done_unlock: 1925 mutex_unlock(&ctl_mutex); 1926 kfree(name); 1927 return ret; 1928 } 1929 1930 static ssize_t rbd_snap_rollback(struct device *dev, 1931 struct device_attribute *attr, 1932 const char *buf, 1933 size_t count) 1934 { 1935 struct rbd_device *rbd_dev = dev_to_rbd(dev); 1936 int ret; 1937 u64 snapid; 1938 u64 cur_ofs; 1939 char *seg_name = NULL; 1940 char *snap_name = kmalloc(count + 1, GFP_KERNEL); 1941 ret = -ENOMEM; 1942 if (!snap_name) 1943 return ret; 1944 1945 /* parse snaps add command */ 1946 snprintf(snap_name, count, "%s", buf); 1947 seg_name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO); 1948 if (!seg_name) 1949 goto done; 1950 1951 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING); 1952 1953 ret = snap_by_name(&rbd_dev->header, snap_name, &snapid, NULL); 1954 if (ret < 0) 1955 goto done_unlock; 1956 1957 dout("snapid=%lld\n", snapid); 1958 1959 cur_ofs = 0; 1960 while (cur_ofs < rbd_dev->header.image_size) { 1961 cur_ofs += rbd_get_segment(&rbd_dev->header, 1962 rbd_dev->obj, 1963 cur_ofs, (u64)-1, 1964 seg_name, NULL); 1965 dout("seg_name=%s\n", seg_name); 1966 1967 ret = rbd_req_sync_rollback_obj(rbd_dev, snapid, seg_name); 1968 if (ret < 0) 1969 pr_warning("could not roll back obj %s err=%d\n", 1970 seg_name, ret); 1971 } 1972 1973 ret = __rbd_update_snaps(rbd_dev); 1974 if (ret < 0) 1975 goto done_unlock; 1976 1977 ret = count; 1978 1979 done_unlock: 1980 mutex_unlock(&ctl_mutex); 1981 done: 1982 kfree(seg_name); 1983 kfree(snap_name); 1984 1985 return ret; 1986 } 1987 1988 static struct bus_attribute rbd_bus_attrs[] = { 1989 __ATTR(add, S_IWUSR, NULL, rbd_add), 1990 __ATTR(remove, S_IWUSR, NULL, rbd_remove), 1991 __ATTR_NULL 1992 }; 1993 1994 /* 1995 * create control files in sysfs 1996 * /sys/bus/rbd/... 1997 */ 1998 static int rbd_sysfs_init(void) 1999 { 2000 int ret; 2001 2002 rbd_bus_type.bus_attrs = rbd_bus_attrs; 2003 2004 ret = bus_register(&rbd_bus_type); 2005 if (ret < 0) 2006 return ret; 2007 2008 ret = device_register(&rbd_root_dev); 2009 2010 return ret; 2011 } 2012 2013 static void rbd_sysfs_cleanup(void) 2014 { 2015 device_unregister(&rbd_root_dev); 2016 bus_unregister(&rbd_bus_type); 2017 } 2018 2019 int __init rbd_init(void) 2020 { 2021 int rc; 2022 2023 rc = rbd_sysfs_init(); 2024 if (rc) 2025 return rc; 2026 spin_lock_init(&node_lock); 2027 pr_info("loaded " DRV_NAME_LONG "\n"); 2028 return 0; 2029 } 2030 2031 void __exit rbd_exit(void) 2032 { 2033 rbd_sysfs_cleanup(); 2034 } 2035 2036 module_init(rbd_init); 2037 module_exit(rbd_exit); 2038 2039 MODULE_AUTHOR("Sage Weil <sage@newdream.net>"); 2040 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>"); 2041 MODULE_DESCRIPTION("rados block device"); 2042 2043 /* following authorship retained from original osdblk.c */ 2044 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>"); 2045 2046 MODULE_LICENSE("GPL"); 2047