1 /* 2 * QEMU Block driver for RADOS (Ceph) 3 * 4 * Copyright (C) 2010-2011 Christian Brunner <chb@muc.de>, 5 * Josh Durgin <josh.durgin@dreamhost.com> 6 * 7 * This work is licensed under the terms of the GNU GPL, version 2. See 8 * the COPYING file in the top-level directory. 9 * 10 * Contributions after 2012-01-13 are licensed under the terms of the 11 * GNU GPL, version 2 or (at your option) any later version. 12 */ 13 14 #include <inttypes.h> 15 16 #include "qemu-common.h" 17 #include "qemu/error-report.h" 18 #include "block/block_int.h" 19 20 #include <rbd/librbd.h> 21 22 /* 23 * When specifying the image filename use: 24 * 25 * rbd:poolname/devicename[@snapshotname][:option1=value1[:option2=value2...]] 26 * 27 * poolname must be the name of an existing rados pool. 28 * 29 * devicename is the name of the rbd image. 30 * 31 * Each option given is used to configure rados, and may be any valid 32 * Ceph option, "id", or "conf". 33 * 34 * The "id" option indicates what user we should authenticate as to 35 * the Ceph cluster. If it is excluded we will use the Ceph default 36 * (normally 'admin'). 37 * 38 * The "conf" option specifies a Ceph configuration file to read. If 39 * it is not specified, we will read from the default Ceph locations 40 * (e.g., /etc/ceph/ceph.conf). To avoid reading _any_ configuration 41 * file, specify conf=/dev/null. 42 * 43 * Configuration values containing :, @, or = can be escaped with a 44 * leading "\". 45 */ 46 47 /* rbd_aio_discard added in 0.1.2 */ 48 #if LIBRBD_VERSION_CODE >= LIBRBD_VERSION(0, 1, 2) 49 #define LIBRBD_SUPPORTS_DISCARD 50 #else 51 #undef LIBRBD_SUPPORTS_DISCARD 52 #endif 53 54 #define OBJ_MAX_SIZE (1UL << OBJ_DEFAULT_OBJ_ORDER) 55 56 #define RBD_MAX_CONF_NAME_SIZE 128 57 #define RBD_MAX_CONF_VAL_SIZE 512 58 #define RBD_MAX_CONF_SIZE 1024 59 #define RBD_MAX_POOL_NAME_SIZE 128 60 #define RBD_MAX_SNAP_NAME_SIZE 128 61 #define RBD_MAX_SNAPS 100 62 63 typedef enum { 64 RBD_AIO_READ, 65 RBD_AIO_WRITE, 66 RBD_AIO_DISCARD, 67 RBD_AIO_FLUSH 68 } RBDAIOCmd; 69 70 typedef struct RBDAIOCB { 71 BlockDriverAIOCB common; 72 QEMUBH *bh; 73 int64_t ret; 74 QEMUIOVector *qiov; 75 char *bounce; 76 RBDAIOCmd cmd; 77 int64_t sector_num; 78 int error; 79 struct BDRVRBDState *s; 80 int cancelled; 81 int status; 82 } RBDAIOCB; 83 84 typedef struct RADOSCB { 85 int rcbid; 86 RBDAIOCB *acb; 87 struct BDRVRBDState *s; 88 int done; 89 int64_t size; 90 char *buf; 91 int64_t ret; 92 } RADOSCB; 93 94 #define RBD_FD_READ 0 95 #define RBD_FD_WRITE 1 96 97 typedef struct BDRVRBDState { 98 int fds[2]; 99 rados_t cluster; 100 rados_ioctx_t io_ctx; 101 rbd_image_t image; 102 char name[RBD_MAX_IMAGE_NAME_SIZE]; 103 int qemu_aio_count; 104 char *snap; 105 int event_reader_pos; 106 RADOSCB *event_rcb; 107 } BDRVRBDState; 108 109 static void rbd_aio_bh_cb(void *opaque); 110 111 static int qemu_rbd_next_tok(char *dst, int dst_len, 112 char *src, char delim, 113 const char *name, 114 char **p) 115 { 116 int l; 117 char *end; 118 119 *p = NULL; 120 121 if (delim != '\0') { 122 for (end = src; *end; ++end) { 123 if (*end == delim) { 124 break; 125 } 126 if (*end == '\\' && end[1] != '\0') { 127 end++; 128 } 129 } 130 if (*end == delim) { 131 *p = end + 1; 132 *end = '\0'; 133 } 134 } 135 l = strlen(src); 136 if (l >= dst_len) { 137 error_report("%s too long", name); 138 return -EINVAL; 139 } else if (l == 0) { 140 error_report("%s too short", name); 141 return -EINVAL; 142 } 143 144 pstrcpy(dst, dst_len, src); 145 146 return 0; 147 } 148 149 static void qemu_rbd_unescape(char *src) 150 { 151 char *p; 152 153 for (p = src; *src; ++src, ++p) { 154 if (*src == '\\' && src[1] != '\0') { 155 src++; 156 } 157 *p = *src; 158 } 159 *p = '\0'; 160 } 161 162 static int qemu_rbd_parsename(const char *filename, 163 char *pool, int pool_len, 164 char *snap, int snap_len, 165 char *name, int name_len, 166 char *conf, int conf_len) 167 { 168 const char *start; 169 char *p, *buf; 170 int ret; 171 172 if (!strstart(filename, "rbd:", &start)) { 173 return -EINVAL; 174 } 175 176 buf = g_strdup(start); 177 p = buf; 178 *snap = '\0'; 179 *conf = '\0'; 180 181 ret = qemu_rbd_next_tok(pool, pool_len, p, '/', "pool name", &p); 182 if (ret < 0 || !p) { 183 ret = -EINVAL; 184 goto done; 185 } 186 qemu_rbd_unescape(pool); 187 188 if (strchr(p, '@')) { 189 ret = qemu_rbd_next_tok(name, name_len, p, '@', "object name", &p); 190 if (ret < 0) { 191 goto done; 192 } 193 ret = qemu_rbd_next_tok(snap, snap_len, p, ':', "snap name", &p); 194 qemu_rbd_unescape(snap); 195 } else { 196 ret = qemu_rbd_next_tok(name, name_len, p, ':', "object name", &p); 197 } 198 qemu_rbd_unescape(name); 199 if (ret < 0 || !p) { 200 goto done; 201 } 202 203 ret = qemu_rbd_next_tok(conf, conf_len, p, '\0', "configuration", &p); 204 205 done: 206 g_free(buf); 207 return ret; 208 } 209 210 static char *qemu_rbd_parse_clientname(const char *conf, char *clientname) 211 { 212 const char *p = conf; 213 214 while (*p) { 215 int len; 216 const char *end = strchr(p, ':'); 217 218 if (end) { 219 len = end - p; 220 } else { 221 len = strlen(p); 222 } 223 224 if (strncmp(p, "id=", 3) == 0) { 225 len -= 3; 226 strncpy(clientname, p + 3, len); 227 clientname[len] = '\0'; 228 return clientname; 229 } 230 if (end == NULL) { 231 break; 232 } 233 p = end + 1; 234 } 235 return NULL; 236 } 237 238 static int qemu_rbd_set_conf(rados_t cluster, const char *conf) 239 { 240 char *p, *buf; 241 char name[RBD_MAX_CONF_NAME_SIZE]; 242 char value[RBD_MAX_CONF_VAL_SIZE]; 243 int ret = 0; 244 245 buf = g_strdup(conf); 246 p = buf; 247 248 while (p) { 249 ret = qemu_rbd_next_tok(name, sizeof(name), p, 250 '=', "conf option name", &p); 251 if (ret < 0) { 252 break; 253 } 254 qemu_rbd_unescape(name); 255 256 if (!p) { 257 error_report("conf option %s has no value", name); 258 ret = -EINVAL; 259 break; 260 } 261 262 ret = qemu_rbd_next_tok(value, sizeof(value), p, 263 ':', "conf option value", &p); 264 if (ret < 0) { 265 break; 266 } 267 qemu_rbd_unescape(value); 268 269 if (strcmp(name, "conf") == 0) { 270 ret = rados_conf_read_file(cluster, value); 271 if (ret < 0) { 272 error_report("error reading conf file %s", value); 273 break; 274 } 275 } else if (strcmp(name, "id") == 0) { 276 /* ignore, this is parsed by qemu_rbd_parse_clientname() */ 277 } else { 278 ret = rados_conf_set(cluster, name, value); 279 if (ret < 0) { 280 error_report("invalid conf option %s", name); 281 ret = -EINVAL; 282 break; 283 } 284 } 285 } 286 287 g_free(buf); 288 return ret; 289 } 290 291 static int qemu_rbd_create(const char *filename, QEMUOptionParameter *options) 292 { 293 int64_t bytes = 0; 294 int64_t objsize; 295 int obj_order = 0; 296 char pool[RBD_MAX_POOL_NAME_SIZE]; 297 char name[RBD_MAX_IMAGE_NAME_SIZE]; 298 char snap_buf[RBD_MAX_SNAP_NAME_SIZE]; 299 char conf[RBD_MAX_CONF_SIZE]; 300 char clientname_buf[RBD_MAX_CONF_SIZE]; 301 char *clientname; 302 rados_t cluster; 303 rados_ioctx_t io_ctx; 304 int ret; 305 306 if (qemu_rbd_parsename(filename, pool, sizeof(pool), 307 snap_buf, sizeof(snap_buf), 308 name, sizeof(name), 309 conf, sizeof(conf)) < 0) { 310 return -EINVAL; 311 } 312 313 /* Read out options */ 314 while (options && options->name) { 315 if (!strcmp(options->name, BLOCK_OPT_SIZE)) { 316 bytes = options->value.n; 317 } else if (!strcmp(options->name, BLOCK_OPT_CLUSTER_SIZE)) { 318 if (options->value.n) { 319 objsize = options->value.n; 320 if ((objsize - 1) & objsize) { /* not a power of 2? */ 321 error_report("obj size needs to be power of 2"); 322 return -EINVAL; 323 } 324 if (objsize < 4096) { 325 error_report("obj size too small"); 326 return -EINVAL; 327 } 328 obj_order = ffs(objsize) - 1; 329 } 330 } 331 options++; 332 } 333 334 clientname = qemu_rbd_parse_clientname(conf, clientname_buf); 335 if (rados_create(&cluster, clientname) < 0) { 336 error_report("error initializing"); 337 return -EIO; 338 } 339 340 if (strstr(conf, "conf=") == NULL) { 341 /* try default location, but ignore failure */ 342 rados_conf_read_file(cluster, NULL); 343 } 344 345 if (conf[0] != '\0' && 346 qemu_rbd_set_conf(cluster, conf) < 0) { 347 error_report("error setting config options"); 348 rados_shutdown(cluster); 349 return -EIO; 350 } 351 352 if (rados_connect(cluster) < 0) { 353 error_report("error connecting"); 354 rados_shutdown(cluster); 355 return -EIO; 356 } 357 358 if (rados_ioctx_create(cluster, pool, &io_ctx) < 0) { 359 error_report("error opening pool %s", pool); 360 rados_shutdown(cluster); 361 return -EIO; 362 } 363 364 ret = rbd_create(io_ctx, name, bytes, &obj_order); 365 rados_ioctx_destroy(io_ctx); 366 rados_shutdown(cluster); 367 368 return ret; 369 } 370 371 /* 372 * This aio completion is being called from qemu_rbd_aio_event_reader() 373 * and runs in qemu context. It schedules a bh, but just in case the aio 374 * was not cancelled before. 375 */ 376 static void qemu_rbd_complete_aio(RADOSCB *rcb) 377 { 378 RBDAIOCB *acb = rcb->acb; 379 int64_t r; 380 381 r = rcb->ret; 382 383 if (acb->cmd != RBD_AIO_READ) { 384 if (r < 0) { 385 acb->ret = r; 386 acb->error = 1; 387 } else if (!acb->error) { 388 acb->ret = rcb->size; 389 } 390 } else { 391 if (r < 0) { 392 memset(rcb->buf, 0, rcb->size); 393 acb->ret = r; 394 acb->error = 1; 395 } else if (r < rcb->size) { 396 memset(rcb->buf + r, 0, rcb->size - r); 397 if (!acb->error) { 398 acb->ret = rcb->size; 399 } 400 } else if (!acb->error) { 401 acb->ret = r; 402 } 403 } 404 /* Note that acb->bh can be NULL in case where the aio was cancelled */ 405 acb->bh = qemu_bh_new(rbd_aio_bh_cb, acb); 406 qemu_bh_schedule(acb->bh); 407 g_free(rcb); 408 } 409 410 /* 411 * aio fd read handler. It runs in the qemu context and calls the 412 * completion handling of completed rados aio operations. 413 */ 414 static void qemu_rbd_aio_event_reader(void *opaque) 415 { 416 BDRVRBDState *s = opaque; 417 418 ssize_t ret; 419 420 do { 421 char *p = (char *)&s->event_rcb; 422 423 /* now read the rcb pointer that was sent from a non qemu thread */ 424 ret = read(s->fds[RBD_FD_READ], p + s->event_reader_pos, 425 sizeof(s->event_rcb) - s->event_reader_pos); 426 if (ret > 0) { 427 s->event_reader_pos += ret; 428 if (s->event_reader_pos == sizeof(s->event_rcb)) { 429 s->event_reader_pos = 0; 430 qemu_rbd_complete_aio(s->event_rcb); 431 s->qemu_aio_count--; 432 } 433 } 434 } while (ret < 0 && errno == EINTR); 435 } 436 437 static int qemu_rbd_aio_flush_cb(void *opaque) 438 { 439 BDRVRBDState *s = opaque; 440 441 return (s->qemu_aio_count > 0); 442 } 443 444 /* TODO Convert to fine grained options */ 445 static QemuOptsList runtime_opts = { 446 .name = "rbd", 447 .head = QTAILQ_HEAD_INITIALIZER(runtime_opts.head), 448 .desc = { 449 { 450 .name = "filename", 451 .type = QEMU_OPT_STRING, 452 .help = "Specification of the rbd image", 453 }, 454 { /* end of list */ } 455 }, 456 }; 457 458 static int qemu_rbd_open(BlockDriverState *bs, QDict *options, int flags) 459 { 460 BDRVRBDState *s = bs->opaque; 461 char pool[RBD_MAX_POOL_NAME_SIZE]; 462 char snap_buf[RBD_MAX_SNAP_NAME_SIZE]; 463 char conf[RBD_MAX_CONF_SIZE]; 464 char clientname_buf[RBD_MAX_CONF_SIZE]; 465 char *clientname; 466 QemuOpts *opts; 467 Error *local_err = NULL; 468 const char *filename; 469 int r; 470 471 opts = qemu_opts_create_nofail(&runtime_opts); 472 qemu_opts_absorb_qdict(opts, options, &local_err); 473 if (error_is_set(&local_err)) { 474 qerror_report_err(local_err); 475 error_free(local_err); 476 qemu_opts_del(opts); 477 return -EINVAL; 478 } 479 480 filename = qemu_opt_get(opts, "filename"); 481 qemu_opts_del(opts); 482 483 if (qemu_rbd_parsename(filename, pool, sizeof(pool), 484 snap_buf, sizeof(snap_buf), 485 s->name, sizeof(s->name), 486 conf, sizeof(conf)) < 0) { 487 return -EINVAL; 488 } 489 490 clientname = qemu_rbd_parse_clientname(conf, clientname_buf); 491 r = rados_create(&s->cluster, clientname); 492 if (r < 0) { 493 error_report("error initializing"); 494 return r; 495 } 496 497 s->snap = NULL; 498 if (snap_buf[0] != '\0') { 499 s->snap = g_strdup(snap_buf); 500 } 501 502 /* 503 * Fallback to more conservative semantics if setting cache 504 * options fails. Ignore errors from setting rbd_cache because the 505 * only possible error is that the option does not exist, and 506 * librbd defaults to no caching. If write through caching cannot 507 * be set up, fall back to no caching. 508 */ 509 if (flags & BDRV_O_NOCACHE) { 510 rados_conf_set(s->cluster, "rbd_cache", "false"); 511 } else { 512 rados_conf_set(s->cluster, "rbd_cache", "true"); 513 } 514 515 if (strstr(conf, "conf=") == NULL) { 516 /* try default location, but ignore failure */ 517 rados_conf_read_file(s->cluster, NULL); 518 } 519 520 if (conf[0] != '\0') { 521 r = qemu_rbd_set_conf(s->cluster, conf); 522 if (r < 0) { 523 error_report("error setting config options"); 524 goto failed_shutdown; 525 } 526 } 527 528 r = rados_connect(s->cluster); 529 if (r < 0) { 530 error_report("error connecting"); 531 goto failed_shutdown; 532 } 533 534 r = rados_ioctx_create(s->cluster, pool, &s->io_ctx); 535 if (r < 0) { 536 error_report("error opening pool %s", pool); 537 goto failed_shutdown; 538 } 539 540 r = rbd_open(s->io_ctx, s->name, &s->image, s->snap); 541 if (r < 0) { 542 error_report("error reading header from %s", s->name); 543 goto failed_open; 544 } 545 546 bs->read_only = (s->snap != NULL); 547 548 s->event_reader_pos = 0; 549 r = qemu_pipe(s->fds); 550 if (r < 0) { 551 error_report("error opening eventfd"); 552 goto failed; 553 } 554 fcntl(s->fds[0], F_SETFL, O_NONBLOCK); 555 fcntl(s->fds[1], F_SETFL, O_NONBLOCK); 556 qemu_aio_set_fd_handler(s->fds[RBD_FD_READ], qemu_rbd_aio_event_reader, 557 NULL, qemu_rbd_aio_flush_cb, s); 558 559 560 return 0; 561 562 failed: 563 rbd_close(s->image); 564 failed_open: 565 rados_ioctx_destroy(s->io_ctx); 566 failed_shutdown: 567 rados_shutdown(s->cluster); 568 g_free(s->snap); 569 return r; 570 } 571 572 static void qemu_rbd_close(BlockDriverState *bs) 573 { 574 BDRVRBDState *s = bs->opaque; 575 576 close(s->fds[0]); 577 close(s->fds[1]); 578 qemu_aio_set_fd_handler(s->fds[RBD_FD_READ], NULL, NULL, NULL, NULL); 579 580 rbd_close(s->image); 581 rados_ioctx_destroy(s->io_ctx); 582 g_free(s->snap); 583 rados_shutdown(s->cluster); 584 } 585 586 /* 587 * Cancel aio. Since we don't reference acb in a non qemu threads, 588 * it is safe to access it here. 589 */ 590 static void qemu_rbd_aio_cancel(BlockDriverAIOCB *blockacb) 591 { 592 RBDAIOCB *acb = (RBDAIOCB *) blockacb; 593 acb->cancelled = 1; 594 595 while (acb->status == -EINPROGRESS) { 596 qemu_aio_wait(); 597 } 598 599 qemu_aio_release(acb); 600 } 601 602 static const AIOCBInfo rbd_aiocb_info = { 603 .aiocb_size = sizeof(RBDAIOCB), 604 .cancel = qemu_rbd_aio_cancel, 605 }; 606 607 static int qemu_rbd_send_pipe(BDRVRBDState *s, RADOSCB *rcb) 608 { 609 int ret = 0; 610 while (1) { 611 fd_set wfd; 612 int fd = s->fds[RBD_FD_WRITE]; 613 614 /* send the op pointer to the qemu thread that is responsible 615 for the aio/op completion. Must do it in a qemu thread context */ 616 ret = write(fd, (void *)&rcb, sizeof(rcb)); 617 if (ret >= 0) { 618 break; 619 } 620 if (errno == EINTR) { 621 continue; 622 } 623 if (errno != EAGAIN) { 624 break; 625 } 626 627 FD_ZERO(&wfd); 628 FD_SET(fd, &wfd); 629 do { 630 ret = select(fd + 1, NULL, &wfd, NULL, NULL); 631 } while (ret < 0 && errno == EINTR); 632 } 633 634 return ret; 635 } 636 637 /* 638 * This is the callback function for rbd_aio_read and _write 639 * 640 * Note: this function is being called from a non qemu thread so 641 * we need to be careful about what we do here. Generally we only 642 * write to the block notification pipe, and do the rest of the 643 * io completion handling from qemu_rbd_aio_event_reader() which 644 * runs in a qemu context. 645 */ 646 static void rbd_finish_aiocb(rbd_completion_t c, RADOSCB *rcb) 647 { 648 int ret; 649 rcb->ret = rbd_aio_get_return_value(c); 650 rbd_aio_release(c); 651 ret = qemu_rbd_send_pipe(rcb->s, rcb); 652 if (ret < 0) { 653 error_report("failed writing to acb->s->fds"); 654 g_free(rcb); 655 } 656 } 657 658 /* Callback when all queued rbd_aio requests are complete */ 659 660 static void rbd_aio_bh_cb(void *opaque) 661 { 662 RBDAIOCB *acb = opaque; 663 664 if (acb->cmd == RBD_AIO_READ) { 665 qemu_iovec_from_buf(acb->qiov, 0, acb->bounce, acb->qiov->size); 666 } 667 qemu_vfree(acb->bounce); 668 acb->common.cb(acb->common.opaque, (acb->ret > 0 ? 0 : acb->ret)); 669 qemu_bh_delete(acb->bh); 670 acb->bh = NULL; 671 acb->status = 0; 672 673 if (!acb->cancelled) { 674 qemu_aio_release(acb); 675 } 676 } 677 678 static int rbd_aio_discard_wrapper(rbd_image_t image, 679 uint64_t off, 680 uint64_t len, 681 rbd_completion_t comp) 682 { 683 #ifdef LIBRBD_SUPPORTS_DISCARD 684 return rbd_aio_discard(image, off, len, comp); 685 #else 686 return -ENOTSUP; 687 #endif 688 } 689 690 static int rbd_aio_flush_wrapper(rbd_image_t image, 691 rbd_completion_t comp) 692 { 693 #ifdef LIBRBD_SUPPORTS_AIO_FLUSH 694 return rbd_aio_flush(image, comp); 695 #else 696 return -ENOTSUP; 697 #endif 698 } 699 700 static BlockDriverAIOCB *rbd_start_aio(BlockDriverState *bs, 701 int64_t sector_num, 702 QEMUIOVector *qiov, 703 int nb_sectors, 704 BlockDriverCompletionFunc *cb, 705 void *opaque, 706 RBDAIOCmd cmd) 707 { 708 RBDAIOCB *acb; 709 RADOSCB *rcb; 710 rbd_completion_t c; 711 int64_t off, size; 712 char *buf; 713 int r; 714 715 BDRVRBDState *s = bs->opaque; 716 717 acb = qemu_aio_get(&rbd_aiocb_info, bs, cb, opaque); 718 acb->cmd = cmd; 719 acb->qiov = qiov; 720 if (cmd == RBD_AIO_DISCARD || cmd == RBD_AIO_FLUSH) { 721 acb->bounce = NULL; 722 } else { 723 acb->bounce = qemu_blockalign(bs, qiov->size); 724 } 725 acb->ret = 0; 726 acb->error = 0; 727 acb->s = s; 728 acb->cancelled = 0; 729 acb->bh = NULL; 730 acb->status = -EINPROGRESS; 731 732 if (cmd == RBD_AIO_WRITE) { 733 qemu_iovec_to_buf(acb->qiov, 0, acb->bounce, qiov->size); 734 } 735 736 buf = acb->bounce; 737 738 off = sector_num * BDRV_SECTOR_SIZE; 739 size = nb_sectors * BDRV_SECTOR_SIZE; 740 741 s->qemu_aio_count++; /* All the RADOSCB */ 742 743 rcb = g_malloc(sizeof(RADOSCB)); 744 rcb->done = 0; 745 rcb->acb = acb; 746 rcb->buf = buf; 747 rcb->s = acb->s; 748 rcb->size = size; 749 r = rbd_aio_create_completion(rcb, (rbd_callback_t) rbd_finish_aiocb, &c); 750 if (r < 0) { 751 goto failed; 752 } 753 754 switch (cmd) { 755 case RBD_AIO_WRITE: 756 r = rbd_aio_write(s->image, off, size, buf, c); 757 break; 758 case RBD_AIO_READ: 759 r = rbd_aio_read(s->image, off, size, buf, c); 760 break; 761 case RBD_AIO_DISCARD: 762 r = rbd_aio_discard_wrapper(s->image, off, size, c); 763 break; 764 case RBD_AIO_FLUSH: 765 r = rbd_aio_flush_wrapper(s->image, c); 766 break; 767 default: 768 r = -EINVAL; 769 } 770 771 if (r < 0) { 772 goto failed; 773 } 774 775 return &acb->common; 776 777 failed: 778 g_free(rcb); 779 s->qemu_aio_count--; 780 qemu_aio_release(acb); 781 return NULL; 782 } 783 784 static BlockDriverAIOCB *qemu_rbd_aio_readv(BlockDriverState *bs, 785 int64_t sector_num, 786 QEMUIOVector *qiov, 787 int nb_sectors, 788 BlockDriverCompletionFunc *cb, 789 void *opaque) 790 { 791 return rbd_start_aio(bs, sector_num, qiov, nb_sectors, cb, opaque, 792 RBD_AIO_READ); 793 } 794 795 static BlockDriverAIOCB *qemu_rbd_aio_writev(BlockDriverState *bs, 796 int64_t sector_num, 797 QEMUIOVector *qiov, 798 int nb_sectors, 799 BlockDriverCompletionFunc *cb, 800 void *opaque) 801 { 802 return rbd_start_aio(bs, sector_num, qiov, nb_sectors, cb, opaque, 803 RBD_AIO_WRITE); 804 } 805 806 #ifdef LIBRBD_SUPPORTS_AIO_FLUSH 807 static BlockDriverAIOCB *qemu_rbd_aio_flush(BlockDriverState *bs, 808 BlockDriverCompletionFunc *cb, 809 void *opaque) 810 { 811 return rbd_start_aio(bs, 0, NULL, 0, cb, opaque, RBD_AIO_FLUSH); 812 } 813 814 #else 815 816 static int qemu_rbd_co_flush(BlockDriverState *bs) 817 { 818 #if LIBRBD_VERSION_CODE >= LIBRBD_VERSION(0, 1, 1) 819 /* rbd_flush added in 0.1.1 */ 820 BDRVRBDState *s = bs->opaque; 821 return rbd_flush(s->image); 822 #else 823 return 0; 824 #endif 825 } 826 #endif 827 828 static int qemu_rbd_getinfo(BlockDriverState *bs, BlockDriverInfo *bdi) 829 { 830 BDRVRBDState *s = bs->opaque; 831 rbd_image_info_t info; 832 int r; 833 834 r = rbd_stat(s->image, &info, sizeof(info)); 835 if (r < 0) { 836 return r; 837 } 838 839 bdi->cluster_size = info.obj_size; 840 return 0; 841 } 842 843 static int64_t qemu_rbd_getlength(BlockDriverState *bs) 844 { 845 BDRVRBDState *s = bs->opaque; 846 rbd_image_info_t info; 847 int r; 848 849 r = rbd_stat(s->image, &info, sizeof(info)); 850 if (r < 0) { 851 return r; 852 } 853 854 return info.size; 855 } 856 857 static int qemu_rbd_truncate(BlockDriverState *bs, int64_t offset) 858 { 859 BDRVRBDState *s = bs->opaque; 860 int r; 861 862 r = rbd_resize(s->image, offset); 863 if (r < 0) { 864 return r; 865 } 866 867 return 0; 868 } 869 870 static int qemu_rbd_snap_create(BlockDriverState *bs, 871 QEMUSnapshotInfo *sn_info) 872 { 873 BDRVRBDState *s = bs->opaque; 874 int r; 875 876 if (sn_info->name[0] == '\0') { 877 return -EINVAL; /* we need a name for rbd snapshots */ 878 } 879 880 /* 881 * rbd snapshots are using the name as the user controlled unique identifier 882 * we can't use the rbd snapid for that purpose, as it can't be set 883 */ 884 if (sn_info->id_str[0] != '\0' && 885 strcmp(sn_info->id_str, sn_info->name) != 0) { 886 return -EINVAL; 887 } 888 889 if (strlen(sn_info->name) >= sizeof(sn_info->id_str)) { 890 return -ERANGE; 891 } 892 893 r = rbd_snap_create(s->image, sn_info->name); 894 if (r < 0) { 895 error_report("failed to create snap: %s", strerror(-r)); 896 return r; 897 } 898 899 return 0; 900 } 901 902 static int qemu_rbd_snap_remove(BlockDriverState *bs, 903 const char *snapshot_name) 904 { 905 BDRVRBDState *s = bs->opaque; 906 int r; 907 908 r = rbd_snap_remove(s->image, snapshot_name); 909 return r; 910 } 911 912 static int qemu_rbd_snap_rollback(BlockDriverState *bs, 913 const char *snapshot_name) 914 { 915 BDRVRBDState *s = bs->opaque; 916 int r; 917 918 r = rbd_snap_rollback(s->image, snapshot_name); 919 return r; 920 } 921 922 static int qemu_rbd_snap_list(BlockDriverState *bs, 923 QEMUSnapshotInfo **psn_tab) 924 { 925 BDRVRBDState *s = bs->opaque; 926 QEMUSnapshotInfo *sn_info, *sn_tab = NULL; 927 int i, snap_count; 928 rbd_snap_info_t *snaps; 929 int max_snaps = RBD_MAX_SNAPS; 930 931 do { 932 snaps = g_malloc(sizeof(*snaps) * max_snaps); 933 snap_count = rbd_snap_list(s->image, snaps, &max_snaps); 934 if (snap_count < 0) { 935 g_free(snaps); 936 } 937 } while (snap_count == -ERANGE); 938 939 if (snap_count <= 0) { 940 goto done; 941 } 942 943 sn_tab = g_malloc0(snap_count * sizeof(QEMUSnapshotInfo)); 944 945 for (i = 0; i < snap_count; i++) { 946 const char *snap_name = snaps[i].name; 947 948 sn_info = sn_tab + i; 949 pstrcpy(sn_info->id_str, sizeof(sn_info->id_str), snap_name); 950 pstrcpy(sn_info->name, sizeof(sn_info->name), snap_name); 951 952 sn_info->vm_state_size = snaps[i].size; 953 sn_info->date_sec = 0; 954 sn_info->date_nsec = 0; 955 sn_info->vm_clock_nsec = 0; 956 } 957 rbd_snap_list_end(snaps); 958 959 done: 960 *psn_tab = sn_tab; 961 return snap_count; 962 } 963 964 #ifdef LIBRBD_SUPPORTS_DISCARD 965 static BlockDriverAIOCB* qemu_rbd_aio_discard(BlockDriverState *bs, 966 int64_t sector_num, 967 int nb_sectors, 968 BlockDriverCompletionFunc *cb, 969 void *opaque) 970 { 971 return rbd_start_aio(bs, sector_num, NULL, nb_sectors, cb, opaque, 972 RBD_AIO_DISCARD); 973 } 974 #endif 975 976 static QEMUOptionParameter qemu_rbd_create_options[] = { 977 { 978 .name = BLOCK_OPT_SIZE, 979 .type = OPT_SIZE, 980 .help = "Virtual disk size" 981 }, 982 { 983 .name = BLOCK_OPT_CLUSTER_SIZE, 984 .type = OPT_SIZE, 985 .help = "RBD object size" 986 }, 987 {NULL} 988 }; 989 990 static BlockDriver bdrv_rbd = { 991 .format_name = "rbd", 992 .instance_size = sizeof(BDRVRBDState), 993 .bdrv_file_open = qemu_rbd_open, 994 .bdrv_close = qemu_rbd_close, 995 .bdrv_create = qemu_rbd_create, 996 .bdrv_get_info = qemu_rbd_getinfo, 997 .create_options = qemu_rbd_create_options, 998 .bdrv_getlength = qemu_rbd_getlength, 999 .bdrv_truncate = qemu_rbd_truncate, 1000 .protocol_name = "rbd", 1001 1002 .bdrv_aio_readv = qemu_rbd_aio_readv, 1003 .bdrv_aio_writev = qemu_rbd_aio_writev, 1004 1005 #ifdef LIBRBD_SUPPORTS_AIO_FLUSH 1006 .bdrv_aio_flush = qemu_rbd_aio_flush, 1007 #else 1008 .bdrv_co_flush_to_disk = qemu_rbd_co_flush, 1009 #endif 1010 1011 #ifdef LIBRBD_SUPPORTS_DISCARD 1012 .bdrv_aio_discard = qemu_rbd_aio_discard, 1013 #endif 1014 1015 .bdrv_snapshot_create = qemu_rbd_snap_create, 1016 .bdrv_snapshot_delete = qemu_rbd_snap_remove, 1017 .bdrv_snapshot_list = qemu_rbd_snap_list, 1018 .bdrv_snapshot_goto = qemu_rbd_snap_rollback, 1019 }; 1020 1021 static void bdrv_rbd_init(void) 1022 { 1023 bdrv_register(&bdrv_rbd); 1024 } 1025 1026 block_init(bdrv_rbd_init); 1027