1 /* 2 * QEMU Block driver for RADOS (Ceph) 3 * 4 * Copyright (C) 2010-2011 Christian Brunner <chb@muc.de>, 5 * Josh Durgin <josh.durgin@dreamhost.com> 6 * 7 * This work is licensed under the terms of the GNU GPL, version 2. See 8 * the COPYING file in the top-level directory. 9 * 10 * Contributions after 2012-01-13 are licensed under the terms of the 11 * GNU GPL, version 2 or (at your option) any later version. 12 */ 13 14 #include <inttypes.h> 15 16 #include "qemu-common.h" 17 #include "qemu/error-report.h" 18 #include "block/block_int.h" 19 20 #include <rbd/librbd.h> 21 22 /* 23 * When specifying the image filename use: 24 * 25 * rbd:poolname/devicename[@snapshotname][:option1=value1[:option2=value2...]] 26 * 27 * poolname must be the name of an existing rados pool. 28 * 29 * devicename is the name of the rbd image. 30 * 31 * Each option given is used to configure rados, and may be any valid 32 * Ceph option, "id", or "conf". 33 * 34 * The "id" option indicates what user we should authenticate as to 35 * the Ceph cluster. If it is excluded we will use the Ceph default 36 * (normally 'admin'). 37 * 38 * The "conf" option specifies a Ceph configuration file to read. If 39 * it is not specified, we will read from the default Ceph locations 40 * (e.g., /etc/ceph/ceph.conf). To avoid reading _any_ configuration 41 * file, specify conf=/dev/null. 42 * 43 * Configuration values containing :, @, or = can be escaped with a 44 * leading "\". 45 */ 46 47 /* rbd_aio_discard added in 0.1.2 */ 48 #if LIBRBD_VERSION_CODE >= LIBRBD_VERSION(0, 1, 2) 49 #define LIBRBD_SUPPORTS_DISCARD 50 #else 51 #undef LIBRBD_SUPPORTS_DISCARD 52 #endif 53 54 #define OBJ_MAX_SIZE (1UL << OBJ_DEFAULT_OBJ_ORDER) 55 56 #define RBD_MAX_CONF_NAME_SIZE 128 57 #define RBD_MAX_CONF_VAL_SIZE 512 58 #define RBD_MAX_CONF_SIZE 1024 59 #define RBD_MAX_POOL_NAME_SIZE 128 60 #define RBD_MAX_SNAP_NAME_SIZE 128 61 #define RBD_MAX_SNAPS 100 62 63 typedef enum { 64 RBD_AIO_READ, 65 RBD_AIO_WRITE, 66 RBD_AIO_DISCARD, 67 RBD_AIO_FLUSH 68 } RBDAIOCmd; 69 70 typedef struct RBDAIOCB { 71 BlockDriverAIOCB common; 72 QEMUBH *bh; 73 int64_t ret; 74 QEMUIOVector *qiov; 75 char *bounce; 76 RBDAIOCmd cmd; 77 int64_t sector_num; 78 int error; 79 struct BDRVRBDState *s; 80 int cancelled; 81 int status; 82 } RBDAIOCB; 83 84 typedef struct RADOSCB { 85 int rcbid; 86 RBDAIOCB *acb; 87 struct BDRVRBDState *s; 88 int done; 89 int64_t size; 90 char *buf; 91 int64_t ret; 92 } RADOSCB; 93 94 #define RBD_FD_READ 0 95 #define RBD_FD_WRITE 1 96 97 typedef struct BDRVRBDState { 98 int fds[2]; 99 rados_t cluster; 100 rados_ioctx_t io_ctx; 101 rbd_image_t image; 102 char name[RBD_MAX_IMAGE_NAME_SIZE]; 103 int qemu_aio_count; 104 char *snap; 105 int event_reader_pos; 106 RADOSCB *event_rcb; 107 } BDRVRBDState; 108 109 static void rbd_aio_bh_cb(void *opaque); 110 111 static int qemu_rbd_next_tok(char *dst, int dst_len, 112 char *src, char delim, 113 const char *name, 114 char **p) 115 { 116 int l; 117 char *end; 118 119 *p = NULL; 120 121 if (delim != '\0') { 122 for (end = src; *end; ++end) { 123 if (*end == delim) { 124 break; 125 } 126 if (*end == '\\' && end[1] != '\0') { 127 end++; 128 } 129 } 130 if (*end == delim) { 131 *p = end + 1; 132 *end = '\0'; 133 } 134 } 135 l = strlen(src); 136 if (l >= dst_len) { 137 error_report("%s too long", name); 138 return -EINVAL; 139 } else if (l == 0) { 140 error_report("%s too short", name); 141 return -EINVAL; 142 } 143 144 pstrcpy(dst, dst_len, src); 145 146 return 0; 147 } 148 149 static void qemu_rbd_unescape(char *src) 150 { 151 char *p; 152 153 for (p = src; *src; ++src, ++p) { 154 if (*src == '\\' && src[1] != '\0') { 155 src++; 156 } 157 *p = *src; 158 } 159 *p = '\0'; 160 } 161 162 static int qemu_rbd_parsename(const char *filename, 163 char *pool, int pool_len, 164 char *snap, int snap_len, 165 char *name, int name_len, 166 char *conf, int conf_len) 167 { 168 const char *start; 169 char *p, *buf; 170 int ret; 171 172 if (!strstart(filename, "rbd:", &start)) { 173 return -EINVAL; 174 } 175 176 buf = g_strdup(start); 177 p = buf; 178 *snap = '\0'; 179 *conf = '\0'; 180 181 ret = qemu_rbd_next_tok(pool, pool_len, p, '/', "pool name", &p); 182 if (ret < 0 || !p) { 183 ret = -EINVAL; 184 goto done; 185 } 186 qemu_rbd_unescape(pool); 187 188 if (strchr(p, '@')) { 189 ret = qemu_rbd_next_tok(name, name_len, p, '@', "object name", &p); 190 if (ret < 0) { 191 goto done; 192 } 193 ret = qemu_rbd_next_tok(snap, snap_len, p, ':', "snap name", &p); 194 qemu_rbd_unescape(snap); 195 } else { 196 ret = qemu_rbd_next_tok(name, name_len, p, ':', "object name", &p); 197 } 198 qemu_rbd_unescape(name); 199 if (ret < 0 || !p) { 200 goto done; 201 } 202 203 ret = qemu_rbd_next_tok(conf, conf_len, p, '\0', "configuration", &p); 204 205 done: 206 g_free(buf); 207 return ret; 208 } 209 210 static char *qemu_rbd_parse_clientname(const char *conf, char *clientname) 211 { 212 const char *p = conf; 213 214 while (*p) { 215 int len; 216 const char *end = strchr(p, ':'); 217 218 if (end) { 219 len = end - p; 220 } else { 221 len = strlen(p); 222 } 223 224 if (strncmp(p, "id=", 3) == 0) { 225 len -= 3; 226 strncpy(clientname, p + 3, len); 227 clientname[len] = '\0'; 228 return clientname; 229 } 230 if (end == NULL) { 231 break; 232 } 233 p = end + 1; 234 } 235 return NULL; 236 } 237 238 static int qemu_rbd_set_conf(rados_t cluster, const char *conf) 239 { 240 char *p, *buf; 241 char name[RBD_MAX_CONF_NAME_SIZE]; 242 char value[RBD_MAX_CONF_VAL_SIZE]; 243 int ret = 0; 244 245 buf = g_strdup(conf); 246 p = buf; 247 248 while (p) { 249 ret = qemu_rbd_next_tok(name, sizeof(name), p, 250 '=', "conf option name", &p); 251 if (ret < 0) { 252 break; 253 } 254 qemu_rbd_unescape(name); 255 256 if (!p) { 257 error_report("conf option %s has no value", name); 258 ret = -EINVAL; 259 break; 260 } 261 262 ret = qemu_rbd_next_tok(value, sizeof(value), p, 263 ':', "conf option value", &p); 264 if (ret < 0) { 265 break; 266 } 267 qemu_rbd_unescape(value); 268 269 if (strcmp(name, "conf") == 0) { 270 ret = rados_conf_read_file(cluster, value); 271 if (ret < 0) { 272 error_report("error reading conf file %s", value); 273 break; 274 } 275 } else if (strcmp(name, "id") == 0) { 276 /* ignore, this is parsed by qemu_rbd_parse_clientname() */ 277 } else { 278 ret = rados_conf_set(cluster, name, value); 279 if (ret < 0) { 280 error_report("invalid conf option %s", name); 281 ret = -EINVAL; 282 break; 283 } 284 } 285 } 286 287 g_free(buf); 288 return ret; 289 } 290 291 static int qemu_rbd_create(const char *filename, QEMUOptionParameter *options) 292 { 293 int64_t bytes = 0; 294 int64_t objsize; 295 int obj_order = 0; 296 char pool[RBD_MAX_POOL_NAME_SIZE]; 297 char name[RBD_MAX_IMAGE_NAME_SIZE]; 298 char snap_buf[RBD_MAX_SNAP_NAME_SIZE]; 299 char conf[RBD_MAX_CONF_SIZE]; 300 char clientname_buf[RBD_MAX_CONF_SIZE]; 301 char *clientname; 302 rados_t cluster; 303 rados_ioctx_t io_ctx; 304 int ret; 305 306 if (qemu_rbd_parsename(filename, pool, sizeof(pool), 307 snap_buf, sizeof(snap_buf), 308 name, sizeof(name), 309 conf, sizeof(conf)) < 0) { 310 return -EINVAL; 311 } 312 313 /* Read out options */ 314 while (options && options->name) { 315 if (!strcmp(options->name, BLOCK_OPT_SIZE)) { 316 bytes = options->value.n; 317 } else if (!strcmp(options->name, BLOCK_OPT_CLUSTER_SIZE)) { 318 if (options->value.n) { 319 objsize = options->value.n; 320 if ((objsize - 1) & objsize) { /* not a power of 2? */ 321 error_report("obj size needs to be power of 2"); 322 return -EINVAL; 323 } 324 if (objsize < 4096) { 325 error_report("obj size too small"); 326 return -EINVAL; 327 } 328 obj_order = ffs(objsize) - 1; 329 } 330 } 331 options++; 332 } 333 334 clientname = qemu_rbd_parse_clientname(conf, clientname_buf); 335 if (rados_create(&cluster, clientname) < 0) { 336 error_report("error initializing"); 337 return -EIO; 338 } 339 340 if (strstr(conf, "conf=") == NULL) { 341 /* try default location, but ignore failure */ 342 rados_conf_read_file(cluster, NULL); 343 } 344 345 if (conf[0] != '\0' && 346 qemu_rbd_set_conf(cluster, conf) < 0) { 347 error_report("error setting config options"); 348 rados_shutdown(cluster); 349 return -EIO; 350 } 351 352 if (rados_connect(cluster) < 0) { 353 error_report("error connecting"); 354 rados_shutdown(cluster); 355 return -EIO; 356 } 357 358 if (rados_ioctx_create(cluster, pool, &io_ctx) < 0) { 359 error_report("error opening pool %s", pool); 360 rados_shutdown(cluster); 361 return -EIO; 362 } 363 364 ret = rbd_create(io_ctx, name, bytes, &obj_order); 365 rados_ioctx_destroy(io_ctx); 366 rados_shutdown(cluster); 367 368 return ret; 369 } 370 371 /* 372 * This aio completion is being called from qemu_rbd_aio_event_reader() 373 * and runs in qemu context. It schedules a bh, but just in case the aio 374 * was not cancelled before. 375 */ 376 static void qemu_rbd_complete_aio(RADOSCB *rcb) 377 { 378 RBDAIOCB *acb = rcb->acb; 379 int64_t r; 380 381 r = rcb->ret; 382 383 if (acb->cmd != RBD_AIO_READ) { 384 if (r < 0) { 385 acb->ret = r; 386 acb->error = 1; 387 } else if (!acb->error) { 388 acb->ret = rcb->size; 389 } 390 } else { 391 if (r < 0) { 392 memset(rcb->buf, 0, rcb->size); 393 acb->ret = r; 394 acb->error = 1; 395 } else if (r < rcb->size) { 396 memset(rcb->buf + r, 0, rcb->size - r); 397 if (!acb->error) { 398 acb->ret = rcb->size; 399 } 400 } else if (!acb->error) { 401 acb->ret = r; 402 } 403 } 404 /* Note that acb->bh can be NULL in case where the aio was cancelled */ 405 acb->bh = qemu_bh_new(rbd_aio_bh_cb, acb); 406 qemu_bh_schedule(acb->bh); 407 g_free(rcb); 408 } 409 410 /* 411 * aio fd read handler. It runs in the qemu context and calls the 412 * completion handling of completed rados aio operations. 413 */ 414 static void qemu_rbd_aio_event_reader(void *opaque) 415 { 416 BDRVRBDState *s = opaque; 417 418 ssize_t ret; 419 420 do { 421 char *p = (char *)&s->event_rcb; 422 423 /* now read the rcb pointer that was sent from a non qemu thread */ 424 ret = read(s->fds[RBD_FD_READ], p + s->event_reader_pos, 425 sizeof(s->event_rcb) - s->event_reader_pos); 426 if (ret > 0) { 427 s->event_reader_pos += ret; 428 if (s->event_reader_pos == sizeof(s->event_rcb)) { 429 s->event_reader_pos = 0; 430 qemu_rbd_complete_aio(s->event_rcb); 431 s->qemu_aio_count--; 432 } 433 } 434 } while (ret < 0 && errno == EINTR); 435 } 436 437 static int qemu_rbd_aio_flush_cb(void *opaque) 438 { 439 BDRVRBDState *s = opaque; 440 441 return (s->qemu_aio_count > 0); 442 } 443 444 /* TODO Convert to fine grained options */ 445 static QemuOptsList runtime_opts = { 446 .name = "rbd", 447 .head = QTAILQ_HEAD_INITIALIZER(runtime_opts.head), 448 .desc = { 449 { 450 .name = "filename", 451 .type = QEMU_OPT_STRING, 452 .help = "Specification of the rbd image", 453 }, 454 { /* end of list */ } 455 }, 456 }; 457 458 static int qemu_rbd_open(BlockDriverState *bs, QDict *options, int flags) 459 { 460 BDRVRBDState *s = bs->opaque; 461 char pool[RBD_MAX_POOL_NAME_SIZE]; 462 char snap_buf[RBD_MAX_SNAP_NAME_SIZE]; 463 char conf[RBD_MAX_CONF_SIZE]; 464 char clientname_buf[RBD_MAX_CONF_SIZE]; 465 char *clientname; 466 QemuOpts *opts; 467 Error *local_err = NULL; 468 const char *filename; 469 int r; 470 471 opts = qemu_opts_create_nofail(&runtime_opts); 472 qemu_opts_absorb_qdict(opts, options, &local_err); 473 if (error_is_set(&local_err)) { 474 qerror_report_err(local_err); 475 error_free(local_err); 476 qemu_opts_del(opts); 477 return -EINVAL; 478 } 479 480 filename = qemu_opt_get(opts, "filename"); 481 482 if (qemu_rbd_parsename(filename, pool, sizeof(pool), 483 snap_buf, sizeof(snap_buf), 484 s->name, sizeof(s->name), 485 conf, sizeof(conf)) < 0) { 486 r = -EINVAL; 487 goto failed_opts; 488 } 489 490 clientname = qemu_rbd_parse_clientname(conf, clientname_buf); 491 r = rados_create(&s->cluster, clientname); 492 if (r < 0) { 493 error_report("error initializing"); 494 goto failed_opts; 495 } 496 497 s->snap = NULL; 498 if (snap_buf[0] != '\0') { 499 s->snap = g_strdup(snap_buf); 500 } 501 502 /* 503 * Fallback to more conservative semantics if setting cache 504 * options fails. Ignore errors from setting rbd_cache because the 505 * only possible error is that the option does not exist, and 506 * librbd defaults to no caching. If write through caching cannot 507 * be set up, fall back to no caching. 508 */ 509 if (flags & BDRV_O_NOCACHE) { 510 rados_conf_set(s->cluster, "rbd_cache", "false"); 511 } else { 512 rados_conf_set(s->cluster, "rbd_cache", "true"); 513 } 514 515 if (strstr(conf, "conf=") == NULL) { 516 /* try default location, but ignore failure */ 517 rados_conf_read_file(s->cluster, NULL); 518 } 519 520 if (conf[0] != '\0') { 521 r = qemu_rbd_set_conf(s->cluster, conf); 522 if (r < 0) { 523 error_report("error setting config options"); 524 goto failed_shutdown; 525 } 526 } 527 528 r = rados_connect(s->cluster); 529 if (r < 0) { 530 error_report("error connecting"); 531 goto failed_shutdown; 532 } 533 534 r = rados_ioctx_create(s->cluster, pool, &s->io_ctx); 535 if (r < 0) { 536 error_report("error opening pool %s", pool); 537 goto failed_shutdown; 538 } 539 540 r = rbd_open(s->io_ctx, s->name, &s->image, s->snap); 541 if (r < 0) { 542 error_report("error reading header from %s", s->name); 543 goto failed_open; 544 } 545 546 bs->read_only = (s->snap != NULL); 547 548 s->event_reader_pos = 0; 549 r = qemu_pipe(s->fds); 550 if (r < 0) { 551 error_report("error opening eventfd"); 552 goto failed; 553 } 554 fcntl(s->fds[0], F_SETFL, O_NONBLOCK); 555 fcntl(s->fds[1], F_SETFL, O_NONBLOCK); 556 qemu_aio_set_fd_handler(s->fds[RBD_FD_READ], qemu_rbd_aio_event_reader, 557 NULL, qemu_rbd_aio_flush_cb, s); 558 559 560 qemu_opts_del(opts); 561 return 0; 562 563 failed: 564 rbd_close(s->image); 565 failed_open: 566 rados_ioctx_destroy(s->io_ctx); 567 failed_shutdown: 568 rados_shutdown(s->cluster); 569 g_free(s->snap); 570 failed_opts: 571 qemu_opts_del(opts); 572 return r; 573 } 574 575 static void qemu_rbd_close(BlockDriverState *bs) 576 { 577 BDRVRBDState *s = bs->opaque; 578 579 close(s->fds[0]); 580 close(s->fds[1]); 581 qemu_aio_set_fd_handler(s->fds[RBD_FD_READ], NULL, NULL, NULL, NULL); 582 583 rbd_close(s->image); 584 rados_ioctx_destroy(s->io_ctx); 585 g_free(s->snap); 586 rados_shutdown(s->cluster); 587 } 588 589 /* 590 * Cancel aio. Since we don't reference acb in a non qemu threads, 591 * it is safe to access it here. 592 */ 593 static void qemu_rbd_aio_cancel(BlockDriverAIOCB *blockacb) 594 { 595 RBDAIOCB *acb = (RBDAIOCB *) blockacb; 596 acb->cancelled = 1; 597 598 while (acb->status == -EINPROGRESS) { 599 qemu_aio_wait(); 600 } 601 602 qemu_aio_release(acb); 603 } 604 605 static const AIOCBInfo rbd_aiocb_info = { 606 .aiocb_size = sizeof(RBDAIOCB), 607 .cancel = qemu_rbd_aio_cancel, 608 }; 609 610 static int qemu_rbd_send_pipe(BDRVRBDState *s, RADOSCB *rcb) 611 { 612 int ret = 0; 613 while (1) { 614 fd_set wfd; 615 int fd = s->fds[RBD_FD_WRITE]; 616 617 /* send the op pointer to the qemu thread that is responsible 618 for the aio/op completion. Must do it in a qemu thread context */ 619 ret = write(fd, (void *)&rcb, sizeof(rcb)); 620 if (ret >= 0) { 621 break; 622 } 623 if (errno == EINTR) { 624 continue; 625 } 626 if (errno != EAGAIN) { 627 break; 628 } 629 630 FD_ZERO(&wfd); 631 FD_SET(fd, &wfd); 632 do { 633 ret = select(fd + 1, NULL, &wfd, NULL, NULL); 634 } while (ret < 0 && errno == EINTR); 635 } 636 637 return ret; 638 } 639 640 /* 641 * This is the callback function for rbd_aio_read and _write 642 * 643 * Note: this function is being called from a non qemu thread so 644 * we need to be careful about what we do here. Generally we only 645 * write to the block notification pipe, and do the rest of the 646 * io completion handling from qemu_rbd_aio_event_reader() which 647 * runs in a qemu context. 648 */ 649 static void rbd_finish_aiocb(rbd_completion_t c, RADOSCB *rcb) 650 { 651 int ret; 652 rcb->ret = rbd_aio_get_return_value(c); 653 rbd_aio_release(c); 654 ret = qemu_rbd_send_pipe(rcb->s, rcb); 655 if (ret < 0) { 656 error_report("failed writing to acb->s->fds"); 657 g_free(rcb); 658 } 659 } 660 661 /* Callback when all queued rbd_aio requests are complete */ 662 663 static void rbd_aio_bh_cb(void *opaque) 664 { 665 RBDAIOCB *acb = opaque; 666 667 if (acb->cmd == RBD_AIO_READ) { 668 qemu_iovec_from_buf(acb->qiov, 0, acb->bounce, acb->qiov->size); 669 } 670 qemu_vfree(acb->bounce); 671 acb->common.cb(acb->common.opaque, (acb->ret > 0 ? 0 : acb->ret)); 672 qemu_bh_delete(acb->bh); 673 acb->bh = NULL; 674 acb->status = 0; 675 676 if (!acb->cancelled) { 677 qemu_aio_release(acb); 678 } 679 } 680 681 static int rbd_aio_discard_wrapper(rbd_image_t image, 682 uint64_t off, 683 uint64_t len, 684 rbd_completion_t comp) 685 { 686 #ifdef LIBRBD_SUPPORTS_DISCARD 687 return rbd_aio_discard(image, off, len, comp); 688 #else 689 return -ENOTSUP; 690 #endif 691 } 692 693 static int rbd_aio_flush_wrapper(rbd_image_t image, 694 rbd_completion_t comp) 695 { 696 #ifdef LIBRBD_SUPPORTS_AIO_FLUSH 697 return rbd_aio_flush(image, comp); 698 #else 699 return -ENOTSUP; 700 #endif 701 } 702 703 static BlockDriverAIOCB *rbd_start_aio(BlockDriverState *bs, 704 int64_t sector_num, 705 QEMUIOVector *qiov, 706 int nb_sectors, 707 BlockDriverCompletionFunc *cb, 708 void *opaque, 709 RBDAIOCmd cmd) 710 { 711 RBDAIOCB *acb; 712 RADOSCB *rcb; 713 rbd_completion_t c; 714 int64_t off, size; 715 char *buf; 716 int r; 717 718 BDRVRBDState *s = bs->opaque; 719 720 acb = qemu_aio_get(&rbd_aiocb_info, bs, cb, opaque); 721 acb->cmd = cmd; 722 acb->qiov = qiov; 723 if (cmd == RBD_AIO_DISCARD || cmd == RBD_AIO_FLUSH) { 724 acb->bounce = NULL; 725 } else { 726 acb->bounce = qemu_blockalign(bs, qiov->size); 727 } 728 acb->ret = 0; 729 acb->error = 0; 730 acb->s = s; 731 acb->cancelled = 0; 732 acb->bh = NULL; 733 acb->status = -EINPROGRESS; 734 735 if (cmd == RBD_AIO_WRITE) { 736 qemu_iovec_to_buf(acb->qiov, 0, acb->bounce, qiov->size); 737 } 738 739 buf = acb->bounce; 740 741 off = sector_num * BDRV_SECTOR_SIZE; 742 size = nb_sectors * BDRV_SECTOR_SIZE; 743 744 s->qemu_aio_count++; /* All the RADOSCB */ 745 746 rcb = g_malloc(sizeof(RADOSCB)); 747 rcb->done = 0; 748 rcb->acb = acb; 749 rcb->buf = buf; 750 rcb->s = acb->s; 751 rcb->size = size; 752 r = rbd_aio_create_completion(rcb, (rbd_callback_t) rbd_finish_aiocb, &c); 753 if (r < 0) { 754 goto failed; 755 } 756 757 switch (cmd) { 758 case RBD_AIO_WRITE: 759 r = rbd_aio_write(s->image, off, size, buf, c); 760 break; 761 case RBD_AIO_READ: 762 r = rbd_aio_read(s->image, off, size, buf, c); 763 break; 764 case RBD_AIO_DISCARD: 765 r = rbd_aio_discard_wrapper(s->image, off, size, c); 766 break; 767 case RBD_AIO_FLUSH: 768 r = rbd_aio_flush_wrapper(s->image, c); 769 break; 770 default: 771 r = -EINVAL; 772 } 773 774 if (r < 0) { 775 goto failed; 776 } 777 778 return &acb->common; 779 780 failed: 781 g_free(rcb); 782 s->qemu_aio_count--; 783 qemu_aio_release(acb); 784 return NULL; 785 } 786 787 static BlockDriverAIOCB *qemu_rbd_aio_readv(BlockDriverState *bs, 788 int64_t sector_num, 789 QEMUIOVector *qiov, 790 int nb_sectors, 791 BlockDriverCompletionFunc *cb, 792 void *opaque) 793 { 794 return rbd_start_aio(bs, sector_num, qiov, nb_sectors, cb, opaque, 795 RBD_AIO_READ); 796 } 797 798 static BlockDriverAIOCB *qemu_rbd_aio_writev(BlockDriverState *bs, 799 int64_t sector_num, 800 QEMUIOVector *qiov, 801 int nb_sectors, 802 BlockDriverCompletionFunc *cb, 803 void *opaque) 804 { 805 return rbd_start_aio(bs, sector_num, qiov, nb_sectors, cb, opaque, 806 RBD_AIO_WRITE); 807 } 808 809 #ifdef LIBRBD_SUPPORTS_AIO_FLUSH 810 static BlockDriverAIOCB *qemu_rbd_aio_flush(BlockDriverState *bs, 811 BlockDriverCompletionFunc *cb, 812 void *opaque) 813 { 814 return rbd_start_aio(bs, 0, NULL, 0, cb, opaque, RBD_AIO_FLUSH); 815 } 816 817 #else 818 819 static int qemu_rbd_co_flush(BlockDriverState *bs) 820 { 821 #if LIBRBD_VERSION_CODE >= LIBRBD_VERSION(0, 1, 1) 822 /* rbd_flush added in 0.1.1 */ 823 BDRVRBDState *s = bs->opaque; 824 return rbd_flush(s->image); 825 #else 826 return 0; 827 #endif 828 } 829 #endif 830 831 static int qemu_rbd_getinfo(BlockDriverState *bs, BlockDriverInfo *bdi) 832 { 833 BDRVRBDState *s = bs->opaque; 834 rbd_image_info_t info; 835 int r; 836 837 r = rbd_stat(s->image, &info, sizeof(info)); 838 if (r < 0) { 839 return r; 840 } 841 842 bdi->cluster_size = info.obj_size; 843 return 0; 844 } 845 846 static int64_t qemu_rbd_getlength(BlockDriverState *bs) 847 { 848 BDRVRBDState *s = bs->opaque; 849 rbd_image_info_t info; 850 int r; 851 852 r = rbd_stat(s->image, &info, sizeof(info)); 853 if (r < 0) { 854 return r; 855 } 856 857 return info.size; 858 } 859 860 static int qemu_rbd_truncate(BlockDriverState *bs, int64_t offset) 861 { 862 BDRVRBDState *s = bs->opaque; 863 int r; 864 865 r = rbd_resize(s->image, offset); 866 if (r < 0) { 867 return r; 868 } 869 870 return 0; 871 } 872 873 static int qemu_rbd_snap_create(BlockDriverState *bs, 874 QEMUSnapshotInfo *sn_info) 875 { 876 BDRVRBDState *s = bs->opaque; 877 int r; 878 879 if (sn_info->name[0] == '\0') { 880 return -EINVAL; /* we need a name for rbd snapshots */ 881 } 882 883 /* 884 * rbd snapshots are using the name as the user controlled unique identifier 885 * we can't use the rbd snapid for that purpose, as it can't be set 886 */ 887 if (sn_info->id_str[0] != '\0' && 888 strcmp(sn_info->id_str, sn_info->name) != 0) { 889 return -EINVAL; 890 } 891 892 if (strlen(sn_info->name) >= sizeof(sn_info->id_str)) { 893 return -ERANGE; 894 } 895 896 r = rbd_snap_create(s->image, sn_info->name); 897 if (r < 0) { 898 error_report("failed to create snap: %s", strerror(-r)); 899 return r; 900 } 901 902 return 0; 903 } 904 905 static int qemu_rbd_snap_remove(BlockDriverState *bs, 906 const char *snapshot_name) 907 { 908 BDRVRBDState *s = bs->opaque; 909 int r; 910 911 r = rbd_snap_remove(s->image, snapshot_name); 912 return r; 913 } 914 915 static int qemu_rbd_snap_rollback(BlockDriverState *bs, 916 const char *snapshot_name) 917 { 918 BDRVRBDState *s = bs->opaque; 919 int r; 920 921 r = rbd_snap_rollback(s->image, snapshot_name); 922 return r; 923 } 924 925 static int qemu_rbd_snap_list(BlockDriverState *bs, 926 QEMUSnapshotInfo **psn_tab) 927 { 928 BDRVRBDState *s = bs->opaque; 929 QEMUSnapshotInfo *sn_info, *sn_tab = NULL; 930 int i, snap_count; 931 rbd_snap_info_t *snaps; 932 int max_snaps = RBD_MAX_SNAPS; 933 934 do { 935 snaps = g_malloc(sizeof(*snaps) * max_snaps); 936 snap_count = rbd_snap_list(s->image, snaps, &max_snaps); 937 if (snap_count < 0) { 938 g_free(snaps); 939 } 940 } while (snap_count == -ERANGE); 941 942 if (snap_count <= 0) { 943 goto done; 944 } 945 946 sn_tab = g_malloc0(snap_count * sizeof(QEMUSnapshotInfo)); 947 948 for (i = 0; i < snap_count; i++) { 949 const char *snap_name = snaps[i].name; 950 951 sn_info = sn_tab + i; 952 pstrcpy(sn_info->id_str, sizeof(sn_info->id_str), snap_name); 953 pstrcpy(sn_info->name, sizeof(sn_info->name), snap_name); 954 955 sn_info->vm_state_size = snaps[i].size; 956 sn_info->date_sec = 0; 957 sn_info->date_nsec = 0; 958 sn_info->vm_clock_nsec = 0; 959 } 960 rbd_snap_list_end(snaps); 961 962 done: 963 *psn_tab = sn_tab; 964 return snap_count; 965 } 966 967 #ifdef LIBRBD_SUPPORTS_DISCARD 968 static BlockDriverAIOCB* qemu_rbd_aio_discard(BlockDriverState *bs, 969 int64_t sector_num, 970 int nb_sectors, 971 BlockDriverCompletionFunc *cb, 972 void *opaque) 973 { 974 return rbd_start_aio(bs, sector_num, NULL, nb_sectors, cb, opaque, 975 RBD_AIO_DISCARD); 976 } 977 #endif 978 979 static QEMUOptionParameter qemu_rbd_create_options[] = { 980 { 981 .name = BLOCK_OPT_SIZE, 982 .type = OPT_SIZE, 983 .help = "Virtual disk size" 984 }, 985 { 986 .name = BLOCK_OPT_CLUSTER_SIZE, 987 .type = OPT_SIZE, 988 .help = "RBD object size" 989 }, 990 {NULL} 991 }; 992 993 static BlockDriver bdrv_rbd = { 994 .format_name = "rbd", 995 .instance_size = sizeof(BDRVRBDState), 996 .bdrv_file_open = qemu_rbd_open, 997 .bdrv_close = qemu_rbd_close, 998 .bdrv_create = qemu_rbd_create, 999 .bdrv_has_zero_init = bdrv_has_zero_init_1, 1000 .bdrv_get_info = qemu_rbd_getinfo, 1001 .create_options = qemu_rbd_create_options, 1002 .bdrv_getlength = qemu_rbd_getlength, 1003 .bdrv_truncate = qemu_rbd_truncate, 1004 .protocol_name = "rbd", 1005 1006 .bdrv_aio_readv = qemu_rbd_aio_readv, 1007 .bdrv_aio_writev = qemu_rbd_aio_writev, 1008 1009 #ifdef LIBRBD_SUPPORTS_AIO_FLUSH 1010 .bdrv_aio_flush = qemu_rbd_aio_flush, 1011 #else 1012 .bdrv_co_flush_to_disk = qemu_rbd_co_flush, 1013 #endif 1014 1015 #ifdef LIBRBD_SUPPORTS_DISCARD 1016 .bdrv_aio_discard = qemu_rbd_aio_discard, 1017 #endif 1018 1019 .bdrv_snapshot_create = qemu_rbd_snap_create, 1020 .bdrv_snapshot_delete = qemu_rbd_snap_remove, 1021 .bdrv_snapshot_list = qemu_rbd_snap_list, 1022 .bdrv_snapshot_goto = qemu_rbd_snap_rollback, 1023 }; 1024 1025 static void bdrv_rbd_init(void) 1026 { 1027 bdrv_register(&bdrv_rbd); 1028 } 1029 1030 block_init(bdrv_rbd_init); 1031