1 /* 2 * QEMU Block driver for RADOS (Ceph) 3 * 4 * Copyright (C) 2010-2011 Christian Brunner <chb@muc.de>, 5 * Josh Durgin <josh.durgin@dreamhost.com> 6 * 7 * This work is licensed under the terms of the GNU GPL, version 2. See 8 * the COPYING file in the top-level directory. 9 * 10 * Contributions after 2012-01-13 are licensed under the terms of the 11 * GNU GPL, version 2 or (at your option) any later version. 12 */ 13 14 #include <inttypes.h> 15 16 #include "qemu-common.h" 17 #include "qemu/error-report.h" 18 #include "block/block_int.h" 19 20 #include <rbd/librbd.h> 21 22 /* 23 * When specifying the image filename use: 24 * 25 * rbd:poolname/devicename[@snapshotname][:option1=value1[:option2=value2...]] 26 * 27 * poolname must be the name of an existing rados pool. 28 * 29 * devicename is the name of the rbd image. 30 * 31 * Each option given is used to configure rados, and may be any valid 32 * Ceph option, "id", or "conf". 33 * 34 * The "id" option indicates what user we should authenticate as to 35 * the Ceph cluster. If it is excluded we will use the Ceph default 36 * (normally 'admin'). 37 * 38 * The "conf" option specifies a Ceph configuration file to read. If 39 * it is not specified, we will read from the default Ceph locations 40 * (e.g., /etc/ceph/ceph.conf). To avoid reading _any_ configuration 41 * file, specify conf=/dev/null. 42 * 43 * Configuration values containing :, @, or = can be escaped with a 44 * leading "\". 45 */ 46 47 /* rbd_aio_discard added in 0.1.2 */ 48 #if LIBRBD_VERSION_CODE >= LIBRBD_VERSION(0, 1, 2) 49 #define LIBRBD_SUPPORTS_DISCARD 50 #else 51 #undef LIBRBD_SUPPORTS_DISCARD 52 #endif 53 54 #define OBJ_MAX_SIZE (1UL << OBJ_DEFAULT_OBJ_ORDER) 55 56 #define RBD_MAX_CONF_NAME_SIZE 128 57 #define RBD_MAX_CONF_VAL_SIZE 512 58 #define RBD_MAX_CONF_SIZE 1024 59 #define RBD_MAX_POOL_NAME_SIZE 128 60 #define RBD_MAX_SNAP_NAME_SIZE 128 61 #define RBD_MAX_SNAPS 100 62 63 typedef enum { 64 RBD_AIO_READ, 65 RBD_AIO_WRITE, 66 RBD_AIO_DISCARD, 67 RBD_AIO_FLUSH 68 } RBDAIOCmd; 69 70 typedef struct RBDAIOCB { 71 BlockDriverAIOCB common; 72 QEMUBH *bh; 73 int64_t ret; 74 QEMUIOVector *qiov; 75 char *bounce; 76 RBDAIOCmd cmd; 77 int64_t sector_num; 78 int error; 79 struct BDRVRBDState *s; 80 int cancelled; 81 int status; 82 } RBDAIOCB; 83 84 typedef struct RADOSCB { 85 int rcbid; 86 RBDAIOCB *acb; 87 struct BDRVRBDState *s; 88 int done; 89 int64_t size; 90 char *buf; 91 int64_t ret; 92 } RADOSCB; 93 94 #define RBD_FD_READ 0 95 #define RBD_FD_WRITE 1 96 97 typedef struct BDRVRBDState { 98 int fds[2]; 99 rados_t cluster; 100 rados_ioctx_t io_ctx; 101 rbd_image_t image; 102 char name[RBD_MAX_IMAGE_NAME_SIZE]; 103 char *snap; 104 int event_reader_pos; 105 RADOSCB *event_rcb; 106 } BDRVRBDState; 107 108 static void rbd_aio_bh_cb(void *opaque); 109 110 static int qemu_rbd_next_tok(char *dst, int dst_len, 111 char *src, char delim, 112 const char *name, 113 char **p) 114 { 115 int l; 116 char *end; 117 118 *p = NULL; 119 120 if (delim != '\0') { 121 for (end = src; *end; ++end) { 122 if (*end == delim) { 123 break; 124 } 125 if (*end == '\\' && end[1] != '\0') { 126 end++; 127 } 128 } 129 if (*end == delim) { 130 *p = end + 1; 131 *end = '\0'; 132 } 133 } 134 l = strlen(src); 135 if (l >= dst_len) { 136 error_report("%s too long", name); 137 return -EINVAL; 138 } else if (l == 0) { 139 error_report("%s too short", name); 140 return -EINVAL; 141 } 142 143 pstrcpy(dst, dst_len, src); 144 145 return 0; 146 } 147 148 static void qemu_rbd_unescape(char *src) 149 { 150 char *p; 151 152 for (p = src; *src; ++src, ++p) { 153 if (*src == '\\' && src[1] != '\0') { 154 src++; 155 } 156 *p = *src; 157 } 158 *p = '\0'; 159 } 160 161 static int qemu_rbd_parsename(const char *filename, 162 char *pool, int pool_len, 163 char *snap, int snap_len, 164 char *name, int name_len, 165 char *conf, int conf_len) 166 { 167 const char *start; 168 char *p, *buf; 169 int ret; 170 171 if (!strstart(filename, "rbd:", &start)) { 172 return -EINVAL; 173 } 174 175 buf = g_strdup(start); 176 p = buf; 177 *snap = '\0'; 178 *conf = '\0'; 179 180 ret = qemu_rbd_next_tok(pool, pool_len, p, '/', "pool name", &p); 181 if (ret < 0 || !p) { 182 ret = -EINVAL; 183 goto done; 184 } 185 qemu_rbd_unescape(pool); 186 187 if (strchr(p, '@')) { 188 ret = qemu_rbd_next_tok(name, name_len, p, '@', "object name", &p); 189 if (ret < 0) { 190 goto done; 191 } 192 ret = qemu_rbd_next_tok(snap, snap_len, p, ':', "snap name", &p); 193 qemu_rbd_unescape(snap); 194 } else { 195 ret = qemu_rbd_next_tok(name, name_len, p, ':', "object name", &p); 196 } 197 qemu_rbd_unescape(name); 198 if (ret < 0 || !p) { 199 goto done; 200 } 201 202 ret = qemu_rbd_next_tok(conf, conf_len, p, '\0', "configuration", &p); 203 204 done: 205 g_free(buf); 206 return ret; 207 } 208 209 static char *qemu_rbd_parse_clientname(const char *conf, char *clientname) 210 { 211 const char *p = conf; 212 213 while (*p) { 214 int len; 215 const char *end = strchr(p, ':'); 216 217 if (end) { 218 len = end - p; 219 } else { 220 len = strlen(p); 221 } 222 223 if (strncmp(p, "id=", 3) == 0) { 224 len -= 3; 225 strncpy(clientname, p + 3, len); 226 clientname[len] = '\0'; 227 return clientname; 228 } 229 if (end == NULL) { 230 break; 231 } 232 p = end + 1; 233 } 234 return NULL; 235 } 236 237 static int qemu_rbd_set_conf(rados_t cluster, const char *conf) 238 { 239 char *p, *buf; 240 char name[RBD_MAX_CONF_NAME_SIZE]; 241 char value[RBD_MAX_CONF_VAL_SIZE]; 242 int ret = 0; 243 244 buf = g_strdup(conf); 245 p = buf; 246 247 while (p) { 248 ret = qemu_rbd_next_tok(name, sizeof(name), p, 249 '=', "conf option name", &p); 250 if (ret < 0) { 251 break; 252 } 253 qemu_rbd_unescape(name); 254 255 if (!p) { 256 error_report("conf option %s has no value", name); 257 ret = -EINVAL; 258 break; 259 } 260 261 ret = qemu_rbd_next_tok(value, sizeof(value), p, 262 ':', "conf option value", &p); 263 if (ret < 0) { 264 break; 265 } 266 qemu_rbd_unescape(value); 267 268 if (strcmp(name, "conf") == 0) { 269 ret = rados_conf_read_file(cluster, value); 270 if (ret < 0) { 271 error_report("error reading conf file %s", value); 272 break; 273 } 274 } else if (strcmp(name, "id") == 0) { 275 /* ignore, this is parsed by qemu_rbd_parse_clientname() */ 276 } else { 277 ret = rados_conf_set(cluster, name, value); 278 if (ret < 0) { 279 error_report("invalid conf option %s", name); 280 ret = -EINVAL; 281 break; 282 } 283 } 284 } 285 286 g_free(buf); 287 return ret; 288 } 289 290 static int qemu_rbd_create(const char *filename, QEMUOptionParameter *options) 291 { 292 int64_t bytes = 0; 293 int64_t objsize; 294 int obj_order = 0; 295 char pool[RBD_MAX_POOL_NAME_SIZE]; 296 char name[RBD_MAX_IMAGE_NAME_SIZE]; 297 char snap_buf[RBD_MAX_SNAP_NAME_SIZE]; 298 char conf[RBD_MAX_CONF_SIZE]; 299 char clientname_buf[RBD_MAX_CONF_SIZE]; 300 char *clientname; 301 rados_t cluster; 302 rados_ioctx_t io_ctx; 303 int ret; 304 305 if (qemu_rbd_parsename(filename, pool, sizeof(pool), 306 snap_buf, sizeof(snap_buf), 307 name, sizeof(name), 308 conf, sizeof(conf)) < 0) { 309 return -EINVAL; 310 } 311 312 /* Read out options */ 313 while (options && options->name) { 314 if (!strcmp(options->name, BLOCK_OPT_SIZE)) { 315 bytes = options->value.n; 316 } else if (!strcmp(options->name, BLOCK_OPT_CLUSTER_SIZE)) { 317 if (options->value.n) { 318 objsize = options->value.n; 319 if ((objsize - 1) & objsize) { /* not a power of 2? */ 320 error_report("obj size needs to be power of 2"); 321 return -EINVAL; 322 } 323 if (objsize < 4096) { 324 error_report("obj size too small"); 325 return -EINVAL; 326 } 327 obj_order = ffs(objsize) - 1; 328 } 329 } 330 options++; 331 } 332 333 clientname = qemu_rbd_parse_clientname(conf, clientname_buf); 334 if (rados_create(&cluster, clientname) < 0) { 335 error_report("error initializing"); 336 return -EIO; 337 } 338 339 if (strstr(conf, "conf=") == NULL) { 340 /* try default location, but ignore failure */ 341 rados_conf_read_file(cluster, NULL); 342 } 343 344 if (conf[0] != '\0' && 345 qemu_rbd_set_conf(cluster, conf) < 0) { 346 error_report("error setting config options"); 347 rados_shutdown(cluster); 348 return -EIO; 349 } 350 351 if (rados_connect(cluster) < 0) { 352 error_report("error connecting"); 353 rados_shutdown(cluster); 354 return -EIO; 355 } 356 357 if (rados_ioctx_create(cluster, pool, &io_ctx) < 0) { 358 error_report("error opening pool %s", pool); 359 rados_shutdown(cluster); 360 return -EIO; 361 } 362 363 ret = rbd_create(io_ctx, name, bytes, &obj_order); 364 rados_ioctx_destroy(io_ctx); 365 rados_shutdown(cluster); 366 367 return ret; 368 } 369 370 /* 371 * This aio completion is being called from qemu_rbd_aio_event_reader() 372 * and runs in qemu context. It schedules a bh, but just in case the aio 373 * was not cancelled before. 374 */ 375 static void qemu_rbd_complete_aio(RADOSCB *rcb) 376 { 377 RBDAIOCB *acb = rcb->acb; 378 int64_t r; 379 380 r = rcb->ret; 381 382 if (acb->cmd != RBD_AIO_READ) { 383 if (r < 0) { 384 acb->ret = r; 385 acb->error = 1; 386 } else if (!acb->error) { 387 acb->ret = rcb->size; 388 } 389 } else { 390 if (r < 0) { 391 memset(rcb->buf, 0, rcb->size); 392 acb->ret = r; 393 acb->error = 1; 394 } else if (r < rcb->size) { 395 memset(rcb->buf + r, 0, rcb->size - r); 396 if (!acb->error) { 397 acb->ret = rcb->size; 398 } 399 } else if (!acb->error) { 400 acb->ret = r; 401 } 402 } 403 /* Note that acb->bh can be NULL in case where the aio was cancelled */ 404 acb->bh = qemu_bh_new(rbd_aio_bh_cb, acb); 405 qemu_bh_schedule(acb->bh); 406 g_free(rcb); 407 } 408 409 /* 410 * aio fd read handler. It runs in the qemu context and calls the 411 * completion handling of completed rados aio operations. 412 */ 413 static void qemu_rbd_aio_event_reader(void *opaque) 414 { 415 BDRVRBDState *s = opaque; 416 417 ssize_t ret; 418 419 do { 420 char *p = (char *)&s->event_rcb; 421 422 /* now read the rcb pointer that was sent from a non qemu thread */ 423 ret = read(s->fds[RBD_FD_READ], p + s->event_reader_pos, 424 sizeof(s->event_rcb) - s->event_reader_pos); 425 if (ret > 0) { 426 s->event_reader_pos += ret; 427 if (s->event_reader_pos == sizeof(s->event_rcb)) { 428 s->event_reader_pos = 0; 429 qemu_rbd_complete_aio(s->event_rcb); 430 } 431 } 432 } while (ret < 0 && errno == EINTR); 433 } 434 435 /* TODO Convert to fine grained options */ 436 static QemuOptsList runtime_opts = { 437 .name = "rbd", 438 .head = QTAILQ_HEAD_INITIALIZER(runtime_opts.head), 439 .desc = { 440 { 441 .name = "filename", 442 .type = QEMU_OPT_STRING, 443 .help = "Specification of the rbd image", 444 }, 445 { /* end of list */ } 446 }, 447 }; 448 449 static int qemu_rbd_open(BlockDriverState *bs, QDict *options, int flags) 450 { 451 BDRVRBDState *s = bs->opaque; 452 char pool[RBD_MAX_POOL_NAME_SIZE]; 453 char snap_buf[RBD_MAX_SNAP_NAME_SIZE]; 454 char conf[RBD_MAX_CONF_SIZE]; 455 char clientname_buf[RBD_MAX_CONF_SIZE]; 456 char *clientname; 457 QemuOpts *opts; 458 Error *local_err = NULL; 459 const char *filename; 460 int r; 461 462 opts = qemu_opts_create_nofail(&runtime_opts); 463 qemu_opts_absorb_qdict(opts, options, &local_err); 464 if (error_is_set(&local_err)) { 465 qerror_report_err(local_err); 466 error_free(local_err); 467 qemu_opts_del(opts); 468 return -EINVAL; 469 } 470 471 filename = qemu_opt_get(opts, "filename"); 472 473 if (qemu_rbd_parsename(filename, pool, sizeof(pool), 474 snap_buf, sizeof(snap_buf), 475 s->name, sizeof(s->name), 476 conf, sizeof(conf)) < 0) { 477 r = -EINVAL; 478 goto failed_opts; 479 } 480 481 clientname = qemu_rbd_parse_clientname(conf, clientname_buf); 482 r = rados_create(&s->cluster, clientname); 483 if (r < 0) { 484 error_report("error initializing"); 485 goto failed_opts; 486 } 487 488 s->snap = NULL; 489 if (snap_buf[0] != '\0') { 490 s->snap = g_strdup(snap_buf); 491 } 492 493 /* 494 * Fallback to more conservative semantics if setting cache 495 * options fails. Ignore errors from setting rbd_cache because the 496 * only possible error is that the option does not exist, and 497 * librbd defaults to no caching. If write through caching cannot 498 * be set up, fall back to no caching. 499 */ 500 if (flags & BDRV_O_NOCACHE) { 501 rados_conf_set(s->cluster, "rbd_cache", "false"); 502 } else { 503 rados_conf_set(s->cluster, "rbd_cache", "true"); 504 } 505 506 if (strstr(conf, "conf=") == NULL) { 507 /* try default location, but ignore failure */ 508 rados_conf_read_file(s->cluster, NULL); 509 } 510 511 if (conf[0] != '\0') { 512 r = qemu_rbd_set_conf(s->cluster, conf); 513 if (r < 0) { 514 error_report("error setting config options"); 515 goto failed_shutdown; 516 } 517 } 518 519 r = rados_connect(s->cluster); 520 if (r < 0) { 521 error_report("error connecting"); 522 goto failed_shutdown; 523 } 524 525 r = rados_ioctx_create(s->cluster, pool, &s->io_ctx); 526 if (r < 0) { 527 error_report("error opening pool %s", pool); 528 goto failed_shutdown; 529 } 530 531 r = rbd_open(s->io_ctx, s->name, &s->image, s->snap); 532 if (r < 0) { 533 error_report("error reading header from %s", s->name); 534 goto failed_open; 535 } 536 537 bs->read_only = (s->snap != NULL); 538 539 s->event_reader_pos = 0; 540 r = qemu_pipe(s->fds); 541 if (r < 0) { 542 error_report("error opening eventfd"); 543 goto failed; 544 } 545 fcntl(s->fds[0], F_SETFL, O_NONBLOCK); 546 fcntl(s->fds[1], F_SETFL, O_NONBLOCK); 547 qemu_aio_set_fd_handler(s->fds[RBD_FD_READ], qemu_rbd_aio_event_reader, 548 NULL, s); 549 550 551 qemu_opts_del(opts); 552 return 0; 553 554 failed: 555 rbd_close(s->image); 556 failed_open: 557 rados_ioctx_destroy(s->io_ctx); 558 failed_shutdown: 559 rados_shutdown(s->cluster); 560 g_free(s->snap); 561 failed_opts: 562 qemu_opts_del(opts); 563 return r; 564 } 565 566 static void qemu_rbd_close(BlockDriverState *bs) 567 { 568 BDRVRBDState *s = bs->opaque; 569 570 close(s->fds[0]); 571 close(s->fds[1]); 572 qemu_aio_set_fd_handler(s->fds[RBD_FD_READ], NULL, NULL, NULL); 573 574 rbd_close(s->image); 575 rados_ioctx_destroy(s->io_ctx); 576 g_free(s->snap); 577 rados_shutdown(s->cluster); 578 } 579 580 /* 581 * Cancel aio. Since we don't reference acb in a non qemu threads, 582 * it is safe to access it here. 583 */ 584 static void qemu_rbd_aio_cancel(BlockDriverAIOCB *blockacb) 585 { 586 RBDAIOCB *acb = (RBDAIOCB *) blockacb; 587 acb->cancelled = 1; 588 589 while (acb->status == -EINPROGRESS) { 590 qemu_aio_wait(); 591 } 592 593 qemu_aio_release(acb); 594 } 595 596 static const AIOCBInfo rbd_aiocb_info = { 597 .aiocb_size = sizeof(RBDAIOCB), 598 .cancel = qemu_rbd_aio_cancel, 599 }; 600 601 static int qemu_rbd_send_pipe(BDRVRBDState *s, RADOSCB *rcb) 602 { 603 int ret = 0; 604 while (1) { 605 fd_set wfd; 606 int fd = s->fds[RBD_FD_WRITE]; 607 608 /* send the op pointer to the qemu thread that is responsible 609 for the aio/op completion. Must do it in a qemu thread context */ 610 ret = write(fd, (void *)&rcb, sizeof(rcb)); 611 if (ret >= 0) { 612 break; 613 } 614 if (errno == EINTR) { 615 continue; 616 } 617 if (errno != EAGAIN) { 618 break; 619 } 620 621 FD_ZERO(&wfd); 622 FD_SET(fd, &wfd); 623 do { 624 ret = select(fd + 1, NULL, &wfd, NULL, NULL); 625 } while (ret < 0 && errno == EINTR); 626 } 627 628 return ret; 629 } 630 631 /* 632 * This is the callback function for rbd_aio_read and _write 633 * 634 * Note: this function is being called from a non qemu thread so 635 * we need to be careful about what we do here. Generally we only 636 * write to the block notification pipe, and do the rest of the 637 * io completion handling from qemu_rbd_aio_event_reader() which 638 * runs in a qemu context. 639 */ 640 static void rbd_finish_aiocb(rbd_completion_t c, RADOSCB *rcb) 641 { 642 int ret; 643 rcb->ret = rbd_aio_get_return_value(c); 644 rbd_aio_release(c); 645 ret = qemu_rbd_send_pipe(rcb->s, rcb); 646 if (ret < 0) { 647 error_report("failed writing to acb->s->fds"); 648 g_free(rcb); 649 } 650 } 651 652 /* Callback when all queued rbd_aio requests are complete */ 653 654 static void rbd_aio_bh_cb(void *opaque) 655 { 656 RBDAIOCB *acb = opaque; 657 658 if (acb->cmd == RBD_AIO_READ) { 659 qemu_iovec_from_buf(acb->qiov, 0, acb->bounce, acb->qiov->size); 660 } 661 qemu_vfree(acb->bounce); 662 acb->common.cb(acb->common.opaque, (acb->ret > 0 ? 0 : acb->ret)); 663 qemu_bh_delete(acb->bh); 664 acb->bh = NULL; 665 acb->status = 0; 666 667 if (!acb->cancelled) { 668 qemu_aio_release(acb); 669 } 670 } 671 672 static int rbd_aio_discard_wrapper(rbd_image_t image, 673 uint64_t off, 674 uint64_t len, 675 rbd_completion_t comp) 676 { 677 #ifdef LIBRBD_SUPPORTS_DISCARD 678 return rbd_aio_discard(image, off, len, comp); 679 #else 680 return -ENOTSUP; 681 #endif 682 } 683 684 static int rbd_aio_flush_wrapper(rbd_image_t image, 685 rbd_completion_t comp) 686 { 687 #ifdef LIBRBD_SUPPORTS_AIO_FLUSH 688 return rbd_aio_flush(image, comp); 689 #else 690 return -ENOTSUP; 691 #endif 692 } 693 694 static BlockDriverAIOCB *rbd_start_aio(BlockDriverState *bs, 695 int64_t sector_num, 696 QEMUIOVector *qiov, 697 int nb_sectors, 698 BlockDriverCompletionFunc *cb, 699 void *opaque, 700 RBDAIOCmd cmd) 701 { 702 RBDAIOCB *acb; 703 RADOSCB *rcb; 704 rbd_completion_t c; 705 int64_t off, size; 706 char *buf; 707 int r; 708 709 BDRVRBDState *s = bs->opaque; 710 711 acb = qemu_aio_get(&rbd_aiocb_info, bs, cb, opaque); 712 acb->cmd = cmd; 713 acb->qiov = qiov; 714 if (cmd == RBD_AIO_DISCARD || cmd == RBD_AIO_FLUSH) { 715 acb->bounce = NULL; 716 } else { 717 acb->bounce = qemu_blockalign(bs, qiov->size); 718 } 719 acb->ret = 0; 720 acb->error = 0; 721 acb->s = s; 722 acb->cancelled = 0; 723 acb->bh = NULL; 724 acb->status = -EINPROGRESS; 725 726 if (cmd == RBD_AIO_WRITE) { 727 qemu_iovec_to_buf(acb->qiov, 0, acb->bounce, qiov->size); 728 } 729 730 buf = acb->bounce; 731 732 off = sector_num * BDRV_SECTOR_SIZE; 733 size = nb_sectors * BDRV_SECTOR_SIZE; 734 735 rcb = g_malloc(sizeof(RADOSCB)); 736 rcb->done = 0; 737 rcb->acb = acb; 738 rcb->buf = buf; 739 rcb->s = acb->s; 740 rcb->size = size; 741 r = rbd_aio_create_completion(rcb, (rbd_callback_t) rbd_finish_aiocb, &c); 742 if (r < 0) { 743 goto failed; 744 } 745 746 switch (cmd) { 747 case RBD_AIO_WRITE: 748 r = rbd_aio_write(s->image, off, size, buf, c); 749 break; 750 case RBD_AIO_READ: 751 r = rbd_aio_read(s->image, off, size, buf, c); 752 break; 753 case RBD_AIO_DISCARD: 754 r = rbd_aio_discard_wrapper(s->image, off, size, c); 755 break; 756 case RBD_AIO_FLUSH: 757 r = rbd_aio_flush_wrapper(s->image, c); 758 break; 759 default: 760 r = -EINVAL; 761 } 762 763 if (r < 0) { 764 goto failed; 765 } 766 767 return &acb->common; 768 769 failed: 770 g_free(rcb); 771 qemu_aio_release(acb); 772 return NULL; 773 } 774 775 static BlockDriverAIOCB *qemu_rbd_aio_readv(BlockDriverState *bs, 776 int64_t sector_num, 777 QEMUIOVector *qiov, 778 int nb_sectors, 779 BlockDriverCompletionFunc *cb, 780 void *opaque) 781 { 782 return rbd_start_aio(bs, sector_num, qiov, nb_sectors, cb, opaque, 783 RBD_AIO_READ); 784 } 785 786 static BlockDriverAIOCB *qemu_rbd_aio_writev(BlockDriverState *bs, 787 int64_t sector_num, 788 QEMUIOVector *qiov, 789 int nb_sectors, 790 BlockDriverCompletionFunc *cb, 791 void *opaque) 792 { 793 return rbd_start_aio(bs, sector_num, qiov, nb_sectors, cb, opaque, 794 RBD_AIO_WRITE); 795 } 796 797 #ifdef LIBRBD_SUPPORTS_AIO_FLUSH 798 static BlockDriverAIOCB *qemu_rbd_aio_flush(BlockDriverState *bs, 799 BlockDriverCompletionFunc *cb, 800 void *opaque) 801 { 802 return rbd_start_aio(bs, 0, NULL, 0, cb, opaque, RBD_AIO_FLUSH); 803 } 804 805 #else 806 807 static int qemu_rbd_co_flush(BlockDriverState *bs) 808 { 809 #if LIBRBD_VERSION_CODE >= LIBRBD_VERSION(0, 1, 1) 810 /* rbd_flush added in 0.1.1 */ 811 BDRVRBDState *s = bs->opaque; 812 return rbd_flush(s->image); 813 #else 814 return 0; 815 #endif 816 } 817 #endif 818 819 static int qemu_rbd_getinfo(BlockDriverState *bs, BlockDriverInfo *bdi) 820 { 821 BDRVRBDState *s = bs->opaque; 822 rbd_image_info_t info; 823 int r; 824 825 r = rbd_stat(s->image, &info, sizeof(info)); 826 if (r < 0) { 827 return r; 828 } 829 830 bdi->cluster_size = info.obj_size; 831 return 0; 832 } 833 834 static int64_t qemu_rbd_getlength(BlockDriverState *bs) 835 { 836 BDRVRBDState *s = bs->opaque; 837 rbd_image_info_t info; 838 int r; 839 840 r = rbd_stat(s->image, &info, sizeof(info)); 841 if (r < 0) { 842 return r; 843 } 844 845 return info.size; 846 } 847 848 static int qemu_rbd_truncate(BlockDriverState *bs, int64_t offset) 849 { 850 BDRVRBDState *s = bs->opaque; 851 int r; 852 853 r = rbd_resize(s->image, offset); 854 if (r < 0) { 855 return r; 856 } 857 858 return 0; 859 } 860 861 static int qemu_rbd_snap_create(BlockDriverState *bs, 862 QEMUSnapshotInfo *sn_info) 863 { 864 BDRVRBDState *s = bs->opaque; 865 int r; 866 867 if (sn_info->name[0] == '\0') { 868 return -EINVAL; /* we need a name for rbd snapshots */ 869 } 870 871 /* 872 * rbd snapshots are using the name as the user controlled unique identifier 873 * we can't use the rbd snapid for that purpose, as it can't be set 874 */ 875 if (sn_info->id_str[0] != '\0' && 876 strcmp(sn_info->id_str, sn_info->name) != 0) { 877 return -EINVAL; 878 } 879 880 if (strlen(sn_info->name) >= sizeof(sn_info->id_str)) { 881 return -ERANGE; 882 } 883 884 r = rbd_snap_create(s->image, sn_info->name); 885 if (r < 0) { 886 error_report("failed to create snap: %s", strerror(-r)); 887 return r; 888 } 889 890 return 0; 891 } 892 893 static int qemu_rbd_snap_remove(BlockDriverState *bs, 894 const char *snapshot_name) 895 { 896 BDRVRBDState *s = bs->opaque; 897 int r; 898 899 r = rbd_snap_remove(s->image, snapshot_name); 900 return r; 901 } 902 903 static int qemu_rbd_snap_rollback(BlockDriverState *bs, 904 const char *snapshot_name) 905 { 906 BDRVRBDState *s = bs->opaque; 907 int r; 908 909 r = rbd_snap_rollback(s->image, snapshot_name); 910 return r; 911 } 912 913 static int qemu_rbd_snap_list(BlockDriverState *bs, 914 QEMUSnapshotInfo **psn_tab) 915 { 916 BDRVRBDState *s = bs->opaque; 917 QEMUSnapshotInfo *sn_info, *sn_tab = NULL; 918 int i, snap_count; 919 rbd_snap_info_t *snaps; 920 int max_snaps = RBD_MAX_SNAPS; 921 922 do { 923 snaps = g_malloc(sizeof(*snaps) * max_snaps); 924 snap_count = rbd_snap_list(s->image, snaps, &max_snaps); 925 if (snap_count < 0) { 926 g_free(snaps); 927 } 928 } while (snap_count == -ERANGE); 929 930 if (snap_count <= 0) { 931 goto done; 932 } 933 934 sn_tab = g_malloc0(snap_count * sizeof(QEMUSnapshotInfo)); 935 936 for (i = 0; i < snap_count; i++) { 937 const char *snap_name = snaps[i].name; 938 939 sn_info = sn_tab + i; 940 pstrcpy(sn_info->id_str, sizeof(sn_info->id_str), snap_name); 941 pstrcpy(sn_info->name, sizeof(sn_info->name), snap_name); 942 943 sn_info->vm_state_size = snaps[i].size; 944 sn_info->date_sec = 0; 945 sn_info->date_nsec = 0; 946 sn_info->vm_clock_nsec = 0; 947 } 948 rbd_snap_list_end(snaps); 949 950 done: 951 *psn_tab = sn_tab; 952 return snap_count; 953 } 954 955 #ifdef LIBRBD_SUPPORTS_DISCARD 956 static BlockDriverAIOCB* qemu_rbd_aio_discard(BlockDriverState *bs, 957 int64_t sector_num, 958 int nb_sectors, 959 BlockDriverCompletionFunc *cb, 960 void *opaque) 961 { 962 return rbd_start_aio(bs, sector_num, NULL, nb_sectors, cb, opaque, 963 RBD_AIO_DISCARD); 964 } 965 #endif 966 967 static QEMUOptionParameter qemu_rbd_create_options[] = { 968 { 969 .name = BLOCK_OPT_SIZE, 970 .type = OPT_SIZE, 971 .help = "Virtual disk size" 972 }, 973 { 974 .name = BLOCK_OPT_CLUSTER_SIZE, 975 .type = OPT_SIZE, 976 .help = "RBD object size" 977 }, 978 {NULL} 979 }; 980 981 static BlockDriver bdrv_rbd = { 982 .format_name = "rbd", 983 .instance_size = sizeof(BDRVRBDState), 984 .bdrv_file_open = qemu_rbd_open, 985 .bdrv_close = qemu_rbd_close, 986 .bdrv_create = qemu_rbd_create, 987 .bdrv_has_zero_init = bdrv_has_zero_init_1, 988 .bdrv_get_info = qemu_rbd_getinfo, 989 .create_options = qemu_rbd_create_options, 990 .bdrv_getlength = qemu_rbd_getlength, 991 .bdrv_truncate = qemu_rbd_truncate, 992 .protocol_name = "rbd", 993 994 .bdrv_aio_readv = qemu_rbd_aio_readv, 995 .bdrv_aio_writev = qemu_rbd_aio_writev, 996 997 #ifdef LIBRBD_SUPPORTS_AIO_FLUSH 998 .bdrv_aio_flush = qemu_rbd_aio_flush, 999 #else 1000 .bdrv_co_flush_to_disk = qemu_rbd_co_flush, 1001 #endif 1002 1003 #ifdef LIBRBD_SUPPORTS_DISCARD 1004 .bdrv_aio_discard = qemu_rbd_aio_discard, 1005 #endif 1006 1007 .bdrv_snapshot_create = qemu_rbd_snap_create, 1008 .bdrv_snapshot_delete = qemu_rbd_snap_remove, 1009 .bdrv_snapshot_list = qemu_rbd_snap_list, 1010 .bdrv_snapshot_goto = qemu_rbd_snap_rollback, 1011 }; 1012 1013 static void bdrv_rbd_init(void) 1014 { 1015 bdrv_register(&bdrv_rbd); 1016 } 1017 1018 block_init(bdrv_rbd_init); 1019