1 /* 2 * QEMU Block driver for RADOS (Ceph) 3 * 4 * Copyright (C) 2010-2011 Christian Brunner <chb@muc.de>, 5 * Josh Durgin <josh.durgin@dreamhost.com> 6 * 7 * This work is licensed under the terms of the GNU GPL, version 2. See 8 * the COPYING file in the top-level directory. 9 * 10 * Contributions after 2012-01-13 are licensed under the terms of the 11 * GNU GPL, version 2 or (at your option) any later version. 12 */ 13 14 #include <inttypes.h> 15 16 #include "qemu-common.h" 17 #include "qemu-error.h" 18 #include "block_int.h" 19 20 #include <rbd/librbd.h> 21 22 /* 23 * When specifying the image filename use: 24 * 25 * rbd:poolname/devicename[@snapshotname][:option1=value1[:option2=value2...]] 26 * 27 * poolname must be the name of an existing rados pool. 28 * 29 * devicename is the name of the rbd image. 30 * 31 * Each option given is used to configure rados, and may be any valid 32 * Ceph option, "id", or "conf". 33 * 34 * The "id" option indicates what user we should authenticate as to 35 * the Ceph cluster. If it is excluded we will use the Ceph default 36 * (normally 'admin'). 37 * 38 * The "conf" option specifies a Ceph configuration file to read. If 39 * it is not specified, we will read from the default Ceph locations 40 * (e.g., /etc/ceph/ceph.conf). To avoid reading _any_ configuration 41 * file, specify conf=/dev/null. 42 * 43 * Configuration values containing :, @, or = can be escaped with a 44 * leading "\". 45 */ 46 47 #define OBJ_MAX_SIZE (1UL << OBJ_DEFAULT_OBJ_ORDER) 48 49 #define RBD_MAX_CONF_NAME_SIZE 128 50 #define RBD_MAX_CONF_VAL_SIZE 512 51 #define RBD_MAX_CONF_SIZE 1024 52 #define RBD_MAX_POOL_NAME_SIZE 128 53 #define RBD_MAX_SNAP_NAME_SIZE 128 54 #define RBD_MAX_SNAPS 100 55 56 typedef struct RBDAIOCB { 57 BlockDriverAIOCB common; 58 QEMUBH *bh; 59 int ret; 60 QEMUIOVector *qiov; 61 char *bounce; 62 int write; 63 int64_t sector_num; 64 int error; 65 struct BDRVRBDState *s; 66 int cancelled; 67 } RBDAIOCB; 68 69 typedef struct RADOSCB { 70 int rcbid; 71 RBDAIOCB *acb; 72 struct BDRVRBDState *s; 73 int done; 74 int64_t size; 75 char *buf; 76 int ret; 77 } RADOSCB; 78 79 #define RBD_FD_READ 0 80 #define RBD_FD_WRITE 1 81 82 typedef struct BDRVRBDState { 83 int fds[2]; 84 rados_t cluster; 85 rados_ioctx_t io_ctx; 86 rbd_image_t image; 87 char name[RBD_MAX_IMAGE_NAME_SIZE]; 88 int qemu_aio_count; 89 char *snap; 90 int event_reader_pos; 91 RADOSCB *event_rcb; 92 } BDRVRBDState; 93 94 static void rbd_aio_bh_cb(void *opaque); 95 96 static int qemu_rbd_next_tok(char *dst, int dst_len, 97 char *src, char delim, 98 const char *name, 99 char **p) 100 { 101 int l; 102 char *end; 103 104 *p = NULL; 105 106 if (delim != '\0') { 107 for (end = src; *end; ++end) { 108 if (*end == delim) { 109 break; 110 } 111 if (*end == '\\' && end[1] != '\0') { 112 end++; 113 } 114 } 115 if (*end == delim) { 116 *p = end + 1; 117 *end = '\0'; 118 } 119 } 120 l = strlen(src); 121 if (l >= dst_len) { 122 error_report("%s too long", name); 123 return -EINVAL; 124 } else if (l == 0) { 125 error_report("%s too short", name); 126 return -EINVAL; 127 } 128 129 pstrcpy(dst, dst_len, src); 130 131 return 0; 132 } 133 134 static void qemu_rbd_unescape(char *src) 135 { 136 char *p; 137 138 for (p = src; *src; ++src, ++p) { 139 if (*src == '\\' && src[1] != '\0') { 140 src++; 141 } 142 *p = *src; 143 } 144 *p = '\0'; 145 } 146 147 static int qemu_rbd_parsename(const char *filename, 148 char *pool, int pool_len, 149 char *snap, int snap_len, 150 char *name, int name_len, 151 char *conf, int conf_len) 152 { 153 const char *start; 154 char *p, *buf; 155 int ret; 156 157 if (!strstart(filename, "rbd:", &start)) { 158 return -EINVAL; 159 } 160 161 buf = g_strdup(start); 162 p = buf; 163 *snap = '\0'; 164 *conf = '\0'; 165 166 ret = qemu_rbd_next_tok(pool, pool_len, p, '/', "pool name", &p); 167 if (ret < 0 || !p) { 168 ret = -EINVAL; 169 goto done; 170 } 171 qemu_rbd_unescape(pool); 172 173 if (strchr(p, '@')) { 174 ret = qemu_rbd_next_tok(name, name_len, p, '@', "object name", &p); 175 if (ret < 0) { 176 goto done; 177 } 178 ret = qemu_rbd_next_tok(snap, snap_len, p, ':', "snap name", &p); 179 qemu_rbd_unescape(snap); 180 } else { 181 ret = qemu_rbd_next_tok(name, name_len, p, ':', "object name", &p); 182 } 183 qemu_rbd_unescape(name); 184 if (ret < 0 || !p) { 185 goto done; 186 } 187 188 ret = qemu_rbd_next_tok(conf, conf_len, p, '\0', "configuration", &p); 189 190 done: 191 g_free(buf); 192 return ret; 193 } 194 195 static char *qemu_rbd_parse_clientname(const char *conf, char *clientname) 196 { 197 const char *p = conf; 198 199 while (*p) { 200 int len; 201 const char *end = strchr(p, ':'); 202 203 if (end) { 204 len = end - p; 205 } else { 206 len = strlen(p); 207 } 208 209 if (strncmp(p, "id=", 3) == 0) { 210 len -= 3; 211 strncpy(clientname, p + 3, len); 212 clientname[len] = '\0'; 213 return clientname; 214 } 215 if (end == NULL) { 216 break; 217 } 218 p = end + 1; 219 } 220 return NULL; 221 } 222 223 static int qemu_rbd_set_conf(rados_t cluster, const char *conf) 224 { 225 char *p, *buf; 226 char name[RBD_MAX_CONF_NAME_SIZE]; 227 char value[RBD_MAX_CONF_VAL_SIZE]; 228 int ret = 0; 229 230 buf = g_strdup(conf); 231 p = buf; 232 233 while (p) { 234 ret = qemu_rbd_next_tok(name, sizeof(name), p, 235 '=', "conf option name", &p); 236 if (ret < 0) { 237 break; 238 } 239 qemu_rbd_unescape(name); 240 241 if (!p) { 242 error_report("conf option %s has no value", name); 243 ret = -EINVAL; 244 break; 245 } 246 247 ret = qemu_rbd_next_tok(value, sizeof(value), p, 248 ':', "conf option value", &p); 249 if (ret < 0) { 250 break; 251 } 252 qemu_rbd_unescape(value); 253 254 if (strcmp(name, "conf") == 0) { 255 ret = rados_conf_read_file(cluster, value); 256 if (ret < 0) { 257 error_report("error reading conf file %s", value); 258 break; 259 } 260 } else if (strcmp(name, "id") == 0) { 261 /* ignore, this is parsed by qemu_rbd_parse_clientname() */ 262 } else { 263 ret = rados_conf_set(cluster, name, value); 264 if (ret < 0) { 265 error_report("invalid conf option %s", name); 266 ret = -EINVAL; 267 break; 268 } 269 } 270 } 271 272 g_free(buf); 273 return ret; 274 } 275 276 static int qemu_rbd_create(const char *filename, QEMUOptionParameter *options) 277 { 278 int64_t bytes = 0; 279 int64_t objsize; 280 int obj_order = 0; 281 char pool[RBD_MAX_POOL_NAME_SIZE]; 282 char name[RBD_MAX_IMAGE_NAME_SIZE]; 283 char snap_buf[RBD_MAX_SNAP_NAME_SIZE]; 284 char conf[RBD_MAX_CONF_SIZE]; 285 char clientname_buf[RBD_MAX_CONF_SIZE]; 286 char *clientname; 287 rados_t cluster; 288 rados_ioctx_t io_ctx; 289 int ret; 290 291 if (qemu_rbd_parsename(filename, pool, sizeof(pool), 292 snap_buf, sizeof(snap_buf), 293 name, sizeof(name), 294 conf, sizeof(conf)) < 0) { 295 return -EINVAL; 296 } 297 298 /* Read out options */ 299 while (options && options->name) { 300 if (!strcmp(options->name, BLOCK_OPT_SIZE)) { 301 bytes = options->value.n; 302 } else if (!strcmp(options->name, BLOCK_OPT_CLUSTER_SIZE)) { 303 if (options->value.n) { 304 objsize = options->value.n; 305 if ((objsize - 1) & objsize) { /* not a power of 2? */ 306 error_report("obj size needs to be power of 2"); 307 return -EINVAL; 308 } 309 if (objsize < 4096) { 310 error_report("obj size too small"); 311 return -EINVAL; 312 } 313 obj_order = ffs(objsize) - 1; 314 } 315 } 316 options++; 317 } 318 319 clientname = qemu_rbd_parse_clientname(conf, clientname_buf); 320 if (rados_create(&cluster, clientname) < 0) { 321 error_report("error initializing"); 322 return -EIO; 323 } 324 325 if (strstr(conf, "conf=") == NULL) { 326 /* try default location, but ignore failure */ 327 rados_conf_read_file(cluster, NULL); 328 } 329 330 if (conf[0] != '\0' && 331 qemu_rbd_set_conf(cluster, conf) < 0) { 332 error_report("error setting config options"); 333 rados_shutdown(cluster); 334 return -EIO; 335 } 336 337 if (rados_connect(cluster) < 0) { 338 error_report("error connecting"); 339 rados_shutdown(cluster); 340 return -EIO; 341 } 342 343 if (rados_ioctx_create(cluster, pool, &io_ctx) < 0) { 344 error_report("error opening pool %s", pool); 345 rados_shutdown(cluster); 346 return -EIO; 347 } 348 349 ret = rbd_create(io_ctx, name, bytes, &obj_order); 350 rados_ioctx_destroy(io_ctx); 351 rados_shutdown(cluster); 352 353 return ret; 354 } 355 356 /* 357 * This aio completion is being called from qemu_rbd_aio_event_reader() 358 * and runs in qemu context. It schedules a bh, but just in case the aio 359 * was not cancelled before. 360 */ 361 static void qemu_rbd_complete_aio(RADOSCB *rcb) 362 { 363 RBDAIOCB *acb = rcb->acb; 364 int64_t r; 365 366 if (acb->cancelled) { 367 qemu_vfree(acb->bounce); 368 qemu_aio_release(acb); 369 goto done; 370 } 371 372 r = rcb->ret; 373 374 if (acb->write) { 375 if (r < 0) { 376 acb->ret = r; 377 acb->error = 1; 378 } else if (!acb->error) { 379 acb->ret = rcb->size; 380 } 381 } else { 382 if (r < 0) { 383 memset(rcb->buf, 0, rcb->size); 384 acb->ret = r; 385 acb->error = 1; 386 } else if (r < rcb->size) { 387 memset(rcb->buf + r, 0, rcb->size - r); 388 if (!acb->error) { 389 acb->ret = rcb->size; 390 } 391 } else if (!acb->error) { 392 acb->ret = r; 393 } 394 } 395 /* Note that acb->bh can be NULL in case where the aio was cancelled */ 396 acb->bh = qemu_bh_new(rbd_aio_bh_cb, acb); 397 qemu_bh_schedule(acb->bh); 398 done: 399 g_free(rcb); 400 } 401 402 /* 403 * aio fd read handler. It runs in the qemu context and calls the 404 * completion handling of completed rados aio operations. 405 */ 406 static void qemu_rbd_aio_event_reader(void *opaque) 407 { 408 BDRVRBDState *s = opaque; 409 410 ssize_t ret; 411 412 do { 413 char *p = (char *)&s->event_rcb; 414 415 /* now read the rcb pointer that was sent from a non qemu thread */ 416 ret = read(s->fds[RBD_FD_READ], p + s->event_reader_pos, 417 sizeof(s->event_rcb) - s->event_reader_pos); 418 if (ret > 0) { 419 s->event_reader_pos += ret; 420 if (s->event_reader_pos == sizeof(s->event_rcb)) { 421 s->event_reader_pos = 0; 422 qemu_rbd_complete_aio(s->event_rcb); 423 s->qemu_aio_count--; 424 } 425 } 426 } while (ret < 0 && errno == EINTR); 427 } 428 429 static int qemu_rbd_aio_flush_cb(void *opaque) 430 { 431 BDRVRBDState *s = opaque; 432 433 return (s->qemu_aio_count > 0); 434 } 435 436 static int qemu_rbd_open(BlockDriverState *bs, const char *filename, int flags) 437 { 438 BDRVRBDState *s = bs->opaque; 439 char pool[RBD_MAX_POOL_NAME_SIZE]; 440 char snap_buf[RBD_MAX_SNAP_NAME_SIZE]; 441 char conf[RBD_MAX_CONF_SIZE]; 442 char clientname_buf[RBD_MAX_CONF_SIZE]; 443 char *clientname; 444 int r; 445 446 if (qemu_rbd_parsename(filename, pool, sizeof(pool), 447 snap_buf, sizeof(snap_buf), 448 s->name, sizeof(s->name), 449 conf, sizeof(conf)) < 0) { 450 return -EINVAL; 451 } 452 453 clientname = qemu_rbd_parse_clientname(conf, clientname_buf); 454 r = rados_create(&s->cluster, clientname); 455 if (r < 0) { 456 error_report("error initializing"); 457 return r; 458 } 459 460 s->snap = NULL; 461 if (snap_buf[0] != '\0') { 462 s->snap = g_strdup(snap_buf); 463 } 464 465 if (strstr(conf, "conf=") == NULL) { 466 /* try default location, but ignore failure */ 467 rados_conf_read_file(s->cluster, NULL); 468 } 469 470 if (conf[0] != '\0') { 471 r = qemu_rbd_set_conf(s->cluster, conf); 472 if (r < 0) { 473 error_report("error setting config options"); 474 goto failed_shutdown; 475 } 476 } 477 478 r = rados_connect(s->cluster); 479 if (r < 0) { 480 error_report("error connecting"); 481 goto failed_shutdown; 482 } 483 484 r = rados_ioctx_create(s->cluster, pool, &s->io_ctx); 485 if (r < 0) { 486 error_report("error opening pool %s", pool); 487 goto failed_shutdown; 488 } 489 490 r = rbd_open(s->io_ctx, s->name, &s->image, s->snap); 491 if (r < 0) { 492 error_report("error reading header from %s", s->name); 493 goto failed_open; 494 } 495 496 bs->read_only = (s->snap != NULL); 497 498 s->event_reader_pos = 0; 499 r = qemu_pipe(s->fds); 500 if (r < 0) { 501 error_report("error opening eventfd"); 502 goto failed; 503 } 504 fcntl(s->fds[0], F_SETFL, O_NONBLOCK); 505 fcntl(s->fds[1], F_SETFL, O_NONBLOCK); 506 qemu_aio_set_fd_handler(s->fds[RBD_FD_READ], qemu_rbd_aio_event_reader, 507 NULL, qemu_rbd_aio_flush_cb, NULL, s); 508 509 510 return 0; 511 512 failed: 513 rbd_close(s->image); 514 failed_open: 515 rados_ioctx_destroy(s->io_ctx); 516 failed_shutdown: 517 rados_shutdown(s->cluster); 518 g_free(s->snap); 519 return r; 520 } 521 522 static void qemu_rbd_close(BlockDriverState *bs) 523 { 524 BDRVRBDState *s = bs->opaque; 525 526 close(s->fds[0]); 527 close(s->fds[1]); 528 qemu_aio_set_fd_handler(s->fds[RBD_FD_READ], NULL , NULL, NULL, NULL, 529 NULL); 530 531 rbd_close(s->image); 532 rados_ioctx_destroy(s->io_ctx); 533 g_free(s->snap); 534 rados_shutdown(s->cluster); 535 } 536 537 /* 538 * Cancel aio. Since we don't reference acb in a non qemu threads, 539 * it is safe to access it here. 540 */ 541 static void qemu_rbd_aio_cancel(BlockDriverAIOCB *blockacb) 542 { 543 RBDAIOCB *acb = (RBDAIOCB *) blockacb; 544 acb->cancelled = 1; 545 } 546 547 static AIOPool rbd_aio_pool = { 548 .aiocb_size = sizeof(RBDAIOCB), 549 .cancel = qemu_rbd_aio_cancel, 550 }; 551 552 static int qemu_rbd_send_pipe(BDRVRBDState *s, RADOSCB *rcb) 553 { 554 int ret = 0; 555 while (1) { 556 fd_set wfd; 557 int fd = s->fds[RBD_FD_WRITE]; 558 559 /* send the op pointer to the qemu thread that is responsible 560 for the aio/op completion. Must do it in a qemu thread context */ 561 ret = write(fd, (void *)&rcb, sizeof(rcb)); 562 if (ret >= 0) { 563 break; 564 } 565 if (errno == EINTR) { 566 continue; 567 } 568 if (errno != EAGAIN) { 569 break; 570 } 571 572 FD_ZERO(&wfd); 573 FD_SET(fd, &wfd); 574 do { 575 ret = select(fd + 1, NULL, &wfd, NULL, NULL); 576 } while (ret < 0 && errno == EINTR); 577 } 578 579 return ret; 580 } 581 582 /* 583 * This is the callback function for rbd_aio_read and _write 584 * 585 * Note: this function is being called from a non qemu thread so 586 * we need to be careful about what we do here. Generally we only 587 * write to the block notification pipe, and do the rest of the 588 * io completion handling from qemu_rbd_aio_event_reader() which 589 * runs in a qemu context. 590 */ 591 static void rbd_finish_aiocb(rbd_completion_t c, RADOSCB *rcb) 592 { 593 int ret; 594 rcb->ret = rbd_aio_get_return_value(c); 595 rbd_aio_release(c); 596 ret = qemu_rbd_send_pipe(rcb->s, rcb); 597 if (ret < 0) { 598 error_report("failed writing to acb->s->fds"); 599 g_free(rcb); 600 } 601 } 602 603 /* Callback when all queued rbd_aio requests are complete */ 604 605 static void rbd_aio_bh_cb(void *opaque) 606 { 607 RBDAIOCB *acb = opaque; 608 609 if (!acb->write) { 610 qemu_iovec_from_buffer(acb->qiov, acb->bounce, acb->qiov->size); 611 } 612 qemu_vfree(acb->bounce); 613 acb->common.cb(acb->common.opaque, (acb->ret > 0 ? 0 : acb->ret)); 614 qemu_bh_delete(acb->bh); 615 acb->bh = NULL; 616 617 qemu_aio_release(acb); 618 } 619 620 static BlockDriverAIOCB *rbd_aio_rw_vector(BlockDriverState *bs, 621 int64_t sector_num, 622 QEMUIOVector *qiov, 623 int nb_sectors, 624 BlockDriverCompletionFunc *cb, 625 void *opaque, int write) 626 { 627 RBDAIOCB *acb; 628 RADOSCB *rcb; 629 rbd_completion_t c; 630 int64_t off, size; 631 char *buf; 632 int r; 633 634 BDRVRBDState *s = bs->opaque; 635 636 acb = qemu_aio_get(&rbd_aio_pool, bs, cb, opaque); 637 acb->write = write; 638 acb->qiov = qiov; 639 acb->bounce = qemu_blockalign(bs, qiov->size); 640 acb->ret = 0; 641 acb->error = 0; 642 acb->s = s; 643 acb->cancelled = 0; 644 acb->bh = NULL; 645 646 if (write) { 647 qemu_iovec_to_buffer(acb->qiov, acb->bounce); 648 } 649 650 buf = acb->bounce; 651 652 off = sector_num * BDRV_SECTOR_SIZE; 653 size = nb_sectors * BDRV_SECTOR_SIZE; 654 655 s->qemu_aio_count++; /* All the RADOSCB */ 656 657 rcb = g_malloc(sizeof(RADOSCB)); 658 rcb->done = 0; 659 rcb->acb = acb; 660 rcb->buf = buf; 661 rcb->s = acb->s; 662 rcb->size = size; 663 r = rbd_aio_create_completion(rcb, (rbd_callback_t) rbd_finish_aiocb, &c); 664 if (r < 0) { 665 goto failed; 666 } 667 668 if (write) { 669 r = rbd_aio_write(s->image, off, size, buf, c); 670 } else { 671 r = rbd_aio_read(s->image, off, size, buf, c); 672 } 673 674 if (r < 0) { 675 goto failed; 676 } 677 678 return &acb->common; 679 680 failed: 681 g_free(rcb); 682 s->qemu_aio_count--; 683 qemu_aio_release(acb); 684 return NULL; 685 } 686 687 static BlockDriverAIOCB *qemu_rbd_aio_readv(BlockDriverState *bs, 688 int64_t sector_num, 689 QEMUIOVector *qiov, 690 int nb_sectors, 691 BlockDriverCompletionFunc *cb, 692 void *opaque) 693 { 694 return rbd_aio_rw_vector(bs, sector_num, qiov, nb_sectors, cb, opaque, 0); 695 } 696 697 static BlockDriverAIOCB *qemu_rbd_aio_writev(BlockDriverState *bs, 698 int64_t sector_num, 699 QEMUIOVector *qiov, 700 int nb_sectors, 701 BlockDriverCompletionFunc *cb, 702 void *opaque) 703 { 704 return rbd_aio_rw_vector(bs, sector_num, qiov, nb_sectors, cb, opaque, 1); 705 } 706 707 static int qemu_rbd_co_flush(BlockDriverState *bs) 708 { 709 #if LIBRBD_VERSION_CODE >= LIBRBD_VERSION(0, 1, 1) 710 /* rbd_flush added in 0.1.1 */ 711 BDRVRBDState *s = bs->opaque; 712 return rbd_flush(s->image); 713 #else 714 return 0; 715 #endif 716 } 717 718 static int qemu_rbd_getinfo(BlockDriverState *bs, BlockDriverInfo *bdi) 719 { 720 BDRVRBDState *s = bs->opaque; 721 rbd_image_info_t info; 722 int r; 723 724 r = rbd_stat(s->image, &info, sizeof(info)); 725 if (r < 0) { 726 return r; 727 } 728 729 bdi->cluster_size = info.obj_size; 730 return 0; 731 } 732 733 static int64_t qemu_rbd_getlength(BlockDriverState *bs) 734 { 735 BDRVRBDState *s = bs->opaque; 736 rbd_image_info_t info; 737 int r; 738 739 r = rbd_stat(s->image, &info, sizeof(info)); 740 if (r < 0) { 741 return r; 742 } 743 744 return info.size; 745 } 746 747 static int qemu_rbd_truncate(BlockDriverState *bs, int64_t offset) 748 { 749 BDRVRBDState *s = bs->opaque; 750 int r; 751 752 r = rbd_resize(s->image, offset); 753 if (r < 0) { 754 return r; 755 } 756 757 return 0; 758 } 759 760 static int qemu_rbd_snap_create(BlockDriverState *bs, 761 QEMUSnapshotInfo *sn_info) 762 { 763 BDRVRBDState *s = bs->opaque; 764 int r; 765 766 if (sn_info->name[0] == '\0') { 767 return -EINVAL; /* we need a name for rbd snapshots */ 768 } 769 770 /* 771 * rbd snapshots are using the name as the user controlled unique identifier 772 * we can't use the rbd snapid for that purpose, as it can't be set 773 */ 774 if (sn_info->id_str[0] != '\0' && 775 strcmp(sn_info->id_str, sn_info->name) != 0) { 776 return -EINVAL; 777 } 778 779 if (strlen(sn_info->name) >= sizeof(sn_info->id_str)) { 780 return -ERANGE; 781 } 782 783 r = rbd_snap_create(s->image, sn_info->name); 784 if (r < 0) { 785 error_report("failed to create snap: %s", strerror(-r)); 786 return r; 787 } 788 789 return 0; 790 } 791 792 static int qemu_rbd_snap_list(BlockDriverState *bs, 793 QEMUSnapshotInfo **psn_tab) 794 { 795 BDRVRBDState *s = bs->opaque; 796 QEMUSnapshotInfo *sn_info, *sn_tab = NULL; 797 int i, snap_count; 798 rbd_snap_info_t *snaps; 799 int max_snaps = RBD_MAX_SNAPS; 800 801 do { 802 snaps = g_malloc(sizeof(*snaps) * max_snaps); 803 snap_count = rbd_snap_list(s->image, snaps, &max_snaps); 804 if (snap_count < 0) { 805 g_free(snaps); 806 } 807 } while (snap_count == -ERANGE); 808 809 if (snap_count <= 0) { 810 goto done; 811 } 812 813 sn_tab = g_malloc0(snap_count * sizeof(QEMUSnapshotInfo)); 814 815 for (i = 0; i < snap_count; i++) { 816 const char *snap_name = snaps[i].name; 817 818 sn_info = sn_tab + i; 819 pstrcpy(sn_info->id_str, sizeof(sn_info->id_str), snap_name); 820 pstrcpy(sn_info->name, sizeof(sn_info->name), snap_name); 821 822 sn_info->vm_state_size = snaps[i].size; 823 sn_info->date_sec = 0; 824 sn_info->date_nsec = 0; 825 sn_info->vm_clock_nsec = 0; 826 } 827 rbd_snap_list_end(snaps); 828 829 done: 830 *psn_tab = sn_tab; 831 return snap_count; 832 } 833 834 static QEMUOptionParameter qemu_rbd_create_options[] = { 835 { 836 .name = BLOCK_OPT_SIZE, 837 .type = OPT_SIZE, 838 .help = "Virtual disk size" 839 }, 840 { 841 .name = BLOCK_OPT_CLUSTER_SIZE, 842 .type = OPT_SIZE, 843 .help = "RBD object size" 844 }, 845 {NULL} 846 }; 847 848 static BlockDriver bdrv_rbd = { 849 .format_name = "rbd", 850 .instance_size = sizeof(BDRVRBDState), 851 .bdrv_file_open = qemu_rbd_open, 852 .bdrv_close = qemu_rbd_close, 853 .bdrv_create = qemu_rbd_create, 854 .bdrv_get_info = qemu_rbd_getinfo, 855 .create_options = qemu_rbd_create_options, 856 .bdrv_getlength = qemu_rbd_getlength, 857 .bdrv_truncate = qemu_rbd_truncate, 858 .protocol_name = "rbd", 859 860 .bdrv_aio_readv = qemu_rbd_aio_readv, 861 .bdrv_aio_writev = qemu_rbd_aio_writev, 862 .bdrv_co_flush_to_disk = qemu_rbd_co_flush, 863 864 .bdrv_snapshot_create = qemu_rbd_snap_create, 865 .bdrv_snapshot_list = qemu_rbd_snap_list, 866 }; 867 868 static void bdrv_rbd_init(void) 869 { 870 bdrv_register(&bdrv_rbd); 871 } 872 873 block_init(bdrv_rbd_init); 874