1 /* 2 * QEMU Block driver for RADOS (Ceph) 3 * 4 * Copyright (C) 2010-2011 Christian Brunner <chb@muc.de>, 5 * Josh Durgin <josh.durgin@dreamhost.com> 6 * 7 * This work is licensed under the terms of the GNU GPL, version 2. See 8 * the COPYING file in the top-level directory. 9 * 10 * Contributions after 2012-01-13 are licensed under the terms of the 11 * GNU GPL, version 2 or (at your option) any later version. 12 */ 13 14 #include <inttypes.h> 15 16 #include "qemu-common.h" 17 #include "qemu-error.h" 18 #include "block_int.h" 19 20 #include <rbd/librbd.h> 21 22 /* 23 * When specifying the image filename use: 24 * 25 * rbd:poolname/devicename[@snapshotname][:option1=value1[:option2=value2...]] 26 * 27 * poolname must be the name of an existing rados pool. 28 * 29 * devicename is the name of the rbd image. 30 * 31 * Each option given is used to configure rados, and may be any valid 32 * Ceph option, "id", or "conf". 33 * 34 * The "id" option indicates what user we should authenticate as to 35 * the Ceph cluster. If it is excluded we will use the Ceph default 36 * (normally 'admin'). 37 * 38 * The "conf" option specifies a Ceph configuration file to read. If 39 * it is not specified, we will read from the default Ceph locations 40 * (e.g., /etc/ceph/ceph.conf). To avoid reading _any_ configuration 41 * file, specify conf=/dev/null. 42 * 43 * Configuration values containing :, @, or = can be escaped with a 44 * leading "\". 45 */ 46 47 /* rbd_aio_discard added in 0.1.2 */ 48 #if LIBRBD_VERSION_CODE >= LIBRBD_VERSION(0, 1, 2) 49 #define LIBRBD_SUPPORTS_DISCARD 50 #else 51 #undef LIBRBD_SUPPORTS_DISCARD 52 #endif 53 54 #define OBJ_MAX_SIZE (1UL << OBJ_DEFAULT_OBJ_ORDER) 55 56 #define RBD_MAX_CONF_NAME_SIZE 128 57 #define RBD_MAX_CONF_VAL_SIZE 512 58 #define RBD_MAX_CONF_SIZE 1024 59 #define RBD_MAX_POOL_NAME_SIZE 128 60 #define RBD_MAX_SNAP_NAME_SIZE 128 61 #define RBD_MAX_SNAPS 100 62 63 typedef enum { 64 RBD_AIO_READ, 65 RBD_AIO_WRITE, 66 RBD_AIO_DISCARD 67 } RBDAIOCmd; 68 69 typedef struct RBDAIOCB { 70 BlockDriverAIOCB common; 71 QEMUBH *bh; 72 int ret; 73 QEMUIOVector *qiov; 74 char *bounce; 75 RBDAIOCmd cmd; 76 int64_t sector_num; 77 int error; 78 struct BDRVRBDState *s; 79 int cancelled; 80 } RBDAIOCB; 81 82 typedef struct RADOSCB { 83 int rcbid; 84 RBDAIOCB *acb; 85 struct BDRVRBDState *s; 86 int done; 87 int64_t size; 88 char *buf; 89 int ret; 90 } RADOSCB; 91 92 #define RBD_FD_READ 0 93 #define RBD_FD_WRITE 1 94 95 typedef struct BDRVRBDState { 96 int fds[2]; 97 rados_t cluster; 98 rados_ioctx_t io_ctx; 99 rbd_image_t image; 100 char name[RBD_MAX_IMAGE_NAME_SIZE]; 101 int qemu_aio_count; 102 char *snap; 103 int event_reader_pos; 104 RADOSCB *event_rcb; 105 } BDRVRBDState; 106 107 static void rbd_aio_bh_cb(void *opaque); 108 109 static int qemu_rbd_next_tok(char *dst, int dst_len, 110 char *src, char delim, 111 const char *name, 112 char **p) 113 { 114 int l; 115 char *end; 116 117 *p = NULL; 118 119 if (delim != '\0') { 120 for (end = src; *end; ++end) { 121 if (*end == delim) { 122 break; 123 } 124 if (*end == '\\' && end[1] != '\0') { 125 end++; 126 } 127 } 128 if (*end == delim) { 129 *p = end + 1; 130 *end = '\0'; 131 } 132 } 133 l = strlen(src); 134 if (l >= dst_len) { 135 error_report("%s too long", name); 136 return -EINVAL; 137 } else if (l == 0) { 138 error_report("%s too short", name); 139 return -EINVAL; 140 } 141 142 pstrcpy(dst, dst_len, src); 143 144 return 0; 145 } 146 147 static void qemu_rbd_unescape(char *src) 148 { 149 char *p; 150 151 for (p = src; *src; ++src, ++p) { 152 if (*src == '\\' && src[1] != '\0') { 153 src++; 154 } 155 *p = *src; 156 } 157 *p = '\0'; 158 } 159 160 static int qemu_rbd_parsename(const char *filename, 161 char *pool, int pool_len, 162 char *snap, int snap_len, 163 char *name, int name_len, 164 char *conf, int conf_len) 165 { 166 const char *start; 167 char *p, *buf; 168 int ret; 169 170 if (!strstart(filename, "rbd:", &start)) { 171 return -EINVAL; 172 } 173 174 buf = g_strdup(start); 175 p = buf; 176 *snap = '\0'; 177 *conf = '\0'; 178 179 ret = qemu_rbd_next_tok(pool, pool_len, p, '/', "pool name", &p); 180 if (ret < 0 || !p) { 181 ret = -EINVAL; 182 goto done; 183 } 184 qemu_rbd_unescape(pool); 185 186 if (strchr(p, '@')) { 187 ret = qemu_rbd_next_tok(name, name_len, p, '@', "object name", &p); 188 if (ret < 0) { 189 goto done; 190 } 191 ret = qemu_rbd_next_tok(snap, snap_len, p, ':', "snap name", &p); 192 qemu_rbd_unescape(snap); 193 } else { 194 ret = qemu_rbd_next_tok(name, name_len, p, ':', "object name", &p); 195 } 196 qemu_rbd_unescape(name); 197 if (ret < 0 || !p) { 198 goto done; 199 } 200 201 ret = qemu_rbd_next_tok(conf, conf_len, p, '\0', "configuration", &p); 202 203 done: 204 g_free(buf); 205 return ret; 206 } 207 208 static char *qemu_rbd_parse_clientname(const char *conf, char *clientname) 209 { 210 const char *p = conf; 211 212 while (*p) { 213 int len; 214 const char *end = strchr(p, ':'); 215 216 if (end) { 217 len = end - p; 218 } else { 219 len = strlen(p); 220 } 221 222 if (strncmp(p, "id=", 3) == 0) { 223 len -= 3; 224 strncpy(clientname, p + 3, len); 225 clientname[len] = '\0'; 226 return clientname; 227 } 228 if (end == NULL) { 229 break; 230 } 231 p = end + 1; 232 } 233 return NULL; 234 } 235 236 static int qemu_rbd_set_conf(rados_t cluster, const char *conf) 237 { 238 char *p, *buf; 239 char name[RBD_MAX_CONF_NAME_SIZE]; 240 char value[RBD_MAX_CONF_VAL_SIZE]; 241 int ret = 0; 242 243 buf = g_strdup(conf); 244 p = buf; 245 246 while (p) { 247 ret = qemu_rbd_next_tok(name, sizeof(name), p, 248 '=', "conf option name", &p); 249 if (ret < 0) { 250 break; 251 } 252 qemu_rbd_unescape(name); 253 254 if (!p) { 255 error_report("conf option %s has no value", name); 256 ret = -EINVAL; 257 break; 258 } 259 260 ret = qemu_rbd_next_tok(value, sizeof(value), p, 261 ':', "conf option value", &p); 262 if (ret < 0) { 263 break; 264 } 265 qemu_rbd_unescape(value); 266 267 if (strcmp(name, "conf") == 0) { 268 ret = rados_conf_read_file(cluster, value); 269 if (ret < 0) { 270 error_report("error reading conf file %s", value); 271 break; 272 } 273 } else if (strcmp(name, "id") == 0) { 274 /* ignore, this is parsed by qemu_rbd_parse_clientname() */ 275 } else { 276 ret = rados_conf_set(cluster, name, value); 277 if (ret < 0) { 278 error_report("invalid conf option %s", name); 279 ret = -EINVAL; 280 break; 281 } 282 } 283 } 284 285 g_free(buf); 286 return ret; 287 } 288 289 static int qemu_rbd_create(const char *filename, QEMUOptionParameter *options) 290 { 291 int64_t bytes = 0; 292 int64_t objsize; 293 int obj_order = 0; 294 char pool[RBD_MAX_POOL_NAME_SIZE]; 295 char name[RBD_MAX_IMAGE_NAME_SIZE]; 296 char snap_buf[RBD_MAX_SNAP_NAME_SIZE]; 297 char conf[RBD_MAX_CONF_SIZE]; 298 char clientname_buf[RBD_MAX_CONF_SIZE]; 299 char *clientname; 300 rados_t cluster; 301 rados_ioctx_t io_ctx; 302 int ret; 303 304 if (qemu_rbd_parsename(filename, pool, sizeof(pool), 305 snap_buf, sizeof(snap_buf), 306 name, sizeof(name), 307 conf, sizeof(conf)) < 0) { 308 return -EINVAL; 309 } 310 311 /* Read out options */ 312 while (options && options->name) { 313 if (!strcmp(options->name, BLOCK_OPT_SIZE)) { 314 bytes = options->value.n; 315 } else if (!strcmp(options->name, BLOCK_OPT_CLUSTER_SIZE)) { 316 if (options->value.n) { 317 objsize = options->value.n; 318 if ((objsize - 1) & objsize) { /* not a power of 2? */ 319 error_report("obj size needs to be power of 2"); 320 return -EINVAL; 321 } 322 if (objsize < 4096) { 323 error_report("obj size too small"); 324 return -EINVAL; 325 } 326 obj_order = ffs(objsize) - 1; 327 } 328 } 329 options++; 330 } 331 332 clientname = qemu_rbd_parse_clientname(conf, clientname_buf); 333 if (rados_create(&cluster, clientname) < 0) { 334 error_report("error initializing"); 335 return -EIO; 336 } 337 338 if (strstr(conf, "conf=") == NULL) { 339 /* try default location, but ignore failure */ 340 rados_conf_read_file(cluster, NULL); 341 } 342 343 if (conf[0] != '\0' && 344 qemu_rbd_set_conf(cluster, conf) < 0) { 345 error_report("error setting config options"); 346 rados_shutdown(cluster); 347 return -EIO; 348 } 349 350 if (rados_connect(cluster) < 0) { 351 error_report("error connecting"); 352 rados_shutdown(cluster); 353 return -EIO; 354 } 355 356 if (rados_ioctx_create(cluster, pool, &io_ctx) < 0) { 357 error_report("error opening pool %s", pool); 358 rados_shutdown(cluster); 359 return -EIO; 360 } 361 362 ret = rbd_create(io_ctx, name, bytes, &obj_order); 363 rados_ioctx_destroy(io_ctx); 364 rados_shutdown(cluster); 365 366 return ret; 367 } 368 369 /* 370 * This aio completion is being called from qemu_rbd_aio_event_reader() 371 * and runs in qemu context. It schedules a bh, but just in case the aio 372 * was not cancelled before. 373 */ 374 static void qemu_rbd_complete_aio(RADOSCB *rcb) 375 { 376 RBDAIOCB *acb = rcb->acb; 377 int64_t r; 378 379 if (acb->cancelled) { 380 qemu_vfree(acb->bounce); 381 qemu_aio_release(acb); 382 goto done; 383 } 384 385 r = rcb->ret; 386 387 if (acb->cmd == RBD_AIO_WRITE || 388 acb->cmd == RBD_AIO_DISCARD) { 389 if (r < 0) { 390 acb->ret = r; 391 acb->error = 1; 392 } else if (!acb->error) { 393 acb->ret = rcb->size; 394 } 395 } else { 396 if (r < 0) { 397 memset(rcb->buf, 0, rcb->size); 398 acb->ret = r; 399 acb->error = 1; 400 } else if (r < rcb->size) { 401 memset(rcb->buf + r, 0, rcb->size - r); 402 if (!acb->error) { 403 acb->ret = rcb->size; 404 } 405 } else if (!acb->error) { 406 acb->ret = r; 407 } 408 } 409 /* Note that acb->bh can be NULL in case where the aio was cancelled */ 410 acb->bh = qemu_bh_new(rbd_aio_bh_cb, acb); 411 qemu_bh_schedule(acb->bh); 412 done: 413 g_free(rcb); 414 } 415 416 /* 417 * aio fd read handler. It runs in the qemu context and calls the 418 * completion handling of completed rados aio operations. 419 */ 420 static void qemu_rbd_aio_event_reader(void *opaque) 421 { 422 BDRVRBDState *s = opaque; 423 424 ssize_t ret; 425 426 do { 427 char *p = (char *)&s->event_rcb; 428 429 /* now read the rcb pointer that was sent from a non qemu thread */ 430 ret = read(s->fds[RBD_FD_READ], p + s->event_reader_pos, 431 sizeof(s->event_rcb) - s->event_reader_pos); 432 if (ret > 0) { 433 s->event_reader_pos += ret; 434 if (s->event_reader_pos == sizeof(s->event_rcb)) { 435 s->event_reader_pos = 0; 436 qemu_rbd_complete_aio(s->event_rcb); 437 s->qemu_aio_count--; 438 } 439 } 440 } while (ret < 0 && errno == EINTR); 441 } 442 443 static int qemu_rbd_aio_flush_cb(void *opaque) 444 { 445 BDRVRBDState *s = opaque; 446 447 return (s->qemu_aio_count > 0); 448 } 449 450 static int qemu_rbd_open(BlockDriverState *bs, const char *filename, int flags) 451 { 452 BDRVRBDState *s = bs->opaque; 453 char pool[RBD_MAX_POOL_NAME_SIZE]; 454 char snap_buf[RBD_MAX_SNAP_NAME_SIZE]; 455 char conf[RBD_MAX_CONF_SIZE]; 456 char clientname_buf[RBD_MAX_CONF_SIZE]; 457 char *clientname; 458 int r; 459 460 if (qemu_rbd_parsename(filename, pool, sizeof(pool), 461 snap_buf, sizeof(snap_buf), 462 s->name, sizeof(s->name), 463 conf, sizeof(conf)) < 0) { 464 return -EINVAL; 465 } 466 467 clientname = qemu_rbd_parse_clientname(conf, clientname_buf); 468 r = rados_create(&s->cluster, clientname); 469 if (r < 0) { 470 error_report("error initializing"); 471 return r; 472 } 473 474 s->snap = NULL; 475 if (snap_buf[0] != '\0') { 476 s->snap = g_strdup(snap_buf); 477 } 478 479 /* 480 * Fallback to more conservative semantics if setting cache 481 * options fails. Ignore errors from setting rbd_cache because the 482 * only possible error is that the option does not exist, and 483 * librbd defaults to no caching. If write through caching cannot 484 * be set up, fall back to no caching. 485 */ 486 if (flags & BDRV_O_NOCACHE) { 487 rados_conf_set(s->cluster, "rbd_cache", "false"); 488 } else { 489 rados_conf_set(s->cluster, "rbd_cache", "true"); 490 if (!(flags & BDRV_O_CACHE_WB)) { 491 r = rados_conf_set(s->cluster, "rbd_cache_max_dirty", "0"); 492 if (r < 0) { 493 rados_conf_set(s->cluster, "rbd_cache", "false"); 494 } 495 } 496 } 497 498 if (strstr(conf, "conf=") == NULL) { 499 /* try default location, but ignore failure */ 500 rados_conf_read_file(s->cluster, NULL); 501 } 502 503 if (conf[0] != '\0') { 504 r = qemu_rbd_set_conf(s->cluster, conf); 505 if (r < 0) { 506 error_report("error setting config options"); 507 goto failed_shutdown; 508 } 509 } 510 511 r = rados_connect(s->cluster); 512 if (r < 0) { 513 error_report("error connecting"); 514 goto failed_shutdown; 515 } 516 517 r = rados_ioctx_create(s->cluster, pool, &s->io_ctx); 518 if (r < 0) { 519 error_report("error opening pool %s", pool); 520 goto failed_shutdown; 521 } 522 523 r = rbd_open(s->io_ctx, s->name, &s->image, s->snap); 524 if (r < 0) { 525 error_report("error reading header from %s", s->name); 526 goto failed_open; 527 } 528 529 bs->read_only = (s->snap != NULL); 530 531 s->event_reader_pos = 0; 532 r = qemu_pipe(s->fds); 533 if (r < 0) { 534 error_report("error opening eventfd"); 535 goto failed; 536 } 537 fcntl(s->fds[0], F_SETFL, O_NONBLOCK); 538 fcntl(s->fds[1], F_SETFL, O_NONBLOCK); 539 qemu_aio_set_fd_handler(s->fds[RBD_FD_READ], qemu_rbd_aio_event_reader, 540 NULL, qemu_rbd_aio_flush_cb, s); 541 542 543 return 0; 544 545 failed: 546 rbd_close(s->image); 547 failed_open: 548 rados_ioctx_destroy(s->io_ctx); 549 failed_shutdown: 550 rados_shutdown(s->cluster); 551 g_free(s->snap); 552 return r; 553 } 554 555 static void qemu_rbd_close(BlockDriverState *bs) 556 { 557 BDRVRBDState *s = bs->opaque; 558 559 close(s->fds[0]); 560 close(s->fds[1]); 561 qemu_aio_set_fd_handler(s->fds[RBD_FD_READ], NULL, NULL, NULL, NULL); 562 563 rbd_close(s->image); 564 rados_ioctx_destroy(s->io_ctx); 565 g_free(s->snap); 566 rados_shutdown(s->cluster); 567 } 568 569 /* 570 * Cancel aio. Since we don't reference acb in a non qemu threads, 571 * it is safe to access it here. 572 */ 573 static void qemu_rbd_aio_cancel(BlockDriverAIOCB *blockacb) 574 { 575 RBDAIOCB *acb = (RBDAIOCB *) blockacb; 576 acb->cancelled = 1; 577 } 578 579 static AIOPool rbd_aio_pool = { 580 .aiocb_size = sizeof(RBDAIOCB), 581 .cancel = qemu_rbd_aio_cancel, 582 }; 583 584 static int qemu_rbd_send_pipe(BDRVRBDState *s, RADOSCB *rcb) 585 { 586 int ret = 0; 587 while (1) { 588 fd_set wfd; 589 int fd = s->fds[RBD_FD_WRITE]; 590 591 /* send the op pointer to the qemu thread that is responsible 592 for the aio/op completion. Must do it in a qemu thread context */ 593 ret = write(fd, (void *)&rcb, sizeof(rcb)); 594 if (ret >= 0) { 595 break; 596 } 597 if (errno == EINTR) { 598 continue; 599 } 600 if (errno != EAGAIN) { 601 break; 602 } 603 604 FD_ZERO(&wfd); 605 FD_SET(fd, &wfd); 606 do { 607 ret = select(fd + 1, NULL, &wfd, NULL, NULL); 608 } while (ret < 0 && errno == EINTR); 609 } 610 611 return ret; 612 } 613 614 /* 615 * This is the callback function for rbd_aio_read and _write 616 * 617 * Note: this function is being called from a non qemu thread so 618 * we need to be careful about what we do here. Generally we only 619 * write to the block notification pipe, and do the rest of the 620 * io completion handling from qemu_rbd_aio_event_reader() which 621 * runs in a qemu context. 622 */ 623 static void rbd_finish_aiocb(rbd_completion_t c, RADOSCB *rcb) 624 { 625 int ret; 626 rcb->ret = rbd_aio_get_return_value(c); 627 rbd_aio_release(c); 628 ret = qemu_rbd_send_pipe(rcb->s, rcb); 629 if (ret < 0) { 630 error_report("failed writing to acb->s->fds"); 631 g_free(rcb); 632 } 633 } 634 635 /* Callback when all queued rbd_aio requests are complete */ 636 637 static void rbd_aio_bh_cb(void *opaque) 638 { 639 RBDAIOCB *acb = opaque; 640 641 if (acb->cmd == RBD_AIO_READ) { 642 qemu_iovec_from_buf(acb->qiov, 0, acb->bounce, acb->qiov->size); 643 } 644 qemu_vfree(acb->bounce); 645 acb->common.cb(acb->common.opaque, (acb->ret > 0 ? 0 : acb->ret)); 646 qemu_bh_delete(acb->bh); 647 acb->bh = NULL; 648 649 qemu_aio_release(acb); 650 } 651 652 static int rbd_aio_discard_wrapper(rbd_image_t image, 653 uint64_t off, 654 uint64_t len, 655 rbd_completion_t comp) 656 { 657 #ifdef LIBRBD_SUPPORTS_DISCARD 658 return rbd_aio_discard(image, off, len, comp); 659 #else 660 return -ENOTSUP; 661 #endif 662 } 663 664 static BlockDriverAIOCB *rbd_start_aio(BlockDriverState *bs, 665 int64_t sector_num, 666 QEMUIOVector *qiov, 667 int nb_sectors, 668 BlockDriverCompletionFunc *cb, 669 void *opaque, 670 RBDAIOCmd cmd) 671 { 672 RBDAIOCB *acb; 673 RADOSCB *rcb; 674 rbd_completion_t c; 675 int64_t off, size; 676 char *buf; 677 int r; 678 679 BDRVRBDState *s = bs->opaque; 680 681 acb = qemu_aio_get(&rbd_aio_pool, bs, cb, opaque); 682 acb->cmd = cmd; 683 acb->qiov = qiov; 684 if (cmd == RBD_AIO_DISCARD) { 685 acb->bounce = NULL; 686 } else { 687 acb->bounce = qemu_blockalign(bs, qiov->size); 688 } 689 acb->ret = 0; 690 acb->error = 0; 691 acb->s = s; 692 acb->cancelled = 0; 693 acb->bh = NULL; 694 695 if (cmd == RBD_AIO_WRITE) { 696 qemu_iovec_to_buf(acb->qiov, 0, acb->bounce, qiov->size); 697 } 698 699 buf = acb->bounce; 700 701 off = sector_num * BDRV_SECTOR_SIZE; 702 size = nb_sectors * BDRV_SECTOR_SIZE; 703 704 s->qemu_aio_count++; /* All the RADOSCB */ 705 706 rcb = g_malloc(sizeof(RADOSCB)); 707 rcb->done = 0; 708 rcb->acb = acb; 709 rcb->buf = buf; 710 rcb->s = acb->s; 711 rcb->size = size; 712 r = rbd_aio_create_completion(rcb, (rbd_callback_t) rbd_finish_aiocb, &c); 713 if (r < 0) { 714 goto failed; 715 } 716 717 switch (cmd) { 718 case RBD_AIO_WRITE: 719 r = rbd_aio_write(s->image, off, size, buf, c); 720 break; 721 case RBD_AIO_READ: 722 r = rbd_aio_read(s->image, off, size, buf, c); 723 break; 724 case RBD_AIO_DISCARD: 725 r = rbd_aio_discard_wrapper(s->image, off, size, c); 726 break; 727 default: 728 r = -EINVAL; 729 } 730 731 if (r < 0) { 732 goto failed; 733 } 734 735 return &acb->common; 736 737 failed: 738 g_free(rcb); 739 s->qemu_aio_count--; 740 qemu_aio_release(acb); 741 return NULL; 742 } 743 744 static BlockDriverAIOCB *qemu_rbd_aio_readv(BlockDriverState *bs, 745 int64_t sector_num, 746 QEMUIOVector *qiov, 747 int nb_sectors, 748 BlockDriverCompletionFunc *cb, 749 void *opaque) 750 { 751 return rbd_start_aio(bs, sector_num, qiov, nb_sectors, cb, opaque, 752 RBD_AIO_READ); 753 } 754 755 static BlockDriverAIOCB *qemu_rbd_aio_writev(BlockDriverState *bs, 756 int64_t sector_num, 757 QEMUIOVector *qiov, 758 int nb_sectors, 759 BlockDriverCompletionFunc *cb, 760 void *opaque) 761 { 762 return rbd_start_aio(bs, sector_num, qiov, nb_sectors, cb, opaque, 763 RBD_AIO_WRITE); 764 } 765 766 static int qemu_rbd_co_flush(BlockDriverState *bs) 767 { 768 #if LIBRBD_VERSION_CODE >= LIBRBD_VERSION(0, 1, 1) 769 /* rbd_flush added in 0.1.1 */ 770 BDRVRBDState *s = bs->opaque; 771 return rbd_flush(s->image); 772 #else 773 return 0; 774 #endif 775 } 776 777 static int qemu_rbd_getinfo(BlockDriverState *bs, BlockDriverInfo *bdi) 778 { 779 BDRVRBDState *s = bs->opaque; 780 rbd_image_info_t info; 781 int r; 782 783 r = rbd_stat(s->image, &info, sizeof(info)); 784 if (r < 0) { 785 return r; 786 } 787 788 bdi->cluster_size = info.obj_size; 789 return 0; 790 } 791 792 static int64_t qemu_rbd_getlength(BlockDriverState *bs) 793 { 794 BDRVRBDState *s = bs->opaque; 795 rbd_image_info_t info; 796 int r; 797 798 r = rbd_stat(s->image, &info, sizeof(info)); 799 if (r < 0) { 800 return r; 801 } 802 803 return info.size; 804 } 805 806 static int qemu_rbd_truncate(BlockDriverState *bs, int64_t offset) 807 { 808 BDRVRBDState *s = bs->opaque; 809 int r; 810 811 r = rbd_resize(s->image, offset); 812 if (r < 0) { 813 return r; 814 } 815 816 return 0; 817 } 818 819 static int qemu_rbd_snap_create(BlockDriverState *bs, 820 QEMUSnapshotInfo *sn_info) 821 { 822 BDRVRBDState *s = bs->opaque; 823 int r; 824 825 if (sn_info->name[0] == '\0') { 826 return -EINVAL; /* we need a name for rbd snapshots */ 827 } 828 829 /* 830 * rbd snapshots are using the name as the user controlled unique identifier 831 * we can't use the rbd snapid for that purpose, as it can't be set 832 */ 833 if (sn_info->id_str[0] != '\0' && 834 strcmp(sn_info->id_str, sn_info->name) != 0) { 835 return -EINVAL; 836 } 837 838 if (strlen(sn_info->name) >= sizeof(sn_info->id_str)) { 839 return -ERANGE; 840 } 841 842 r = rbd_snap_create(s->image, sn_info->name); 843 if (r < 0) { 844 error_report("failed to create snap: %s", strerror(-r)); 845 return r; 846 } 847 848 return 0; 849 } 850 851 static int qemu_rbd_snap_remove(BlockDriverState *bs, 852 const char *snapshot_name) 853 { 854 BDRVRBDState *s = bs->opaque; 855 int r; 856 857 r = rbd_snap_remove(s->image, snapshot_name); 858 return r; 859 } 860 861 static int qemu_rbd_snap_rollback(BlockDriverState *bs, 862 const char *snapshot_name) 863 { 864 BDRVRBDState *s = bs->opaque; 865 int r; 866 867 r = rbd_snap_rollback(s->image, snapshot_name); 868 return r; 869 } 870 871 static int qemu_rbd_snap_list(BlockDriverState *bs, 872 QEMUSnapshotInfo **psn_tab) 873 { 874 BDRVRBDState *s = bs->opaque; 875 QEMUSnapshotInfo *sn_info, *sn_tab = NULL; 876 int i, snap_count; 877 rbd_snap_info_t *snaps; 878 int max_snaps = RBD_MAX_SNAPS; 879 880 do { 881 snaps = g_malloc(sizeof(*snaps) * max_snaps); 882 snap_count = rbd_snap_list(s->image, snaps, &max_snaps); 883 if (snap_count < 0) { 884 g_free(snaps); 885 } 886 } while (snap_count == -ERANGE); 887 888 if (snap_count <= 0) { 889 goto done; 890 } 891 892 sn_tab = g_malloc0(snap_count * sizeof(QEMUSnapshotInfo)); 893 894 for (i = 0; i < snap_count; i++) { 895 const char *snap_name = snaps[i].name; 896 897 sn_info = sn_tab + i; 898 pstrcpy(sn_info->id_str, sizeof(sn_info->id_str), snap_name); 899 pstrcpy(sn_info->name, sizeof(sn_info->name), snap_name); 900 901 sn_info->vm_state_size = snaps[i].size; 902 sn_info->date_sec = 0; 903 sn_info->date_nsec = 0; 904 sn_info->vm_clock_nsec = 0; 905 } 906 rbd_snap_list_end(snaps); 907 908 done: 909 *psn_tab = sn_tab; 910 return snap_count; 911 } 912 913 #ifdef LIBRBD_SUPPORTS_DISCARD 914 static BlockDriverAIOCB* qemu_rbd_aio_discard(BlockDriverState *bs, 915 int64_t sector_num, 916 int nb_sectors, 917 BlockDriverCompletionFunc *cb, 918 void *opaque) 919 { 920 return rbd_start_aio(bs, sector_num, NULL, nb_sectors, cb, opaque, 921 RBD_AIO_DISCARD); 922 } 923 #endif 924 925 static QEMUOptionParameter qemu_rbd_create_options[] = { 926 { 927 .name = BLOCK_OPT_SIZE, 928 .type = OPT_SIZE, 929 .help = "Virtual disk size" 930 }, 931 { 932 .name = BLOCK_OPT_CLUSTER_SIZE, 933 .type = OPT_SIZE, 934 .help = "RBD object size" 935 }, 936 {NULL} 937 }; 938 939 static BlockDriver bdrv_rbd = { 940 .format_name = "rbd", 941 .instance_size = sizeof(BDRVRBDState), 942 .bdrv_file_open = qemu_rbd_open, 943 .bdrv_close = qemu_rbd_close, 944 .bdrv_create = qemu_rbd_create, 945 .bdrv_get_info = qemu_rbd_getinfo, 946 .create_options = qemu_rbd_create_options, 947 .bdrv_getlength = qemu_rbd_getlength, 948 .bdrv_truncate = qemu_rbd_truncate, 949 .protocol_name = "rbd", 950 951 .bdrv_aio_readv = qemu_rbd_aio_readv, 952 .bdrv_aio_writev = qemu_rbd_aio_writev, 953 .bdrv_co_flush_to_disk = qemu_rbd_co_flush, 954 955 #ifdef LIBRBD_SUPPORTS_DISCARD 956 .bdrv_aio_discard = qemu_rbd_aio_discard, 957 #endif 958 959 .bdrv_snapshot_create = qemu_rbd_snap_create, 960 .bdrv_snapshot_delete = qemu_rbd_snap_remove, 961 .bdrv_snapshot_list = qemu_rbd_snap_list, 962 .bdrv_snapshot_goto = qemu_rbd_snap_rollback, 963 }; 964 965 static void bdrv_rbd_init(void) 966 { 967 bdrv_register(&bdrv_rbd); 968 } 969 970 block_init(bdrv_rbd_init); 971