1 /* 2 * QEMU Block driver for RADOS (Ceph) 3 * 4 * Copyright (C) 2010-2011 Christian Brunner <chb@muc.de>, 5 * Josh Durgin <josh.durgin@dreamhost.com> 6 * 7 * This work is licensed under the terms of the GNU GPL, version 2. See 8 * the COPYING file in the top-level directory. 9 * 10 * Contributions after 2012-01-13 are licensed under the terms of the 11 * GNU GPL, version 2 or (at your option) any later version. 12 */ 13 14 #include "qemu/osdep.h" 15 16 #include <rbd/librbd.h> 17 #include "qapi/error.h" 18 #include "qemu/error-report.h" 19 #include "qemu/option.h" 20 #include "block/block_int.h" 21 #include "crypto/secret.h" 22 #include "qemu/cutils.h" 23 #include "qapi/qmp/qstring.h" 24 #include "qapi/qmp/qdict.h" 25 #include "qapi/qmp/qjson.h" 26 #include "qapi/qmp/qlist.h" 27 28 /* 29 * When specifying the image filename use: 30 * 31 * rbd:poolname/devicename[@snapshotname][:option1=value1[:option2=value2...]] 32 * 33 * poolname must be the name of an existing rados pool. 34 * 35 * devicename is the name of the rbd image. 36 * 37 * Each option given is used to configure rados, and may be any valid 38 * Ceph option, "id", or "conf". 39 * 40 * The "id" option indicates what user we should authenticate as to 41 * the Ceph cluster. If it is excluded we will use the Ceph default 42 * (normally 'admin'). 43 * 44 * The "conf" option specifies a Ceph configuration file to read. If 45 * it is not specified, we will read from the default Ceph locations 46 * (e.g., /etc/ceph/ceph.conf). To avoid reading _any_ configuration 47 * file, specify conf=/dev/null. 48 * 49 * Configuration values containing :, @, or = can be escaped with a 50 * leading "\". 51 */ 52 53 /* rbd_aio_discard added in 0.1.2 */ 54 #if LIBRBD_VERSION_CODE >= LIBRBD_VERSION(0, 1, 2) 55 #define LIBRBD_SUPPORTS_DISCARD 56 #else 57 #undef LIBRBD_SUPPORTS_DISCARD 58 #endif 59 60 #define OBJ_MAX_SIZE (1UL << OBJ_DEFAULT_OBJ_ORDER) 61 62 #define RBD_MAX_SNAPS 100 63 64 /* The LIBRBD_SUPPORTS_IOVEC is defined in librbd.h */ 65 #ifdef LIBRBD_SUPPORTS_IOVEC 66 #define LIBRBD_USE_IOVEC 1 67 #else 68 #define LIBRBD_USE_IOVEC 0 69 #endif 70 71 typedef enum { 72 RBD_AIO_READ, 73 RBD_AIO_WRITE, 74 RBD_AIO_DISCARD, 75 RBD_AIO_FLUSH 76 } RBDAIOCmd; 77 78 typedef struct RBDAIOCB { 79 BlockAIOCB common; 80 int64_t ret; 81 QEMUIOVector *qiov; 82 char *bounce; 83 RBDAIOCmd cmd; 84 int error; 85 struct BDRVRBDState *s; 86 } RBDAIOCB; 87 88 typedef struct RADOSCB { 89 RBDAIOCB *acb; 90 struct BDRVRBDState *s; 91 int64_t size; 92 char *buf; 93 int64_t ret; 94 } RADOSCB; 95 96 typedef struct BDRVRBDState { 97 rados_t cluster; 98 rados_ioctx_t io_ctx; 99 rbd_image_t image; 100 char *image_name; 101 char *snap; 102 } BDRVRBDState; 103 104 static char *qemu_rbd_next_tok(char *src, char delim, char **p) 105 { 106 char *end; 107 108 *p = NULL; 109 110 for (end = src; *end; ++end) { 111 if (*end == delim) { 112 break; 113 } 114 if (*end == '\\' && end[1] != '\0') { 115 end++; 116 } 117 } 118 if (*end == delim) { 119 *p = end + 1; 120 *end = '\0'; 121 } 122 return src; 123 } 124 125 static void qemu_rbd_unescape(char *src) 126 { 127 char *p; 128 129 for (p = src; *src; ++src, ++p) { 130 if (*src == '\\' && src[1] != '\0') { 131 src++; 132 } 133 *p = *src; 134 } 135 *p = '\0'; 136 } 137 138 static void qemu_rbd_parse_filename(const char *filename, QDict *options, 139 Error **errp) 140 { 141 const char *start; 142 char *p, *buf; 143 QList *keypairs = NULL; 144 char *found_str; 145 146 if (!strstart(filename, "rbd:", &start)) { 147 error_setg(errp, "File name must start with 'rbd:'"); 148 return; 149 } 150 151 buf = g_strdup(start); 152 p = buf; 153 154 found_str = qemu_rbd_next_tok(p, '/', &p); 155 if (!p) { 156 error_setg(errp, "Pool name is required"); 157 goto done; 158 } 159 qemu_rbd_unescape(found_str); 160 qdict_put_str(options, "pool", found_str); 161 162 if (strchr(p, '@')) { 163 found_str = qemu_rbd_next_tok(p, '@', &p); 164 qemu_rbd_unescape(found_str); 165 qdict_put_str(options, "image", found_str); 166 167 found_str = qemu_rbd_next_tok(p, ':', &p); 168 qemu_rbd_unescape(found_str); 169 qdict_put_str(options, "snapshot", found_str); 170 } else { 171 found_str = qemu_rbd_next_tok(p, ':', &p); 172 qemu_rbd_unescape(found_str); 173 qdict_put_str(options, "image", found_str); 174 } 175 if (!p) { 176 goto done; 177 } 178 179 /* The following are essentially all key/value pairs, and we treat 180 * 'id' and 'conf' a bit special. Key/value pairs may be in any order. */ 181 while (p) { 182 char *name, *value; 183 name = qemu_rbd_next_tok(p, '=', &p); 184 if (!p) { 185 error_setg(errp, "conf option %s has no value", name); 186 break; 187 } 188 189 qemu_rbd_unescape(name); 190 191 value = qemu_rbd_next_tok(p, ':', &p); 192 qemu_rbd_unescape(value); 193 194 if (!strcmp(name, "conf")) { 195 qdict_put_str(options, "conf", value); 196 } else if (!strcmp(name, "id")) { 197 qdict_put_str(options, "user", value); 198 } else { 199 /* 200 * We pass these internally to qemu_rbd_set_keypairs(), so 201 * we can get away with the simpler list of [ "key1", 202 * "value1", "key2", "value2" ] rather than a raw dict 203 * { "key1": "value1", "key2": "value2" } where we can't 204 * guarantee order, or even a more correct but complex 205 * [ { "key1": "value1" }, { "key2": "value2" } ] 206 */ 207 if (!keypairs) { 208 keypairs = qlist_new(); 209 } 210 qlist_append_str(keypairs, name); 211 qlist_append_str(keypairs, value); 212 } 213 } 214 215 if (keypairs) { 216 qdict_put(options, "=keyvalue-pairs", 217 qobject_to_json(QOBJECT(keypairs))); 218 } 219 220 done: 221 g_free(buf); 222 QDECREF(keypairs); 223 return; 224 } 225 226 227 static int qemu_rbd_set_auth(rados_t cluster, const char *secretid, 228 Error **errp) 229 { 230 if (secretid == 0) { 231 return 0; 232 } 233 234 gchar *secret = qcrypto_secret_lookup_as_base64(secretid, 235 errp); 236 if (!secret) { 237 return -1; 238 } 239 240 rados_conf_set(cluster, "key", secret); 241 g_free(secret); 242 243 return 0; 244 } 245 246 static int qemu_rbd_set_keypairs(rados_t cluster, const char *keypairs_json, 247 Error **errp) 248 { 249 QList *keypairs; 250 QString *name; 251 QString *value; 252 const char *key; 253 size_t remaining; 254 int ret = 0; 255 256 if (!keypairs_json) { 257 return ret; 258 } 259 keypairs = qobject_to_qlist(qobject_from_json(keypairs_json, 260 &error_abort)); 261 remaining = qlist_size(keypairs) / 2; 262 assert(remaining); 263 264 while (remaining--) { 265 name = qobject_to_qstring(qlist_pop(keypairs)); 266 value = qobject_to_qstring(qlist_pop(keypairs)); 267 assert(name && value); 268 key = qstring_get_str(name); 269 270 ret = rados_conf_set(cluster, key, qstring_get_str(value)); 271 QDECREF(name); 272 QDECREF(value); 273 if (ret < 0) { 274 error_setg_errno(errp, -ret, "invalid conf option %s", key); 275 ret = -EINVAL; 276 break; 277 } 278 } 279 280 QDECREF(keypairs); 281 return ret; 282 } 283 284 static void qemu_rbd_memset(RADOSCB *rcb, int64_t offs) 285 { 286 if (LIBRBD_USE_IOVEC) { 287 RBDAIOCB *acb = rcb->acb; 288 iov_memset(acb->qiov->iov, acb->qiov->niov, offs, 0, 289 acb->qiov->size - offs); 290 } else { 291 memset(rcb->buf + offs, 0, rcb->size - offs); 292 } 293 } 294 295 static QemuOptsList runtime_opts = { 296 .name = "rbd", 297 .head = QTAILQ_HEAD_INITIALIZER(runtime_opts.head), 298 .desc = { 299 { 300 .name = "pool", 301 .type = QEMU_OPT_STRING, 302 .help = "Rados pool name", 303 }, 304 { 305 .name = "image", 306 .type = QEMU_OPT_STRING, 307 .help = "Image name in the pool", 308 }, 309 { 310 .name = "conf", 311 .type = QEMU_OPT_STRING, 312 .help = "Rados config file location", 313 }, 314 { 315 .name = "snapshot", 316 .type = QEMU_OPT_STRING, 317 .help = "Ceph snapshot name", 318 }, 319 { 320 /* maps to 'id' in rados_create() */ 321 .name = "user", 322 .type = QEMU_OPT_STRING, 323 .help = "Rados id name", 324 }, 325 /* 326 * server.* extracted manually, see qemu_rbd_mon_host() 327 */ 328 { 329 .name = "password-secret", 330 .type = QEMU_OPT_STRING, 331 .help = "ID of secret providing the password", 332 }, 333 334 /* 335 * Keys for qemu_rbd_parse_filename(), not in the QAPI schema 336 */ 337 { 338 /* 339 * HACK: name starts with '=' so that qemu_opts_parse() 340 * can't set it 341 */ 342 .name = "=keyvalue-pairs", 343 .type = QEMU_OPT_STRING, 344 .help = "Legacy rados key/value option parameters", 345 }, 346 { 347 .name = "filename", 348 .type = QEMU_OPT_STRING, 349 }, 350 { /* end of list */ } 351 }, 352 }; 353 354 static int qemu_rbd_create(const char *filename, QemuOpts *opts, Error **errp) 355 { 356 Error *local_err = NULL; 357 int64_t bytes = 0; 358 int64_t objsize; 359 int obj_order = 0; 360 const char *pool, *image_name, *conf, *user, *keypairs; 361 const char *secretid; 362 rados_t cluster; 363 rados_ioctx_t io_ctx; 364 QDict *options = NULL; 365 int ret = 0; 366 367 secretid = qemu_opt_get(opts, "password-secret"); 368 369 /* Read out options */ 370 bytes = ROUND_UP(qemu_opt_get_size_del(opts, BLOCK_OPT_SIZE, 0), 371 BDRV_SECTOR_SIZE); 372 objsize = qemu_opt_get_size_del(opts, BLOCK_OPT_CLUSTER_SIZE, 0); 373 if (objsize) { 374 if ((objsize - 1) & objsize) { /* not a power of 2? */ 375 error_setg(errp, "obj size needs to be power of 2"); 376 ret = -EINVAL; 377 goto exit; 378 } 379 if (objsize < 4096) { 380 error_setg(errp, "obj size too small"); 381 ret = -EINVAL; 382 goto exit; 383 } 384 obj_order = ctz32(objsize); 385 } 386 387 options = qdict_new(); 388 qemu_rbd_parse_filename(filename, options, &local_err); 389 if (local_err) { 390 ret = -EINVAL; 391 error_propagate(errp, local_err); 392 goto exit; 393 } 394 395 /* 396 * Caution: while qdict_get_try_str() is fine, getting non-string 397 * types would require more care. When @options come from -blockdev 398 * or blockdev_add, its members are typed according to the QAPI 399 * schema, but when they come from -drive, they're all QString. 400 */ 401 pool = qdict_get_try_str(options, "pool"); 402 conf = qdict_get_try_str(options, "conf"); 403 user = qdict_get_try_str(options, "user"); 404 image_name = qdict_get_try_str(options, "image"); 405 keypairs = qdict_get_try_str(options, "=keyvalue-pairs"); 406 407 ret = rados_create(&cluster, user); 408 if (ret < 0) { 409 error_setg_errno(errp, -ret, "error initializing"); 410 goto exit; 411 } 412 413 /* try default location when conf=NULL, but ignore failure */ 414 ret = rados_conf_read_file(cluster, conf); 415 if (conf && ret < 0) { 416 error_setg_errno(errp, -ret, "error reading conf file %s", conf); 417 ret = -EIO; 418 goto shutdown; 419 } 420 421 ret = qemu_rbd_set_keypairs(cluster, keypairs, errp); 422 if (ret < 0) { 423 ret = -EIO; 424 goto shutdown; 425 } 426 427 if (qemu_rbd_set_auth(cluster, secretid, errp) < 0) { 428 ret = -EIO; 429 goto shutdown; 430 } 431 432 ret = rados_connect(cluster); 433 if (ret < 0) { 434 error_setg_errno(errp, -ret, "error connecting"); 435 goto shutdown; 436 } 437 438 ret = rados_ioctx_create(cluster, pool, &io_ctx); 439 if (ret < 0) { 440 error_setg_errno(errp, -ret, "error opening pool %s", pool); 441 goto shutdown; 442 } 443 444 ret = rbd_create(io_ctx, image_name, bytes, &obj_order); 445 if (ret < 0) { 446 error_setg_errno(errp, -ret, "error rbd create"); 447 } 448 449 rados_ioctx_destroy(io_ctx); 450 451 shutdown: 452 rados_shutdown(cluster); 453 454 exit: 455 QDECREF(options); 456 return ret; 457 } 458 459 /* 460 * This aio completion is being called from rbd_finish_bh() and runs in qemu 461 * BH context. 462 */ 463 static void qemu_rbd_complete_aio(RADOSCB *rcb) 464 { 465 RBDAIOCB *acb = rcb->acb; 466 int64_t r; 467 468 r = rcb->ret; 469 470 if (acb->cmd != RBD_AIO_READ) { 471 if (r < 0) { 472 acb->ret = r; 473 acb->error = 1; 474 } else if (!acb->error) { 475 acb->ret = rcb->size; 476 } 477 } else { 478 if (r < 0) { 479 qemu_rbd_memset(rcb, 0); 480 acb->ret = r; 481 acb->error = 1; 482 } else if (r < rcb->size) { 483 qemu_rbd_memset(rcb, r); 484 if (!acb->error) { 485 acb->ret = rcb->size; 486 } 487 } else if (!acb->error) { 488 acb->ret = r; 489 } 490 } 491 492 g_free(rcb); 493 494 if (!LIBRBD_USE_IOVEC) { 495 if (acb->cmd == RBD_AIO_READ) { 496 qemu_iovec_from_buf(acb->qiov, 0, acb->bounce, acb->qiov->size); 497 } 498 qemu_vfree(acb->bounce); 499 } 500 501 acb->common.cb(acb->common.opaque, (acb->ret > 0 ? 0 : acb->ret)); 502 503 qemu_aio_unref(acb); 504 } 505 506 static char *qemu_rbd_mon_host(QDict *options, Error **errp) 507 { 508 const char **vals = g_new(const char *, qdict_size(options) + 1); 509 char keybuf[32]; 510 const char *host, *port; 511 char *rados_str; 512 int i; 513 514 for (i = 0;; i++) { 515 sprintf(keybuf, "server.%d.host", i); 516 host = qdict_get_try_str(options, keybuf); 517 qdict_del(options, keybuf); 518 sprintf(keybuf, "server.%d.port", i); 519 port = qdict_get_try_str(options, keybuf); 520 qdict_del(options, keybuf); 521 if (!host && !port) { 522 break; 523 } 524 if (!host) { 525 error_setg(errp, "Parameter server.%d.host is missing", i); 526 rados_str = NULL; 527 goto out; 528 } 529 530 if (strchr(host, ':')) { 531 vals[i] = port ? g_strdup_printf("[%s]:%s", host, port) 532 : g_strdup_printf("[%s]", host); 533 } else { 534 vals[i] = port ? g_strdup_printf("%s:%s", host, port) 535 : g_strdup(host); 536 } 537 } 538 vals[i] = NULL; 539 540 rados_str = i ? g_strjoinv(";", (char **)vals) : NULL; 541 out: 542 g_strfreev((char **)vals); 543 return rados_str; 544 } 545 546 static int qemu_rbd_open(BlockDriverState *bs, QDict *options, int flags, 547 Error **errp) 548 { 549 BDRVRBDState *s = bs->opaque; 550 const char *pool, *snap, *conf, *user, *image_name, *keypairs; 551 const char *secretid, *filename; 552 QemuOpts *opts; 553 Error *local_err = NULL; 554 char *mon_host = NULL; 555 int r; 556 557 /* If we are given a filename, parse the filename, with precedence given to 558 * filename encoded options */ 559 filename = qdict_get_try_str(options, "filename"); 560 if (filename) { 561 warn_report("'filename' option specified. " 562 "This is an unsupported option, and may be deprecated " 563 "in the future"); 564 qemu_rbd_parse_filename(filename, options, &local_err); 565 if (local_err) { 566 r = -EINVAL; 567 error_propagate(errp, local_err); 568 goto exit; 569 } 570 } 571 572 opts = qemu_opts_create(&runtime_opts, NULL, 0, &error_abort); 573 qemu_opts_absorb_qdict(opts, options, &local_err); 574 if (local_err) { 575 error_propagate(errp, local_err); 576 r = -EINVAL; 577 goto failed_opts; 578 } 579 580 mon_host = qemu_rbd_mon_host(options, &local_err); 581 if (local_err) { 582 error_propagate(errp, local_err); 583 r = -EINVAL; 584 goto failed_opts; 585 } 586 587 secretid = qemu_opt_get(opts, "password-secret"); 588 589 pool = qemu_opt_get(opts, "pool"); 590 conf = qemu_opt_get(opts, "conf"); 591 snap = qemu_opt_get(opts, "snapshot"); 592 user = qemu_opt_get(opts, "user"); 593 image_name = qemu_opt_get(opts, "image"); 594 keypairs = qemu_opt_get(opts, "=keyvalue-pairs"); 595 596 if (!pool || !image_name) { 597 error_setg(errp, "Parameters 'pool' and 'image' are required"); 598 r = -EINVAL; 599 goto failed_opts; 600 } 601 602 r = rados_create(&s->cluster, user); 603 if (r < 0) { 604 error_setg_errno(errp, -r, "error initializing"); 605 goto failed_opts; 606 } 607 608 s->snap = g_strdup(snap); 609 s->image_name = g_strdup(image_name); 610 611 /* try default location when conf=NULL, but ignore failure */ 612 r = rados_conf_read_file(s->cluster, conf); 613 if (conf && r < 0) { 614 error_setg_errno(errp, -r, "error reading conf file %s", conf); 615 goto failed_shutdown; 616 } 617 618 r = qemu_rbd_set_keypairs(s->cluster, keypairs, errp); 619 if (r < 0) { 620 goto failed_shutdown; 621 } 622 623 if (mon_host) { 624 r = rados_conf_set(s->cluster, "mon_host", mon_host); 625 if (r < 0) { 626 goto failed_shutdown; 627 } 628 } 629 630 if (qemu_rbd_set_auth(s->cluster, secretid, errp) < 0) { 631 r = -EIO; 632 goto failed_shutdown; 633 } 634 635 /* 636 * Fallback to more conservative semantics if setting cache 637 * options fails. Ignore errors from setting rbd_cache because the 638 * only possible error is that the option does not exist, and 639 * librbd defaults to no caching. If write through caching cannot 640 * be set up, fall back to no caching. 641 */ 642 if (flags & BDRV_O_NOCACHE) { 643 rados_conf_set(s->cluster, "rbd_cache", "false"); 644 } else { 645 rados_conf_set(s->cluster, "rbd_cache", "true"); 646 } 647 648 r = rados_connect(s->cluster); 649 if (r < 0) { 650 error_setg_errno(errp, -r, "error connecting"); 651 goto failed_shutdown; 652 } 653 654 r = rados_ioctx_create(s->cluster, pool, &s->io_ctx); 655 if (r < 0) { 656 error_setg_errno(errp, -r, "error opening pool %s", pool); 657 goto failed_shutdown; 658 } 659 660 /* rbd_open is always r/w */ 661 r = rbd_open(s->io_ctx, s->image_name, &s->image, s->snap); 662 if (r < 0) { 663 error_setg_errno(errp, -r, "error reading header from %s", 664 s->image_name); 665 goto failed_open; 666 } 667 668 /* If we are using an rbd snapshot, we must be r/o, otherwise 669 * leave as-is */ 670 if (s->snap != NULL) { 671 if (!bdrv_is_read_only(bs)) { 672 error_report("Opening rbd snapshots without an explicit " 673 "read-only=on option is deprecated. Future versions " 674 "will refuse to open the image instead of " 675 "automatically marking the image read-only."); 676 r = bdrv_set_read_only(bs, true, &local_err); 677 if (r < 0) { 678 error_propagate(errp, local_err); 679 goto failed_open; 680 } 681 } 682 } 683 684 qemu_opts_del(opts); 685 return 0; 686 687 failed_open: 688 rados_ioctx_destroy(s->io_ctx); 689 failed_shutdown: 690 rados_shutdown(s->cluster); 691 g_free(s->snap); 692 g_free(s->image_name); 693 failed_opts: 694 qemu_opts_del(opts); 695 g_free(mon_host); 696 exit: 697 return r; 698 } 699 700 701 /* Since RBD is currently always opened R/W via the API, 702 * we just need to check if we are using a snapshot or not, in 703 * order to determine if we will allow it to be R/W */ 704 static int qemu_rbd_reopen_prepare(BDRVReopenState *state, 705 BlockReopenQueue *queue, Error **errp) 706 { 707 BDRVRBDState *s = state->bs->opaque; 708 int ret = 0; 709 710 if (s->snap && state->flags & BDRV_O_RDWR) { 711 error_setg(errp, 712 "Cannot change node '%s' to r/w when using RBD snapshot", 713 bdrv_get_device_or_node_name(state->bs)); 714 ret = -EINVAL; 715 } 716 717 return ret; 718 } 719 720 static void qemu_rbd_close(BlockDriverState *bs) 721 { 722 BDRVRBDState *s = bs->opaque; 723 724 rbd_close(s->image); 725 rados_ioctx_destroy(s->io_ctx); 726 g_free(s->snap); 727 g_free(s->image_name); 728 rados_shutdown(s->cluster); 729 } 730 731 static const AIOCBInfo rbd_aiocb_info = { 732 .aiocb_size = sizeof(RBDAIOCB), 733 }; 734 735 static void rbd_finish_bh(void *opaque) 736 { 737 RADOSCB *rcb = opaque; 738 qemu_rbd_complete_aio(rcb); 739 } 740 741 /* 742 * This is the callback function for rbd_aio_read and _write 743 * 744 * Note: this function is being called from a non qemu thread so 745 * we need to be careful about what we do here. Generally we only 746 * schedule a BH, and do the rest of the io completion handling 747 * from rbd_finish_bh() which runs in a qemu context. 748 */ 749 static void rbd_finish_aiocb(rbd_completion_t c, RADOSCB *rcb) 750 { 751 RBDAIOCB *acb = rcb->acb; 752 753 rcb->ret = rbd_aio_get_return_value(c); 754 rbd_aio_release(c); 755 756 aio_bh_schedule_oneshot(bdrv_get_aio_context(acb->common.bs), 757 rbd_finish_bh, rcb); 758 } 759 760 static int rbd_aio_discard_wrapper(rbd_image_t image, 761 uint64_t off, 762 uint64_t len, 763 rbd_completion_t comp) 764 { 765 #ifdef LIBRBD_SUPPORTS_DISCARD 766 return rbd_aio_discard(image, off, len, comp); 767 #else 768 return -ENOTSUP; 769 #endif 770 } 771 772 static int rbd_aio_flush_wrapper(rbd_image_t image, 773 rbd_completion_t comp) 774 { 775 #ifdef LIBRBD_SUPPORTS_AIO_FLUSH 776 return rbd_aio_flush(image, comp); 777 #else 778 return -ENOTSUP; 779 #endif 780 } 781 782 static BlockAIOCB *rbd_start_aio(BlockDriverState *bs, 783 int64_t off, 784 QEMUIOVector *qiov, 785 int64_t size, 786 BlockCompletionFunc *cb, 787 void *opaque, 788 RBDAIOCmd cmd) 789 { 790 RBDAIOCB *acb; 791 RADOSCB *rcb = NULL; 792 rbd_completion_t c; 793 int r; 794 795 BDRVRBDState *s = bs->opaque; 796 797 acb = qemu_aio_get(&rbd_aiocb_info, bs, cb, opaque); 798 acb->cmd = cmd; 799 acb->qiov = qiov; 800 assert(!qiov || qiov->size == size); 801 802 rcb = g_new(RADOSCB, 1); 803 804 if (!LIBRBD_USE_IOVEC) { 805 if (cmd == RBD_AIO_DISCARD || cmd == RBD_AIO_FLUSH) { 806 acb->bounce = NULL; 807 } else { 808 acb->bounce = qemu_try_blockalign(bs, qiov->size); 809 if (acb->bounce == NULL) { 810 goto failed; 811 } 812 } 813 if (cmd == RBD_AIO_WRITE) { 814 qemu_iovec_to_buf(acb->qiov, 0, acb->bounce, qiov->size); 815 } 816 rcb->buf = acb->bounce; 817 } 818 819 acb->ret = 0; 820 acb->error = 0; 821 acb->s = s; 822 823 rcb->acb = acb; 824 rcb->s = acb->s; 825 rcb->size = size; 826 r = rbd_aio_create_completion(rcb, (rbd_callback_t) rbd_finish_aiocb, &c); 827 if (r < 0) { 828 goto failed; 829 } 830 831 switch (cmd) { 832 case RBD_AIO_WRITE: 833 #ifdef LIBRBD_SUPPORTS_IOVEC 834 r = rbd_aio_writev(s->image, qiov->iov, qiov->niov, off, c); 835 #else 836 r = rbd_aio_write(s->image, off, size, rcb->buf, c); 837 #endif 838 break; 839 case RBD_AIO_READ: 840 #ifdef LIBRBD_SUPPORTS_IOVEC 841 r = rbd_aio_readv(s->image, qiov->iov, qiov->niov, off, c); 842 #else 843 r = rbd_aio_read(s->image, off, size, rcb->buf, c); 844 #endif 845 break; 846 case RBD_AIO_DISCARD: 847 r = rbd_aio_discard_wrapper(s->image, off, size, c); 848 break; 849 case RBD_AIO_FLUSH: 850 r = rbd_aio_flush_wrapper(s->image, c); 851 break; 852 default: 853 r = -EINVAL; 854 } 855 856 if (r < 0) { 857 goto failed_completion; 858 } 859 return &acb->common; 860 861 failed_completion: 862 rbd_aio_release(c); 863 failed: 864 g_free(rcb); 865 if (!LIBRBD_USE_IOVEC) { 866 qemu_vfree(acb->bounce); 867 } 868 869 qemu_aio_unref(acb); 870 return NULL; 871 } 872 873 static BlockAIOCB *qemu_rbd_aio_readv(BlockDriverState *bs, 874 int64_t sector_num, 875 QEMUIOVector *qiov, 876 int nb_sectors, 877 BlockCompletionFunc *cb, 878 void *opaque) 879 { 880 return rbd_start_aio(bs, sector_num << BDRV_SECTOR_BITS, qiov, 881 (int64_t) nb_sectors << BDRV_SECTOR_BITS, cb, opaque, 882 RBD_AIO_READ); 883 } 884 885 static BlockAIOCB *qemu_rbd_aio_writev(BlockDriverState *bs, 886 int64_t sector_num, 887 QEMUIOVector *qiov, 888 int nb_sectors, 889 BlockCompletionFunc *cb, 890 void *opaque) 891 { 892 return rbd_start_aio(bs, sector_num << BDRV_SECTOR_BITS, qiov, 893 (int64_t) nb_sectors << BDRV_SECTOR_BITS, cb, opaque, 894 RBD_AIO_WRITE); 895 } 896 897 #ifdef LIBRBD_SUPPORTS_AIO_FLUSH 898 static BlockAIOCB *qemu_rbd_aio_flush(BlockDriverState *bs, 899 BlockCompletionFunc *cb, 900 void *opaque) 901 { 902 return rbd_start_aio(bs, 0, NULL, 0, cb, opaque, RBD_AIO_FLUSH); 903 } 904 905 #else 906 907 static int qemu_rbd_co_flush(BlockDriverState *bs) 908 { 909 #if LIBRBD_VERSION_CODE >= LIBRBD_VERSION(0, 1, 1) 910 /* rbd_flush added in 0.1.1 */ 911 BDRVRBDState *s = bs->opaque; 912 return rbd_flush(s->image); 913 #else 914 return 0; 915 #endif 916 } 917 #endif 918 919 static int qemu_rbd_getinfo(BlockDriverState *bs, BlockDriverInfo *bdi) 920 { 921 BDRVRBDState *s = bs->opaque; 922 rbd_image_info_t info; 923 int r; 924 925 r = rbd_stat(s->image, &info, sizeof(info)); 926 if (r < 0) { 927 return r; 928 } 929 930 bdi->cluster_size = info.obj_size; 931 return 0; 932 } 933 934 static int64_t qemu_rbd_getlength(BlockDriverState *bs) 935 { 936 BDRVRBDState *s = bs->opaque; 937 rbd_image_info_t info; 938 int r; 939 940 r = rbd_stat(s->image, &info, sizeof(info)); 941 if (r < 0) { 942 return r; 943 } 944 945 return info.size; 946 } 947 948 static int qemu_rbd_truncate(BlockDriverState *bs, int64_t offset, 949 PreallocMode prealloc, Error **errp) 950 { 951 BDRVRBDState *s = bs->opaque; 952 int r; 953 954 if (prealloc != PREALLOC_MODE_OFF) { 955 error_setg(errp, "Unsupported preallocation mode '%s'", 956 PreallocMode_str(prealloc)); 957 return -ENOTSUP; 958 } 959 960 r = rbd_resize(s->image, offset); 961 if (r < 0) { 962 error_setg_errno(errp, -r, "Failed to resize file"); 963 return r; 964 } 965 966 return 0; 967 } 968 969 static int qemu_rbd_snap_create(BlockDriverState *bs, 970 QEMUSnapshotInfo *sn_info) 971 { 972 BDRVRBDState *s = bs->opaque; 973 int r; 974 975 if (sn_info->name[0] == '\0') { 976 return -EINVAL; /* we need a name for rbd snapshots */ 977 } 978 979 /* 980 * rbd snapshots are using the name as the user controlled unique identifier 981 * we can't use the rbd snapid for that purpose, as it can't be set 982 */ 983 if (sn_info->id_str[0] != '\0' && 984 strcmp(sn_info->id_str, sn_info->name) != 0) { 985 return -EINVAL; 986 } 987 988 if (strlen(sn_info->name) >= sizeof(sn_info->id_str)) { 989 return -ERANGE; 990 } 991 992 r = rbd_snap_create(s->image, sn_info->name); 993 if (r < 0) { 994 error_report("failed to create snap: %s", strerror(-r)); 995 return r; 996 } 997 998 return 0; 999 } 1000 1001 static int qemu_rbd_snap_remove(BlockDriverState *bs, 1002 const char *snapshot_id, 1003 const char *snapshot_name, 1004 Error **errp) 1005 { 1006 BDRVRBDState *s = bs->opaque; 1007 int r; 1008 1009 if (!snapshot_name) { 1010 error_setg(errp, "rbd need a valid snapshot name"); 1011 return -EINVAL; 1012 } 1013 1014 /* If snapshot_id is specified, it must be equal to name, see 1015 qemu_rbd_snap_list() */ 1016 if (snapshot_id && strcmp(snapshot_id, snapshot_name)) { 1017 error_setg(errp, 1018 "rbd do not support snapshot id, it should be NULL or " 1019 "equal to snapshot name"); 1020 return -EINVAL; 1021 } 1022 1023 r = rbd_snap_remove(s->image, snapshot_name); 1024 if (r < 0) { 1025 error_setg_errno(errp, -r, "Failed to remove the snapshot"); 1026 } 1027 return r; 1028 } 1029 1030 static int qemu_rbd_snap_rollback(BlockDriverState *bs, 1031 const char *snapshot_name) 1032 { 1033 BDRVRBDState *s = bs->opaque; 1034 1035 return rbd_snap_rollback(s->image, snapshot_name); 1036 } 1037 1038 static int qemu_rbd_snap_list(BlockDriverState *bs, 1039 QEMUSnapshotInfo **psn_tab) 1040 { 1041 BDRVRBDState *s = bs->opaque; 1042 QEMUSnapshotInfo *sn_info, *sn_tab = NULL; 1043 int i, snap_count; 1044 rbd_snap_info_t *snaps; 1045 int max_snaps = RBD_MAX_SNAPS; 1046 1047 do { 1048 snaps = g_new(rbd_snap_info_t, max_snaps); 1049 snap_count = rbd_snap_list(s->image, snaps, &max_snaps); 1050 if (snap_count <= 0) { 1051 g_free(snaps); 1052 } 1053 } while (snap_count == -ERANGE); 1054 1055 if (snap_count <= 0) { 1056 goto done; 1057 } 1058 1059 sn_tab = g_new0(QEMUSnapshotInfo, snap_count); 1060 1061 for (i = 0; i < snap_count; i++) { 1062 const char *snap_name = snaps[i].name; 1063 1064 sn_info = sn_tab + i; 1065 pstrcpy(sn_info->id_str, sizeof(sn_info->id_str), snap_name); 1066 pstrcpy(sn_info->name, sizeof(sn_info->name), snap_name); 1067 1068 sn_info->vm_state_size = snaps[i].size; 1069 sn_info->date_sec = 0; 1070 sn_info->date_nsec = 0; 1071 sn_info->vm_clock_nsec = 0; 1072 } 1073 rbd_snap_list_end(snaps); 1074 g_free(snaps); 1075 1076 done: 1077 *psn_tab = sn_tab; 1078 return snap_count; 1079 } 1080 1081 #ifdef LIBRBD_SUPPORTS_DISCARD 1082 static BlockAIOCB *qemu_rbd_aio_pdiscard(BlockDriverState *bs, 1083 int64_t offset, 1084 int bytes, 1085 BlockCompletionFunc *cb, 1086 void *opaque) 1087 { 1088 return rbd_start_aio(bs, offset, NULL, bytes, cb, opaque, 1089 RBD_AIO_DISCARD); 1090 } 1091 #endif 1092 1093 #ifdef LIBRBD_SUPPORTS_INVALIDATE 1094 static void qemu_rbd_invalidate_cache(BlockDriverState *bs, 1095 Error **errp) 1096 { 1097 BDRVRBDState *s = bs->opaque; 1098 int r = rbd_invalidate_cache(s->image); 1099 if (r < 0) { 1100 error_setg_errno(errp, -r, "Failed to invalidate the cache"); 1101 } 1102 } 1103 #endif 1104 1105 static QemuOptsList qemu_rbd_create_opts = { 1106 .name = "rbd-create-opts", 1107 .head = QTAILQ_HEAD_INITIALIZER(qemu_rbd_create_opts.head), 1108 .desc = { 1109 { 1110 .name = BLOCK_OPT_SIZE, 1111 .type = QEMU_OPT_SIZE, 1112 .help = "Virtual disk size" 1113 }, 1114 { 1115 .name = BLOCK_OPT_CLUSTER_SIZE, 1116 .type = QEMU_OPT_SIZE, 1117 .help = "RBD object size" 1118 }, 1119 { 1120 .name = "password-secret", 1121 .type = QEMU_OPT_STRING, 1122 .help = "ID of secret providing the password", 1123 }, 1124 { /* end of list */ } 1125 } 1126 }; 1127 1128 static BlockDriver bdrv_rbd = { 1129 .format_name = "rbd", 1130 .instance_size = sizeof(BDRVRBDState), 1131 .bdrv_parse_filename = qemu_rbd_parse_filename, 1132 .bdrv_file_open = qemu_rbd_open, 1133 .bdrv_close = qemu_rbd_close, 1134 .bdrv_reopen_prepare = qemu_rbd_reopen_prepare, 1135 .bdrv_create = qemu_rbd_create, 1136 .bdrv_has_zero_init = bdrv_has_zero_init_1, 1137 .bdrv_get_info = qemu_rbd_getinfo, 1138 .create_opts = &qemu_rbd_create_opts, 1139 .bdrv_getlength = qemu_rbd_getlength, 1140 .bdrv_truncate = qemu_rbd_truncate, 1141 .protocol_name = "rbd", 1142 1143 .bdrv_aio_readv = qemu_rbd_aio_readv, 1144 .bdrv_aio_writev = qemu_rbd_aio_writev, 1145 1146 #ifdef LIBRBD_SUPPORTS_AIO_FLUSH 1147 .bdrv_aio_flush = qemu_rbd_aio_flush, 1148 #else 1149 .bdrv_co_flush_to_disk = qemu_rbd_co_flush, 1150 #endif 1151 1152 #ifdef LIBRBD_SUPPORTS_DISCARD 1153 .bdrv_aio_pdiscard = qemu_rbd_aio_pdiscard, 1154 #endif 1155 1156 .bdrv_snapshot_create = qemu_rbd_snap_create, 1157 .bdrv_snapshot_delete = qemu_rbd_snap_remove, 1158 .bdrv_snapshot_list = qemu_rbd_snap_list, 1159 .bdrv_snapshot_goto = qemu_rbd_snap_rollback, 1160 #ifdef LIBRBD_SUPPORTS_INVALIDATE 1161 .bdrv_invalidate_cache = qemu_rbd_invalidate_cache, 1162 #endif 1163 }; 1164 1165 static void bdrv_rbd_init(void) 1166 { 1167 bdrv_register(&bdrv_rbd); 1168 } 1169 1170 block_init(bdrv_rbd_init); 1171