1 /* 2 * QEMU Block driver for RADOS (Ceph) 3 * 4 * Copyright (C) 2010-2011 Christian Brunner <chb@muc.de>, 5 * Josh Durgin <josh.durgin@dreamhost.com> 6 * 7 * This work is licensed under the terms of the GNU GPL, version 2. See 8 * the COPYING file in the top-level directory. 9 * 10 * Contributions after 2012-01-13 are licensed under the terms of the 11 * GNU GPL, version 2 or (at your option) any later version. 12 */ 13 14 #include "qemu/osdep.h" 15 16 #include <rbd/librbd.h> 17 #include "qapi/error.h" 18 #include "qemu/error-report.h" 19 #include "block/block_int.h" 20 #include "crypto/secret.h" 21 #include "qemu/cutils.h" 22 #include "qapi/qmp/qstring.h" 23 24 /* 25 * When specifying the image filename use: 26 * 27 * rbd:poolname/devicename[@snapshotname][:option1=value1[:option2=value2...]] 28 * 29 * poolname must be the name of an existing rados pool. 30 * 31 * devicename is the name of the rbd image. 32 * 33 * Each option given is used to configure rados, and may be any valid 34 * Ceph option, "id", or "conf". 35 * 36 * The "id" option indicates what user we should authenticate as to 37 * the Ceph cluster. If it is excluded we will use the Ceph default 38 * (normally 'admin'). 39 * 40 * The "conf" option specifies a Ceph configuration file to read. If 41 * it is not specified, we will read from the default Ceph locations 42 * (e.g., /etc/ceph/ceph.conf). To avoid reading _any_ configuration 43 * file, specify conf=/dev/null. 44 * 45 * Configuration values containing :, @, or = can be escaped with a 46 * leading "\". 47 */ 48 49 /* rbd_aio_discard added in 0.1.2 */ 50 #if LIBRBD_VERSION_CODE >= LIBRBD_VERSION(0, 1, 2) 51 #define LIBRBD_SUPPORTS_DISCARD 52 #else 53 #undef LIBRBD_SUPPORTS_DISCARD 54 #endif 55 56 #define OBJ_MAX_SIZE (1UL << OBJ_DEFAULT_OBJ_ORDER) 57 58 #define RBD_MAX_SNAPS 100 59 60 /* The LIBRBD_SUPPORTS_IOVEC is defined in librbd.h */ 61 #ifdef LIBRBD_SUPPORTS_IOVEC 62 #define LIBRBD_USE_IOVEC 1 63 #else 64 #define LIBRBD_USE_IOVEC 0 65 #endif 66 67 typedef enum { 68 RBD_AIO_READ, 69 RBD_AIO_WRITE, 70 RBD_AIO_DISCARD, 71 RBD_AIO_FLUSH 72 } RBDAIOCmd; 73 74 typedef struct RBDAIOCB { 75 BlockAIOCB common; 76 int64_t ret; 77 QEMUIOVector *qiov; 78 char *bounce; 79 RBDAIOCmd cmd; 80 int error; 81 struct BDRVRBDState *s; 82 } RBDAIOCB; 83 84 typedef struct RADOSCB { 85 RBDAIOCB *acb; 86 struct BDRVRBDState *s; 87 int64_t size; 88 char *buf; 89 int64_t ret; 90 } RADOSCB; 91 92 typedef struct BDRVRBDState { 93 rados_t cluster; 94 rados_ioctx_t io_ctx; 95 rbd_image_t image; 96 char *name; 97 char *snap; 98 } BDRVRBDState; 99 100 static char *qemu_rbd_next_tok(char *src, char delim, char **p) 101 { 102 char *end; 103 104 *p = NULL; 105 106 for (end = src; *end; ++end) { 107 if (*end == delim) { 108 break; 109 } 110 if (*end == '\\' && end[1] != '\0') { 111 end++; 112 } 113 } 114 if (*end == delim) { 115 *p = end + 1; 116 *end = '\0'; 117 } 118 return src; 119 } 120 121 static void qemu_rbd_unescape(char *src) 122 { 123 char *p; 124 125 for (p = src; *src; ++src, ++p) { 126 if (*src == '\\' && src[1] != '\0') { 127 src++; 128 } 129 *p = *src; 130 } 131 *p = '\0'; 132 } 133 134 static void qemu_rbd_parse_filename(const char *filename, QDict *options, 135 Error **errp) 136 { 137 const char *start; 138 char *p, *buf, *keypairs; 139 char *found_str; 140 size_t max_keypair_size; 141 142 if (!strstart(filename, "rbd:", &start)) { 143 error_setg(errp, "File name must start with 'rbd:'"); 144 return; 145 } 146 147 max_keypair_size = strlen(start) + 1; 148 buf = g_strdup(start); 149 keypairs = g_malloc0(max_keypair_size); 150 p = buf; 151 152 found_str = qemu_rbd_next_tok(p, '/', &p); 153 if (!p) { 154 error_setg(errp, "Pool name is required"); 155 goto done; 156 } 157 qemu_rbd_unescape(found_str); 158 qdict_put(options, "pool", qstring_from_str(found_str)); 159 160 if (strchr(p, '@')) { 161 found_str = qemu_rbd_next_tok(p, '@', &p); 162 qemu_rbd_unescape(found_str); 163 qdict_put(options, "image", qstring_from_str(found_str)); 164 165 found_str = qemu_rbd_next_tok(p, ':', &p); 166 qemu_rbd_unescape(found_str); 167 qdict_put(options, "snapshot", qstring_from_str(found_str)); 168 } else { 169 found_str = qemu_rbd_next_tok(p, ':', &p); 170 qemu_rbd_unescape(found_str); 171 qdict_put(options, "image", qstring_from_str(found_str)); 172 } 173 if (!p) { 174 goto done; 175 } 176 177 /* The following are essentially all key/value pairs, and we treat 178 * 'id' and 'conf' a bit special. Key/value pairs may be in any order. */ 179 while (p) { 180 char *name, *value; 181 name = qemu_rbd_next_tok(p, '=', &p); 182 if (!p) { 183 error_setg(errp, "conf option %s has no value", name); 184 break; 185 } 186 187 qemu_rbd_unescape(name); 188 189 value = qemu_rbd_next_tok(p, ':', &p); 190 qemu_rbd_unescape(value); 191 192 if (!strcmp(name, "conf")) { 193 qdict_put(options, "conf", qstring_from_str(value)); 194 } else if (!strcmp(name, "id")) { 195 qdict_put(options, "user" , qstring_from_str(value)); 196 } else { 197 /* FIXME: This is pretty ugly, and not the right way to do this. 198 * These should be contained in a structure, and then 199 * passed explicitly as individual key/value pairs to 200 * rados. Consider this legacy code that needs to be 201 * updated. */ 202 char *tmp = g_malloc0(max_keypair_size); 203 /* only use a delimiter if it is not the first keypair found */ 204 /* These are sets of unknown key/value pairs we'll pass along 205 * to ceph */ 206 if (keypairs[0]) { 207 snprintf(tmp, max_keypair_size, ":%s=%s", name, value); 208 pstrcat(keypairs, max_keypair_size, tmp); 209 } else { 210 snprintf(keypairs, max_keypair_size, "%s=%s", name, value); 211 } 212 g_free(tmp); 213 } 214 } 215 216 if (keypairs[0]) { 217 qdict_put(options, "=keyvalue-pairs", qstring_from_str(keypairs)); 218 } 219 220 221 done: 222 g_free(buf); 223 g_free(keypairs); 224 return; 225 } 226 227 228 static int qemu_rbd_set_auth(rados_t cluster, const char *secretid, 229 Error **errp) 230 { 231 if (secretid == 0) { 232 return 0; 233 } 234 235 gchar *secret = qcrypto_secret_lookup_as_base64(secretid, 236 errp); 237 if (!secret) { 238 return -1; 239 } 240 241 rados_conf_set(cluster, "key", secret); 242 g_free(secret); 243 244 return 0; 245 } 246 247 static int qemu_rbd_set_keypairs(rados_t cluster, const char *keypairs, 248 Error **errp) 249 { 250 char *p, *buf; 251 char *name; 252 char *value; 253 int ret = 0; 254 255 buf = g_strdup(keypairs); 256 p = buf; 257 258 while (p) { 259 name = qemu_rbd_next_tok(p, '=', &p); 260 if (!p) { 261 error_setg(errp, "conf option %s has no value", name); 262 ret = -EINVAL; 263 break; 264 } 265 266 value = qemu_rbd_next_tok(p, ':', &p); 267 268 ret = rados_conf_set(cluster, name, value); 269 if (ret < 0) { 270 error_setg_errno(errp, -ret, "invalid conf option %s", name); 271 ret = -EINVAL; 272 break; 273 } 274 } 275 276 g_free(buf); 277 return ret; 278 } 279 280 static void qemu_rbd_memset(RADOSCB *rcb, int64_t offs) 281 { 282 if (LIBRBD_USE_IOVEC) { 283 RBDAIOCB *acb = rcb->acb; 284 iov_memset(acb->qiov->iov, acb->qiov->niov, offs, 0, 285 acb->qiov->size - offs); 286 } else { 287 memset(rcb->buf + offs, 0, rcb->size - offs); 288 } 289 } 290 291 static QemuOptsList runtime_opts = { 292 .name = "rbd", 293 .head = QTAILQ_HEAD_INITIALIZER(runtime_opts.head), 294 .desc = { 295 { 296 .name = "pool", 297 .type = QEMU_OPT_STRING, 298 .help = "Rados pool name", 299 }, 300 { 301 .name = "image", 302 .type = QEMU_OPT_STRING, 303 .help = "Image name in the pool", 304 }, 305 { 306 .name = "conf", 307 .type = QEMU_OPT_STRING, 308 .help = "Rados config file location", 309 }, 310 { 311 .name = "snapshot", 312 .type = QEMU_OPT_STRING, 313 .help = "Ceph snapshot name", 314 }, 315 { 316 /* maps to 'id' in rados_create() */ 317 .name = "user", 318 .type = QEMU_OPT_STRING, 319 .help = "Rados id name", 320 }, 321 /* 322 * server.* extracted manually, see qemu_rbd_mon_host() 323 */ 324 { 325 .name = "password-secret", 326 .type = QEMU_OPT_STRING, 327 .help = "ID of secret providing the password", 328 }, 329 330 /* 331 * Keys for qemu_rbd_parse_filename(), not in the QAPI schema 332 */ 333 { 334 /* 335 * HACK: name starts with '=' so that qemu_opts_parse() 336 * can't set it 337 */ 338 .name = "=keyvalue-pairs", 339 .type = QEMU_OPT_STRING, 340 .help = "Legacy rados key/value option parameters", 341 }, 342 { /* end of list */ } 343 }, 344 }; 345 346 static int qemu_rbd_create(const char *filename, QemuOpts *opts, Error **errp) 347 { 348 Error *local_err = NULL; 349 int64_t bytes = 0; 350 int64_t objsize; 351 int obj_order = 0; 352 const char *pool, *name, *conf, *clientname, *keypairs; 353 const char *secretid; 354 rados_t cluster; 355 rados_ioctx_t io_ctx; 356 QDict *options = NULL; 357 int ret = 0; 358 359 secretid = qemu_opt_get(opts, "password-secret"); 360 361 /* Read out options */ 362 bytes = ROUND_UP(qemu_opt_get_size_del(opts, BLOCK_OPT_SIZE, 0), 363 BDRV_SECTOR_SIZE); 364 objsize = qemu_opt_get_size_del(opts, BLOCK_OPT_CLUSTER_SIZE, 0); 365 if (objsize) { 366 if ((objsize - 1) & objsize) { /* not a power of 2? */ 367 error_setg(errp, "obj size needs to be power of 2"); 368 ret = -EINVAL; 369 goto exit; 370 } 371 if (objsize < 4096) { 372 error_setg(errp, "obj size too small"); 373 ret = -EINVAL; 374 goto exit; 375 } 376 obj_order = ctz32(objsize); 377 } 378 379 options = qdict_new(); 380 qemu_rbd_parse_filename(filename, options, &local_err); 381 if (local_err) { 382 ret = -EINVAL; 383 error_propagate(errp, local_err); 384 goto exit; 385 } 386 387 pool = qdict_get_try_str(options, "pool"); 388 conf = qdict_get_try_str(options, "conf"); 389 clientname = qdict_get_try_str(options, "user"); 390 name = qdict_get_try_str(options, "image"); 391 keypairs = qdict_get_try_str(options, "=keyvalue-pairs"); 392 393 ret = rados_create(&cluster, clientname); 394 if (ret < 0) { 395 error_setg_errno(errp, -ret, "error initializing"); 396 goto exit; 397 } 398 399 /* try default location when conf=NULL, but ignore failure */ 400 ret = rados_conf_read_file(cluster, conf); 401 if (conf && ret < 0) { 402 error_setg_errno(errp, -ret, "error reading conf file %s", conf); 403 ret = -EIO; 404 goto shutdown; 405 } 406 407 ret = qemu_rbd_set_keypairs(cluster, keypairs, errp); 408 if (ret < 0) { 409 ret = -EIO; 410 goto shutdown; 411 } 412 413 if (qemu_rbd_set_auth(cluster, secretid, errp) < 0) { 414 ret = -EIO; 415 goto shutdown; 416 } 417 418 ret = rados_connect(cluster); 419 if (ret < 0) { 420 error_setg_errno(errp, -ret, "error connecting"); 421 goto shutdown; 422 } 423 424 ret = rados_ioctx_create(cluster, pool, &io_ctx); 425 if (ret < 0) { 426 error_setg_errno(errp, -ret, "error opening pool %s", pool); 427 goto shutdown; 428 } 429 430 ret = rbd_create(io_ctx, name, bytes, &obj_order); 431 if (ret < 0) { 432 error_setg_errno(errp, -ret, "error rbd create"); 433 } 434 435 rados_ioctx_destroy(io_ctx); 436 437 shutdown: 438 rados_shutdown(cluster); 439 440 exit: 441 QDECREF(options); 442 return ret; 443 } 444 445 /* 446 * This aio completion is being called from rbd_finish_bh() and runs in qemu 447 * BH context. 448 */ 449 static void qemu_rbd_complete_aio(RADOSCB *rcb) 450 { 451 RBDAIOCB *acb = rcb->acb; 452 int64_t r; 453 454 r = rcb->ret; 455 456 if (acb->cmd != RBD_AIO_READ) { 457 if (r < 0) { 458 acb->ret = r; 459 acb->error = 1; 460 } else if (!acb->error) { 461 acb->ret = rcb->size; 462 } 463 } else { 464 if (r < 0) { 465 qemu_rbd_memset(rcb, 0); 466 acb->ret = r; 467 acb->error = 1; 468 } else if (r < rcb->size) { 469 qemu_rbd_memset(rcb, r); 470 if (!acb->error) { 471 acb->ret = rcb->size; 472 } 473 } else if (!acb->error) { 474 acb->ret = r; 475 } 476 } 477 478 g_free(rcb); 479 480 if (!LIBRBD_USE_IOVEC) { 481 if (acb->cmd == RBD_AIO_READ) { 482 qemu_iovec_from_buf(acb->qiov, 0, acb->bounce, acb->qiov->size); 483 } 484 qemu_vfree(acb->bounce); 485 } 486 487 acb->common.cb(acb->common.opaque, (acb->ret > 0 ? 0 : acb->ret)); 488 489 qemu_aio_unref(acb); 490 } 491 492 static char *qemu_rbd_mon_host(QDict *options, Error **errp) 493 { 494 const char **vals = g_new(const char *, qdict_size(options) + 1); 495 char keybuf[32]; 496 const char *host, *port; 497 char *rados_str; 498 int i; 499 500 for (i = 0;; i++) { 501 sprintf(keybuf, "server.%d.host", i); 502 host = qdict_get_try_str(options, keybuf); 503 qdict_del(options, keybuf); 504 sprintf(keybuf, "server.%d.port", i); 505 port = qdict_get_try_str(options, keybuf); 506 qdict_del(options, keybuf); 507 if (!host && !port) { 508 break; 509 } 510 if (!host) { 511 error_setg(errp, "Parameter server.%d.host is missing", i); 512 rados_str = NULL; 513 goto out; 514 } 515 516 if (strchr(host, ':')) { 517 vals[i] = port ? g_strdup_printf("[%s]:%s", host, port) 518 : g_strdup_printf("[%s]", host); 519 } else { 520 vals[i] = port ? g_strdup_printf("%s:%s", host, port) 521 : g_strdup(host); 522 } 523 } 524 vals[i] = NULL; 525 526 rados_str = i ? g_strjoinv(";", (char **)vals) : NULL; 527 out: 528 g_strfreev((char **)vals); 529 return rados_str; 530 } 531 532 static int qemu_rbd_open(BlockDriverState *bs, QDict *options, int flags, 533 Error **errp) 534 { 535 BDRVRBDState *s = bs->opaque; 536 const char *pool, *snap, *conf, *clientname, *name, *keypairs; 537 const char *secretid; 538 QemuOpts *opts; 539 Error *local_err = NULL; 540 char *mon_host = NULL; 541 int r; 542 543 opts = qemu_opts_create(&runtime_opts, NULL, 0, &error_abort); 544 qemu_opts_absorb_qdict(opts, options, &local_err); 545 if (local_err) { 546 error_propagate(errp, local_err); 547 r = -EINVAL; 548 goto failed_opts; 549 } 550 551 mon_host = qemu_rbd_mon_host(options, &local_err); 552 if (local_err) { 553 error_propagate(errp, local_err); 554 r = -EINVAL; 555 goto failed_opts; 556 } 557 558 secretid = qemu_opt_get(opts, "password-secret"); 559 560 pool = qemu_opt_get(opts, "pool"); 561 conf = qemu_opt_get(opts, "conf"); 562 snap = qemu_opt_get(opts, "snapshot"); 563 clientname = qemu_opt_get(opts, "user"); 564 name = qemu_opt_get(opts, "image"); 565 keypairs = qemu_opt_get(opts, "=keyvalue-pairs"); 566 567 if (!pool || !name) { 568 error_setg(errp, "Parameters 'pool' and 'image' are required"); 569 r = -EINVAL; 570 goto failed_opts; 571 } 572 573 r = rados_create(&s->cluster, clientname); 574 if (r < 0) { 575 error_setg_errno(errp, -r, "error initializing"); 576 goto failed_opts; 577 } 578 579 s->snap = g_strdup(snap); 580 s->name = g_strdup(name); 581 582 /* try default location when conf=NULL, but ignore failure */ 583 r = rados_conf_read_file(s->cluster, conf); 584 if (conf && r < 0) { 585 error_setg_errno(errp, -r, "error reading conf file %s", conf); 586 goto failed_shutdown; 587 } 588 589 r = qemu_rbd_set_keypairs(s->cluster, keypairs, errp); 590 if (r < 0) { 591 goto failed_shutdown; 592 } 593 594 if (mon_host) { 595 r = rados_conf_set(s->cluster, "mon_host", mon_host); 596 if (r < 0) { 597 goto failed_shutdown; 598 } 599 } 600 601 if (qemu_rbd_set_auth(s->cluster, secretid, errp) < 0) { 602 r = -EIO; 603 goto failed_shutdown; 604 } 605 606 /* 607 * Fallback to more conservative semantics if setting cache 608 * options fails. Ignore errors from setting rbd_cache because the 609 * only possible error is that the option does not exist, and 610 * librbd defaults to no caching. If write through caching cannot 611 * be set up, fall back to no caching. 612 */ 613 if (flags & BDRV_O_NOCACHE) { 614 rados_conf_set(s->cluster, "rbd_cache", "false"); 615 } else { 616 rados_conf_set(s->cluster, "rbd_cache", "true"); 617 } 618 619 r = rados_connect(s->cluster); 620 if (r < 0) { 621 error_setg_errno(errp, -r, "error connecting"); 622 goto failed_shutdown; 623 } 624 625 r = rados_ioctx_create(s->cluster, pool, &s->io_ctx); 626 if (r < 0) { 627 error_setg_errno(errp, -r, "error opening pool %s", pool); 628 goto failed_shutdown; 629 } 630 631 r = rbd_open(s->io_ctx, s->name, &s->image, s->snap); 632 if (r < 0) { 633 error_setg_errno(errp, -r, "error reading header from %s", s->name); 634 goto failed_open; 635 } 636 637 bs->read_only = (s->snap != NULL); 638 639 qemu_opts_del(opts); 640 return 0; 641 642 failed_open: 643 rados_ioctx_destroy(s->io_ctx); 644 failed_shutdown: 645 rados_shutdown(s->cluster); 646 g_free(s->snap); 647 g_free(s->name); 648 failed_opts: 649 qemu_opts_del(opts); 650 g_free(mon_host); 651 return r; 652 } 653 654 static void qemu_rbd_close(BlockDriverState *bs) 655 { 656 BDRVRBDState *s = bs->opaque; 657 658 rbd_close(s->image); 659 rados_ioctx_destroy(s->io_ctx); 660 g_free(s->snap); 661 g_free(s->name); 662 rados_shutdown(s->cluster); 663 } 664 665 static const AIOCBInfo rbd_aiocb_info = { 666 .aiocb_size = sizeof(RBDAIOCB), 667 }; 668 669 static void rbd_finish_bh(void *opaque) 670 { 671 RADOSCB *rcb = opaque; 672 qemu_rbd_complete_aio(rcb); 673 } 674 675 /* 676 * This is the callback function for rbd_aio_read and _write 677 * 678 * Note: this function is being called from a non qemu thread so 679 * we need to be careful about what we do here. Generally we only 680 * schedule a BH, and do the rest of the io completion handling 681 * from rbd_finish_bh() which runs in a qemu context. 682 */ 683 static void rbd_finish_aiocb(rbd_completion_t c, RADOSCB *rcb) 684 { 685 RBDAIOCB *acb = rcb->acb; 686 687 rcb->ret = rbd_aio_get_return_value(c); 688 rbd_aio_release(c); 689 690 aio_bh_schedule_oneshot(bdrv_get_aio_context(acb->common.bs), 691 rbd_finish_bh, rcb); 692 } 693 694 static int rbd_aio_discard_wrapper(rbd_image_t image, 695 uint64_t off, 696 uint64_t len, 697 rbd_completion_t comp) 698 { 699 #ifdef LIBRBD_SUPPORTS_DISCARD 700 return rbd_aio_discard(image, off, len, comp); 701 #else 702 return -ENOTSUP; 703 #endif 704 } 705 706 static int rbd_aio_flush_wrapper(rbd_image_t image, 707 rbd_completion_t comp) 708 { 709 #ifdef LIBRBD_SUPPORTS_AIO_FLUSH 710 return rbd_aio_flush(image, comp); 711 #else 712 return -ENOTSUP; 713 #endif 714 } 715 716 static BlockAIOCB *rbd_start_aio(BlockDriverState *bs, 717 int64_t off, 718 QEMUIOVector *qiov, 719 int64_t size, 720 BlockCompletionFunc *cb, 721 void *opaque, 722 RBDAIOCmd cmd) 723 { 724 RBDAIOCB *acb; 725 RADOSCB *rcb = NULL; 726 rbd_completion_t c; 727 int r; 728 729 BDRVRBDState *s = bs->opaque; 730 731 acb = qemu_aio_get(&rbd_aiocb_info, bs, cb, opaque); 732 acb->cmd = cmd; 733 acb->qiov = qiov; 734 assert(!qiov || qiov->size == size); 735 736 rcb = g_new(RADOSCB, 1); 737 738 if (!LIBRBD_USE_IOVEC) { 739 if (cmd == RBD_AIO_DISCARD || cmd == RBD_AIO_FLUSH) { 740 acb->bounce = NULL; 741 } else { 742 acb->bounce = qemu_try_blockalign(bs, qiov->size); 743 if (acb->bounce == NULL) { 744 goto failed; 745 } 746 } 747 if (cmd == RBD_AIO_WRITE) { 748 qemu_iovec_to_buf(acb->qiov, 0, acb->bounce, qiov->size); 749 } 750 rcb->buf = acb->bounce; 751 } 752 753 acb->ret = 0; 754 acb->error = 0; 755 acb->s = s; 756 757 rcb->acb = acb; 758 rcb->s = acb->s; 759 rcb->size = size; 760 r = rbd_aio_create_completion(rcb, (rbd_callback_t) rbd_finish_aiocb, &c); 761 if (r < 0) { 762 goto failed; 763 } 764 765 switch (cmd) { 766 case RBD_AIO_WRITE: 767 #ifdef LIBRBD_SUPPORTS_IOVEC 768 r = rbd_aio_writev(s->image, qiov->iov, qiov->niov, off, c); 769 #else 770 r = rbd_aio_write(s->image, off, size, rcb->buf, c); 771 #endif 772 break; 773 case RBD_AIO_READ: 774 #ifdef LIBRBD_SUPPORTS_IOVEC 775 r = rbd_aio_readv(s->image, qiov->iov, qiov->niov, off, c); 776 #else 777 r = rbd_aio_read(s->image, off, size, rcb->buf, c); 778 #endif 779 break; 780 case RBD_AIO_DISCARD: 781 r = rbd_aio_discard_wrapper(s->image, off, size, c); 782 break; 783 case RBD_AIO_FLUSH: 784 r = rbd_aio_flush_wrapper(s->image, c); 785 break; 786 default: 787 r = -EINVAL; 788 } 789 790 if (r < 0) { 791 goto failed_completion; 792 } 793 return &acb->common; 794 795 failed_completion: 796 rbd_aio_release(c); 797 failed: 798 g_free(rcb); 799 if (!LIBRBD_USE_IOVEC) { 800 qemu_vfree(acb->bounce); 801 } 802 803 qemu_aio_unref(acb); 804 return NULL; 805 } 806 807 static BlockAIOCB *qemu_rbd_aio_readv(BlockDriverState *bs, 808 int64_t sector_num, 809 QEMUIOVector *qiov, 810 int nb_sectors, 811 BlockCompletionFunc *cb, 812 void *opaque) 813 { 814 return rbd_start_aio(bs, sector_num << BDRV_SECTOR_BITS, qiov, 815 (int64_t) nb_sectors << BDRV_SECTOR_BITS, cb, opaque, 816 RBD_AIO_READ); 817 } 818 819 static BlockAIOCB *qemu_rbd_aio_writev(BlockDriverState *bs, 820 int64_t sector_num, 821 QEMUIOVector *qiov, 822 int nb_sectors, 823 BlockCompletionFunc *cb, 824 void *opaque) 825 { 826 return rbd_start_aio(bs, sector_num << BDRV_SECTOR_BITS, qiov, 827 (int64_t) nb_sectors << BDRV_SECTOR_BITS, cb, opaque, 828 RBD_AIO_WRITE); 829 } 830 831 #ifdef LIBRBD_SUPPORTS_AIO_FLUSH 832 static BlockAIOCB *qemu_rbd_aio_flush(BlockDriverState *bs, 833 BlockCompletionFunc *cb, 834 void *opaque) 835 { 836 return rbd_start_aio(bs, 0, NULL, 0, cb, opaque, RBD_AIO_FLUSH); 837 } 838 839 #else 840 841 static int qemu_rbd_co_flush(BlockDriverState *bs) 842 { 843 #if LIBRBD_VERSION_CODE >= LIBRBD_VERSION(0, 1, 1) 844 /* rbd_flush added in 0.1.1 */ 845 BDRVRBDState *s = bs->opaque; 846 return rbd_flush(s->image); 847 #else 848 return 0; 849 #endif 850 } 851 #endif 852 853 static int qemu_rbd_getinfo(BlockDriverState *bs, BlockDriverInfo *bdi) 854 { 855 BDRVRBDState *s = bs->opaque; 856 rbd_image_info_t info; 857 int r; 858 859 r = rbd_stat(s->image, &info, sizeof(info)); 860 if (r < 0) { 861 return r; 862 } 863 864 bdi->cluster_size = info.obj_size; 865 return 0; 866 } 867 868 static int64_t qemu_rbd_getlength(BlockDriverState *bs) 869 { 870 BDRVRBDState *s = bs->opaque; 871 rbd_image_info_t info; 872 int r; 873 874 r = rbd_stat(s->image, &info, sizeof(info)); 875 if (r < 0) { 876 return r; 877 } 878 879 return info.size; 880 } 881 882 static int qemu_rbd_truncate(BlockDriverState *bs, int64_t offset) 883 { 884 BDRVRBDState *s = bs->opaque; 885 int r; 886 887 r = rbd_resize(s->image, offset); 888 if (r < 0) { 889 return r; 890 } 891 892 return 0; 893 } 894 895 static int qemu_rbd_snap_create(BlockDriverState *bs, 896 QEMUSnapshotInfo *sn_info) 897 { 898 BDRVRBDState *s = bs->opaque; 899 int r; 900 901 if (sn_info->name[0] == '\0') { 902 return -EINVAL; /* we need a name for rbd snapshots */ 903 } 904 905 /* 906 * rbd snapshots are using the name as the user controlled unique identifier 907 * we can't use the rbd snapid for that purpose, as it can't be set 908 */ 909 if (sn_info->id_str[0] != '\0' && 910 strcmp(sn_info->id_str, sn_info->name) != 0) { 911 return -EINVAL; 912 } 913 914 if (strlen(sn_info->name) >= sizeof(sn_info->id_str)) { 915 return -ERANGE; 916 } 917 918 r = rbd_snap_create(s->image, sn_info->name); 919 if (r < 0) { 920 error_report("failed to create snap: %s", strerror(-r)); 921 return r; 922 } 923 924 return 0; 925 } 926 927 static int qemu_rbd_snap_remove(BlockDriverState *bs, 928 const char *snapshot_id, 929 const char *snapshot_name, 930 Error **errp) 931 { 932 BDRVRBDState *s = bs->opaque; 933 int r; 934 935 if (!snapshot_name) { 936 error_setg(errp, "rbd need a valid snapshot name"); 937 return -EINVAL; 938 } 939 940 /* If snapshot_id is specified, it must be equal to name, see 941 qemu_rbd_snap_list() */ 942 if (snapshot_id && strcmp(snapshot_id, snapshot_name)) { 943 error_setg(errp, 944 "rbd do not support snapshot id, it should be NULL or " 945 "equal to snapshot name"); 946 return -EINVAL; 947 } 948 949 r = rbd_snap_remove(s->image, snapshot_name); 950 if (r < 0) { 951 error_setg_errno(errp, -r, "Failed to remove the snapshot"); 952 } 953 return r; 954 } 955 956 static int qemu_rbd_snap_rollback(BlockDriverState *bs, 957 const char *snapshot_name) 958 { 959 BDRVRBDState *s = bs->opaque; 960 961 return rbd_snap_rollback(s->image, snapshot_name); 962 } 963 964 static int qemu_rbd_snap_list(BlockDriverState *bs, 965 QEMUSnapshotInfo **psn_tab) 966 { 967 BDRVRBDState *s = bs->opaque; 968 QEMUSnapshotInfo *sn_info, *sn_tab = NULL; 969 int i, snap_count; 970 rbd_snap_info_t *snaps; 971 int max_snaps = RBD_MAX_SNAPS; 972 973 do { 974 snaps = g_new(rbd_snap_info_t, max_snaps); 975 snap_count = rbd_snap_list(s->image, snaps, &max_snaps); 976 if (snap_count <= 0) { 977 g_free(snaps); 978 } 979 } while (snap_count == -ERANGE); 980 981 if (snap_count <= 0) { 982 goto done; 983 } 984 985 sn_tab = g_new0(QEMUSnapshotInfo, snap_count); 986 987 for (i = 0; i < snap_count; i++) { 988 const char *snap_name = snaps[i].name; 989 990 sn_info = sn_tab + i; 991 pstrcpy(sn_info->id_str, sizeof(sn_info->id_str), snap_name); 992 pstrcpy(sn_info->name, sizeof(sn_info->name), snap_name); 993 994 sn_info->vm_state_size = snaps[i].size; 995 sn_info->date_sec = 0; 996 sn_info->date_nsec = 0; 997 sn_info->vm_clock_nsec = 0; 998 } 999 rbd_snap_list_end(snaps); 1000 g_free(snaps); 1001 1002 done: 1003 *psn_tab = sn_tab; 1004 return snap_count; 1005 } 1006 1007 #ifdef LIBRBD_SUPPORTS_DISCARD 1008 static BlockAIOCB *qemu_rbd_aio_pdiscard(BlockDriverState *bs, 1009 int64_t offset, 1010 int count, 1011 BlockCompletionFunc *cb, 1012 void *opaque) 1013 { 1014 return rbd_start_aio(bs, offset, NULL, count, cb, opaque, 1015 RBD_AIO_DISCARD); 1016 } 1017 #endif 1018 1019 #ifdef LIBRBD_SUPPORTS_INVALIDATE 1020 static void qemu_rbd_invalidate_cache(BlockDriverState *bs, 1021 Error **errp) 1022 { 1023 BDRVRBDState *s = bs->opaque; 1024 int r = rbd_invalidate_cache(s->image); 1025 if (r < 0) { 1026 error_setg_errno(errp, -r, "Failed to invalidate the cache"); 1027 } 1028 } 1029 #endif 1030 1031 static QemuOptsList qemu_rbd_create_opts = { 1032 .name = "rbd-create-opts", 1033 .head = QTAILQ_HEAD_INITIALIZER(qemu_rbd_create_opts.head), 1034 .desc = { 1035 { 1036 .name = BLOCK_OPT_SIZE, 1037 .type = QEMU_OPT_SIZE, 1038 .help = "Virtual disk size" 1039 }, 1040 { 1041 .name = BLOCK_OPT_CLUSTER_SIZE, 1042 .type = QEMU_OPT_SIZE, 1043 .help = "RBD object size" 1044 }, 1045 { 1046 .name = "password-secret", 1047 .type = QEMU_OPT_STRING, 1048 .help = "ID of secret providing the password", 1049 }, 1050 { /* end of list */ } 1051 } 1052 }; 1053 1054 static BlockDriver bdrv_rbd = { 1055 .format_name = "rbd", 1056 .instance_size = sizeof(BDRVRBDState), 1057 .bdrv_parse_filename = qemu_rbd_parse_filename, 1058 .bdrv_file_open = qemu_rbd_open, 1059 .bdrv_close = qemu_rbd_close, 1060 .bdrv_create = qemu_rbd_create, 1061 .bdrv_has_zero_init = bdrv_has_zero_init_1, 1062 .bdrv_get_info = qemu_rbd_getinfo, 1063 .create_opts = &qemu_rbd_create_opts, 1064 .bdrv_getlength = qemu_rbd_getlength, 1065 .bdrv_truncate = qemu_rbd_truncate, 1066 .protocol_name = "rbd", 1067 1068 .bdrv_aio_readv = qemu_rbd_aio_readv, 1069 .bdrv_aio_writev = qemu_rbd_aio_writev, 1070 1071 #ifdef LIBRBD_SUPPORTS_AIO_FLUSH 1072 .bdrv_aio_flush = qemu_rbd_aio_flush, 1073 #else 1074 .bdrv_co_flush_to_disk = qemu_rbd_co_flush, 1075 #endif 1076 1077 #ifdef LIBRBD_SUPPORTS_DISCARD 1078 .bdrv_aio_pdiscard = qemu_rbd_aio_pdiscard, 1079 #endif 1080 1081 .bdrv_snapshot_create = qemu_rbd_snap_create, 1082 .bdrv_snapshot_delete = qemu_rbd_snap_remove, 1083 .bdrv_snapshot_list = qemu_rbd_snap_list, 1084 .bdrv_snapshot_goto = qemu_rbd_snap_rollback, 1085 #ifdef LIBRBD_SUPPORTS_INVALIDATE 1086 .bdrv_invalidate_cache = qemu_rbd_invalidate_cache, 1087 #endif 1088 }; 1089 1090 static void bdrv_rbd_init(void) 1091 { 1092 bdrv_register(&bdrv_rbd); 1093 } 1094 1095 block_init(bdrv_rbd_init); 1096