1 /* 2 * QEMU Block driver for RADOS (Ceph) 3 * 4 * Copyright (C) 2010-2011 Christian Brunner <chb@muc.de>, 5 * Josh Durgin <josh.durgin@dreamhost.com> 6 * 7 * This work is licensed under the terms of the GNU GPL, version 2. See 8 * the COPYING file in the top-level directory. 9 * 10 * Contributions after 2012-01-13 are licensed under the terms of the 11 * GNU GPL, version 2 or (at your option) any later version. 12 */ 13 14 #include "qemu/osdep.h" 15 16 #include <rbd/librbd.h> 17 #include "qapi/error.h" 18 #include "qemu/error-report.h" 19 #include "qemu/option.h" 20 #include "block/block_int.h" 21 #include "crypto/secret.h" 22 #include "qemu/cutils.h" 23 #include "qapi/qmp/qstring.h" 24 #include "qapi/qmp/qdict.h" 25 #include "qapi/qmp/qjson.h" 26 #include "qapi/qmp/qlist.h" 27 28 /* 29 * When specifying the image filename use: 30 * 31 * rbd:poolname/devicename[@snapshotname][:option1=value1[:option2=value2...]] 32 * 33 * poolname must be the name of an existing rados pool. 34 * 35 * devicename is the name of the rbd image. 36 * 37 * Each option given is used to configure rados, and may be any valid 38 * Ceph option, "id", or "conf". 39 * 40 * The "id" option indicates what user we should authenticate as to 41 * the Ceph cluster. If it is excluded we will use the Ceph default 42 * (normally 'admin'). 43 * 44 * The "conf" option specifies a Ceph configuration file to read. If 45 * it is not specified, we will read from the default Ceph locations 46 * (e.g., /etc/ceph/ceph.conf). To avoid reading _any_ configuration 47 * file, specify conf=/dev/null. 48 * 49 * Configuration values containing :, @, or = can be escaped with a 50 * leading "\". 51 */ 52 53 /* rbd_aio_discard added in 0.1.2 */ 54 #if LIBRBD_VERSION_CODE >= LIBRBD_VERSION(0, 1, 2) 55 #define LIBRBD_SUPPORTS_DISCARD 56 #else 57 #undef LIBRBD_SUPPORTS_DISCARD 58 #endif 59 60 #define OBJ_MAX_SIZE (1UL << OBJ_DEFAULT_OBJ_ORDER) 61 62 #define RBD_MAX_SNAPS 100 63 64 /* The LIBRBD_SUPPORTS_IOVEC is defined in librbd.h */ 65 #ifdef LIBRBD_SUPPORTS_IOVEC 66 #define LIBRBD_USE_IOVEC 1 67 #else 68 #define LIBRBD_USE_IOVEC 0 69 #endif 70 71 typedef enum { 72 RBD_AIO_READ, 73 RBD_AIO_WRITE, 74 RBD_AIO_DISCARD, 75 RBD_AIO_FLUSH 76 } RBDAIOCmd; 77 78 typedef struct RBDAIOCB { 79 BlockAIOCB common; 80 int64_t ret; 81 QEMUIOVector *qiov; 82 char *bounce; 83 RBDAIOCmd cmd; 84 int error; 85 struct BDRVRBDState *s; 86 } RBDAIOCB; 87 88 typedef struct RADOSCB { 89 RBDAIOCB *acb; 90 struct BDRVRBDState *s; 91 int64_t size; 92 char *buf; 93 int64_t ret; 94 } RADOSCB; 95 96 typedef struct BDRVRBDState { 97 rados_t cluster; 98 rados_ioctx_t io_ctx; 99 rbd_image_t image; 100 char *image_name; 101 char *snap; 102 } BDRVRBDState; 103 104 static char *qemu_rbd_next_tok(char *src, char delim, char **p) 105 { 106 char *end; 107 108 *p = NULL; 109 110 for (end = src; *end; ++end) { 111 if (*end == delim) { 112 break; 113 } 114 if (*end == '\\' && end[1] != '\0') { 115 end++; 116 } 117 } 118 if (*end == delim) { 119 *p = end + 1; 120 *end = '\0'; 121 } 122 return src; 123 } 124 125 static void qemu_rbd_unescape(char *src) 126 { 127 char *p; 128 129 for (p = src; *src; ++src, ++p) { 130 if (*src == '\\' && src[1] != '\0') { 131 src++; 132 } 133 *p = *src; 134 } 135 *p = '\0'; 136 } 137 138 static void qemu_rbd_parse_filename(const char *filename, QDict *options, 139 Error **errp) 140 { 141 const char *start; 142 char *p, *buf; 143 QList *keypairs = NULL; 144 char *found_str; 145 146 if (!strstart(filename, "rbd:", &start)) { 147 error_setg(errp, "File name must start with 'rbd:'"); 148 return; 149 } 150 151 buf = g_strdup(start); 152 p = buf; 153 154 found_str = qemu_rbd_next_tok(p, '/', &p); 155 if (!p) { 156 error_setg(errp, "Pool name is required"); 157 goto done; 158 } 159 qemu_rbd_unescape(found_str); 160 qdict_put_str(options, "pool", found_str); 161 162 if (strchr(p, '@')) { 163 found_str = qemu_rbd_next_tok(p, '@', &p); 164 qemu_rbd_unescape(found_str); 165 qdict_put_str(options, "image", found_str); 166 167 found_str = qemu_rbd_next_tok(p, ':', &p); 168 qemu_rbd_unescape(found_str); 169 qdict_put_str(options, "snapshot", found_str); 170 } else { 171 found_str = qemu_rbd_next_tok(p, ':', &p); 172 qemu_rbd_unescape(found_str); 173 qdict_put_str(options, "image", found_str); 174 } 175 if (!p) { 176 goto done; 177 } 178 179 /* The following are essentially all key/value pairs, and we treat 180 * 'id' and 'conf' a bit special. Key/value pairs may be in any order. */ 181 while (p) { 182 char *name, *value; 183 name = qemu_rbd_next_tok(p, '=', &p); 184 if (!p) { 185 error_setg(errp, "conf option %s has no value", name); 186 break; 187 } 188 189 qemu_rbd_unescape(name); 190 191 value = qemu_rbd_next_tok(p, ':', &p); 192 qemu_rbd_unescape(value); 193 194 if (!strcmp(name, "conf")) { 195 qdict_put_str(options, "conf", value); 196 } else if (!strcmp(name, "id")) { 197 qdict_put_str(options, "user", value); 198 } else { 199 /* 200 * We pass these internally to qemu_rbd_set_keypairs(), so 201 * we can get away with the simpler list of [ "key1", 202 * "value1", "key2", "value2" ] rather than a raw dict 203 * { "key1": "value1", "key2": "value2" } where we can't 204 * guarantee order, or even a more correct but complex 205 * [ { "key1": "value1" }, { "key2": "value2" } ] 206 */ 207 if (!keypairs) { 208 keypairs = qlist_new(); 209 } 210 qlist_append_str(keypairs, name); 211 qlist_append_str(keypairs, value); 212 } 213 } 214 215 if (keypairs) { 216 qdict_put(options, "=keyvalue-pairs", 217 qobject_to_json(QOBJECT(keypairs))); 218 } 219 220 done: 221 g_free(buf); 222 QDECREF(keypairs); 223 return; 224 } 225 226 227 static int qemu_rbd_set_auth(rados_t cluster, const char *secretid, 228 Error **errp) 229 { 230 if (secretid == 0) { 231 return 0; 232 } 233 234 gchar *secret = qcrypto_secret_lookup_as_base64(secretid, 235 errp); 236 if (!secret) { 237 return -1; 238 } 239 240 rados_conf_set(cluster, "key", secret); 241 g_free(secret); 242 243 return 0; 244 } 245 246 static int qemu_rbd_set_keypairs(rados_t cluster, const char *keypairs_json, 247 Error **errp) 248 { 249 QList *keypairs; 250 QString *name; 251 QString *value; 252 const char *key; 253 size_t remaining; 254 int ret = 0; 255 256 if (!keypairs_json) { 257 return ret; 258 } 259 keypairs = qobject_to_qlist(qobject_from_json(keypairs_json, 260 &error_abort)); 261 remaining = qlist_size(keypairs) / 2; 262 assert(remaining); 263 264 while (remaining--) { 265 name = qobject_to_qstring(qlist_pop(keypairs)); 266 value = qobject_to_qstring(qlist_pop(keypairs)); 267 assert(name && value); 268 key = qstring_get_str(name); 269 270 ret = rados_conf_set(cluster, key, qstring_get_str(value)); 271 QDECREF(name); 272 QDECREF(value); 273 if (ret < 0) { 274 error_setg_errno(errp, -ret, "invalid conf option %s", key); 275 ret = -EINVAL; 276 break; 277 } 278 } 279 280 QDECREF(keypairs); 281 return ret; 282 } 283 284 static void qemu_rbd_memset(RADOSCB *rcb, int64_t offs) 285 { 286 if (LIBRBD_USE_IOVEC) { 287 RBDAIOCB *acb = rcb->acb; 288 iov_memset(acb->qiov->iov, acb->qiov->niov, offs, 0, 289 acb->qiov->size - offs); 290 } else { 291 memset(rcb->buf + offs, 0, rcb->size - offs); 292 } 293 } 294 295 static QemuOptsList runtime_opts = { 296 .name = "rbd", 297 .head = QTAILQ_HEAD_INITIALIZER(runtime_opts.head), 298 .desc = { 299 { 300 .name = "pool", 301 .type = QEMU_OPT_STRING, 302 .help = "Rados pool name", 303 }, 304 { 305 .name = "image", 306 .type = QEMU_OPT_STRING, 307 .help = "Image name in the pool", 308 }, 309 { 310 .name = "conf", 311 .type = QEMU_OPT_STRING, 312 .help = "Rados config file location", 313 }, 314 { 315 .name = "snapshot", 316 .type = QEMU_OPT_STRING, 317 .help = "Ceph snapshot name", 318 }, 319 { 320 /* maps to 'id' in rados_create() */ 321 .name = "user", 322 .type = QEMU_OPT_STRING, 323 .help = "Rados id name", 324 }, 325 /* 326 * server.* extracted manually, see qemu_rbd_mon_host() 327 */ 328 { 329 .name = "password-secret", 330 .type = QEMU_OPT_STRING, 331 .help = "ID of secret providing the password", 332 }, 333 334 /* 335 * Keys for qemu_rbd_parse_filename(), not in the QAPI schema 336 */ 337 { 338 /* 339 * HACK: name starts with '=' so that qemu_opts_parse() 340 * can't set it 341 */ 342 .name = "=keyvalue-pairs", 343 .type = QEMU_OPT_STRING, 344 .help = "Legacy rados key/value option parameters", 345 }, 346 { 347 .name = "filename", 348 .type = QEMU_OPT_STRING, 349 }, 350 { /* end of list */ } 351 }, 352 }; 353 354 static int coroutine_fn qemu_rbd_co_create_opts(const char *filename, 355 QemuOpts *opts, 356 Error **errp) 357 { 358 Error *local_err = NULL; 359 int64_t bytes = 0; 360 int64_t objsize; 361 int obj_order = 0; 362 const char *pool, *image_name, *conf, *user, *keypairs; 363 const char *secretid; 364 rados_t cluster; 365 rados_ioctx_t io_ctx; 366 QDict *options = NULL; 367 int ret = 0; 368 369 secretid = qemu_opt_get(opts, "password-secret"); 370 371 /* Read out options */ 372 bytes = ROUND_UP(qemu_opt_get_size_del(opts, BLOCK_OPT_SIZE, 0), 373 BDRV_SECTOR_SIZE); 374 objsize = qemu_opt_get_size_del(opts, BLOCK_OPT_CLUSTER_SIZE, 0); 375 if (objsize) { 376 if ((objsize - 1) & objsize) { /* not a power of 2? */ 377 error_setg(errp, "obj size needs to be power of 2"); 378 ret = -EINVAL; 379 goto exit; 380 } 381 if (objsize < 4096) { 382 error_setg(errp, "obj size too small"); 383 ret = -EINVAL; 384 goto exit; 385 } 386 obj_order = ctz32(objsize); 387 } 388 389 options = qdict_new(); 390 qemu_rbd_parse_filename(filename, options, &local_err); 391 if (local_err) { 392 ret = -EINVAL; 393 error_propagate(errp, local_err); 394 goto exit; 395 } 396 397 /* 398 * Caution: while qdict_get_try_str() is fine, getting non-string 399 * types would require more care. When @options come from -blockdev 400 * or blockdev_add, its members are typed according to the QAPI 401 * schema, but when they come from -drive, they're all QString. 402 */ 403 pool = qdict_get_try_str(options, "pool"); 404 conf = qdict_get_try_str(options, "conf"); 405 user = qdict_get_try_str(options, "user"); 406 image_name = qdict_get_try_str(options, "image"); 407 keypairs = qdict_get_try_str(options, "=keyvalue-pairs"); 408 409 ret = rados_create(&cluster, user); 410 if (ret < 0) { 411 error_setg_errno(errp, -ret, "error initializing"); 412 goto exit; 413 } 414 415 /* try default location when conf=NULL, but ignore failure */ 416 ret = rados_conf_read_file(cluster, conf); 417 if (conf && ret < 0) { 418 error_setg_errno(errp, -ret, "error reading conf file %s", conf); 419 ret = -EIO; 420 goto shutdown; 421 } 422 423 ret = qemu_rbd_set_keypairs(cluster, keypairs, errp); 424 if (ret < 0) { 425 ret = -EIO; 426 goto shutdown; 427 } 428 429 if (qemu_rbd_set_auth(cluster, secretid, errp) < 0) { 430 ret = -EIO; 431 goto shutdown; 432 } 433 434 ret = rados_connect(cluster); 435 if (ret < 0) { 436 error_setg_errno(errp, -ret, "error connecting"); 437 goto shutdown; 438 } 439 440 ret = rados_ioctx_create(cluster, pool, &io_ctx); 441 if (ret < 0) { 442 error_setg_errno(errp, -ret, "error opening pool %s", pool); 443 goto shutdown; 444 } 445 446 ret = rbd_create(io_ctx, image_name, bytes, &obj_order); 447 if (ret < 0) { 448 error_setg_errno(errp, -ret, "error rbd create"); 449 } 450 451 rados_ioctx_destroy(io_ctx); 452 453 shutdown: 454 rados_shutdown(cluster); 455 456 exit: 457 QDECREF(options); 458 return ret; 459 } 460 461 /* 462 * This aio completion is being called from rbd_finish_bh() and runs in qemu 463 * BH context. 464 */ 465 static void qemu_rbd_complete_aio(RADOSCB *rcb) 466 { 467 RBDAIOCB *acb = rcb->acb; 468 int64_t r; 469 470 r = rcb->ret; 471 472 if (acb->cmd != RBD_AIO_READ) { 473 if (r < 0) { 474 acb->ret = r; 475 acb->error = 1; 476 } else if (!acb->error) { 477 acb->ret = rcb->size; 478 } 479 } else { 480 if (r < 0) { 481 qemu_rbd_memset(rcb, 0); 482 acb->ret = r; 483 acb->error = 1; 484 } else if (r < rcb->size) { 485 qemu_rbd_memset(rcb, r); 486 if (!acb->error) { 487 acb->ret = rcb->size; 488 } 489 } else if (!acb->error) { 490 acb->ret = r; 491 } 492 } 493 494 g_free(rcb); 495 496 if (!LIBRBD_USE_IOVEC) { 497 if (acb->cmd == RBD_AIO_READ) { 498 qemu_iovec_from_buf(acb->qiov, 0, acb->bounce, acb->qiov->size); 499 } 500 qemu_vfree(acb->bounce); 501 } 502 503 acb->common.cb(acb->common.opaque, (acb->ret > 0 ? 0 : acb->ret)); 504 505 qemu_aio_unref(acb); 506 } 507 508 static char *qemu_rbd_mon_host(QDict *options, Error **errp) 509 { 510 const char **vals = g_new(const char *, qdict_size(options) + 1); 511 char keybuf[32]; 512 const char *host, *port; 513 char *rados_str; 514 int i; 515 516 for (i = 0;; i++) { 517 sprintf(keybuf, "server.%d.host", i); 518 host = qdict_get_try_str(options, keybuf); 519 qdict_del(options, keybuf); 520 sprintf(keybuf, "server.%d.port", i); 521 port = qdict_get_try_str(options, keybuf); 522 qdict_del(options, keybuf); 523 if (!host && !port) { 524 break; 525 } 526 if (!host) { 527 error_setg(errp, "Parameter server.%d.host is missing", i); 528 rados_str = NULL; 529 goto out; 530 } 531 532 if (strchr(host, ':')) { 533 vals[i] = port ? g_strdup_printf("[%s]:%s", host, port) 534 : g_strdup_printf("[%s]", host); 535 } else { 536 vals[i] = port ? g_strdup_printf("%s:%s", host, port) 537 : g_strdup(host); 538 } 539 } 540 vals[i] = NULL; 541 542 rados_str = i ? g_strjoinv(";", (char **)vals) : NULL; 543 out: 544 g_strfreev((char **)vals); 545 return rados_str; 546 } 547 548 static int qemu_rbd_open(BlockDriverState *bs, QDict *options, int flags, 549 Error **errp) 550 { 551 BDRVRBDState *s = bs->opaque; 552 const char *pool, *snap, *conf, *user, *image_name, *keypairs; 553 const char *secretid, *filename; 554 QemuOpts *opts; 555 Error *local_err = NULL; 556 char *mon_host = NULL; 557 int r; 558 559 /* If we are given a filename, parse the filename, with precedence given to 560 * filename encoded options */ 561 filename = qdict_get_try_str(options, "filename"); 562 if (filename) { 563 warn_report("'filename' option specified. " 564 "This is an unsupported option, and may be deprecated " 565 "in the future"); 566 qemu_rbd_parse_filename(filename, options, &local_err); 567 if (local_err) { 568 r = -EINVAL; 569 error_propagate(errp, local_err); 570 goto exit; 571 } 572 } 573 574 opts = qemu_opts_create(&runtime_opts, NULL, 0, &error_abort); 575 qemu_opts_absorb_qdict(opts, options, &local_err); 576 if (local_err) { 577 error_propagate(errp, local_err); 578 r = -EINVAL; 579 goto failed_opts; 580 } 581 582 mon_host = qemu_rbd_mon_host(options, &local_err); 583 if (local_err) { 584 error_propagate(errp, local_err); 585 r = -EINVAL; 586 goto failed_opts; 587 } 588 589 secretid = qemu_opt_get(opts, "password-secret"); 590 591 pool = qemu_opt_get(opts, "pool"); 592 conf = qemu_opt_get(opts, "conf"); 593 snap = qemu_opt_get(opts, "snapshot"); 594 user = qemu_opt_get(opts, "user"); 595 image_name = qemu_opt_get(opts, "image"); 596 keypairs = qemu_opt_get(opts, "=keyvalue-pairs"); 597 598 if (!pool || !image_name) { 599 error_setg(errp, "Parameters 'pool' and 'image' are required"); 600 r = -EINVAL; 601 goto failed_opts; 602 } 603 604 r = rados_create(&s->cluster, user); 605 if (r < 0) { 606 error_setg_errno(errp, -r, "error initializing"); 607 goto failed_opts; 608 } 609 610 s->snap = g_strdup(snap); 611 s->image_name = g_strdup(image_name); 612 613 /* try default location when conf=NULL, but ignore failure */ 614 r = rados_conf_read_file(s->cluster, conf); 615 if (conf && r < 0) { 616 error_setg_errno(errp, -r, "error reading conf file %s", conf); 617 goto failed_shutdown; 618 } 619 620 r = qemu_rbd_set_keypairs(s->cluster, keypairs, errp); 621 if (r < 0) { 622 goto failed_shutdown; 623 } 624 625 if (mon_host) { 626 r = rados_conf_set(s->cluster, "mon_host", mon_host); 627 if (r < 0) { 628 goto failed_shutdown; 629 } 630 } 631 632 if (qemu_rbd_set_auth(s->cluster, secretid, errp) < 0) { 633 r = -EIO; 634 goto failed_shutdown; 635 } 636 637 /* 638 * Fallback to more conservative semantics if setting cache 639 * options fails. Ignore errors from setting rbd_cache because the 640 * only possible error is that the option does not exist, and 641 * librbd defaults to no caching. If write through caching cannot 642 * be set up, fall back to no caching. 643 */ 644 if (flags & BDRV_O_NOCACHE) { 645 rados_conf_set(s->cluster, "rbd_cache", "false"); 646 } else { 647 rados_conf_set(s->cluster, "rbd_cache", "true"); 648 } 649 650 r = rados_connect(s->cluster); 651 if (r < 0) { 652 error_setg_errno(errp, -r, "error connecting"); 653 goto failed_shutdown; 654 } 655 656 r = rados_ioctx_create(s->cluster, pool, &s->io_ctx); 657 if (r < 0) { 658 error_setg_errno(errp, -r, "error opening pool %s", pool); 659 goto failed_shutdown; 660 } 661 662 /* rbd_open is always r/w */ 663 r = rbd_open(s->io_ctx, s->image_name, &s->image, s->snap); 664 if (r < 0) { 665 error_setg_errno(errp, -r, "error reading header from %s", 666 s->image_name); 667 goto failed_open; 668 } 669 670 /* If we are using an rbd snapshot, we must be r/o, otherwise 671 * leave as-is */ 672 if (s->snap != NULL) { 673 if (!bdrv_is_read_only(bs)) { 674 error_report("Opening rbd snapshots without an explicit " 675 "read-only=on option is deprecated. Future versions " 676 "will refuse to open the image instead of " 677 "automatically marking the image read-only."); 678 r = bdrv_set_read_only(bs, true, &local_err); 679 if (r < 0) { 680 error_propagate(errp, local_err); 681 goto failed_open; 682 } 683 } 684 } 685 686 qemu_opts_del(opts); 687 return 0; 688 689 failed_open: 690 rados_ioctx_destroy(s->io_ctx); 691 failed_shutdown: 692 rados_shutdown(s->cluster); 693 g_free(s->snap); 694 g_free(s->image_name); 695 failed_opts: 696 qemu_opts_del(opts); 697 g_free(mon_host); 698 exit: 699 return r; 700 } 701 702 703 /* Since RBD is currently always opened R/W via the API, 704 * we just need to check if we are using a snapshot or not, in 705 * order to determine if we will allow it to be R/W */ 706 static int qemu_rbd_reopen_prepare(BDRVReopenState *state, 707 BlockReopenQueue *queue, Error **errp) 708 { 709 BDRVRBDState *s = state->bs->opaque; 710 int ret = 0; 711 712 if (s->snap && state->flags & BDRV_O_RDWR) { 713 error_setg(errp, 714 "Cannot change node '%s' to r/w when using RBD snapshot", 715 bdrv_get_device_or_node_name(state->bs)); 716 ret = -EINVAL; 717 } 718 719 return ret; 720 } 721 722 static void qemu_rbd_close(BlockDriverState *bs) 723 { 724 BDRVRBDState *s = bs->opaque; 725 726 rbd_close(s->image); 727 rados_ioctx_destroy(s->io_ctx); 728 g_free(s->snap); 729 g_free(s->image_name); 730 rados_shutdown(s->cluster); 731 } 732 733 static const AIOCBInfo rbd_aiocb_info = { 734 .aiocb_size = sizeof(RBDAIOCB), 735 }; 736 737 static void rbd_finish_bh(void *opaque) 738 { 739 RADOSCB *rcb = opaque; 740 qemu_rbd_complete_aio(rcb); 741 } 742 743 /* 744 * This is the callback function for rbd_aio_read and _write 745 * 746 * Note: this function is being called from a non qemu thread so 747 * we need to be careful about what we do here. Generally we only 748 * schedule a BH, and do the rest of the io completion handling 749 * from rbd_finish_bh() which runs in a qemu context. 750 */ 751 static void rbd_finish_aiocb(rbd_completion_t c, RADOSCB *rcb) 752 { 753 RBDAIOCB *acb = rcb->acb; 754 755 rcb->ret = rbd_aio_get_return_value(c); 756 rbd_aio_release(c); 757 758 aio_bh_schedule_oneshot(bdrv_get_aio_context(acb->common.bs), 759 rbd_finish_bh, rcb); 760 } 761 762 static int rbd_aio_discard_wrapper(rbd_image_t image, 763 uint64_t off, 764 uint64_t len, 765 rbd_completion_t comp) 766 { 767 #ifdef LIBRBD_SUPPORTS_DISCARD 768 return rbd_aio_discard(image, off, len, comp); 769 #else 770 return -ENOTSUP; 771 #endif 772 } 773 774 static int rbd_aio_flush_wrapper(rbd_image_t image, 775 rbd_completion_t comp) 776 { 777 #ifdef LIBRBD_SUPPORTS_AIO_FLUSH 778 return rbd_aio_flush(image, comp); 779 #else 780 return -ENOTSUP; 781 #endif 782 } 783 784 static BlockAIOCB *rbd_start_aio(BlockDriverState *bs, 785 int64_t off, 786 QEMUIOVector *qiov, 787 int64_t size, 788 BlockCompletionFunc *cb, 789 void *opaque, 790 RBDAIOCmd cmd) 791 { 792 RBDAIOCB *acb; 793 RADOSCB *rcb = NULL; 794 rbd_completion_t c; 795 int r; 796 797 BDRVRBDState *s = bs->opaque; 798 799 acb = qemu_aio_get(&rbd_aiocb_info, bs, cb, opaque); 800 acb->cmd = cmd; 801 acb->qiov = qiov; 802 assert(!qiov || qiov->size == size); 803 804 rcb = g_new(RADOSCB, 1); 805 806 if (!LIBRBD_USE_IOVEC) { 807 if (cmd == RBD_AIO_DISCARD || cmd == RBD_AIO_FLUSH) { 808 acb->bounce = NULL; 809 } else { 810 acb->bounce = qemu_try_blockalign(bs, qiov->size); 811 if (acb->bounce == NULL) { 812 goto failed; 813 } 814 } 815 if (cmd == RBD_AIO_WRITE) { 816 qemu_iovec_to_buf(acb->qiov, 0, acb->bounce, qiov->size); 817 } 818 rcb->buf = acb->bounce; 819 } 820 821 acb->ret = 0; 822 acb->error = 0; 823 acb->s = s; 824 825 rcb->acb = acb; 826 rcb->s = acb->s; 827 rcb->size = size; 828 r = rbd_aio_create_completion(rcb, (rbd_callback_t) rbd_finish_aiocb, &c); 829 if (r < 0) { 830 goto failed; 831 } 832 833 switch (cmd) { 834 case RBD_AIO_WRITE: 835 #ifdef LIBRBD_SUPPORTS_IOVEC 836 r = rbd_aio_writev(s->image, qiov->iov, qiov->niov, off, c); 837 #else 838 r = rbd_aio_write(s->image, off, size, rcb->buf, c); 839 #endif 840 break; 841 case RBD_AIO_READ: 842 #ifdef LIBRBD_SUPPORTS_IOVEC 843 r = rbd_aio_readv(s->image, qiov->iov, qiov->niov, off, c); 844 #else 845 r = rbd_aio_read(s->image, off, size, rcb->buf, c); 846 #endif 847 break; 848 case RBD_AIO_DISCARD: 849 r = rbd_aio_discard_wrapper(s->image, off, size, c); 850 break; 851 case RBD_AIO_FLUSH: 852 r = rbd_aio_flush_wrapper(s->image, c); 853 break; 854 default: 855 r = -EINVAL; 856 } 857 858 if (r < 0) { 859 goto failed_completion; 860 } 861 return &acb->common; 862 863 failed_completion: 864 rbd_aio_release(c); 865 failed: 866 g_free(rcb); 867 if (!LIBRBD_USE_IOVEC) { 868 qemu_vfree(acb->bounce); 869 } 870 871 qemu_aio_unref(acb); 872 return NULL; 873 } 874 875 static BlockAIOCB *qemu_rbd_aio_readv(BlockDriverState *bs, 876 int64_t sector_num, 877 QEMUIOVector *qiov, 878 int nb_sectors, 879 BlockCompletionFunc *cb, 880 void *opaque) 881 { 882 return rbd_start_aio(bs, sector_num << BDRV_SECTOR_BITS, qiov, 883 (int64_t) nb_sectors << BDRV_SECTOR_BITS, cb, opaque, 884 RBD_AIO_READ); 885 } 886 887 static BlockAIOCB *qemu_rbd_aio_writev(BlockDriverState *bs, 888 int64_t sector_num, 889 QEMUIOVector *qiov, 890 int nb_sectors, 891 BlockCompletionFunc *cb, 892 void *opaque) 893 { 894 return rbd_start_aio(bs, sector_num << BDRV_SECTOR_BITS, qiov, 895 (int64_t) nb_sectors << BDRV_SECTOR_BITS, cb, opaque, 896 RBD_AIO_WRITE); 897 } 898 899 #ifdef LIBRBD_SUPPORTS_AIO_FLUSH 900 static BlockAIOCB *qemu_rbd_aio_flush(BlockDriverState *bs, 901 BlockCompletionFunc *cb, 902 void *opaque) 903 { 904 return rbd_start_aio(bs, 0, NULL, 0, cb, opaque, RBD_AIO_FLUSH); 905 } 906 907 #else 908 909 static int qemu_rbd_co_flush(BlockDriverState *bs) 910 { 911 #if LIBRBD_VERSION_CODE >= LIBRBD_VERSION(0, 1, 1) 912 /* rbd_flush added in 0.1.1 */ 913 BDRVRBDState *s = bs->opaque; 914 return rbd_flush(s->image); 915 #else 916 return 0; 917 #endif 918 } 919 #endif 920 921 static int qemu_rbd_getinfo(BlockDriverState *bs, BlockDriverInfo *bdi) 922 { 923 BDRVRBDState *s = bs->opaque; 924 rbd_image_info_t info; 925 int r; 926 927 r = rbd_stat(s->image, &info, sizeof(info)); 928 if (r < 0) { 929 return r; 930 } 931 932 bdi->cluster_size = info.obj_size; 933 return 0; 934 } 935 936 static int64_t qemu_rbd_getlength(BlockDriverState *bs) 937 { 938 BDRVRBDState *s = bs->opaque; 939 rbd_image_info_t info; 940 int r; 941 942 r = rbd_stat(s->image, &info, sizeof(info)); 943 if (r < 0) { 944 return r; 945 } 946 947 return info.size; 948 } 949 950 static int qemu_rbd_truncate(BlockDriverState *bs, int64_t offset, 951 PreallocMode prealloc, Error **errp) 952 { 953 BDRVRBDState *s = bs->opaque; 954 int r; 955 956 if (prealloc != PREALLOC_MODE_OFF) { 957 error_setg(errp, "Unsupported preallocation mode '%s'", 958 PreallocMode_str(prealloc)); 959 return -ENOTSUP; 960 } 961 962 r = rbd_resize(s->image, offset); 963 if (r < 0) { 964 error_setg_errno(errp, -r, "Failed to resize file"); 965 return r; 966 } 967 968 return 0; 969 } 970 971 static int qemu_rbd_snap_create(BlockDriverState *bs, 972 QEMUSnapshotInfo *sn_info) 973 { 974 BDRVRBDState *s = bs->opaque; 975 int r; 976 977 if (sn_info->name[0] == '\0') { 978 return -EINVAL; /* we need a name for rbd snapshots */ 979 } 980 981 /* 982 * rbd snapshots are using the name as the user controlled unique identifier 983 * we can't use the rbd snapid for that purpose, as it can't be set 984 */ 985 if (sn_info->id_str[0] != '\0' && 986 strcmp(sn_info->id_str, sn_info->name) != 0) { 987 return -EINVAL; 988 } 989 990 if (strlen(sn_info->name) >= sizeof(sn_info->id_str)) { 991 return -ERANGE; 992 } 993 994 r = rbd_snap_create(s->image, sn_info->name); 995 if (r < 0) { 996 error_report("failed to create snap: %s", strerror(-r)); 997 return r; 998 } 999 1000 return 0; 1001 } 1002 1003 static int qemu_rbd_snap_remove(BlockDriverState *bs, 1004 const char *snapshot_id, 1005 const char *snapshot_name, 1006 Error **errp) 1007 { 1008 BDRVRBDState *s = bs->opaque; 1009 int r; 1010 1011 if (!snapshot_name) { 1012 error_setg(errp, "rbd need a valid snapshot name"); 1013 return -EINVAL; 1014 } 1015 1016 /* If snapshot_id is specified, it must be equal to name, see 1017 qemu_rbd_snap_list() */ 1018 if (snapshot_id && strcmp(snapshot_id, snapshot_name)) { 1019 error_setg(errp, 1020 "rbd do not support snapshot id, it should be NULL or " 1021 "equal to snapshot name"); 1022 return -EINVAL; 1023 } 1024 1025 r = rbd_snap_remove(s->image, snapshot_name); 1026 if (r < 0) { 1027 error_setg_errno(errp, -r, "Failed to remove the snapshot"); 1028 } 1029 return r; 1030 } 1031 1032 static int qemu_rbd_snap_rollback(BlockDriverState *bs, 1033 const char *snapshot_name) 1034 { 1035 BDRVRBDState *s = bs->opaque; 1036 1037 return rbd_snap_rollback(s->image, snapshot_name); 1038 } 1039 1040 static int qemu_rbd_snap_list(BlockDriverState *bs, 1041 QEMUSnapshotInfo **psn_tab) 1042 { 1043 BDRVRBDState *s = bs->opaque; 1044 QEMUSnapshotInfo *sn_info, *sn_tab = NULL; 1045 int i, snap_count; 1046 rbd_snap_info_t *snaps; 1047 int max_snaps = RBD_MAX_SNAPS; 1048 1049 do { 1050 snaps = g_new(rbd_snap_info_t, max_snaps); 1051 snap_count = rbd_snap_list(s->image, snaps, &max_snaps); 1052 if (snap_count <= 0) { 1053 g_free(snaps); 1054 } 1055 } while (snap_count == -ERANGE); 1056 1057 if (snap_count <= 0) { 1058 goto done; 1059 } 1060 1061 sn_tab = g_new0(QEMUSnapshotInfo, snap_count); 1062 1063 for (i = 0; i < snap_count; i++) { 1064 const char *snap_name = snaps[i].name; 1065 1066 sn_info = sn_tab + i; 1067 pstrcpy(sn_info->id_str, sizeof(sn_info->id_str), snap_name); 1068 pstrcpy(sn_info->name, sizeof(sn_info->name), snap_name); 1069 1070 sn_info->vm_state_size = snaps[i].size; 1071 sn_info->date_sec = 0; 1072 sn_info->date_nsec = 0; 1073 sn_info->vm_clock_nsec = 0; 1074 } 1075 rbd_snap_list_end(snaps); 1076 g_free(snaps); 1077 1078 done: 1079 *psn_tab = sn_tab; 1080 return snap_count; 1081 } 1082 1083 #ifdef LIBRBD_SUPPORTS_DISCARD 1084 static BlockAIOCB *qemu_rbd_aio_pdiscard(BlockDriverState *bs, 1085 int64_t offset, 1086 int bytes, 1087 BlockCompletionFunc *cb, 1088 void *opaque) 1089 { 1090 return rbd_start_aio(bs, offset, NULL, bytes, cb, opaque, 1091 RBD_AIO_DISCARD); 1092 } 1093 #endif 1094 1095 #ifdef LIBRBD_SUPPORTS_INVALIDATE 1096 static void qemu_rbd_invalidate_cache(BlockDriverState *bs, 1097 Error **errp) 1098 { 1099 BDRVRBDState *s = bs->opaque; 1100 int r = rbd_invalidate_cache(s->image); 1101 if (r < 0) { 1102 error_setg_errno(errp, -r, "Failed to invalidate the cache"); 1103 } 1104 } 1105 #endif 1106 1107 static QemuOptsList qemu_rbd_create_opts = { 1108 .name = "rbd-create-opts", 1109 .head = QTAILQ_HEAD_INITIALIZER(qemu_rbd_create_opts.head), 1110 .desc = { 1111 { 1112 .name = BLOCK_OPT_SIZE, 1113 .type = QEMU_OPT_SIZE, 1114 .help = "Virtual disk size" 1115 }, 1116 { 1117 .name = BLOCK_OPT_CLUSTER_SIZE, 1118 .type = QEMU_OPT_SIZE, 1119 .help = "RBD object size" 1120 }, 1121 { 1122 .name = "password-secret", 1123 .type = QEMU_OPT_STRING, 1124 .help = "ID of secret providing the password", 1125 }, 1126 { /* end of list */ } 1127 } 1128 }; 1129 1130 static BlockDriver bdrv_rbd = { 1131 .format_name = "rbd", 1132 .instance_size = sizeof(BDRVRBDState), 1133 .bdrv_parse_filename = qemu_rbd_parse_filename, 1134 .bdrv_file_open = qemu_rbd_open, 1135 .bdrv_close = qemu_rbd_close, 1136 .bdrv_reopen_prepare = qemu_rbd_reopen_prepare, 1137 .bdrv_co_create_opts = qemu_rbd_co_create_opts, 1138 .bdrv_has_zero_init = bdrv_has_zero_init_1, 1139 .bdrv_get_info = qemu_rbd_getinfo, 1140 .create_opts = &qemu_rbd_create_opts, 1141 .bdrv_getlength = qemu_rbd_getlength, 1142 .bdrv_truncate = qemu_rbd_truncate, 1143 .protocol_name = "rbd", 1144 1145 .bdrv_aio_readv = qemu_rbd_aio_readv, 1146 .bdrv_aio_writev = qemu_rbd_aio_writev, 1147 1148 #ifdef LIBRBD_SUPPORTS_AIO_FLUSH 1149 .bdrv_aio_flush = qemu_rbd_aio_flush, 1150 #else 1151 .bdrv_co_flush_to_disk = qemu_rbd_co_flush, 1152 #endif 1153 1154 #ifdef LIBRBD_SUPPORTS_DISCARD 1155 .bdrv_aio_pdiscard = qemu_rbd_aio_pdiscard, 1156 #endif 1157 1158 .bdrv_snapshot_create = qemu_rbd_snap_create, 1159 .bdrv_snapshot_delete = qemu_rbd_snap_remove, 1160 .bdrv_snapshot_list = qemu_rbd_snap_list, 1161 .bdrv_snapshot_goto = qemu_rbd_snap_rollback, 1162 #ifdef LIBRBD_SUPPORTS_INVALIDATE 1163 .bdrv_invalidate_cache = qemu_rbd_invalidate_cache, 1164 #endif 1165 }; 1166 1167 static void bdrv_rbd_init(void) 1168 { 1169 bdrv_register(&bdrv_rbd); 1170 } 1171 1172 block_init(bdrv_rbd_init); 1173