1 /* 2 * QEMU Block driver for RADOS (Ceph) 3 * 4 * Copyright (C) 2010-2011 Christian Brunner <chb@muc.de>, 5 * Josh Durgin <josh.durgin@dreamhost.com> 6 * 7 * This work is licensed under the terms of the GNU GPL, version 2. See 8 * the COPYING file in the top-level directory. 9 * 10 * Contributions after 2012-01-13 are licensed under the terms of the 11 * GNU GPL, version 2 or (at your option) any later version. 12 */ 13 14 #include <inttypes.h> 15 16 #include "qemu-common.h" 17 #include "qemu-error.h" 18 #include "block_int.h" 19 20 #include <rbd/librbd.h> 21 22 /* 23 * When specifying the image filename use: 24 * 25 * rbd:poolname/devicename[@snapshotname][:option1=value1[:option2=value2...]] 26 * 27 * poolname must be the name of an existing rados pool. 28 * 29 * devicename is the name of the rbd image. 30 * 31 * Each option given is used to configure rados, and may be any valid 32 * Ceph option, "id", or "conf". 33 * 34 * The "id" option indicates what user we should authenticate as to 35 * the Ceph cluster. If it is excluded we will use the Ceph default 36 * (normally 'admin'). 37 * 38 * The "conf" option specifies a Ceph configuration file to read. If 39 * it is not specified, we will read from the default Ceph locations 40 * (e.g., /etc/ceph/ceph.conf). To avoid reading _any_ configuration 41 * file, specify conf=/dev/null. 42 * 43 * Configuration values containing :, @, or = can be escaped with a 44 * leading "\". 45 */ 46 47 /* rbd_aio_discard added in 0.1.2 */ 48 #if LIBRBD_VERSION_CODE >= LIBRBD_VERSION(0, 1, 2) 49 #define LIBRBD_SUPPORTS_DISCARD 50 #else 51 #undef LIBRBD_SUPPORTS_DISCARD 52 #endif 53 54 #define OBJ_MAX_SIZE (1UL << OBJ_DEFAULT_OBJ_ORDER) 55 56 #define RBD_MAX_CONF_NAME_SIZE 128 57 #define RBD_MAX_CONF_VAL_SIZE 512 58 #define RBD_MAX_CONF_SIZE 1024 59 #define RBD_MAX_POOL_NAME_SIZE 128 60 #define RBD_MAX_SNAP_NAME_SIZE 128 61 #define RBD_MAX_SNAPS 100 62 63 typedef enum { 64 RBD_AIO_READ, 65 RBD_AIO_WRITE, 66 RBD_AIO_DISCARD 67 } RBDAIOCmd; 68 69 typedef struct RBDAIOCB { 70 BlockDriverAIOCB common; 71 QEMUBH *bh; 72 int64_t ret; 73 QEMUIOVector *qiov; 74 char *bounce; 75 RBDAIOCmd cmd; 76 int64_t sector_num; 77 int error; 78 struct BDRVRBDState *s; 79 int cancelled; 80 } RBDAIOCB; 81 82 typedef struct RADOSCB { 83 int rcbid; 84 RBDAIOCB *acb; 85 struct BDRVRBDState *s; 86 int done; 87 int64_t size; 88 char *buf; 89 int64_t ret; 90 } RADOSCB; 91 92 #define RBD_FD_READ 0 93 #define RBD_FD_WRITE 1 94 95 typedef struct BDRVRBDState { 96 int fds[2]; 97 rados_t cluster; 98 rados_ioctx_t io_ctx; 99 rbd_image_t image; 100 char name[RBD_MAX_IMAGE_NAME_SIZE]; 101 int qemu_aio_count; 102 char *snap; 103 int event_reader_pos; 104 RADOSCB *event_rcb; 105 } BDRVRBDState; 106 107 static void rbd_aio_bh_cb(void *opaque); 108 109 static int qemu_rbd_next_tok(char *dst, int dst_len, 110 char *src, char delim, 111 const char *name, 112 char **p) 113 { 114 int l; 115 char *end; 116 117 *p = NULL; 118 119 if (delim != '\0') { 120 for (end = src; *end; ++end) { 121 if (*end == delim) { 122 break; 123 } 124 if (*end == '\\' && end[1] != '\0') { 125 end++; 126 } 127 } 128 if (*end == delim) { 129 *p = end + 1; 130 *end = '\0'; 131 } 132 } 133 l = strlen(src); 134 if (l >= dst_len) { 135 error_report("%s too long", name); 136 return -EINVAL; 137 } else if (l == 0) { 138 error_report("%s too short", name); 139 return -EINVAL; 140 } 141 142 pstrcpy(dst, dst_len, src); 143 144 return 0; 145 } 146 147 static void qemu_rbd_unescape(char *src) 148 { 149 char *p; 150 151 for (p = src; *src; ++src, ++p) { 152 if (*src == '\\' && src[1] != '\0') { 153 src++; 154 } 155 *p = *src; 156 } 157 *p = '\0'; 158 } 159 160 static int qemu_rbd_parsename(const char *filename, 161 char *pool, int pool_len, 162 char *snap, int snap_len, 163 char *name, int name_len, 164 char *conf, int conf_len) 165 { 166 const char *start; 167 char *p, *buf; 168 int ret; 169 170 if (!strstart(filename, "rbd:", &start)) { 171 return -EINVAL; 172 } 173 174 buf = g_strdup(start); 175 p = buf; 176 *snap = '\0'; 177 *conf = '\0'; 178 179 ret = qemu_rbd_next_tok(pool, pool_len, p, '/', "pool name", &p); 180 if (ret < 0 || !p) { 181 ret = -EINVAL; 182 goto done; 183 } 184 qemu_rbd_unescape(pool); 185 186 if (strchr(p, '@')) { 187 ret = qemu_rbd_next_tok(name, name_len, p, '@', "object name", &p); 188 if (ret < 0) { 189 goto done; 190 } 191 ret = qemu_rbd_next_tok(snap, snap_len, p, ':', "snap name", &p); 192 qemu_rbd_unescape(snap); 193 } else { 194 ret = qemu_rbd_next_tok(name, name_len, p, ':', "object name", &p); 195 } 196 qemu_rbd_unescape(name); 197 if (ret < 0 || !p) { 198 goto done; 199 } 200 201 ret = qemu_rbd_next_tok(conf, conf_len, p, '\0', "configuration", &p); 202 203 done: 204 g_free(buf); 205 return ret; 206 } 207 208 static char *qemu_rbd_parse_clientname(const char *conf, char *clientname) 209 { 210 const char *p = conf; 211 212 while (*p) { 213 int len; 214 const char *end = strchr(p, ':'); 215 216 if (end) { 217 len = end - p; 218 } else { 219 len = strlen(p); 220 } 221 222 if (strncmp(p, "id=", 3) == 0) { 223 len -= 3; 224 strncpy(clientname, p + 3, len); 225 clientname[len] = '\0'; 226 return clientname; 227 } 228 if (end == NULL) { 229 break; 230 } 231 p = end + 1; 232 } 233 return NULL; 234 } 235 236 static int qemu_rbd_set_conf(rados_t cluster, const char *conf) 237 { 238 char *p, *buf; 239 char name[RBD_MAX_CONF_NAME_SIZE]; 240 char value[RBD_MAX_CONF_VAL_SIZE]; 241 int ret = 0; 242 243 buf = g_strdup(conf); 244 p = buf; 245 246 while (p) { 247 ret = qemu_rbd_next_tok(name, sizeof(name), p, 248 '=', "conf option name", &p); 249 if (ret < 0) { 250 break; 251 } 252 qemu_rbd_unescape(name); 253 254 if (!p) { 255 error_report("conf option %s has no value", name); 256 ret = -EINVAL; 257 break; 258 } 259 260 ret = qemu_rbd_next_tok(value, sizeof(value), p, 261 ':', "conf option value", &p); 262 if (ret < 0) { 263 break; 264 } 265 qemu_rbd_unescape(value); 266 267 if (strcmp(name, "conf") == 0) { 268 ret = rados_conf_read_file(cluster, value); 269 if (ret < 0) { 270 error_report("error reading conf file %s", value); 271 break; 272 } 273 } else if (strcmp(name, "id") == 0) { 274 /* ignore, this is parsed by qemu_rbd_parse_clientname() */ 275 } else { 276 ret = rados_conf_set(cluster, name, value); 277 if (ret < 0) { 278 error_report("invalid conf option %s", name); 279 ret = -EINVAL; 280 break; 281 } 282 } 283 } 284 285 g_free(buf); 286 return ret; 287 } 288 289 static int qemu_rbd_create(const char *filename, QEMUOptionParameter *options) 290 { 291 int64_t bytes = 0; 292 int64_t objsize; 293 int obj_order = 0; 294 char pool[RBD_MAX_POOL_NAME_SIZE]; 295 char name[RBD_MAX_IMAGE_NAME_SIZE]; 296 char snap_buf[RBD_MAX_SNAP_NAME_SIZE]; 297 char conf[RBD_MAX_CONF_SIZE]; 298 char clientname_buf[RBD_MAX_CONF_SIZE]; 299 char *clientname; 300 rados_t cluster; 301 rados_ioctx_t io_ctx; 302 int ret; 303 304 if (qemu_rbd_parsename(filename, pool, sizeof(pool), 305 snap_buf, sizeof(snap_buf), 306 name, sizeof(name), 307 conf, sizeof(conf)) < 0) { 308 return -EINVAL; 309 } 310 311 /* Read out options */ 312 while (options && options->name) { 313 if (!strcmp(options->name, BLOCK_OPT_SIZE)) { 314 bytes = options->value.n; 315 } else if (!strcmp(options->name, BLOCK_OPT_CLUSTER_SIZE)) { 316 if (options->value.n) { 317 objsize = options->value.n; 318 if ((objsize - 1) & objsize) { /* not a power of 2? */ 319 error_report("obj size needs to be power of 2"); 320 return -EINVAL; 321 } 322 if (objsize < 4096) { 323 error_report("obj size too small"); 324 return -EINVAL; 325 } 326 obj_order = ffs(objsize) - 1; 327 } 328 } 329 options++; 330 } 331 332 clientname = qemu_rbd_parse_clientname(conf, clientname_buf); 333 if (rados_create(&cluster, clientname) < 0) { 334 error_report("error initializing"); 335 return -EIO; 336 } 337 338 if (strstr(conf, "conf=") == NULL) { 339 /* try default location, but ignore failure */ 340 rados_conf_read_file(cluster, NULL); 341 } 342 343 if (conf[0] != '\0' && 344 qemu_rbd_set_conf(cluster, conf) < 0) { 345 error_report("error setting config options"); 346 rados_shutdown(cluster); 347 return -EIO; 348 } 349 350 if (rados_connect(cluster) < 0) { 351 error_report("error connecting"); 352 rados_shutdown(cluster); 353 return -EIO; 354 } 355 356 if (rados_ioctx_create(cluster, pool, &io_ctx) < 0) { 357 error_report("error opening pool %s", pool); 358 rados_shutdown(cluster); 359 return -EIO; 360 } 361 362 ret = rbd_create(io_ctx, name, bytes, &obj_order); 363 rados_ioctx_destroy(io_ctx); 364 rados_shutdown(cluster); 365 366 return ret; 367 } 368 369 /* 370 * This aio completion is being called from qemu_rbd_aio_event_reader() 371 * and runs in qemu context. It schedules a bh, but just in case the aio 372 * was not cancelled before. 373 */ 374 static void qemu_rbd_complete_aio(RADOSCB *rcb) 375 { 376 RBDAIOCB *acb = rcb->acb; 377 int64_t r; 378 379 if (acb->cancelled) { 380 qemu_vfree(acb->bounce); 381 qemu_aio_release(acb); 382 goto done; 383 } 384 385 r = rcb->ret; 386 387 if (acb->cmd == RBD_AIO_WRITE || 388 acb->cmd == RBD_AIO_DISCARD) { 389 if (r < 0) { 390 acb->ret = r; 391 acb->error = 1; 392 } else if (!acb->error) { 393 acb->ret = rcb->size; 394 } 395 } else { 396 if (r < 0) { 397 memset(rcb->buf, 0, rcb->size); 398 acb->ret = r; 399 acb->error = 1; 400 } else if (r < rcb->size) { 401 memset(rcb->buf + r, 0, rcb->size - r); 402 if (!acb->error) { 403 acb->ret = rcb->size; 404 } 405 } else if (!acb->error) { 406 acb->ret = r; 407 } 408 } 409 /* Note that acb->bh can be NULL in case where the aio was cancelled */ 410 acb->bh = qemu_bh_new(rbd_aio_bh_cb, acb); 411 qemu_bh_schedule(acb->bh); 412 done: 413 g_free(rcb); 414 } 415 416 /* 417 * aio fd read handler. It runs in the qemu context and calls the 418 * completion handling of completed rados aio operations. 419 */ 420 static void qemu_rbd_aio_event_reader(void *opaque) 421 { 422 BDRVRBDState *s = opaque; 423 424 ssize_t ret; 425 426 do { 427 char *p = (char *)&s->event_rcb; 428 429 /* now read the rcb pointer that was sent from a non qemu thread */ 430 ret = read(s->fds[RBD_FD_READ], p + s->event_reader_pos, 431 sizeof(s->event_rcb) - s->event_reader_pos); 432 if (ret > 0) { 433 s->event_reader_pos += ret; 434 if (s->event_reader_pos == sizeof(s->event_rcb)) { 435 s->event_reader_pos = 0; 436 qemu_rbd_complete_aio(s->event_rcb); 437 s->qemu_aio_count--; 438 } 439 } 440 } while (ret < 0 && errno == EINTR); 441 } 442 443 static int qemu_rbd_aio_flush_cb(void *opaque) 444 { 445 BDRVRBDState *s = opaque; 446 447 return (s->qemu_aio_count > 0); 448 } 449 450 static int qemu_rbd_open(BlockDriverState *bs, const char *filename, int flags) 451 { 452 BDRVRBDState *s = bs->opaque; 453 char pool[RBD_MAX_POOL_NAME_SIZE]; 454 char snap_buf[RBD_MAX_SNAP_NAME_SIZE]; 455 char conf[RBD_MAX_CONF_SIZE]; 456 char clientname_buf[RBD_MAX_CONF_SIZE]; 457 char *clientname; 458 int r; 459 460 if (qemu_rbd_parsename(filename, pool, sizeof(pool), 461 snap_buf, sizeof(snap_buf), 462 s->name, sizeof(s->name), 463 conf, sizeof(conf)) < 0) { 464 return -EINVAL; 465 } 466 467 clientname = qemu_rbd_parse_clientname(conf, clientname_buf); 468 r = rados_create(&s->cluster, clientname); 469 if (r < 0) { 470 error_report("error initializing"); 471 return r; 472 } 473 474 s->snap = NULL; 475 if (snap_buf[0] != '\0') { 476 s->snap = g_strdup(snap_buf); 477 } 478 479 /* 480 * Fallback to more conservative semantics if setting cache 481 * options fails. Ignore errors from setting rbd_cache because the 482 * only possible error is that the option does not exist, and 483 * librbd defaults to no caching. If write through caching cannot 484 * be set up, fall back to no caching. 485 */ 486 if (flags & BDRV_O_NOCACHE) { 487 rados_conf_set(s->cluster, "rbd_cache", "false"); 488 } else { 489 rados_conf_set(s->cluster, "rbd_cache", "true"); 490 } 491 492 if (strstr(conf, "conf=") == NULL) { 493 /* try default location, but ignore failure */ 494 rados_conf_read_file(s->cluster, NULL); 495 } 496 497 if (conf[0] != '\0') { 498 r = qemu_rbd_set_conf(s->cluster, conf); 499 if (r < 0) { 500 error_report("error setting config options"); 501 goto failed_shutdown; 502 } 503 } 504 505 r = rados_connect(s->cluster); 506 if (r < 0) { 507 error_report("error connecting"); 508 goto failed_shutdown; 509 } 510 511 r = rados_ioctx_create(s->cluster, pool, &s->io_ctx); 512 if (r < 0) { 513 error_report("error opening pool %s", pool); 514 goto failed_shutdown; 515 } 516 517 r = rbd_open(s->io_ctx, s->name, &s->image, s->snap); 518 if (r < 0) { 519 error_report("error reading header from %s", s->name); 520 goto failed_open; 521 } 522 523 bs->read_only = (s->snap != NULL); 524 525 s->event_reader_pos = 0; 526 r = qemu_pipe(s->fds); 527 if (r < 0) { 528 error_report("error opening eventfd"); 529 goto failed; 530 } 531 fcntl(s->fds[0], F_SETFL, O_NONBLOCK); 532 fcntl(s->fds[1], F_SETFL, O_NONBLOCK); 533 qemu_aio_set_fd_handler(s->fds[RBD_FD_READ], qemu_rbd_aio_event_reader, 534 NULL, qemu_rbd_aio_flush_cb, s); 535 536 537 return 0; 538 539 failed: 540 rbd_close(s->image); 541 failed_open: 542 rados_ioctx_destroy(s->io_ctx); 543 failed_shutdown: 544 rados_shutdown(s->cluster); 545 g_free(s->snap); 546 return r; 547 } 548 549 static void qemu_rbd_close(BlockDriverState *bs) 550 { 551 BDRVRBDState *s = bs->opaque; 552 553 close(s->fds[0]); 554 close(s->fds[1]); 555 qemu_aio_set_fd_handler(s->fds[RBD_FD_READ], NULL, NULL, NULL, NULL); 556 557 rbd_close(s->image); 558 rados_ioctx_destroy(s->io_ctx); 559 g_free(s->snap); 560 rados_shutdown(s->cluster); 561 } 562 563 /* 564 * Cancel aio. Since we don't reference acb in a non qemu threads, 565 * it is safe to access it here. 566 */ 567 static void qemu_rbd_aio_cancel(BlockDriverAIOCB *blockacb) 568 { 569 RBDAIOCB *acb = (RBDAIOCB *) blockacb; 570 acb->cancelled = 1; 571 } 572 573 static const AIOCBInfo rbd_aiocb_info = { 574 .aiocb_size = sizeof(RBDAIOCB), 575 .cancel = qemu_rbd_aio_cancel, 576 }; 577 578 static int qemu_rbd_send_pipe(BDRVRBDState *s, RADOSCB *rcb) 579 { 580 int ret = 0; 581 while (1) { 582 fd_set wfd; 583 int fd = s->fds[RBD_FD_WRITE]; 584 585 /* send the op pointer to the qemu thread that is responsible 586 for the aio/op completion. Must do it in a qemu thread context */ 587 ret = write(fd, (void *)&rcb, sizeof(rcb)); 588 if (ret >= 0) { 589 break; 590 } 591 if (errno == EINTR) { 592 continue; 593 } 594 if (errno != EAGAIN) { 595 break; 596 } 597 598 FD_ZERO(&wfd); 599 FD_SET(fd, &wfd); 600 do { 601 ret = select(fd + 1, NULL, &wfd, NULL, NULL); 602 } while (ret < 0 && errno == EINTR); 603 } 604 605 return ret; 606 } 607 608 /* 609 * This is the callback function for rbd_aio_read and _write 610 * 611 * Note: this function is being called from a non qemu thread so 612 * we need to be careful about what we do here. Generally we only 613 * write to the block notification pipe, and do the rest of the 614 * io completion handling from qemu_rbd_aio_event_reader() which 615 * runs in a qemu context. 616 */ 617 static void rbd_finish_aiocb(rbd_completion_t c, RADOSCB *rcb) 618 { 619 int ret; 620 rcb->ret = rbd_aio_get_return_value(c); 621 rbd_aio_release(c); 622 ret = qemu_rbd_send_pipe(rcb->s, rcb); 623 if (ret < 0) { 624 error_report("failed writing to acb->s->fds"); 625 g_free(rcb); 626 } 627 } 628 629 /* Callback when all queued rbd_aio requests are complete */ 630 631 static void rbd_aio_bh_cb(void *opaque) 632 { 633 RBDAIOCB *acb = opaque; 634 635 if (acb->cmd == RBD_AIO_READ) { 636 qemu_iovec_from_buf(acb->qiov, 0, acb->bounce, acb->qiov->size); 637 } 638 qemu_vfree(acb->bounce); 639 acb->common.cb(acb->common.opaque, (acb->ret > 0 ? 0 : acb->ret)); 640 qemu_bh_delete(acb->bh); 641 acb->bh = NULL; 642 643 qemu_aio_release(acb); 644 } 645 646 static int rbd_aio_discard_wrapper(rbd_image_t image, 647 uint64_t off, 648 uint64_t len, 649 rbd_completion_t comp) 650 { 651 #ifdef LIBRBD_SUPPORTS_DISCARD 652 return rbd_aio_discard(image, off, len, comp); 653 #else 654 return -ENOTSUP; 655 #endif 656 } 657 658 static BlockDriverAIOCB *rbd_start_aio(BlockDriverState *bs, 659 int64_t sector_num, 660 QEMUIOVector *qiov, 661 int nb_sectors, 662 BlockDriverCompletionFunc *cb, 663 void *opaque, 664 RBDAIOCmd cmd) 665 { 666 RBDAIOCB *acb; 667 RADOSCB *rcb; 668 rbd_completion_t c; 669 int64_t off, size; 670 char *buf; 671 int r; 672 673 BDRVRBDState *s = bs->opaque; 674 675 acb = qemu_aio_get(&rbd_aiocb_info, bs, cb, opaque); 676 acb->cmd = cmd; 677 acb->qiov = qiov; 678 if (cmd == RBD_AIO_DISCARD) { 679 acb->bounce = NULL; 680 } else { 681 acb->bounce = qemu_blockalign(bs, qiov->size); 682 } 683 acb->ret = 0; 684 acb->error = 0; 685 acb->s = s; 686 acb->cancelled = 0; 687 acb->bh = NULL; 688 689 if (cmd == RBD_AIO_WRITE) { 690 qemu_iovec_to_buf(acb->qiov, 0, acb->bounce, qiov->size); 691 } 692 693 buf = acb->bounce; 694 695 off = sector_num * BDRV_SECTOR_SIZE; 696 size = nb_sectors * BDRV_SECTOR_SIZE; 697 698 s->qemu_aio_count++; /* All the RADOSCB */ 699 700 rcb = g_malloc(sizeof(RADOSCB)); 701 rcb->done = 0; 702 rcb->acb = acb; 703 rcb->buf = buf; 704 rcb->s = acb->s; 705 rcb->size = size; 706 r = rbd_aio_create_completion(rcb, (rbd_callback_t) rbd_finish_aiocb, &c); 707 if (r < 0) { 708 goto failed; 709 } 710 711 switch (cmd) { 712 case RBD_AIO_WRITE: 713 r = rbd_aio_write(s->image, off, size, buf, c); 714 break; 715 case RBD_AIO_READ: 716 r = rbd_aio_read(s->image, off, size, buf, c); 717 break; 718 case RBD_AIO_DISCARD: 719 r = rbd_aio_discard_wrapper(s->image, off, size, c); 720 break; 721 default: 722 r = -EINVAL; 723 } 724 725 if (r < 0) { 726 goto failed; 727 } 728 729 return &acb->common; 730 731 failed: 732 g_free(rcb); 733 s->qemu_aio_count--; 734 qemu_aio_release(acb); 735 return NULL; 736 } 737 738 static BlockDriverAIOCB *qemu_rbd_aio_readv(BlockDriverState *bs, 739 int64_t sector_num, 740 QEMUIOVector *qiov, 741 int nb_sectors, 742 BlockDriverCompletionFunc *cb, 743 void *opaque) 744 { 745 return rbd_start_aio(bs, sector_num, qiov, nb_sectors, cb, opaque, 746 RBD_AIO_READ); 747 } 748 749 static BlockDriverAIOCB *qemu_rbd_aio_writev(BlockDriverState *bs, 750 int64_t sector_num, 751 QEMUIOVector *qiov, 752 int nb_sectors, 753 BlockDriverCompletionFunc *cb, 754 void *opaque) 755 { 756 return rbd_start_aio(bs, sector_num, qiov, nb_sectors, cb, opaque, 757 RBD_AIO_WRITE); 758 } 759 760 static int qemu_rbd_co_flush(BlockDriverState *bs) 761 { 762 #if LIBRBD_VERSION_CODE >= LIBRBD_VERSION(0, 1, 1) 763 /* rbd_flush added in 0.1.1 */ 764 BDRVRBDState *s = bs->opaque; 765 return rbd_flush(s->image); 766 #else 767 return 0; 768 #endif 769 } 770 771 static int qemu_rbd_getinfo(BlockDriverState *bs, BlockDriverInfo *bdi) 772 { 773 BDRVRBDState *s = bs->opaque; 774 rbd_image_info_t info; 775 int r; 776 777 r = rbd_stat(s->image, &info, sizeof(info)); 778 if (r < 0) { 779 return r; 780 } 781 782 bdi->cluster_size = info.obj_size; 783 return 0; 784 } 785 786 static int64_t qemu_rbd_getlength(BlockDriverState *bs) 787 { 788 BDRVRBDState *s = bs->opaque; 789 rbd_image_info_t info; 790 int r; 791 792 r = rbd_stat(s->image, &info, sizeof(info)); 793 if (r < 0) { 794 return r; 795 } 796 797 return info.size; 798 } 799 800 static int qemu_rbd_truncate(BlockDriverState *bs, int64_t offset) 801 { 802 BDRVRBDState *s = bs->opaque; 803 int r; 804 805 r = rbd_resize(s->image, offset); 806 if (r < 0) { 807 return r; 808 } 809 810 return 0; 811 } 812 813 static int qemu_rbd_snap_create(BlockDriverState *bs, 814 QEMUSnapshotInfo *sn_info) 815 { 816 BDRVRBDState *s = bs->opaque; 817 int r; 818 819 if (sn_info->name[0] == '\0') { 820 return -EINVAL; /* we need a name for rbd snapshots */ 821 } 822 823 /* 824 * rbd snapshots are using the name as the user controlled unique identifier 825 * we can't use the rbd snapid for that purpose, as it can't be set 826 */ 827 if (sn_info->id_str[0] != '\0' && 828 strcmp(sn_info->id_str, sn_info->name) != 0) { 829 return -EINVAL; 830 } 831 832 if (strlen(sn_info->name) >= sizeof(sn_info->id_str)) { 833 return -ERANGE; 834 } 835 836 r = rbd_snap_create(s->image, sn_info->name); 837 if (r < 0) { 838 error_report("failed to create snap: %s", strerror(-r)); 839 return r; 840 } 841 842 return 0; 843 } 844 845 static int qemu_rbd_snap_remove(BlockDriverState *bs, 846 const char *snapshot_name) 847 { 848 BDRVRBDState *s = bs->opaque; 849 int r; 850 851 r = rbd_snap_remove(s->image, snapshot_name); 852 return r; 853 } 854 855 static int qemu_rbd_snap_rollback(BlockDriverState *bs, 856 const char *snapshot_name) 857 { 858 BDRVRBDState *s = bs->opaque; 859 int r; 860 861 r = rbd_snap_rollback(s->image, snapshot_name); 862 return r; 863 } 864 865 static int qemu_rbd_snap_list(BlockDriverState *bs, 866 QEMUSnapshotInfo **psn_tab) 867 { 868 BDRVRBDState *s = bs->opaque; 869 QEMUSnapshotInfo *sn_info, *sn_tab = NULL; 870 int i, snap_count; 871 rbd_snap_info_t *snaps; 872 int max_snaps = RBD_MAX_SNAPS; 873 874 do { 875 snaps = g_malloc(sizeof(*snaps) * max_snaps); 876 snap_count = rbd_snap_list(s->image, snaps, &max_snaps); 877 if (snap_count < 0) { 878 g_free(snaps); 879 } 880 } while (snap_count == -ERANGE); 881 882 if (snap_count <= 0) { 883 goto done; 884 } 885 886 sn_tab = g_malloc0(snap_count * sizeof(QEMUSnapshotInfo)); 887 888 for (i = 0; i < snap_count; i++) { 889 const char *snap_name = snaps[i].name; 890 891 sn_info = sn_tab + i; 892 pstrcpy(sn_info->id_str, sizeof(sn_info->id_str), snap_name); 893 pstrcpy(sn_info->name, sizeof(sn_info->name), snap_name); 894 895 sn_info->vm_state_size = snaps[i].size; 896 sn_info->date_sec = 0; 897 sn_info->date_nsec = 0; 898 sn_info->vm_clock_nsec = 0; 899 } 900 rbd_snap_list_end(snaps); 901 902 done: 903 *psn_tab = sn_tab; 904 return snap_count; 905 } 906 907 #ifdef LIBRBD_SUPPORTS_DISCARD 908 static BlockDriverAIOCB* qemu_rbd_aio_discard(BlockDriverState *bs, 909 int64_t sector_num, 910 int nb_sectors, 911 BlockDriverCompletionFunc *cb, 912 void *opaque) 913 { 914 return rbd_start_aio(bs, sector_num, NULL, nb_sectors, cb, opaque, 915 RBD_AIO_DISCARD); 916 } 917 #endif 918 919 static QEMUOptionParameter qemu_rbd_create_options[] = { 920 { 921 .name = BLOCK_OPT_SIZE, 922 .type = OPT_SIZE, 923 .help = "Virtual disk size" 924 }, 925 { 926 .name = BLOCK_OPT_CLUSTER_SIZE, 927 .type = OPT_SIZE, 928 .help = "RBD object size" 929 }, 930 {NULL} 931 }; 932 933 static BlockDriver bdrv_rbd = { 934 .format_name = "rbd", 935 .instance_size = sizeof(BDRVRBDState), 936 .bdrv_file_open = qemu_rbd_open, 937 .bdrv_close = qemu_rbd_close, 938 .bdrv_create = qemu_rbd_create, 939 .bdrv_get_info = qemu_rbd_getinfo, 940 .create_options = qemu_rbd_create_options, 941 .bdrv_getlength = qemu_rbd_getlength, 942 .bdrv_truncate = qemu_rbd_truncate, 943 .protocol_name = "rbd", 944 945 .bdrv_aio_readv = qemu_rbd_aio_readv, 946 .bdrv_aio_writev = qemu_rbd_aio_writev, 947 .bdrv_co_flush_to_disk = qemu_rbd_co_flush, 948 949 #ifdef LIBRBD_SUPPORTS_DISCARD 950 .bdrv_aio_discard = qemu_rbd_aio_discard, 951 #endif 952 953 .bdrv_snapshot_create = qemu_rbd_snap_create, 954 .bdrv_snapshot_delete = qemu_rbd_snap_remove, 955 .bdrv_snapshot_list = qemu_rbd_snap_list, 956 .bdrv_snapshot_goto = qemu_rbd_snap_rollback, 957 }; 958 959 static void bdrv_rbd_init(void) 960 { 961 bdrv_register(&bdrv_rbd); 962 } 963 964 block_init(bdrv_rbd_init); 965