1 /* 2 * QEMU Block driver for RADOS (Ceph) 3 * 4 * Copyright (C) 2010-2011 Christian Brunner <chb@muc.de>, 5 * Josh Durgin <josh.durgin@dreamhost.com> 6 * 7 * This work is licensed under the terms of the GNU GPL, version 2. See 8 * the COPYING file in the top-level directory. 9 * 10 */ 11 12 #include <inttypes.h> 13 14 #include "qemu-common.h" 15 #include "qemu-error.h" 16 17 #include "block_int.h" 18 19 #include <rbd/librbd.h> 20 21 22 23 /* 24 * When specifying the image filename use: 25 * 26 * rbd:poolname/devicename[@snapshotname][:option1=value1[:option2=value2...]] 27 * 28 * poolname must be the name of an existing rados pool 29 * 30 * devicename is the basename for all objects used to 31 * emulate the raw device. 32 * 33 * Each option given is used to configure rados, and may be 34 * any Ceph option, or "conf". The "conf" option specifies 35 * a Ceph configuration file to read. 36 * 37 * Metadata information (image size, ...) is stored in an 38 * object with the name "devicename.rbd". 39 * 40 * The raw device is split into 4MB sized objects by default. 41 * The sequencenumber is encoded in a 12 byte long hex-string, 42 * and is attached to the devicename, separated by a dot. 43 * e.g. "devicename.1234567890ab" 44 * 45 */ 46 47 #define OBJ_MAX_SIZE (1UL << OBJ_DEFAULT_OBJ_ORDER) 48 49 #define RBD_MAX_CONF_NAME_SIZE 128 50 #define RBD_MAX_CONF_VAL_SIZE 512 51 #define RBD_MAX_CONF_SIZE 1024 52 #define RBD_MAX_POOL_NAME_SIZE 128 53 #define RBD_MAX_SNAP_NAME_SIZE 128 54 #define RBD_MAX_SNAPS 100 55 56 typedef struct RBDAIOCB { 57 BlockDriverAIOCB common; 58 QEMUBH *bh; 59 int ret; 60 QEMUIOVector *qiov; 61 char *bounce; 62 int write; 63 int64_t sector_num; 64 int error; 65 struct BDRVRBDState *s; 66 int cancelled; 67 } RBDAIOCB; 68 69 typedef struct RADOSCB { 70 int rcbid; 71 RBDAIOCB *acb; 72 struct BDRVRBDState *s; 73 int done; 74 int64_t size; 75 char *buf; 76 int ret; 77 } RADOSCB; 78 79 #define RBD_FD_READ 0 80 #define RBD_FD_WRITE 1 81 82 typedef struct BDRVRBDState { 83 int fds[2]; 84 rados_t cluster; 85 rados_ioctx_t io_ctx; 86 rbd_image_t image; 87 char name[RBD_MAX_IMAGE_NAME_SIZE]; 88 int qemu_aio_count; 89 char *snap; 90 int event_reader_pos; 91 RADOSCB *event_rcb; 92 } BDRVRBDState; 93 94 static void rbd_aio_bh_cb(void *opaque); 95 96 static int qemu_rbd_next_tok(char *dst, int dst_len, 97 char *src, char delim, 98 const char *name, 99 char **p) 100 { 101 int l; 102 char *end; 103 104 *p = NULL; 105 106 if (delim != '\0') { 107 end = strchr(src, delim); 108 if (end) { 109 *p = end + 1; 110 *end = '\0'; 111 } 112 } 113 l = strlen(src); 114 if (l >= dst_len) { 115 error_report("%s too long", name); 116 return -EINVAL; 117 } else if (l == 0) { 118 error_report("%s too short", name); 119 return -EINVAL; 120 } 121 122 pstrcpy(dst, dst_len, src); 123 124 return 0; 125 } 126 127 static int qemu_rbd_parsename(const char *filename, 128 char *pool, int pool_len, 129 char *snap, int snap_len, 130 char *name, int name_len, 131 char *conf, int conf_len) 132 { 133 const char *start; 134 char *p, *buf; 135 int ret; 136 137 if (!strstart(filename, "rbd:", &start)) { 138 return -EINVAL; 139 } 140 141 buf = qemu_strdup(start); 142 p = buf; 143 *snap = '\0'; 144 *conf = '\0'; 145 146 ret = qemu_rbd_next_tok(pool, pool_len, p, '/', "pool name", &p); 147 if (ret < 0 || !p) { 148 ret = -EINVAL; 149 goto done; 150 } 151 152 if (strchr(p, '@')) { 153 ret = qemu_rbd_next_tok(name, name_len, p, '@', "object name", &p); 154 if (ret < 0) { 155 goto done; 156 } 157 ret = qemu_rbd_next_tok(snap, snap_len, p, ':', "snap name", &p); 158 } else { 159 ret = qemu_rbd_next_tok(name, name_len, p, ':', "object name", &p); 160 } 161 if (ret < 0 || !p) { 162 goto done; 163 } 164 165 ret = qemu_rbd_next_tok(conf, conf_len, p, '\0', "configuration", &p); 166 167 done: 168 qemu_free(buf); 169 return ret; 170 } 171 172 static int qemu_rbd_set_conf(rados_t cluster, const char *conf) 173 { 174 char *p, *buf; 175 char name[RBD_MAX_CONF_NAME_SIZE]; 176 char value[RBD_MAX_CONF_VAL_SIZE]; 177 int ret = 0; 178 179 buf = qemu_strdup(conf); 180 p = buf; 181 182 while (p) { 183 ret = qemu_rbd_next_tok(name, sizeof(name), p, 184 '=', "conf option name", &p); 185 if (ret < 0) { 186 break; 187 } 188 189 if (!p) { 190 error_report("conf option %s has no value", name); 191 ret = -EINVAL; 192 break; 193 } 194 195 ret = qemu_rbd_next_tok(value, sizeof(value), p, 196 ':', "conf option value", &p); 197 if (ret < 0) { 198 break; 199 } 200 201 if (strcmp(name, "conf")) { 202 ret = rados_conf_set(cluster, name, value); 203 if (ret < 0) { 204 error_report("invalid conf option %s", name); 205 ret = -EINVAL; 206 break; 207 } 208 } else { 209 ret = rados_conf_read_file(cluster, value); 210 if (ret < 0) { 211 error_report("error reading conf file %s", value); 212 break; 213 } 214 } 215 } 216 217 qemu_free(buf); 218 return ret; 219 } 220 221 static int qemu_rbd_create(const char *filename, QEMUOptionParameter *options) 222 { 223 int64_t bytes = 0; 224 int64_t objsize; 225 int obj_order = 0; 226 char pool[RBD_MAX_POOL_NAME_SIZE]; 227 char name[RBD_MAX_IMAGE_NAME_SIZE]; 228 char snap_buf[RBD_MAX_SNAP_NAME_SIZE]; 229 char conf[RBD_MAX_CONF_SIZE]; 230 rados_t cluster; 231 rados_ioctx_t io_ctx; 232 int ret; 233 234 if (qemu_rbd_parsename(filename, pool, sizeof(pool), 235 snap_buf, sizeof(snap_buf), 236 name, sizeof(name), 237 conf, sizeof(conf)) < 0) { 238 return -EINVAL; 239 } 240 241 /* Read out options */ 242 while (options && options->name) { 243 if (!strcmp(options->name, BLOCK_OPT_SIZE)) { 244 bytes = options->value.n; 245 } else if (!strcmp(options->name, BLOCK_OPT_CLUSTER_SIZE)) { 246 if (options->value.n) { 247 objsize = options->value.n; 248 if ((objsize - 1) & objsize) { /* not a power of 2? */ 249 error_report("obj size needs to be power of 2"); 250 return -EINVAL; 251 } 252 if (objsize < 4096) { 253 error_report("obj size too small"); 254 return -EINVAL; 255 } 256 obj_order = ffs(objsize) - 1; 257 } 258 } 259 options++; 260 } 261 262 if (rados_create(&cluster, NULL) < 0) { 263 error_report("error initializing"); 264 return -EIO; 265 } 266 267 if (strstr(conf, "conf=") == NULL) { 268 if (rados_conf_read_file(cluster, NULL) < 0) { 269 error_report("error reading config file"); 270 rados_shutdown(cluster); 271 return -EIO; 272 } 273 } 274 275 if (conf[0] != '\0' && 276 qemu_rbd_set_conf(cluster, conf) < 0) { 277 error_report("error setting config options"); 278 rados_shutdown(cluster); 279 return -EIO; 280 } 281 282 if (rados_connect(cluster) < 0) { 283 error_report("error connecting"); 284 rados_shutdown(cluster); 285 return -EIO; 286 } 287 288 if (rados_ioctx_create(cluster, pool, &io_ctx) < 0) { 289 error_report("error opening pool %s", pool); 290 rados_shutdown(cluster); 291 return -EIO; 292 } 293 294 ret = rbd_create(io_ctx, name, bytes, &obj_order); 295 rados_ioctx_destroy(io_ctx); 296 rados_shutdown(cluster); 297 298 return ret; 299 } 300 301 /* 302 * This aio completion is being called from qemu_rbd_aio_event_reader() 303 * and runs in qemu context. It schedules a bh, but just in case the aio 304 * was not cancelled before. 305 */ 306 static void qemu_rbd_complete_aio(RADOSCB *rcb) 307 { 308 RBDAIOCB *acb = rcb->acb; 309 int64_t r; 310 311 if (acb->cancelled) { 312 qemu_vfree(acb->bounce); 313 qemu_aio_release(acb); 314 goto done; 315 } 316 317 r = rcb->ret; 318 319 if (acb->write) { 320 if (r < 0) { 321 acb->ret = r; 322 acb->error = 1; 323 } else if (!acb->error) { 324 acb->ret = rcb->size; 325 } 326 } else { 327 if (r < 0) { 328 memset(rcb->buf, 0, rcb->size); 329 acb->ret = r; 330 acb->error = 1; 331 } else if (r < rcb->size) { 332 memset(rcb->buf + r, 0, rcb->size - r); 333 if (!acb->error) { 334 acb->ret = rcb->size; 335 } 336 } else if (!acb->error) { 337 acb->ret = r; 338 } 339 } 340 /* Note that acb->bh can be NULL in case where the aio was cancelled */ 341 acb->bh = qemu_bh_new(rbd_aio_bh_cb, acb); 342 qemu_bh_schedule(acb->bh); 343 done: 344 qemu_free(rcb); 345 } 346 347 /* 348 * aio fd read handler. It runs in the qemu context and calls the 349 * completion handling of completed rados aio operations. 350 */ 351 static void qemu_rbd_aio_event_reader(void *opaque) 352 { 353 BDRVRBDState *s = opaque; 354 355 ssize_t ret; 356 357 do { 358 char *p = (char *)&s->event_rcb; 359 360 /* now read the rcb pointer that was sent from a non qemu thread */ 361 if ((ret = read(s->fds[RBD_FD_READ], p + s->event_reader_pos, 362 sizeof(s->event_rcb) - s->event_reader_pos)) > 0) { 363 if (ret > 0) { 364 s->event_reader_pos += ret; 365 if (s->event_reader_pos == sizeof(s->event_rcb)) { 366 s->event_reader_pos = 0; 367 qemu_rbd_complete_aio(s->event_rcb); 368 s->qemu_aio_count--; 369 } 370 } 371 } 372 } while (ret < 0 && errno == EINTR); 373 } 374 375 static int qemu_rbd_aio_flush_cb(void *opaque) 376 { 377 BDRVRBDState *s = opaque; 378 379 return (s->qemu_aio_count > 0); 380 } 381 382 static int qemu_rbd_open(BlockDriverState *bs, const char *filename, int flags) 383 { 384 BDRVRBDState *s = bs->opaque; 385 char pool[RBD_MAX_POOL_NAME_SIZE]; 386 char snap_buf[RBD_MAX_SNAP_NAME_SIZE]; 387 char conf[RBD_MAX_CONF_SIZE]; 388 int r; 389 390 if (qemu_rbd_parsename(filename, pool, sizeof(pool), 391 snap_buf, sizeof(snap_buf), 392 s->name, sizeof(s->name), 393 conf, sizeof(conf)) < 0) { 394 return -EINVAL; 395 } 396 s->snap = NULL; 397 if (snap_buf[0] != '\0') { 398 s->snap = qemu_strdup(snap_buf); 399 } 400 401 r = rados_create(&s->cluster, NULL); 402 if (r < 0) { 403 error_report("error initializing"); 404 return r; 405 } 406 407 if (strstr(conf, "conf=") == NULL) { 408 r = rados_conf_read_file(s->cluster, NULL); 409 if (r < 0) { 410 error_report("error reading config file"); 411 rados_shutdown(s->cluster); 412 return r; 413 } 414 } 415 416 if (conf[0] != '\0') { 417 r = qemu_rbd_set_conf(s->cluster, conf); 418 if (r < 0) { 419 error_report("error setting config options"); 420 rados_shutdown(s->cluster); 421 return r; 422 } 423 } 424 425 r = rados_connect(s->cluster); 426 if (r < 0) { 427 error_report("error connecting"); 428 rados_shutdown(s->cluster); 429 return r; 430 } 431 432 r = rados_ioctx_create(s->cluster, pool, &s->io_ctx); 433 if (r < 0) { 434 error_report("error opening pool %s", pool); 435 rados_shutdown(s->cluster); 436 return r; 437 } 438 439 r = rbd_open(s->io_ctx, s->name, &s->image, s->snap); 440 if (r < 0) { 441 error_report("error reading header from %s", s->name); 442 rados_ioctx_destroy(s->io_ctx); 443 rados_shutdown(s->cluster); 444 return r; 445 } 446 447 bs->read_only = (s->snap != NULL); 448 449 s->event_reader_pos = 0; 450 r = qemu_pipe(s->fds); 451 if (r < 0) { 452 error_report("error opening eventfd"); 453 goto failed; 454 } 455 fcntl(s->fds[0], F_SETFL, O_NONBLOCK); 456 fcntl(s->fds[1], F_SETFL, O_NONBLOCK); 457 qemu_aio_set_fd_handler(s->fds[RBD_FD_READ], qemu_rbd_aio_event_reader, 458 NULL, qemu_rbd_aio_flush_cb, NULL, s); 459 460 461 return 0; 462 463 failed: 464 rbd_close(s->image); 465 rados_ioctx_destroy(s->io_ctx); 466 rados_shutdown(s->cluster); 467 return r; 468 } 469 470 static void qemu_rbd_close(BlockDriverState *bs) 471 { 472 BDRVRBDState *s = bs->opaque; 473 474 close(s->fds[0]); 475 close(s->fds[1]); 476 qemu_aio_set_fd_handler(s->fds[RBD_FD_READ], NULL , NULL, NULL, NULL, 477 NULL); 478 479 rbd_close(s->image); 480 rados_ioctx_destroy(s->io_ctx); 481 qemu_free(s->snap); 482 rados_shutdown(s->cluster); 483 } 484 485 /* 486 * Cancel aio. Since we don't reference acb in a non qemu threads, 487 * it is safe to access it here. 488 */ 489 static void qemu_rbd_aio_cancel(BlockDriverAIOCB *blockacb) 490 { 491 RBDAIOCB *acb = (RBDAIOCB *) blockacb; 492 acb->cancelled = 1; 493 } 494 495 static AIOPool rbd_aio_pool = { 496 .aiocb_size = sizeof(RBDAIOCB), 497 .cancel = qemu_rbd_aio_cancel, 498 }; 499 500 static int qemu_rbd_send_pipe(BDRVRBDState *s, RADOSCB *rcb) 501 { 502 int ret = 0; 503 while (1) { 504 fd_set wfd; 505 int fd = s->fds[RBD_FD_WRITE]; 506 507 /* send the op pointer to the qemu thread that is responsible 508 for the aio/op completion. Must do it in a qemu thread context */ 509 ret = write(fd, (void *)&rcb, sizeof(rcb)); 510 if (ret >= 0) { 511 break; 512 } 513 if (errno == EINTR) { 514 continue; 515 } 516 if (errno != EAGAIN) { 517 break; 518 } 519 520 FD_ZERO(&wfd); 521 FD_SET(fd, &wfd); 522 do { 523 ret = select(fd + 1, NULL, &wfd, NULL, NULL); 524 } while (ret < 0 && errno == EINTR); 525 } 526 527 return ret; 528 } 529 530 /* 531 * This is the callback function for rbd_aio_read and _write 532 * 533 * Note: this function is being called from a non qemu thread so 534 * we need to be careful about what we do here. Generally we only 535 * write to the block notification pipe, and do the rest of the 536 * io completion handling from qemu_rbd_aio_event_reader() which 537 * runs in a qemu context. 538 */ 539 static void rbd_finish_aiocb(rbd_completion_t c, RADOSCB *rcb) 540 { 541 int ret; 542 rcb->ret = rbd_aio_get_return_value(c); 543 rbd_aio_release(c); 544 ret = qemu_rbd_send_pipe(rcb->s, rcb); 545 if (ret < 0) { 546 error_report("failed writing to acb->s->fds"); 547 qemu_free(rcb); 548 } 549 } 550 551 /* Callback when all queued rbd_aio requests are complete */ 552 553 static void rbd_aio_bh_cb(void *opaque) 554 { 555 RBDAIOCB *acb = opaque; 556 557 if (!acb->write) { 558 qemu_iovec_from_buffer(acb->qiov, acb->bounce, acb->qiov->size); 559 } 560 qemu_vfree(acb->bounce); 561 acb->common.cb(acb->common.opaque, (acb->ret > 0 ? 0 : acb->ret)); 562 qemu_bh_delete(acb->bh); 563 acb->bh = NULL; 564 565 qemu_aio_release(acb); 566 } 567 568 static BlockDriverAIOCB *rbd_aio_rw_vector(BlockDriverState *bs, 569 int64_t sector_num, 570 QEMUIOVector *qiov, 571 int nb_sectors, 572 BlockDriverCompletionFunc *cb, 573 void *opaque, int write) 574 { 575 RBDAIOCB *acb; 576 RADOSCB *rcb; 577 rbd_completion_t c; 578 int64_t off, size; 579 char *buf; 580 int r; 581 582 BDRVRBDState *s = bs->opaque; 583 584 acb = qemu_aio_get(&rbd_aio_pool, bs, cb, opaque); 585 if (!acb) { 586 return NULL; 587 } 588 acb->write = write; 589 acb->qiov = qiov; 590 acb->bounce = qemu_blockalign(bs, qiov->size); 591 acb->ret = 0; 592 acb->error = 0; 593 acb->s = s; 594 acb->cancelled = 0; 595 acb->bh = NULL; 596 597 if (write) { 598 qemu_iovec_to_buffer(acb->qiov, acb->bounce); 599 } 600 601 buf = acb->bounce; 602 603 off = sector_num * BDRV_SECTOR_SIZE; 604 size = nb_sectors * BDRV_SECTOR_SIZE; 605 606 s->qemu_aio_count++; /* All the RADOSCB */ 607 608 rcb = qemu_malloc(sizeof(RADOSCB)); 609 rcb->done = 0; 610 rcb->acb = acb; 611 rcb->buf = buf; 612 rcb->s = acb->s; 613 rcb->size = size; 614 r = rbd_aio_create_completion(rcb, (rbd_callback_t) rbd_finish_aiocb, &c); 615 if (r < 0) { 616 goto failed; 617 } 618 619 if (write) { 620 r = rbd_aio_write(s->image, off, size, buf, c); 621 } else { 622 r = rbd_aio_read(s->image, off, size, buf, c); 623 } 624 625 if (r < 0) { 626 goto failed; 627 } 628 629 return &acb->common; 630 631 failed: 632 qemu_free(rcb); 633 s->qemu_aio_count--; 634 qemu_aio_release(acb); 635 return NULL; 636 } 637 638 static BlockDriverAIOCB *qemu_rbd_aio_readv(BlockDriverState *bs, 639 int64_t sector_num, 640 QEMUIOVector *qiov, 641 int nb_sectors, 642 BlockDriverCompletionFunc *cb, 643 void *opaque) 644 { 645 return rbd_aio_rw_vector(bs, sector_num, qiov, nb_sectors, cb, opaque, 0); 646 } 647 648 static BlockDriverAIOCB *qemu_rbd_aio_writev(BlockDriverState *bs, 649 int64_t sector_num, 650 QEMUIOVector *qiov, 651 int nb_sectors, 652 BlockDriverCompletionFunc *cb, 653 void *opaque) 654 { 655 return rbd_aio_rw_vector(bs, sector_num, qiov, nb_sectors, cb, opaque, 1); 656 } 657 658 static int qemu_rbd_getinfo(BlockDriverState *bs, BlockDriverInfo *bdi) 659 { 660 BDRVRBDState *s = bs->opaque; 661 rbd_image_info_t info; 662 int r; 663 664 r = rbd_stat(s->image, &info, sizeof(info)); 665 if (r < 0) { 666 return r; 667 } 668 669 bdi->cluster_size = info.obj_size; 670 return 0; 671 } 672 673 static int64_t qemu_rbd_getlength(BlockDriverState *bs) 674 { 675 BDRVRBDState *s = bs->opaque; 676 rbd_image_info_t info; 677 int r; 678 679 r = rbd_stat(s->image, &info, sizeof(info)); 680 if (r < 0) { 681 return r; 682 } 683 684 return info.size; 685 } 686 687 static int qemu_rbd_truncate(BlockDriverState *bs, int64_t offset) 688 { 689 BDRVRBDState *s = bs->opaque; 690 int r; 691 692 r = rbd_resize(s->image, offset); 693 if (r < 0) { 694 return r; 695 } 696 697 return 0; 698 } 699 700 static int qemu_rbd_snap_create(BlockDriverState *bs, 701 QEMUSnapshotInfo *sn_info) 702 { 703 BDRVRBDState *s = bs->opaque; 704 int r; 705 706 if (sn_info->name[0] == '\0') { 707 return -EINVAL; /* we need a name for rbd snapshots */ 708 } 709 710 /* 711 * rbd snapshots are using the name as the user controlled unique identifier 712 * we can't use the rbd snapid for that purpose, as it can't be set 713 */ 714 if (sn_info->id_str[0] != '\0' && 715 strcmp(sn_info->id_str, sn_info->name) != 0) { 716 return -EINVAL; 717 } 718 719 if (strlen(sn_info->name) >= sizeof(sn_info->id_str)) { 720 return -ERANGE; 721 } 722 723 r = rbd_snap_create(s->image, sn_info->name); 724 if (r < 0) { 725 error_report("failed to create snap: %s", strerror(-r)); 726 return r; 727 } 728 729 return 0; 730 } 731 732 static int qemu_rbd_snap_list(BlockDriverState *bs, 733 QEMUSnapshotInfo **psn_tab) 734 { 735 BDRVRBDState *s = bs->opaque; 736 QEMUSnapshotInfo *sn_info, *sn_tab = NULL; 737 int i, snap_count; 738 rbd_snap_info_t *snaps; 739 int max_snaps = RBD_MAX_SNAPS; 740 741 do { 742 snaps = qemu_malloc(sizeof(*snaps) * max_snaps); 743 snap_count = rbd_snap_list(s->image, snaps, &max_snaps); 744 if (snap_count < 0) { 745 qemu_free(snaps); 746 } 747 } while (snap_count == -ERANGE); 748 749 if (snap_count <= 0) { 750 return snap_count; 751 } 752 753 sn_tab = qemu_mallocz(snap_count * sizeof(QEMUSnapshotInfo)); 754 755 for (i = 0; i < snap_count; i++) { 756 const char *snap_name = snaps[i].name; 757 758 sn_info = sn_tab + i; 759 pstrcpy(sn_info->id_str, sizeof(sn_info->id_str), snap_name); 760 pstrcpy(sn_info->name, sizeof(sn_info->name), snap_name); 761 762 sn_info->vm_state_size = snaps[i].size; 763 sn_info->date_sec = 0; 764 sn_info->date_nsec = 0; 765 sn_info->vm_clock_nsec = 0; 766 } 767 rbd_snap_list_end(snaps); 768 769 *psn_tab = sn_tab; 770 return snap_count; 771 } 772 773 static QEMUOptionParameter qemu_rbd_create_options[] = { 774 { 775 .name = BLOCK_OPT_SIZE, 776 .type = OPT_SIZE, 777 .help = "Virtual disk size" 778 }, 779 { 780 .name = BLOCK_OPT_CLUSTER_SIZE, 781 .type = OPT_SIZE, 782 .help = "RBD object size" 783 }, 784 {NULL} 785 }; 786 787 static BlockDriver bdrv_rbd = { 788 .format_name = "rbd", 789 .instance_size = sizeof(BDRVRBDState), 790 .bdrv_file_open = qemu_rbd_open, 791 .bdrv_close = qemu_rbd_close, 792 .bdrv_create = qemu_rbd_create, 793 .bdrv_get_info = qemu_rbd_getinfo, 794 .create_options = qemu_rbd_create_options, 795 .bdrv_getlength = qemu_rbd_getlength, 796 .bdrv_truncate = qemu_rbd_truncate, 797 .protocol_name = "rbd", 798 799 .bdrv_aio_readv = qemu_rbd_aio_readv, 800 .bdrv_aio_writev = qemu_rbd_aio_writev, 801 802 .bdrv_snapshot_create = qemu_rbd_snap_create, 803 .bdrv_snapshot_list = qemu_rbd_snap_list, 804 }; 805 806 static void bdrv_rbd_init(void) 807 { 808 bdrv_register(&bdrv_rbd); 809 } 810 811 block_init(bdrv_rbd_init); 812