1 /* 2 * QEMU Block driver for RADOS (Ceph) 3 * 4 * Copyright (C) 2010-2011 Christian Brunner <chb@muc.de>, 5 * Josh Durgin <josh.durgin@dreamhost.com> 6 * 7 * This work is licensed under the terms of the GNU GPL, version 2. See 8 * the COPYING file in the top-level directory. 9 * 10 * Contributions after 2012-01-13 are licensed under the terms of the 11 * GNU GPL, version 2 or (at your option) any later version. 12 */ 13 14 #include "qemu/osdep.h" 15 16 #include "qapi/error.h" 17 #include "qemu/error-report.h" 18 #include "block/block_int.h" 19 #include "crypto/secret.h" 20 #include "qemu/cutils.h" 21 #include "qapi/qmp/qstring.h" 22 23 #include <rbd/librbd.h> 24 25 /* 26 * When specifying the image filename use: 27 * 28 * rbd:poolname/devicename[@snapshotname][:option1=value1[:option2=value2...]] 29 * 30 * poolname must be the name of an existing rados pool. 31 * 32 * devicename is the name of the rbd image. 33 * 34 * Each option given is used to configure rados, and may be any valid 35 * Ceph option, "id", or "conf". 36 * 37 * The "id" option indicates what user we should authenticate as to 38 * the Ceph cluster. If it is excluded we will use the Ceph default 39 * (normally 'admin'). 40 * 41 * The "conf" option specifies a Ceph configuration file to read. If 42 * it is not specified, we will read from the default Ceph locations 43 * (e.g., /etc/ceph/ceph.conf). To avoid reading _any_ configuration 44 * file, specify conf=/dev/null. 45 * 46 * Configuration values containing :, @, or = can be escaped with a 47 * leading "\". 48 */ 49 50 /* rbd_aio_discard added in 0.1.2 */ 51 #if LIBRBD_VERSION_CODE >= LIBRBD_VERSION(0, 1, 2) 52 #define LIBRBD_SUPPORTS_DISCARD 53 #else 54 #undef LIBRBD_SUPPORTS_DISCARD 55 #endif 56 57 #define OBJ_MAX_SIZE (1UL << OBJ_DEFAULT_OBJ_ORDER) 58 59 #define RBD_MAX_CONF_NAME_SIZE 128 60 #define RBD_MAX_CONF_VAL_SIZE 512 61 #define RBD_MAX_CONF_SIZE 1024 62 #define RBD_MAX_POOL_NAME_SIZE 128 63 #define RBD_MAX_SNAP_NAME_SIZE 128 64 #define RBD_MAX_SNAPS 100 65 66 /* The LIBRBD_SUPPORTS_IOVEC is defined in librbd.h */ 67 #ifdef LIBRBD_SUPPORTS_IOVEC 68 #define LIBRBD_USE_IOVEC 1 69 #else 70 #define LIBRBD_USE_IOVEC 0 71 #endif 72 73 typedef enum { 74 RBD_AIO_READ, 75 RBD_AIO_WRITE, 76 RBD_AIO_DISCARD, 77 RBD_AIO_FLUSH 78 } RBDAIOCmd; 79 80 typedef struct RBDAIOCB { 81 BlockAIOCB common; 82 int64_t ret; 83 QEMUIOVector *qiov; 84 char *bounce; 85 RBDAIOCmd cmd; 86 int error; 87 struct BDRVRBDState *s; 88 } RBDAIOCB; 89 90 typedef struct RADOSCB { 91 RBDAIOCB *acb; 92 struct BDRVRBDState *s; 93 int64_t size; 94 char *buf; 95 int64_t ret; 96 } RADOSCB; 97 98 typedef struct BDRVRBDState { 99 rados_t cluster; 100 rados_ioctx_t io_ctx; 101 rbd_image_t image; 102 char name[RBD_MAX_IMAGE_NAME_SIZE]; 103 char *snap; 104 } BDRVRBDState; 105 106 static char *qemu_rbd_next_tok(int max_len, 107 char *src, char delim, 108 const char *name, 109 char **p, Error **errp) 110 { 111 int l; 112 char *end; 113 114 *p = NULL; 115 116 if (delim != '\0') { 117 for (end = src; *end; ++end) { 118 if (*end == delim) { 119 break; 120 } 121 if (*end == '\\' && end[1] != '\0') { 122 end++; 123 } 124 } 125 if (*end == delim) { 126 *p = end + 1; 127 *end = '\0'; 128 } 129 } 130 l = strlen(src); 131 if (l >= max_len) { 132 error_setg(errp, "%s too long", name); 133 return NULL; 134 } else if (l == 0) { 135 error_setg(errp, "%s too short", name); 136 return NULL; 137 } 138 139 return src; 140 } 141 142 static void qemu_rbd_unescape(char *src) 143 { 144 char *p; 145 146 for (p = src; *src; ++src, ++p) { 147 if (*src == '\\' && src[1] != '\0') { 148 src++; 149 } 150 *p = *src; 151 } 152 *p = '\0'; 153 } 154 155 static void qemu_rbd_parse_filename(const char *filename, QDict *options, 156 Error **errp) 157 { 158 const char *start; 159 char *p, *buf, *keypairs; 160 char *found_str; 161 size_t max_keypair_size; 162 Error *local_err = NULL; 163 164 if (!strstart(filename, "rbd:", &start)) { 165 error_setg(errp, "File name must start with 'rbd:'"); 166 return; 167 } 168 169 max_keypair_size = strlen(start) + 1; 170 buf = g_strdup(start); 171 keypairs = g_malloc0(max_keypair_size); 172 p = buf; 173 174 found_str = qemu_rbd_next_tok(RBD_MAX_POOL_NAME_SIZE, p, 175 '/', "pool name", &p, &local_err); 176 if (local_err) { 177 goto done; 178 } 179 if (!p) { 180 error_setg(errp, "Pool name is required"); 181 goto done; 182 } 183 qemu_rbd_unescape(found_str); 184 qdict_put(options, "pool", qstring_from_str(found_str)); 185 186 if (strchr(p, '@')) { 187 found_str = qemu_rbd_next_tok(RBD_MAX_IMAGE_NAME_SIZE, p, 188 '@', "object name", &p, &local_err); 189 if (local_err) { 190 goto done; 191 } 192 qemu_rbd_unescape(found_str); 193 qdict_put(options, "image", qstring_from_str(found_str)); 194 195 found_str = qemu_rbd_next_tok(RBD_MAX_SNAP_NAME_SIZE, p, 196 ':', "snap name", &p, &local_err); 197 if (local_err) { 198 goto done; 199 } 200 qemu_rbd_unescape(found_str); 201 qdict_put(options, "snapshot", qstring_from_str(found_str)); 202 } else { 203 found_str = qemu_rbd_next_tok(RBD_MAX_IMAGE_NAME_SIZE, p, 204 ':', "object name", &p, &local_err); 205 if (local_err) { 206 goto done; 207 } 208 qemu_rbd_unescape(found_str); 209 qdict_put(options, "image", qstring_from_str(found_str)); 210 } 211 if (!p) { 212 goto done; 213 } 214 215 found_str = qemu_rbd_next_tok(RBD_MAX_CONF_NAME_SIZE, p, 216 '\0', "configuration", &p, &local_err); 217 if (local_err) { 218 goto done; 219 } 220 221 p = found_str; 222 223 /* The following are essentially all key/value pairs, and we treat 224 * 'id' and 'conf' a bit special. Key/value pairs may be in any order. */ 225 while (p) { 226 char *name, *value; 227 name = qemu_rbd_next_tok(RBD_MAX_CONF_NAME_SIZE, p, 228 '=', "conf option name", &p, &local_err); 229 if (local_err) { 230 break; 231 } 232 233 if (!p) { 234 error_setg(errp, "conf option %s has no value", name); 235 break; 236 } 237 238 qemu_rbd_unescape(name); 239 240 value = qemu_rbd_next_tok(RBD_MAX_CONF_VAL_SIZE, p, 241 ':', "conf option value", &p, &local_err); 242 if (local_err) { 243 break; 244 } 245 qemu_rbd_unescape(value); 246 247 if (!strcmp(name, "conf")) { 248 qdict_put(options, "conf", qstring_from_str(value)); 249 } else if (!strcmp(name, "id")) { 250 qdict_put(options, "user" , qstring_from_str(value)); 251 } else { 252 /* FIXME: This is pretty ugly, and not the right way to do this. 253 * These should be contained in a structure, and then 254 * passed explicitly as individual key/value pairs to 255 * rados. Consider this legacy code that needs to be 256 * updated. */ 257 char *tmp = g_malloc0(max_keypair_size); 258 /* only use a delimiter if it is not the first keypair found */ 259 /* These are sets of unknown key/value pairs we'll pass along 260 * to ceph */ 261 if (keypairs[0]) { 262 snprintf(tmp, max_keypair_size, ":%s=%s", name, value); 263 pstrcat(keypairs, max_keypair_size, tmp); 264 } else { 265 snprintf(keypairs, max_keypair_size, "%s=%s", name, value); 266 } 267 g_free(tmp); 268 } 269 } 270 271 if (keypairs[0]) { 272 qdict_put(options, "keyvalue-pairs", qstring_from_str(keypairs)); 273 } 274 275 276 done: 277 if (local_err) { 278 error_propagate(errp, local_err); 279 } 280 g_free(buf); 281 g_free(keypairs); 282 return; 283 } 284 285 286 static int qemu_rbd_set_auth(rados_t cluster, const char *secretid, 287 Error **errp) 288 { 289 if (secretid == 0) { 290 return 0; 291 } 292 293 gchar *secret = qcrypto_secret_lookup_as_base64(secretid, 294 errp); 295 if (!secret) { 296 return -1; 297 } 298 299 rados_conf_set(cluster, "key", secret); 300 g_free(secret); 301 302 return 0; 303 } 304 305 static int qemu_rbd_set_keypairs(rados_t cluster, const char *keypairs, 306 Error **errp) 307 { 308 char *p, *buf; 309 char *name; 310 char *value; 311 Error *local_err = NULL; 312 int ret = 0; 313 314 buf = g_strdup(keypairs); 315 p = buf; 316 317 while (p) { 318 name = qemu_rbd_next_tok(RBD_MAX_CONF_NAME_SIZE, p, 319 '=', "conf option name", &p, &local_err); 320 if (local_err) { 321 break; 322 } 323 324 if (!p) { 325 error_setg(errp, "conf option %s has no value", name); 326 ret = -EINVAL; 327 break; 328 } 329 330 value = qemu_rbd_next_tok(RBD_MAX_CONF_VAL_SIZE, p, 331 ':', "conf option value", &p, &local_err); 332 if (local_err) { 333 break; 334 } 335 336 ret = rados_conf_set(cluster, name, value); 337 if (ret < 0) { 338 error_setg_errno(errp, -ret, "invalid conf option %s", name); 339 ret = -EINVAL; 340 break; 341 } 342 } 343 344 if (local_err) { 345 error_propagate(errp, local_err); 346 ret = -EINVAL; 347 } 348 g_free(buf); 349 return ret; 350 } 351 352 static void qemu_rbd_memset(RADOSCB *rcb, int64_t offs) 353 { 354 if (LIBRBD_USE_IOVEC) { 355 RBDAIOCB *acb = rcb->acb; 356 iov_memset(acb->qiov->iov, acb->qiov->niov, offs, 0, 357 acb->qiov->size - offs); 358 } else { 359 memset(rcb->buf + offs, 0, rcb->size - offs); 360 } 361 } 362 363 static QemuOptsList runtime_opts = { 364 .name = "rbd", 365 .head = QTAILQ_HEAD_INITIALIZER(runtime_opts.head), 366 .desc = { 367 { 368 .name = "filename", 369 .type = QEMU_OPT_STRING, 370 .help = "Specification of the rbd image", 371 }, 372 { 373 .name = "password-secret", 374 .type = QEMU_OPT_STRING, 375 .help = "ID of secret providing the password", 376 }, 377 { 378 .name = "conf", 379 .type = QEMU_OPT_STRING, 380 .help = "Rados config file location", 381 }, 382 { 383 .name = "pool", 384 .type = QEMU_OPT_STRING, 385 .help = "Rados pool name", 386 }, 387 { 388 .name = "image", 389 .type = QEMU_OPT_STRING, 390 .help = "Image name in the pool", 391 }, 392 { 393 .name = "snapshot", 394 .type = QEMU_OPT_STRING, 395 .help = "Ceph snapshot name", 396 }, 397 { 398 /* maps to 'id' in rados_create() */ 399 .name = "user", 400 .type = QEMU_OPT_STRING, 401 .help = "Rados id name", 402 }, 403 { 404 .name = "keyvalue-pairs", 405 .type = QEMU_OPT_STRING, 406 .help = "Legacy rados key/value option parameters", 407 }, 408 { 409 .name = "host", 410 .type = QEMU_OPT_STRING, 411 }, 412 { 413 .name = "port", 414 .type = QEMU_OPT_STRING, 415 }, 416 { 417 .name = "auth", 418 .type = QEMU_OPT_STRING, 419 .help = "Supported authentication method, either cephx or none", 420 }, 421 { /* end of list */ } 422 }, 423 }; 424 425 static int qemu_rbd_create(const char *filename, QemuOpts *opts, Error **errp) 426 { 427 Error *local_err = NULL; 428 int64_t bytes = 0; 429 int64_t objsize; 430 int obj_order = 0; 431 const char *pool, *name, *conf, *clientname, *keypairs; 432 const char *secretid; 433 rados_t cluster; 434 rados_ioctx_t io_ctx; 435 QDict *options = NULL; 436 QemuOpts *rbd_opts = NULL; 437 int ret = 0; 438 439 secretid = qemu_opt_get(opts, "password-secret"); 440 441 /* Read out options */ 442 bytes = ROUND_UP(qemu_opt_get_size_del(opts, BLOCK_OPT_SIZE, 0), 443 BDRV_SECTOR_SIZE); 444 objsize = qemu_opt_get_size_del(opts, BLOCK_OPT_CLUSTER_SIZE, 0); 445 if (objsize) { 446 if ((objsize - 1) & objsize) { /* not a power of 2? */ 447 error_setg(errp, "obj size needs to be power of 2"); 448 ret = -EINVAL; 449 goto exit; 450 } 451 if (objsize < 4096) { 452 error_setg(errp, "obj size too small"); 453 ret = -EINVAL; 454 goto exit; 455 } 456 obj_order = ctz32(objsize); 457 } 458 459 options = qdict_new(); 460 qemu_rbd_parse_filename(filename, options, &local_err); 461 if (local_err) { 462 ret = -EINVAL; 463 error_propagate(errp, local_err); 464 goto exit; 465 } 466 467 rbd_opts = qemu_opts_create(&runtime_opts, NULL, 0, &error_abort); 468 qemu_opts_absorb_qdict(rbd_opts, options, &local_err); 469 if (local_err) { 470 error_propagate(errp, local_err); 471 ret = -EINVAL; 472 goto exit; 473 } 474 475 pool = qemu_opt_get(rbd_opts, "pool"); 476 conf = qemu_opt_get(rbd_opts, "conf"); 477 clientname = qemu_opt_get(rbd_opts, "user"); 478 name = qemu_opt_get(rbd_opts, "image"); 479 keypairs = qemu_opt_get(rbd_opts, "keyvalue-pairs"); 480 481 ret = rados_create(&cluster, clientname); 482 if (ret < 0) { 483 error_setg_errno(errp, -ret, "error initializing"); 484 goto exit; 485 } 486 487 /* try default location when conf=NULL, but ignore failure */ 488 ret = rados_conf_read_file(cluster, conf); 489 if (conf && ret < 0) { 490 error_setg_errno(errp, -ret, "error reading conf file %s", conf); 491 ret = -EIO; 492 goto shutdown; 493 } 494 495 ret = qemu_rbd_set_keypairs(cluster, keypairs, errp); 496 if (ret < 0) { 497 ret = -EIO; 498 goto shutdown; 499 } 500 501 if (qemu_rbd_set_auth(cluster, secretid, errp) < 0) { 502 ret = -EIO; 503 goto shutdown; 504 } 505 506 ret = rados_connect(cluster); 507 if (ret < 0) { 508 error_setg_errno(errp, -ret, "error connecting"); 509 goto shutdown; 510 } 511 512 ret = rados_ioctx_create(cluster, pool, &io_ctx); 513 if (ret < 0) { 514 error_setg_errno(errp, -ret, "error opening pool %s", pool); 515 goto shutdown; 516 } 517 518 ret = rbd_create(io_ctx, name, bytes, &obj_order); 519 if (ret < 0) { 520 error_setg_errno(errp, -ret, "error rbd create"); 521 } 522 523 rados_ioctx_destroy(io_ctx); 524 525 shutdown: 526 rados_shutdown(cluster); 527 528 exit: 529 QDECREF(options); 530 qemu_opts_del(rbd_opts); 531 return ret; 532 } 533 534 /* 535 * This aio completion is being called from rbd_finish_bh() and runs in qemu 536 * BH context. 537 */ 538 static void qemu_rbd_complete_aio(RADOSCB *rcb) 539 { 540 RBDAIOCB *acb = rcb->acb; 541 int64_t r; 542 543 r = rcb->ret; 544 545 if (acb->cmd != RBD_AIO_READ) { 546 if (r < 0) { 547 acb->ret = r; 548 acb->error = 1; 549 } else if (!acb->error) { 550 acb->ret = rcb->size; 551 } 552 } else { 553 if (r < 0) { 554 qemu_rbd_memset(rcb, 0); 555 acb->ret = r; 556 acb->error = 1; 557 } else if (r < rcb->size) { 558 qemu_rbd_memset(rcb, r); 559 if (!acb->error) { 560 acb->ret = rcb->size; 561 } 562 } else if (!acb->error) { 563 acb->ret = r; 564 } 565 } 566 567 g_free(rcb); 568 569 if (!LIBRBD_USE_IOVEC) { 570 if (acb->cmd == RBD_AIO_READ) { 571 qemu_iovec_from_buf(acb->qiov, 0, acb->bounce, acb->qiov->size); 572 } 573 qemu_vfree(acb->bounce); 574 } 575 576 acb->common.cb(acb->common.opaque, (acb->ret > 0 ? 0 : acb->ret)); 577 578 qemu_aio_unref(acb); 579 } 580 581 #define RBD_MON_HOST 0 582 #define RBD_AUTH_SUPPORTED 1 583 584 static char *qemu_rbd_array_opts(QDict *options, const char *prefix, int type, 585 Error **errp) 586 { 587 int num_entries; 588 QemuOpts *opts = NULL; 589 QDict *sub_options; 590 const char *host; 591 const char *port; 592 char *str; 593 char *rados_str = NULL; 594 Error *local_err = NULL; 595 int i; 596 597 assert(type == RBD_MON_HOST || type == RBD_AUTH_SUPPORTED); 598 599 num_entries = qdict_array_entries(options, prefix); 600 601 if (num_entries < 0) { 602 error_setg(errp, "Parse error on RBD QDict array"); 603 return NULL; 604 } 605 606 for (i = 0; i < num_entries; i++) { 607 char *strbuf = NULL; 608 const char *value; 609 char *rados_str_tmp; 610 611 str = g_strdup_printf("%s%d.", prefix, i); 612 qdict_extract_subqdict(options, &sub_options, str); 613 g_free(str); 614 615 opts = qemu_opts_create(&runtime_opts, NULL, 0, &error_abort); 616 qemu_opts_absorb_qdict(opts, sub_options, &local_err); 617 QDECREF(sub_options); 618 if (local_err) { 619 error_propagate(errp, local_err); 620 g_free(rados_str); 621 rados_str = NULL; 622 goto exit; 623 } 624 625 if (type == RBD_MON_HOST) { 626 host = qemu_opt_get(opts, "host"); 627 port = qemu_opt_get(opts, "port"); 628 629 value = host; 630 if (port) { 631 /* check for ipv6 */ 632 if (strchr(host, ':')) { 633 strbuf = g_strdup_printf("[%s]:%s", host, port); 634 } else { 635 strbuf = g_strdup_printf("%s:%s", host, port); 636 } 637 value = strbuf; 638 } else if (strchr(host, ':')) { 639 strbuf = g_strdup_printf("[%s]", host); 640 value = strbuf; 641 } 642 } else { 643 value = qemu_opt_get(opts, "auth"); 644 } 645 646 647 /* each iteration in the for loop will build upon the string, and if 648 * rados_str is NULL then it is our first pass */ 649 if (rados_str) { 650 /* separate options with ';', as that is what rados_conf_set() 651 * requires */ 652 rados_str_tmp = rados_str; 653 rados_str = g_strdup_printf("%s;%s", rados_str_tmp, value); 654 g_free(rados_str_tmp); 655 } else { 656 rados_str = g_strdup(value); 657 } 658 659 g_free(strbuf); 660 qemu_opts_del(opts); 661 opts = NULL; 662 } 663 664 exit: 665 qemu_opts_del(opts); 666 return rados_str; 667 } 668 669 static int qemu_rbd_open(BlockDriverState *bs, QDict *options, int flags, 670 Error **errp) 671 { 672 BDRVRBDState *s = bs->opaque; 673 const char *pool, *snap, *conf, *clientname, *name, *keypairs; 674 const char *secretid; 675 QemuOpts *opts; 676 Error *local_err = NULL; 677 char *mon_host = NULL; 678 char *auth_supported = NULL; 679 int r; 680 681 opts = qemu_opts_create(&runtime_opts, NULL, 0, &error_abort); 682 qemu_opts_absorb_qdict(opts, options, &local_err); 683 if (local_err) { 684 error_propagate(errp, local_err); 685 qemu_opts_del(opts); 686 return -EINVAL; 687 } 688 689 auth_supported = qemu_rbd_array_opts(options, "auth-supported.", 690 RBD_AUTH_SUPPORTED, &local_err); 691 if (local_err) { 692 error_propagate(errp, local_err); 693 r = -EINVAL; 694 goto failed_opts; 695 } 696 697 mon_host = qemu_rbd_array_opts(options, "server.", 698 RBD_MON_HOST, &local_err); 699 if (local_err) { 700 error_propagate(errp, local_err); 701 r = -EINVAL; 702 goto failed_opts; 703 } 704 705 secretid = qemu_opt_get(opts, "password-secret"); 706 707 pool = qemu_opt_get(opts, "pool"); 708 conf = qemu_opt_get(opts, "conf"); 709 snap = qemu_opt_get(opts, "snapshot"); 710 clientname = qemu_opt_get(opts, "user"); 711 name = qemu_opt_get(opts, "image"); 712 keypairs = qemu_opt_get(opts, "keyvalue-pairs"); 713 714 r = rados_create(&s->cluster, clientname); 715 if (r < 0) { 716 error_setg_errno(errp, -r, "error initializing"); 717 goto failed_opts; 718 } 719 720 s->snap = g_strdup(snap); 721 if (name) { 722 pstrcpy(s->name, RBD_MAX_IMAGE_NAME_SIZE, name); 723 } 724 725 /* try default location when conf=NULL, but ignore failure */ 726 r = rados_conf_read_file(s->cluster, conf); 727 if (conf && r < 0) { 728 error_setg_errno(errp, -r, "error reading conf file %s", conf); 729 goto failed_shutdown; 730 } 731 732 r = qemu_rbd_set_keypairs(s->cluster, keypairs, errp); 733 if (r < 0) { 734 goto failed_shutdown; 735 } 736 737 if (mon_host) { 738 r = rados_conf_set(s->cluster, "mon_host", mon_host); 739 if (r < 0) { 740 goto failed_shutdown; 741 } 742 } 743 744 if (auth_supported) { 745 r = rados_conf_set(s->cluster, "auth_supported", auth_supported); 746 if (r < 0) { 747 goto failed_shutdown; 748 } 749 } 750 751 if (qemu_rbd_set_auth(s->cluster, secretid, errp) < 0) { 752 r = -EIO; 753 goto failed_shutdown; 754 } 755 756 /* 757 * Fallback to more conservative semantics if setting cache 758 * options fails. Ignore errors from setting rbd_cache because the 759 * only possible error is that the option does not exist, and 760 * librbd defaults to no caching. If write through caching cannot 761 * be set up, fall back to no caching. 762 */ 763 if (flags & BDRV_O_NOCACHE) { 764 rados_conf_set(s->cluster, "rbd_cache", "false"); 765 } else { 766 rados_conf_set(s->cluster, "rbd_cache", "true"); 767 } 768 769 r = rados_connect(s->cluster); 770 if (r < 0) { 771 error_setg_errno(errp, -r, "error connecting"); 772 goto failed_shutdown; 773 } 774 775 r = rados_ioctx_create(s->cluster, pool, &s->io_ctx); 776 if (r < 0) { 777 error_setg_errno(errp, -r, "error opening pool %s", pool); 778 goto failed_shutdown; 779 } 780 781 r = rbd_open(s->io_ctx, s->name, &s->image, s->snap); 782 if (r < 0) { 783 error_setg_errno(errp, -r, "error reading header from %s", s->name); 784 goto failed_open; 785 } 786 787 bs->read_only = (s->snap != NULL); 788 789 qemu_opts_del(opts); 790 return 0; 791 792 failed_open: 793 rados_ioctx_destroy(s->io_ctx); 794 failed_shutdown: 795 rados_shutdown(s->cluster); 796 g_free(s->snap); 797 failed_opts: 798 qemu_opts_del(opts); 799 g_free(mon_host); 800 g_free(auth_supported); 801 return r; 802 } 803 804 static void qemu_rbd_close(BlockDriverState *bs) 805 { 806 BDRVRBDState *s = bs->opaque; 807 808 rbd_close(s->image); 809 rados_ioctx_destroy(s->io_ctx); 810 g_free(s->snap); 811 rados_shutdown(s->cluster); 812 } 813 814 static const AIOCBInfo rbd_aiocb_info = { 815 .aiocb_size = sizeof(RBDAIOCB), 816 }; 817 818 static void rbd_finish_bh(void *opaque) 819 { 820 RADOSCB *rcb = opaque; 821 qemu_rbd_complete_aio(rcb); 822 } 823 824 /* 825 * This is the callback function for rbd_aio_read and _write 826 * 827 * Note: this function is being called from a non qemu thread so 828 * we need to be careful about what we do here. Generally we only 829 * schedule a BH, and do the rest of the io completion handling 830 * from rbd_finish_bh() which runs in a qemu context. 831 */ 832 static void rbd_finish_aiocb(rbd_completion_t c, RADOSCB *rcb) 833 { 834 RBDAIOCB *acb = rcb->acb; 835 836 rcb->ret = rbd_aio_get_return_value(c); 837 rbd_aio_release(c); 838 839 aio_bh_schedule_oneshot(bdrv_get_aio_context(acb->common.bs), 840 rbd_finish_bh, rcb); 841 } 842 843 static int rbd_aio_discard_wrapper(rbd_image_t image, 844 uint64_t off, 845 uint64_t len, 846 rbd_completion_t comp) 847 { 848 #ifdef LIBRBD_SUPPORTS_DISCARD 849 return rbd_aio_discard(image, off, len, comp); 850 #else 851 return -ENOTSUP; 852 #endif 853 } 854 855 static int rbd_aio_flush_wrapper(rbd_image_t image, 856 rbd_completion_t comp) 857 { 858 #ifdef LIBRBD_SUPPORTS_AIO_FLUSH 859 return rbd_aio_flush(image, comp); 860 #else 861 return -ENOTSUP; 862 #endif 863 } 864 865 static BlockAIOCB *rbd_start_aio(BlockDriverState *bs, 866 int64_t off, 867 QEMUIOVector *qiov, 868 int64_t size, 869 BlockCompletionFunc *cb, 870 void *opaque, 871 RBDAIOCmd cmd) 872 { 873 RBDAIOCB *acb; 874 RADOSCB *rcb = NULL; 875 rbd_completion_t c; 876 int r; 877 878 BDRVRBDState *s = bs->opaque; 879 880 acb = qemu_aio_get(&rbd_aiocb_info, bs, cb, opaque); 881 acb->cmd = cmd; 882 acb->qiov = qiov; 883 assert(!qiov || qiov->size == size); 884 885 rcb = g_new(RADOSCB, 1); 886 887 if (!LIBRBD_USE_IOVEC) { 888 if (cmd == RBD_AIO_DISCARD || cmd == RBD_AIO_FLUSH) { 889 acb->bounce = NULL; 890 } else { 891 acb->bounce = qemu_try_blockalign(bs, qiov->size); 892 if (acb->bounce == NULL) { 893 goto failed; 894 } 895 } 896 if (cmd == RBD_AIO_WRITE) { 897 qemu_iovec_to_buf(acb->qiov, 0, acb->bounce, qiov->size); 898 } 899 rcb->buf = acb->bounce; 900 } 901 902 acb->ret = 0; 903 acb->error = 0; 904 acb->s = s; 905 906 rcb->acb = acb; 907 rcb->s = acb->s; 908 rcb->size = size; 909 r = rbd_aio_create_completion(rcb, (rbd_callback_t) rbd_finish_aiocb, &c); 910 if (r < 0) { 911 goto failed; 912 } 913 914 switch (cmd) { 915 case RBD_AIO_WRITE: 916 #ifdef LIBRBD_SUPPORTS_IOVEC 917 r = rbd_aio_writev(s->image, qiov->iov, qiov->niov, off, c); 918 #else 919 r = rbd_aio_write(s->image, off, size, rcb->buf, c); 920 #endif 921 break; 922 case RBD_AIO_READ: 923 #ifdef LIBRBD_SUPPORTS_IOVEC 924 r = rbd_aio_readv(s->image, qiov->iov, qiov->niov, off, c); 925 #else 926 r = rbd_aio_read(s->image, off, size, rcb->buf, c); 927 #endif 928 break; 929 case RBD_AIO_DISCARD: 930 r = rbd_aio_discard_wrapper(s->image, off, size, c); 931 break; 932 case RBD_AIO_FLUSH: 933 r = rbd_aio_flush_wrapper(s->image, c); 934 break; 935 default: 936 r = -EINVAL; 937 } 938 939 if (r < 0) { 940 goto failed_completion; 941 } 942 return &acb->common; 943 944 failed_completion: 945 rbd_aio_release(c); 946 failed: 947 g_free(rcb); 948 if (!LIBRBD_USE_IOVEC) { 949 qemu_vfree(acb->bounce); 950 } 951 952 qemu_aio_unref(acb); 953 return NULL; 954 } 955 956 static BlockAIOCB *qemu_rbd_aio_readv(BlockDriverState *bs, 957 int64_t sector_num, 958 QEMUIOVector *qiov, 959 int nb_sectors, 960 BlockCompletionFunc *cb, 961 void *opaque) 962 { 963 return rbd_start_aio(bs, sector_num << BDRV_SECTOR_BITS, qiov, 964 (int64_t) nb_sectors << BDRV_SECTOR_BITS, cb, opaque, 965 RBD_AIO_READ); 966 } 967 968 static BlockAIOCB *qemu_rbd_aio_writev(BlockDriverState *bs, 969 int64_t sector_num, 970 QEMUIOVector *qiov, 971 int nb_sectors, 972 BlockCompletionFunc *cb, 973 void *opaque) 974 { 975 return rbd_start_aio(bs, sector_num << BDRV_SECTOR_BITS, qiov, 976 (int64_t) nb_sectors << BDRV_SECTOR_BITS, cb, opaque, 977 RBD_AIO_WRITE); 978 } 979 980 #ifdef LIBRBD_SUPPORTS_AIO_FLUSH 981 static BlockAIOCB *qemu_rbd_aio_flush(BlockDriverState *bs, 982 BlockCompletionFunc *cb, 983 void *opaque) 984 { 985 return rbd_start_aio(bs, 0, NULL, 0, cb, opaque, RBD_AIO_FLUSH); 986 } 987 988 #else 989 990 static int qemu_rbd_co_flush(BlockDriverState *bs) 991 { 992 #if LIBRBD_VERSION_CODE >= LIBRBD_VERSION(0, 1, 1) 993 /* rbd_flush added in 0.1.1 */ 994 BDRVRBDState *s = bs->opaque; 995 return rbd_flush(s->image); 996 #else 997 return 0; 998 #endif 999 } 1000 #endif 1001 1002 static int qemu_rbd_getinfo(BlockDriverState *bs, BlockDriverInfo *bdi) 1003 { 1004 BDRVRBDState *s = bs->opaque; 1005 rbd_image_info_t info; 1006 int r; 1007 1008 r = rbd_stat(s->image, &info, sizeof(info)); 1009 if (r < 0) { 1010 return r; 1011 } 1012 1013 bdi->cluster_size = info.obj_size; 1014 return 0; 1015 } 1016 1017 static int64_t qemu_rbd_getlength(BlockDriverState *bs) 1018 { 1019 BDRVRBDState *s = bs->opaque; 1020 rbd_image_info_t info; 1021 int r; 1022 1023 r = rbd_stat(s->image, &info, sizeof(info)); 1024 if (r < 0) { 1025 return r; 1026 } 1027 1028 return info.size; 1029 } 1030 1031 static int qemu_rbd_truncate(BlockDriverState *bs, int64_t offset) 1032 { 1033 BDRVRBDState *s = bs->opaque; 1034 int r; 1035 1036 r = rbd_resize(s->image, offset); 1037 if (r < 0) { 1038 return r; 1039 } 1040 1041 return 0; 1042 } 1043 1044 static int qemu_rbd_snap_create(BlockDriverState *bs, 1045 QEMUSnapshotInfo *sn_info) 1046 { 1047 BDRVRBDState *s = bs->opaque; 1048 int r; 1049 1050 if (sn_info->name[0] == '\0') { 1051 return -EINVAL; /* we need a name for rbd snapshots */ 1052 } 1053 1054 /* 1055 * rbd snapshots are using the name as the user controlled unique identifier 1056 * we can't use the rbd snapid for that purpose, as it can't be set 1057 */ 1058 if (sn_info->id_str[0] != '\0' && 1059 strcmp(sn_info->id_str, sn_info->name) != 0) { 1060 return -EINVAL; 1061 } 1062 1063 if (strlen(sn_info->name) >= sizeof(sn_info->id_str)) { 1064 return -ERANGE; 1065 } 1066 1067 r = rbd_snap_create(s->image, sn_info->name); 1068 if (r < 0) { 1069 error_report("failed to create snap: %s", strerror(-r)); 1070 return r; 1071 } 1072 1073 return 0; 1074 } 1075 1076 static int qemu_rbd_snap_remove(BlockDriverState *bs, 1077 const char *snapshot_id, 1078 const char *snapshot_name, 1079 Error **errp) 1080 { 1081 BDRVRBDState *s = bs->opaque; 1082 int r; 1083 1084 if (!snapshot_name) { 1085 error_setg(errp, "rbd need a valid snapshot name"); 1086 return -EINVAL; 1087 } 1088 1089 /* If snapshot_id is specified, it must be equal to name, see 1090 qemu_rbd_snap_list() */ 1091 if (snapshot_id && strcmp(snapshot_id, snapshot_name)) { 1092 error_setg(errp, 1093 "rbd do not support snapshot id, it should be NULL or " 1094 "equal to snapshot name"); 1095 return -EINVAL; 1096 } 1097 1098 r = rbd_snap_remove(s->image, snapshot_name); 1099 if (r < 0) { 1100 error_setg_errno(errp, -r, "Failed to remove the snapshot"); 1101 } 1102 return r; 1103 } 1104 1105 static int qemu_rbd_snap_rollback(BlockDriverState *bs, 1106 const char *snapshot_name) 1107 { 1108 BDRVRBDState *s = bs->opaque; 1109 1110 return rbd_snap_rollback(s->image, snapshot_name); 1111 } 1112 1113 static int qemu_rbd_snap_list(BlockDriverState *bs, 1114 QEMUSnapshotInfo **psn_tab) 1115 { 1116 BDRVRBDState *s = bs->opaque; 1117 QEMUSnapshotInfo *sn_info, *sn_tab = NULL; 1118 int i, snap_count; 1119 rbd_snap_info_t *snaps; 1120 int max_snaps = RBD_MAX_SNAPS; 1121 1122 do { 1123 snaps = g_new(rbd_snap_info_t, max_snaps); 1124 snap_count = rbd_snap_list(s->image, snaps, &max_snaps); 1125 if (snap_count <= 0) { 1126 g_free(snaps); 1127 } 1128 } while (snap_count == -ERANGE); 1129 1130 if (snap_count <= 0) { 1131 goto done; 1132 } 1133 1134 sn_tab = g_new0(QEMUSnapshotInfo, snap_count); 1135 1136 for (i = 0; i < snap_count; i++) { 1137 const char *snap_name = snaps[i].name; 1138 1139 sn_info = sn_tab + i; 1140 pstrcpy(sn_info->id_str, sizeof(sn_info->id_str), snap_name); 1141 pstrcpy(sn_info->name, sizeof(sn_info->name), snap_name); 1142 1143 sn_info->vm_state_size = snaps[i].size; 1144 sn_info->date_sec = 0; 1145 sn_info->date_nsec = 0; 1146 sn_info->vm_clock_nsec = 0; 1147 } 1148 rbd_snap_list_end(snaps); 1149 g_free(snaps); 1150 1151 done: 1152 *psn_tab = sn_tab; 1153 return snap_count; 1154 } 1155 1156 #ifdef LIBRBD_SUPPORTS_DISCARD 1157 static BlockAIOCB *qemu_rbd_aio_pdiscard(BlockDriverState *bs, 1158 int64_t offset, 1159 int count, 1160 BlockCompletionFunc *cb, 1161 void *opaque) 1162 { 1163 return rbd_start_aio(bs, offset, NULL, count, cb, opaque, 1164 RBD_AIO_DISCARD); 1165 } 1166 #endif 1167 1168 #ifdef LIBRBD_SUPPORTS_INVALIDATE 1169 static void qemu_rbd_invalidate_cache(BlockDriverState *bs, 1170 Error **errp) 1171 { 1172 BDRVRBDState *s = bs->opaque; 1173 int r = rbd_invalidate_cache(s->image); 1174 if (r < 0) { 1175 error_setg_errno(errp, -r, "Failed to invalidate the cache"); 1176 } 1177 } 1178 #endif 1179 1180 static QemuOptsList qemu_rbd_create_opts = { 1181 .name = "rbd-create-opts", 1182 .head = QTAILQ_HEAD_INITIALIZER(qemu_rbd_create_opts.head), 1183 .desc = { 1184 { 1185 .name = BLOCK_OPT_SIZE, 1186 .type = QEMU_OPT_SIZE, 1187 .help = "Virtual disk size" 1188 }, 1189 { 1190 .name = BLOCK_OPT_CLUSTER_SIZE, 1191 .type = QEMU_OPT_SIZE, 1192 .help = "RBD object size" 1193 }, 1194 { 1195 .name = "password-secret", 1196 .type = QEMU_OPT_STRING, 1197 .help = "ID of secret providing the password", 1198 }, 1199 { /* end of list */ } 1200 } 1201 }; 1202 1203 static BlockDriver bdrv_rbd = { 1204 .format_name = "rbd", 1205 .instance_size = sizeof(BDRVRBDState), 1206 .bdrv_parse_filename = qemu_rbd_parse_filename, 1207 .bdrv_file_open = qemu_rbd_open, 1208 .bdrv_close = qemu_rbd_close, 1209 .bdrv_create = qemu_rbd_create, 1210 .bdrv_has_zero_init = bdrv_has_zero_init_1, 1211 .bdrv_get_info = qemu_rbd_getinfo, 1212 .create_opts = &qemu_rbd_create_opts, 1213 .bdrv_getlength = qemu_rbd_getlength, 1214 .bdrv_truncate = qemu_rbd_truncate, 1215 .protocol_name = "rbd", 1216 1217 .bdrv_aio_readv = qemu_rbd_aio_readv, 1218 .bdrv_aio_writev = qemu_rbd_aio_writev, 1219 1220 #ifdef LIBRBD_SUPPORTS_AIO_FLUSH 1221 .bdrv_aio_flush = qemu_rbd_aio_flush, 1222 #else 1223 .bdrv_co_flush_to_disk = qemu_rbd_co_flush, 1224 #endif 1225 1226 #ifdef LIBRBD_SUPPORTS_DISCARD 1227 .bdrv_aio_pdiscard = qemu_rbd_aio_pdiscard, 1228 #endif 1229 1230 .bdrv_snapshot_create = qemu_rbd_snap_create, 1231 .bdrv_snapshot_delete = qemu_rbd_snap_remove, 1232 .bdrv_snapshot_list = qemu_rbd_snap_list, 1233 .bdrv_snapshot_goto = qemu_rbd_snap_rollback, 1234 #ifdef LIBRBD_SUPPORTS_INVALIDATE 1235 .bdrv_invalidate_cache = qemu_rbd_invalidate_cache, 1236 #endif 1237 }; 1238 1239 static void bdrv_rbd_init(void) 1240 { 1241 bdrv_register(&bdrv_rbd); 1242 } 1243 1244 block_init(bdrv_rbd_init); 1245