1 /* 2 * QEMU Block driver for RADOS (Ceph) 3 * 4 * Copyright (C) 2010-2011 Christian Brunner <chb@muc.de>, 5 * Josh Durgin <josh.durgin@dreamhost.com> 6 * 7 * This work is licensed under the terms of the GNU GPL, version 2. See 8 * the COPYING file in the top-level directory. 9 * 10 * Contributions after 2012-01-13 are licensed under the terms of the 11 * GNU GPL, version 2 or (at your option) any later version. 12 */ 13 14 #include <inttypes.h> 15 16 #include "qemu-common.h" 17 #include "qemu/error-report.h" 18 #include "block/block_int.h" 19 20 #include <rbd/librbd.h> 21 22 /* 23 * When specifying the image filename use: 24 * 25 * rbd:poolname/devicename[@snapshotname][:option1=value1[:option2=value2...]] 26 * 27 * poolname must be the name of an existing rados pool. 28 * 29 * devicename is the name of the rbd image. 30 * 31 * Each option given is used to configure rados, and may be any valid 32 * Ceph option, "id", or "conf". 33 * 34 * The "id" option indicates what user we should authenticate as to 35 * the Ceph cluster. If it is excluded we will use the Ceph default 36 * (normally 'admin'). 37 * 38 * The "conf" option specifies a Ceph configuration file to read. If 39 * it is not specified, we will read from the default Ceph locations 40 * (e.g., /etc/ceph/ceph.conf). To avoid reading _any_ configuration 41 * file, specify conf=/dev/null. 42 * 43 * Configuration values containing :, @, or = can be escaped with a 44 * leading "\". 45 */ 46 47 /* rbd_aio_discard added in 0.1.2 */ 48 #if LIBRBD_VERSION_CODE >= LIBRBD_VERSION(0, 1, 2) 49 #define LIBRBD_SUPPORTS_DISCARD 50 #else 51 #undef LIBRBD_SUPPORTS_DISCARD 52 #endif 53 54 #define OBJ_MAX_SIZE (1UL << OBJ_DEFAULT_OBJ_ORDER) 55 56 #define RBD_MAX_CONF_NAME_SIZE 128 57 #define RBD_MAX_CONF_VAL_SIZE 512 58 #define RBD_MAX_CONF_SIZE 1024 59 #define RBD_MAX_POOL_NAME_SIZE 128 60 #define RBD_MAX_SNAP_NAME_SIZE 128 61 #define RBD_MAX_SNAPS 100 62 63 typedef enum { 64 RBD_AIO_READ, 65 RBD_AIO_WRITE, 66 RBD_AIO_DISCARD, 67 RBD_AIO_FLUSH 68 } RBDAIOCmd; 69 70 typedef struct RBDAIOCB { 71 BlockDriverAIOCB common; 72 QEMUBH *bh; 73 int64_t ret; 74 QEMUIOVector *qiov; 75 char *bounce; 76 RBDAIOCmd cmd; 77 int64_t sector_num; 78 int error; 79 struct BDRVRBDState *s; 80 int cancelled; 81 int status; 82 } RBDAIOCB; 83 84 typedef struct RADOSCB { 85 int rcbid; 86 RBDAIOCB *acb; 87 struct BDRVRBDState *s; 88 int done; 89 int64_t size; 90 char *buf; 91 int64_t ret; 92 } RADOSCB; 93 94 #define RBD_FD_READ 0 95 #define RBD_FD_WRITE 1 96 97 typedef struct BDRVRBDState { 98 int fds[2]; 99 rados_t cluster; 100 rados_ioctx_t io_ctx; 101 rbd_image_t image; 102 char name[RBD_MAX_IMAGE_NAME_SIZE]; 103 int qemu_aio_count; 104 char *snap; 105 int event_reader_pos; 106 RADOSCB *event_rcb; 107 } BDRVRBDState; 108 109 static void rbd_aio_bh_cb(void *opaque); 110 111 static int qemu_rbd_next_tok(char *dst, int dst_len, 112 char *src, char delim, 113 const char *name, 114 char **p) 115 { 116 int l; 117 char *end; 118 119 *p = NULL; 120 121 if (delim != '\0') { 122 for (end = src; *end; ++end) { 123 if (*end == delim) { 124 break; 125 } 126 if (*end == '\\' && end[1] != '\0') { 127 end++; 128 } 129 } 130 if (*end == delim) { 131 *p = end + 1; 132 *end = '\0'; 133 } 134 } 135 l = strlen(src); 136 if (l >= dst_len) { 137 error_report("%s too long", name); 138 return -EINVAL; 139 } else if (l == 0) { 140 error_report("%s too short", name); 141 return -EINVAL; 142 } 143 144 pstrcpy(dst, dst_len, src); 145 146 return 0; 147 } 148 149 static void qemu_rbd_unescape(char *src) 150 { 151 char *p; 152 153 for (p = src; *src; ++src, ++p) { 154 if (*src == '\\' && src[1] != '\0') { 155 src++; 156 } 157 *p = *src; 158 } 159 *p = '\0'; 160 } 161 162 static int qemu_rbd_parsename(const char *filename, 163 char *pool, int pool_len, 164 char *snap, int snap_len, 165 char *name, int name_len, 166 char *conf, int conf_len) 167 { 168 const char *start; 169 char *p, *buf; 170 int ret; 171 172 if (!strstart(filename, "rbd:", &start)) { 173 return -EINVAL; 174 } 175 176 buf = g_strdup(start); 177 p = buf; 178 *snap = '\0'; 179 *conf = '\0'; 180 181 ret = qemu_rbd_next_tok(pool, pool_len, p, '/', "pool name", &p); 182 if (ret < 0 || !p) { 183 ret = -EINVAL; 184 goto done; 185 } 186 qemu_rbd_unescape(pool); 187 188 if (strchr(p, '@')) { 189 ret = qemu_rbd_next_tok(name, name_len, p, '@', "object name", &p); 190 if (ret < 0) { 191 goto done; 192 } 193 ret = qemu_rbd_next_tok(snap, snap_len, p, ':', "snap name", &p); 194 qemu_rbd_unescape(snap); 195 } else { 196 ret = qemu_rbd_next_tok(name, name_len, p, ':', "object name", &p); 197 } 198 qemu_rbd_unescape(name); 199 if (ret < 0 || !p) { 200 goto done; 201 } 202 203 ret = qemu_rbd_next_tok(conf, conf_len, p, '\0', "configuration", &p); 204 205 done: 206 g_free(buf); 207 return ret; 208 } 209 210 static char *qemu_rbd_parse_clientname(const char *conf, char *clientname) 211 { 212 const char *p = conf; 213 214 while (*p) { 215 int len; 216 const char *end = strchr(p, ':'); 217 218 if (end) { 219 len = end - p; 220 } else { 221 len = strlen(p); 222 } 223 224 if (strncmp(p, "id=", 3) == 0) { 225 len -= 3; 226 strncpy(clientname, p + 3, len); 227 clientname[len] = '\0'; 228 return clientname; 229 } 230 if (end == NULL) { 231 break; 232 } 233 p = end + 1; 234 } 235 return NULL; 236 } 237 238 static int qemu_rbd_set_conf(rados_t cluster, const char *conf) 239 { 240 char *p, *buf; 241 char name[RBD_MAX_CONF_NAME_SIZE]; 242 char value[RBD_MAX_CONF_VAL_SIZE]; 243 int ret = 0; 244 245 buf = g_strdup(conf); 246 p = buf; 247 248 while (p) { 249 ret = qemu_rbd_next_tok(name, sizeof(name), p, 250 '=', "conf option name", &p); 251 if (ret < 0) { 252 break; 253 } 254 qemu_rbd_unescape(name); 255 256 if (!p) { 257 error_report("conf option %s has no value", name); 258 ret = -EINVAL; 259 break; 260 } 261 262 ret = qemu_rbd_next_tok(value, sizeof(value), p, 263 ':', "conf option value", &p); 264 if (ret < 0) { 265 break; 266 } 267 qemu_rbd_unescape(value); 268 269 if (strcmp(name, "conf") == 0) { 270 ret = rados_conf_read_file(cluster, value); 271 if (ret < 0) { 272 error_report("error reading conf file %s", value); 273 break; 274 } 275 } else if (strcmp(name, "id") == 0) { 276 /* ignore, this is parsed by qemu_rbd_parse_clientname() */ 277 } else { 278 ret = rados_conf_set(cluster, name, value); 279 if (ret < 0) { 280 error_report("invalid conf option %s", name); 281 ret = -EINVAL; 282 break; 283 } 284 } 285 } 286 287 g_free(buf); 288 return ret; 289 } 290 291 static int qemu_rbd_create(const char *filename, QEMUOptionParameter *options) 292 { 293 int64_t bytes = 0; 294 int64_t objsize; 295 int obj_order = 0; 296 char pool[RBD_MAX_POOL_NAME_SIZE]; 297 char name[RBD_MAX_IMAGE_NAME_SIZE]; 298 char snap_buf[RBD_MAX_SNAP_NAME_SIZE]; 299 char conf[RBD_MAX_CONF_SIZE]; 300 char clientname_buf[RBD_MAX_CONF_SIZE]; 301 char *clientname; 302 rados_t cluster; 303 rados_ioctx_t io_ctx; 304 int ret; 305 306 if (qemu_rbd_parsename(filename, pool, sizeof(pool), 307 snap_buf, sizeof(snap_buf), 308 name, sizeof(name), 309 conf, sizeof(conf)) < 0) { 310 return -EINVAL; 311 } 312 313 /* Read out options */ 314 while (options && options->name) { 315 if (!strcmp(options->name, BLOCK_OPT_SIZE)) { 316 bytes = options->value.n; 317 } else if (!strcmp(options->name, BLOCK_OPT_CLUSTER_SIZE)) { 318 if (options->value.n) { 319 objsize = options->value.n; 320 if ((objsize - 1) & objsize) { /* not a power of 2? */ 321 error_report("obj size needs to be power of 2"); 322 return -EINVAL; 323 } 324 if (objsize < 4096) { 325 error_report("obj size too small"); 326 return -EINVAL; 327 } 328 obj_order = ffs(objsize) - 1; 329 } 330 } 331 options++; 332 } 333 334 clientname = qemu_rbd_parse_clientname(conf, clientname_buf); 335 if (rados_create(&cluster, clientname) < 0) { 336 error_report("error initializing"); 337 return -EIO; 338 } 339 340 if (strstr(conf, "conf=") == NULL) { 341 /* try default location, but ignore failure */ 342 rados_conf_read_file(cluster, NULL); 343 } 344 345 if (conf[0] != '\0' && 346 qemu_rbd_set_conf(cluster, conf) < 0) { 347 error_report("error setting config options"); 348 rados_shutdown(cluster); 349 return -EIO; 350 } 351 352 if (rados_connect(cluster) < 0) { 353 error_report("error connecting"); 354 rados_shutdown(cluster); 355 return -EIO; 356 } 357 358 if (rados_ioctx_create(cluster, pool, &io_ctx) < 0) { 359 error_report("error opening pool %s", pool); 360 rados_shutdown(cluster); 361 return -EIO; 362 } 363 364 ret = rbd_create(io_ctx, name, bytes, &obj_order); 365 rados_ioctx_destroy(io_ctx); 366 rados_shutdown(cluster); 367 368 return ret; 369 } 370 371 /* 372 * This aio completion is being called from qemu_rbd_aio_event_reader() 373 * and runs in qemu context. It schedules a bh, but just in case the aio 374 * was not cancelled before. 375 */ 376 static void qemu_rbd_complete_aio(RADOSCB *rcb) 377 { 378 RBDAIOCB *acb = rcb->acb; 379 int64_t r; 380 381 r = rcb->ret; 382 383 if (acb->cmd != RBD_AIO_READ) { 384 if (r < 0) { 385 acb->ret = r; 386 acb->error = 1; 387 } else if (!acb->error) { 388 acb->ret = rcb->size; 389 } 390 } else { 391 if (r < 0) { 392 memset(rcb->buf, 0, rcb->size); 393 acb->ret = r; 394 acb->error = 1; 395 } else if (r < rcb->size) { 396 memset(rcb->buf + r, 0, rcb->size - r); 397 if (!acb->error) { 398 acb->ret = rcb->size; 399 } 400 } else if (!acb->error) { 401 acb->ret = r; 402 } 403 } 404 /* Note that acb->bh can be NULL in case where the aio was cancelled */ 405 acb->bh = qemu_bh_new(rbd_aio_bh_cb, acb); 406 qemu_bh_schedule(acb->bh); 407 g_free(rcb); 408 } 409 410 /* 411 * aio fd read handler. It runs in the qemu context and calls the 412 * completion handling of completed rados aio operations. 413 */ 414 static void qemu_rbd_aio_event_reader(void *opaque) 415 { 416 BDRVRBDState *s = opaque; 417 418 ssize_t ret; 419 420 do { 421 char *p = (char *)&s->event_rcb; 422 423 /* now read the rcb pointer that was sent from a non qemu thread */ 424 ret = read(s->fds[RBD_FD_READ], p + s->event_reader_pos, 425 sizeof(s->event_rcb) - s->event_reader_pos); 426 if (ret > 0) { 427 s->event_reader_pos += ret; 428 if (s->event_reader_pos == sizeof(s->event_rcb)) { 429 s->event_reader_pos = 0; 430 qemu_rbd_complete_aio(s->event_rcb); 431 s->qemu_aio_count--; 432 } 433 } 434 } while (ret < 0 && errno == EINTR); 435 } 436 437 static int qemu_rbd_aio_flush_cb(void *opaque) 438 { 439 BDRVRBDState *s = opaque; 440 441 return (s->qemu_aio_count > 0); 442 } 443 444 static int qemu_rbd_open(BlockDriverState *bs, const char *filename, 445 QDict *options, int flags) 446 { 447 BDRVRBDState *s = bs->opaque; 448 char pool[RBD_MAX_POOL_NAME_SIZE]; 449 char snap_buf[RBD_MAX_SNAP_NAME_SIZE]; 450 char conf[RBD_MAX_CONF_SIZE]; 451 char clientname_buf[RBD_MAX_CONF_SIZE]; 452 char *clientname; 453 int r; 454 455 if (qemu_rbd_parsename(filename, pool, sizeof(pool), 456 snap_buf, sizeof(snap_buf), 457 s->name, sizeof(s->name), 458 conf, sizeof(conf)) < 0) { 459 return -EINVAL; 460 } 461 462 clientname = qemu_rbd_parse_clientname(conf, clientname_buf); 463 r = rados_create(&s->cluster, clientname); 464 if (r < 0) { 465 error_report("error initializing"); 466 return r; 467 } 468 469 s->snap = NULL; 470 if (snap_buf[0] != '\0') { 471 s->snap = g_strdup(snap_buf); 472 } 473 474 /* 475 * Fallback to more conservative semantics if setting cache 476 * options fails. Ignore errors from setting rbd_cache because the 477 * only possible error is that the option does not exist, and 478 * librbd defaults to no caching. If write through caching cannot 479 * be set up, fall back to no caching. 480 */ 481 if (flags & BDRV_O_NOCACHE) { 482 rados_conf_set(s->cluster, "rbd_cache", "false"); 483 } else { 484 rados_conf_set(s->cluster, "rbd_cache", "true"); 485 } 486 487 if (strstr(conf, "conf=") == NULL) { 488 /* try default location, but ignore failure */ 489 rados_conf_read_file(s->cluster, NULL); 490 } 491 492 if (conf[0] != '\0') { 493 r = qemu_rbd_set_conf(s->cluster, conf); 494 if (r < 0) { 495 error_report("error setting config options"); 496 goto failed_shutdown; 497 } 498 } 499 500 r = rados_connect(s->cluster); 501 if (r < 0) { 502 error_report("error connecting"); 503 goto failed_shutdown; 504 } 505 506 r = rados_ioctx_create(s->cluster, pool, &s->io_ctx); 507 if (r < 0) { 508 error_report("error opening pool %s", pool); 509 goto failed_shutdown; 510 } 511 512 r = rbd_open(s->io_ctx, s->name, &s->image, s->snap); 513 if (r < 0) { 514 error_report("error reading header from %s", s->name); 515 goto failed_open; 516 } 517 518 bs->read_only = (s->snap != NULL); 519 520 s->event_reader_pos = 0; 521 r = qemu_pipe(s->fds); 522 if (r < 0) { 523 error_report("error opening eventfd"); 524 goto failed; 525 } 526 fcntl(s->fds[0], F_SETFL, O_NONBLOCK); 527 fcntl(s->fds[1], F_SETFL, O_NONBLOCK); 528 qemu_aio_set_fd_handler(s->fds[RBD_FD_READ], qemu_rbd_aio_event_reader, 529 NULL, qemu_rbd_aio_flush_cb, s); 530 531 532 return 0; 533 534 failed: 535 rbd_close(s->image); 536 failed_open: 537 rados_ioctx_destroy(s->io_ctx); 538 failed_shutdown: 539 rados_shutdown(s->cluster); 540 g_free(s->snap); 541 return r; 542 } 543 544 static void qemu_rbd_close(BlockDriverState *bs) 545 { 546 BDRVRBDState *s = bs->opaque; 547 548 close(s->fds[0]); 549 close(s->fds[1]); 550 qemu_aio_set_fd_handler(s->fds[RBD_FD_READ], NULL, NULL, NULL, NULL); 551 552 rbd_close(s->image); 553 rados_ioctx_destroy(s->io_ctx); 554 g_free(s->snap); 555 rados_shutdown(s->cluster); 556 } 557 558 /* 559 * Cancel aio. Since we don't reference acb in a non qemu threads, 560 * it is safe to access it here. 561 */ 562 static void qemu_rbd_aio_cancel(BlockDriverAIOCB *blockacb) 563 { 564 RBDAIOCB *acb = (RBDAIOCB *) blockacb; 565 acb->cancelled = 1; 566 567 while (acb->status == -EINPROGRESS) { 568 qemu_aio_wait(); 569 } 570 571 qemu_aio_release(acb); 572 } 573 574 static const AIOCBInfo rbd_aiocb_info = { 575 .aiocb_size = sizeof(RBDAIOCB), 576 .cancel = qemu_rbd_aio_cancel, 577 }; 578 579 static int qemu_rbd_send_pipe(BDRVRBDState *s, RADOSCB *rcb) 580 { 581 int ret = 0; 582 while (1) { 583 fd_set wfd; 584 int fd = s->fds[RBD_FD_WRITE]; 585 586 /* send the op pointer to the qemu thread that is responsible 587 for the aio/op completion. Must do it in a qemu thread context */ 588 ret = write(fd, (void *)&rcb, sizeof(rcb)); 589 if (ret >= 0) { 590 break; 591 } 592 if (errno == EINTR) { 593 continue; 594 } 595 if (errno != EAGAIN) { 596 break; 597 } 598 599 FD_ZERO(&wfd); 600 FD_SET(fd, &wfd); 601 do { 602 ret = select(fd + 1, NULL, &wfd, NULL, NULL); 603 } while (ret < 0 && errno == EINTR); 604 } 605 606 return ret; 607 } 608 609 /* 610 * This is the callback function for rbd_aio_read and _write 611 * 612 * Note: this function is being called from a non qemu thread so 613 * we need to be careful about what we do here. Generally we only 614 * write to the block notification pipe, and do the rest of the 615 * io completion handling from qemu_rbd_aio_event_reader() which 616 * runs in a qemu context. 617 */ 618 static void rbd_finish_aiocb(rbd_completion_t c, RADOSCB *rcb) 619 { 620 int ret; 621 rcb->ret = rbd_aio_get_return_value(c); 622 rbd_aio_release(c); 623 ret = qemu_rbd_send_pipe(rcb->s, rcb); 624 if (ret < 0) { 625 error_report("failed writing to acb->s->fds"); 626 g_free(rcb); 627 } 628 } 629 630 /* Callback when all queued rbd_aio requests are complete */ 631 632 static void rbd_aio_bh_cb(void *opaque) 633 { 634 RBDAIOCB *acb = opaque; 635 636 if (acb->cmd == RBD_AIO_READ) { 637 qemu_iovec_from_buf(acb->qiov, 0, acb->bounce, acb->qiov->size); 638 } 639 qemu_vfree(acb->bounce); 640 acb->common.cb(acb->common.opaque, (acb->ret > 0 ? 0 : acb->ret)); 641 qemu_bh_delete(acb->bh); 642 acb->bh = NULL; 643 acb->status = 0; 644 645 if (!acb->cancelled) { 646 qemu_aio_release(acb); 647 } 648 } 649 650 static int rbd_aio_discard_wrapper(rbd_image_t image, 651 uint64_t off, 652 uint64_t len, 653 rbd_completion_t comp) 654 { 655 #ifdef LIBRBD_SUPPORTS_DISCARD 656 return rbd_aio_discard(image, off, len, comp); 657 #else 658 return -ENOTSUP; 659 #endif 660 } 661 662 static int rbd_aio_flush_wrapper(rbd_image_t image, 663 rbd_completion_t comp) 664 { 665 #ifdef LIBRBD_SUPPORTS_AIO_FLUSH 666 return rbd_aio_flush(image, comp); 667 #else 668 return -ENOTSUP; 669 #endif 670 } 671 672 static BlockDriverAIOCB *rbd_start_aio(BlockDriverState *bs, 673 int64_t sector_num, 674 QEMUIOVector *qiov, 675 int nb_sectors, 676 BlockDriverCompletionFunc *cb, 677 void *opaque, 678 RBDAIOCmd cmd) 679 { 680 RBDAIOCB *acb; 681 RADOSCB *rcb; 682 rbd_completion_t c; 683 int64_t off, size; 684 char *buf; 685 int r; 686 687 BDRVRBDState *s = bs->opaque; 688 689 acb = qemu_aio_get(&rbd_aiocb_info, bs, cb, opaque); 690 acb->cmd = cmd; 691 acb->qiov = qiov; 692 if (cmd == RBD_AIO_DISCARD || cmd == RBD_AIO_FLUSH) { 693 acb->bounce = NULL; 694 } else { 695 acb->bounce = qemu_blockalign(bs, qiov->size); 696 } 697 acb->ret = 0; 698 acb->error = 0; 699 acb->s = s; 700 acb->cancelled = 0; 701 acb->bh = NULL; 702 acb->status = -EINPROGRESS; 703 704 if (cmd == RBD_AIO_WRITE) { 705 qemu_iovec_to_buf(acb->qiov, 0, acb->bounce, qiov->size); 706 } 707 708 buf = acb->bounce; 709 710 off = sector_num * BDRV_SECTOR_SIZE; 711 size = nb_sectors * BDRV_SECTOR_SIZE; 712 713 s->qemu_aio_count++; /* All the RADOSCB */ 714 715 rcb = g_malloc(sizeof(RADOSCB)); 716 rcb->done = 0; 717 rcb->acb = acb; 718 rcb->buf = buf; 719 rcb->s = acb->s; 720 rcb->size = size; 721 r = rbd_aio_create_completion(rcb, (rbd_callback_t) rbd_finish_aiocb, &c); 722 if (r < 0) { 723 goto failed; 724 } 725 726 switch (cmd) { 727 case RBD_AIO_WRITE: 728 r = rbd_aio_write(s->image, off, size, buf, c); 729 break; 730 case RBD_AIO_READ: 731 r = rbd_aio_read(s->image, off, size, buf, c); 732 break; 733 case RBD_AIO_DISCARD: 734 r = rbd_aio_discard_wrapper(s->image, off, size, c); 735 break; 736 case RBD_AIO_FLUSH: 737 r = rbd_aio_flush_wrapper(s->image, c); 738 break; 739 default: 740 r = -EINVAL; 741 } 742 743 if (r < 0) { 744 goto failed; 745 } 746 747 return &acb->common; 748 749 failed: 750 g_free(rcb); 751 s->qemu_aio_count--; 752 qemu_aio_release(acb); 753 return NULL; 754 } 755 756 static BlockDriverAIOCB *qemu_rbd_aio_readv(BlockDriverState *bs, 757 int64_t sector_num, 758 QEMUIOVector *qiov, 759 int nb_sectors, 760 BlockDriverCompletionFunc *cb, 761 void *opaque) 762 { 763 return rbd_start_aio(bs, sector_num, qiov, nb_sectors, cb, opaque, 764 RBD_AIO_READ); 765 } 766 767 static BlockDriverAIOCB *qemu_rbd_aio_writev(BlockDriverState *bs, 768 int64_t sector_num, 769 QEMUIOVector *qiov, 770 int nb_sectors, 771 BlockDriverCompletionFunc *cb, 772 void *opaque) 773 { 774 return rbd_start_aio(bs, sector_num, qiov, nb_sectors, cb, opaque, 775 RBD_AIO_WRITE); 776 } 777 778 #ifdef LIBRBD_SUPPORTS_AIO_FLUSH 779 static BlockDriverAIOCB *qemu_rbd_aio_flush(BlockDriverState *bs, 780 BlockDriverCompletionFunc *cb, 781 void *opaque) 782 { 783 return rbd_start_aio(bs, 0, NULL, 0, cb, opaque, RBD_AIO_FLUSH); 784 } 785 786 #else 787 788 static int qemu_rbd_co_flush(BlockDriverState *bs) 789 { 790 #if LIBRBD_VERSION_CODE >= LIBRBD_VERSION(0, 1, 1) 791 /* rbd_flush added in 0.1.1 */ 792 BDRVRBDState *s = bs->opaque; 793 return rbd_flush(s->image); 794 #else 795 return 0; 796 #endif 797 } 798 #endif 799 800 static int qemu_rbd_getinfo(BlockDriverState *bs, BlockDriverInfo *bdi) 801 { 802 BDRVRBDState *s = bs->opaque; 803 rbd_image_info_t info; 804 int r; 805 806 r = rbd_stat(s->image, &info, sizeof(info)); 807 if (r < 0) { 808 return r; 809 } 810 811 bdi->cluster_size = info.obj_size; 812 return 0; 813 } 814 815 static int64_t qemu_rbd_getlength(BlockDriverState *bs) 816 { 817 BDRVRBDState *s = bs->opaque; 818 rbd_image_info_t info; 819 int r; 820 821 r = rbd_stat(s->image, &info, sizeof(info)); 822 if (r < 0) { 823 return r; 824 } 825 826 return info.size; 827 } 828 829 static int qemu_rbd_truncate(BlockDriverState *bs, int64_t offset) 830 { 831 BDRVRBDState *s = bs->opaque; 832 int r; 833 834 r = rbd_resize(s->image, offset); 835 if (r < 0) { 836 return r; 837 } 838 839 return 0; 840 } 841 842 static int qemu_rbd_snap_create(BlockDriverState *bs, 843 QEMUSnapshotInfo *sn_info) 844 { 845 BDRVRBDState *s = bs->opaque; 846 int r; 847 848 if (sn_info->name[0] == '\0') { 849 return -EINVAL; /* we need a name for rbd snapshots */ 850 } 851 852 /* 853 * rbd snapshots are using the name as the user controlled unique identifier 854 * we can't use the rbd snapid for that purpose, as it can't be set 855 */ 856 if (sn_info->id_str[0] != '\0' && 857 strcmp(sn_info->id_str, sn_info->name) != 0) { 858 return -EINVAL; 859 } 860 861 if (strlen(sn_info->name) >= sizeof(sn_info->id_str)) { 862 return -ERANGE; 863 } 864 865 r = rbd_snap_create(s->image, sn_info->name); 866 if (r < 0) { 867 error_report("failed to create snap: %s", strerror(-r)); 868 return r; 869 } 870 871 return 0; 872 } 873 874 static int qemu_rbd_snap_remove(BlockDriverState *bs, 875 const char *snapshot_name) 876 { 877 BDRVRBDState *s = bs->opaque; 878 int r; 879 880 r = rbd_snap_remove(s->image, snapshot_name); 881 return r; 882 } 883 884 static int qemu_rbd_snap_rollback(BlockDriverState *bs, 885 const char *snapshot_name) 886 { 887 BDRVRBDState *s = bs->opaque; 888 int r; 889 890 r = rbd_snap_rollback(s->image, snapshot_name); 891 return r; 892 } 893 894 static int qemu_rbd_snap_list(BlockDriverState *bs, 895 QEMUSnapshotInfo **psn_tab) 896 { 897 BDRVRBDState *s = bs->opaque; 898 QEMUSnapshotInfo *sn_info, *sn_tab = NULL; 899 int i, snap_count; 900 rbd_snap_info_t *snaps; 901 int max_snaps = RBD_MAX_SNAPS; 902 903 do { 904 snaps = g_malloc(sizeof(*snaps) * max_snaps); 905 snap_count = rbd_snap_list(s->image, snaps, &max_snaps); 906 if (snap_count < 0) { 907 g_free(snaps); 908 } 909 } while (snap_count == -ERANGE); 910 911 if (snap_count <= 0) { 912 goto done; 913 } 914 915 sn_tab = g_malloc0(snap_count * sizeof(QEMUSnapshotInfo)); 916 917 for (i = 0; i < snap_count; i++) { 918 const char *snap_name = snaps[i].name; 919 920 sn_info = sn_tab + i; 921 pstrcpy(sn_info->id_str, sizeof(sn_info->id_str), snap_name); 922 pstrcpy(sn_info->name, sizeof(sn_info->name), snap_name); 923 924 sn_info->vm_state_size = snaps[i].size; 925 sn_info->date_sec = 0; 926 sn_info->date_nsec = 0; 927 sn_info->vm_clock_nsec = 0; 928 } 929 rbd_snap_list_end(snaps); 930 931 done: 932 *psn_tab = sn_tab; 933 return snap_count; 934 } 935 936 #ifdef LIBRBD_SUPPORTS_DISCARD 937 static BlockDriverAIOCB* qemu_rbd_aio_discard(BlockDriverState *bs, 938 int64_t sector_num, 939 int nb_sectors, 940 BlockDriverCompletionFunc *cb, 941 void *opaque) 942 { 943 return rbd_start_aio(bs, sector_num, NULL, nb_sectors, cb, opaque, 944 RBD_AIO_DISCARD); 945 } 946 #endif 947 948 static QEMUOptionParameter qemu_rbd_create_options[] = { 949 { 950 .name = BLOCK_OPT_SIZE, 951 .type = OPT_SIZE, 952 .help = "Virtual disk size" 953 }, 954 { 955 .name = BLOCK_OPT_CLUSTER_SIZE, 956 .type = OPT_SIZE, 957 .help = "RBD object size" 958 }, 959 {NULL} 960 }; 961 962 static BlockDriver bdrv_rbd = { 963 .format_name = "rbd", 964 .instance_size = sizeof(BDRVRBDState), 965 .bdrv_file_open = qemu_rbd_open, 966 .bdrv_close = qemu_rbd_close, 967 .bdrv_create = qemu_rbd_create, 968 .bdrv_get_info = qemu_rbd_getinfo, 969 .create_options = qemu_rbd_create_options, 970 .bdrv_getlength = qemu_rbd_getlength, 971 .bdrv_truncate = qemu_rbd_truncate, 972 .protocol_name = "rbd", 973 974 .bdrv_aio_readv = qemu_rbd_aio_readv, 975 .bdrv_aio_writev = qemu_rbd_aio_writev, 976 977 #ifdef LIBRBD_SUPPORTS_AIO_FLUSH 978 .bdrv_aio_flush = qemu_rbd_aio_flush, 979 #else 980 .bdrv_co_flush_to_disk = qemu_rbd_co_flush, 981 #endif 982 983 #ifdef LIBRBD_SUPPORTS_DISCARD 984 .bdrv_aio_discard = qemu_rbd_aio_discard, 985 #endif 986 987 .bdrv_snapshot_create = qemu_rbd_snap_create, 988 .bdrv_snapshot_delete = qemu_rbd_snap_remove, 989 .bdrv_snapshot_list = qemu_rbd_snap_list, 990 .bdrv_snapshot_goto = qemu_rbd_snap_rollback, 991 }; 992 993 static void bdrv_rbd_init(void) 994 { 995 bdrv_register(&bdrv_rbd); 996 } 997 998 block_init(bdrv_rbd_init); 999