1 /* 2 * QEMU Block driver for RADOS (Ceph) 3 * 4 * Copyright (C) 2010-2011 Christian Brunner <chb@muc.de>, 5 * Josh Durgin <josh.durgin@dreamhost.com> 6 * 7 * This work is licensed under the terms of the GNU GPL, version 2. See 8 * the COPYING file in the top-level directory. 9 * 10 * Contributions after 2012-01-13 are licensed under the terms of the 11 * GNU GPL, version 2 or (at your option) any later version. 12 */ 13 14 #include "qemu/osdep.h" 15 16 #include <rbd/librbd.h> 17 #include "qapi/error.h" 18 #include "qemu/error-report.h" 19 #include "qemu/module.h" 20 #include "qemu/option.h" 21 #include "block/block_int.h" 22 #include "block/qdict.h" 23 #include "crypto/secret.h" 24 #include "qemu/cutils.h" 25 #include "sysemu/replay.h" 26 #include "qapi/qmp/qstring.h" 27 #include "qapi/qmp/qdict.h" 28 #include "qapi/qmp/qjson.h" 29 #include "qapi/qmp/qlist.h" 30 #include "qapi/qobject-input-visitor.h" 31 #include "qapi/qapi-visit-block-core.h" 32 33 /* 34 * When specifying the image filename use: 35 * 36 * rbd:poolname/devicename[@snapshotname][:option1=value1[:option2=value2...]] 37 * 38 * poolname must be the name of an existing rados pool. 39 * 40 * devicename is the name of the rbd image. 41 * 42 * Each option given is used to configure rados, and may be any valid 43 * Ceph option, "id", or "conf". 44 * 45 * The "id" option indicates what user we should authenticate as to 46 * the Ceph cluster. If it is excluded we will use the Ceph default 47 * (normally 'admin'). 48 * 49 * The "conf" option specifies a Ceph configuration file to read. If 50 * it is not specified, we will read from the default Ceph locations 51 * (e.g., /etc/ceph/ceph.conf). To avoid reading _any_ configuration 52 * file, specify conf=/dev/null. 53 * 54 * Configuration values containing :, @, or = can be escaped with a 55 * leading "\". 56 */ 57 58 #define OBJ_MAX_SIZE (1UL << OBJ_DEFAULT_OBJ_ORDER) 59 60 #define RBD_MAX_SNAPS 100 61 62 #define RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN 8 63 64 static const char rbd_luks_header_verification[ 65 RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN] = { 66 'L', 'U', 'K', 'S', 0xBA, 0xBE, 0, 1 67 }; 68 69 static const char rbd_luks2_header_verification[ 70 RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN] = { 71 'L', 'U', 'K', 'S', 0xBA, 0xBE, 0, 2 72 }; 73 74 typedef enum { 75 RBD_AIO_READ, 76 RBD_AIO_WRITE, 77 RBD_AIO_DISCARD, 78 RBD_AIO_FLUSH, 79 RBD_AIO_WRITE_ZEROES 80 } RBDAIOCmd; 81 82 typedef struct BDRVRBDState { 83 rados_t cluster; 84 rados_ioctx_t io_ctx; 85 rbd_image_t image; 86 char *image_name; 87 char *snap; 88 char *namespace; 89 uint64_t image_size; 90 uint64_t object_size; 91 } BDRVRBDState; 92 93 typedef struct RBDTask { 94 BlockDriverState *bs; 95 Coroutine *co; 96 bool complete; 97 int64_t ret; 98 } RBDTask; 99 100 typedef struct RBDDiffIterateReq { 101 uint64_t offs; 102 uint64_t bytes; 103 bool exists; 104 } RBDDiffIterateReq; 105 106 static int qemu_rbd_connect(rados_t *cluster, rados_ioctx_t *io_ctx, 107 BlockdevOptionsRbd *opts, bool cache, 108 const char *keypairs, const char *secretid, 109 Error **errp); 110 111 static char *qemu_rbd_strchr(char *src, char delim) 112 { 113 char *p; 114 115 for (p = src; *p; ++p) { 116 if (*p == delim) { 117 return p; 118 } 119 if (*p == '\\' && p[1] != '\0') { 120 ++p; 121 } 122 } 123 124 return NULL; 125 } 126 127 128 static char *qemu_rbd_next_tok(char *src, char delim, char **p) 129 { 130 char *end; 131 132 *p = NULL; 133 134 end = qemu_rbd_strchr(src, delim); 135 if (end) { 136 *p = end + 1; 137 *end = '\0'; 138 } 139 return src; 140 } 141 142 static void qemu_rbd_unescape(char *src) 143 { 144 char *p; 145 146 for (p = src; *src; ++src, ++p) { 147 if (*src == '\\' && src[1] != '\0') { 148 src++; 149 } 150 *p = *src; 151 } 152 *p = '\0'; 153 } 154 155 static void qemu_rbd_parse_filename(const char *filename, QDict *options, 156 Error **errp) 157 { 158 const char *start; 159 char *p, *buf; 160 QList *keypairs = NULL; 161 char *found_str, *image_name; 162 163 if (!strstart(filename, "rbd:", &start)) { 164 error_setg(errp, "File name must start with 'rbd:'"); 165 return; 166 } 167 168 buf = g_strdup(start); 169 p = buf; 170 171 found_str = qemu_rbd_next_tok(p, '/', &p); 172 if (!p) { 173 error_setg(errp, "Pool name is required"); 174 goto done; 175 } 176 qemu_rbd_unescape(found_str); 177 qdict_put_str(options, "pool", found_str); 178 179 if (qemu_rbd_strchr(p, '@')) { 180 image_name = qemu_rbd_next_tok(p, '@', &p); 181 182 found_str = qemu_rbd_next_tok(p, ':', &p); 183 qemu_rbd_unescape(found_str); 184 qdict_put_str(options, "snapshot", found_str); 185 } else { 186 image_name = qemu_rbd_next_tok(p, ':', &p); 187 } 188 /* Check for namespace in the image_name */ 189 if (qemu_rbd_strchr(image_name, '/')) { 190 found_str = qemu_rbd_next_tok(image_name, '/', &image_name); 191 qemu_rbd_unescape(found_str); 192 qdict_put_str(options, "namespace", found_str); 193 } else { 194 qdict_put_str(options, "namespace", ""); 195 } 196 qemu_rbd_unescape(image_name); 197 qdict_put_str(options, "image", image_name); 198 if (!p) { 199 goto done; 200 } 201 202 /* The following are essentially all key/value pairs, and we treat 203 * 'id' and 'conf' a bit special. Key/value pairs may be in any order. */ 204 while (p) { 205 char *name, *value; 206 name = qemu_rbd_next_tok(p, '=', &p); 207 if (!p) { 208 error_setg(errp, "conf option %s has no value", name); 209 break; 210 } 211 212 qemu_rbd_unescape(name); 213 214 value = qemu_rbd_next_tok(p, ':', &p); 215 qemu_rbd_unescape(value); 216 217 if (!strcmp(name, "conf")) { 218 qdict_put_str(options, "conf", value); 219 } else if (!strcmp(name, "id")) { 220 qdict_put_str(options, "user", value); 221 } else { 222 /* 223 * We pass these internally to qemu_rbd_set_keypairs(), so 224 * we can get away with the simpler list of [ "key1", 225 * "value1", "key2", "value2" ] rather than a raw dict 226 * { "key1": "value1", "key2": "value2" } where we can't 227 * guarantee order, or even a more correct but complex 228 * [ { "key1": "value1" }, { "key2": "value2" } ] 229 */ 230 if (!keypairs) { 231 keypairs = qlist_new(); 232 } 233 qlist_append_str(keypairs, name); 234 qlist_append_str(keypairs, value); 235 } 236 } 237 238 if (keypairs) { 239 qdict_put(options, "=keyvalue-pairs", 240 qstring_from_gstring(qobject_to_json(QOBJECT(keypairs)))); 241 } 242 243 done: 244 g_free(buf); 245 qobject_unref(keypairs); 246 return; 247 } 248 249 static int qemu_rbd_set_auth(rados_t cluster, BlockdevOptionsRbd *opts, 250 Error **errp) 251 { 252 char *key, *acr; 253 int r; 254 GString *accu; 255 RbdAuthModeList *auth; 256 257 if (opts->key_secret) { 258 key = qcrypto_secret_lookup_as_base64(opts->key_secret, errp); 259 if (!key) { 260 return -EIO; 261 } 262 r = rados_conf_set(cluster, "key", key); 263 g_free(key); 264 if (r < 0) { 265 error_setg_errno(errp, -r, "Could not set 'key'"); 266 return r; 267 } 268 } 269 270 if (opts->has_auth_client_required) { 271 accu = g_string_new(""); 272 for (auth = opts->auth_client_required; auth; auth = auth->next) { 273 if (accu->str[0]) { 274 g_string_append_c(accu, ';'); 275 } 276 g_string_append(accu, RbdAuthMode_str(auth->value)); 277 } 278 acr = g_string_free(accu, FALSE); 279 r = rados_conf_set(cluster, "auth_client_required", acr); 280 g_free(acr); 281 if (r < 0) { 282 error_setg_errno(errp, -r, 283 "Could not set 'auth_client_required'"); 284 return r; 285 } 286 } 287 288 return 0; 289 } 290 291 static int qemu_rbd_set_keypairs(rados_t cluster, const char *keypairs_json, 292 Error **errp) 293 { 294 QList *keypairs; 295 QString *name; 296 QString *value; 297 const char *key; 298 size_t remaining; 299 int ret = 0; 300 301 if (!keypairs_json) { 302 return ret; 303 } 304 keypairs = qobject_to(QList, 305 qobject_from_json(keypairs_json, &error_abort)); 306 remaining = qlist_size(keypairs) / 2; 307 assert(remaining); 308 309 while (remaining--) { 310 name = qobject_to(QString, qlist_pop(keypairs)); 311 value = qobject_to(QString, qlist_pop(keypairs)); 312 assert(name && value); 313 key = qstring_get_str(name); 314 315 ret = rados_conf_set(cluster, key, qstring_get_str(value)); 316 qobject_unref(value); 317 if (ret < 0) { 318 error_setg_errno(errp, -ret, "invalid conf option %s", key); 319 qobject_unref(name); 320 ret = -EINVAL; 321 break; 322 } 323 qobject_unref(name); 324 } 325 326 qobject_unref(keypairs); 327 return ret; 328 } 329 330 #ifdef LIBRBD_SUPPORTS_ENCRYPTION 331 static int qemu_rbd_convert_luks_options( 332 RbdEncryptionOptionsLUKSBase *luks_opts, 333 char **passphrase, 334 size_t *passphrase_len, 335 Error **errp) 336 { 337 return qcrypto_secret_lookup(luks_opts->key_secret, (uint8_t **)passphrase, 338 passphrase_len, errp); 339 } 340 341 static int qemu_rbd_convert_luks_create_options( 342 RbdEncryptionCreateOptionsLUKSBase *luks_opts, 343 rbd_encryption_algorithm_t *alg, 344 char **passphrase, 345 size_t *passphrase_len, 346 Error **errp) 347 { 348 int r = 0; 349 350 r = qemu_rbd_convert_luks_options( 351 qapi_RbdEncryptionCreateOptionsLUKSBase_base(luks_opts), 352 passphrase, passphrase_len, errp); 353 if (r < 0) { 354 return r; 355 } 356 357 if (luks_opts->has_cipher_alg) { 358 switch (luks_opts->cipher_alg) { 359 case QCRYPTO_CIPHER_ALG_AES_128: { 360 *alg = RBD_ENCRYPTION_ALGORITHM_AES128; 361 break; 362 } 363 case QCRYPTO_CIPHER_ALG_AES_256: { 364 *alg = RBD_ENCRYPTION_ALGORITHM_AES256; 365 break; 366 } 367 default: { 368 r = -ENOTSUP; 369 error_setg_errno(errp, -r, "unknown encryption algorithm: %u", 370 luks_opts->cipher_alg); 371 return r; 372 } 373 } 374 } else { 375 /* default alg */ 376 *alg = RBD_ENCRYPTION_ALGORITHM_AES256; 377 } 378 379 return 0; 380 } 381 382 static int qemu_rbd_encryption_format(rbd_image_t image, 383 RbdEncryptionCreateOptions *encrypt, 384 Error **errp) 385 { 386 int r = 0; 387 g_autofree char *passphrase = NULL; 388 size_t passphrase_len; 389 rbd_encryption_format_t format; 390 rbd_encryption_options_t opts; 391 rbd_encryption_luks1_format_options_t luks_opts; 392 rbd_encryption_luks2_format_options_t luks2_opts; 393 size_t opts_size; 394 uint64_t raw_size, effective_size; 395 396 r = rbd_get_size(image, &raw_size); 397 if (r < 0) { 398 error_setg_errno(errp, -r, "cannot get raw image size"); 399 return r; 400 } 401 402 switch (encrypt->format) { 403 case RBD_IMAGE_ENCRYPTION_FORMAT_LUKS: { 404 memset(&luks_opts, 0, sizeof(luks_opts)); 405 format = RBD_ENCRYPTION_FORMAT_LUKS1; 406 opts = &luks_opts; 407 opts_size = sizeof(luks_opts); 408 r = qemu_rbd_convert_luks_create_options( 409 qapi_RbdEncryptionCreateOptionsLUKS_base(&encrypt->u.luks), 410 &luks_opts.alg, &passphrase, &passphrase_len, errp); 411 if (r < 0) { 412 return r; 413 } 414 luks_opts.passphrase = passphrase; 415 luks_opts.passphrase_size = passphrase_len; 416 break; 417 } 418 case RBD_IMAGE_ENCRYPTION_FORMAT_LUKS2: { 419 memset(&luks2_opts, 0, sizeof(luks2_opts)); 420 format = RBD_ENCRYPTION_FORMAT_LUKS2; 421 opts = &luks2_opts; 422 opts_size = sizeof(luks2_opts); 423 r = qemu_rbd_convert_luks_create_options( 424 qapi_RbdEncryptionCreateOptionsLUKS2_base( 425 &encrypt->u.luks2), 426 &luks2_opts.alg, &passphrase, &passphrase_len, errp); 427 if (r < 0) { 428 return r; 429 } 430 luks2_opts.passphrase = passphrase; 431 luks2_opts.passphrase_size = passphrase_len; 432 break; 433 } 434 default: { 435 r = -ENOTSUP; 436 error_setg_errno( 437 errp, -r, "unknown image encryption format: %u", 438 encrypt->format); 439 return r; 440 } 441 } 442 443 r = rbd_encryption_format(image, format, opts, opts_size); 444 if (r < 0) { 445 error_setg_errno(errp, -r, "encryption format fail"); 446 return r; 447 } 448 449 r = rbd_get_size(image, &effective_size); 450 if (r < 0) { 451 error_setg_errno(errp, -r, "cannot get effective image size"); 452 return r; 453 } 454 455 r = rbd_resize(image, raw_size + (raw_size - effective_size)); 456 if (r < 0) { 457 error_setg_errno(errp, -r, "cannot resize image after format"); 458 return r; 459 } 460 461 return 0; 462 } 463 464 static int qemu_rbd_encryption_load(rbd_image_t image, 465 RbdEncryptionOptions *encrypt, 466 Error **errp) 467 { 468 int r = 0; 469 g_autofree char *passphrase = NULL; 470 size_t passphrase_len; 471 rbd_encryption_luks1_format_options_t luks_opts; 472 rbd_encryption_luks2_format_options_t luks2_opts; 473 rbd_encryption_format_t format; 474 rbd_encryption_options_t opts; 475 size_t opts_size; 476 477 switch (encrypt->format) { 478 case RBD_IMAGE_ENCRYPTION_FORMAT_LUKS: { 479 memset(&luks_opts, 0, sizeof(luks_opts)); 480 format = RBD_ENCRYPTION_FORMAT_LUKS1; 481 opts = &luks_opts; 482 opts_size = sizeof(luks_opts); 483 r = qemu_rbd_convert_luks_options( 484 qapi_RbdEncryptionOptionsLUKS_base(&encrypt->u.luks), 485 &passphrase, &passphrase_len, errp); 486 if (r < 0) { 487 return r; 488 } 489 luks_opts.passphrase = passphrase; 490 luks_opts.passphrase_size = passphrase_len; 491 break; 492 } 493 case RBD_IMAGE_ENCRYPTION_FORMAT_LUKS2: { 494 memset(&luks2_opts, 0, sizeof(luks2_opts)); 495 format = RBD_ENCRYPTION_FORMAT_LUKS2; 496 opts = &luks2_opts; 497 opts_size = sizeof(luks2_opts); 498 r = qemu_rbd_convert_luks_options( 499 qapi_RbdEncryptionOptionsLUKS2_base(&encrypt->u.luks2), 500 &passphrase, &passphrase_len, errp); 501 if (r < 0) { 502 return r; 503 } 504 luks2_opts.passphrase = passphrase; 505 luks2_opts.passphrase_size = passphrase_len; 506 break; 507 } 508 default: { 509 r = -ENOTSUP; 510 error_setg_errno( 511 errp, -r, "unknown image encryption format: %u", 512 encrypt->format); 513 return r; 514 } 515 } 516 517 r = rbd_encryption_load(image, format, opts, opts_size); 518 if (r < 0) { 519 error_setg_errno(errp, -r, "encryption load fail"); 520 return r; 521 } 522 523 return 0; 524 } 525 #endif 526 527 /* FIXME Deprecate and remove keypairs or make it available in QMP. */ 528 static int qemu_rbd_do_create(BlockdevCreateOptions *options, 529 const char *keypairs, const char *password_secret, 530 Error **errp) 531 { 532 BlockdevCreateOptionsRbd *opts = &options->u.rbd; 533 rados_t cluster; 534 rados_ioctx_t io_ctx; 535 int obj_order = 0; 536 int ret; 537 538 assert(options->driver == BLOCKDEV_DRIVER_RBD); 539 if (opts->location->has_snapshot) { 540 error_setg(errp, "Can't use snapshot name for image creation"); 541 return -EINVAL; 542 } 543 544 #ifndef LIBRBD_SUPPORTS_ENCRYPTION 545 if (opts->has_encrypt) { 546 error_setg(errp, "RBD library does not support image encryption"); 547 return -ENOTSUP; 548 } 549 #endif 550 551 if (opts->has_cluster_size) { 552 int64_t objsize = opts->cluster_size; 553 if ((objsize - 1) & objsize) { /* not a power of 2? */ 554 error_setg(errp, "obj size needs to be power of 2"); 555 return -EINVAL; 556 } 557 if (objsize < 4096) { 558 error_setg(errp, "obj size too small"); 559 return -EINVAL; 560 } 561 obj_order = ctz32(objsize); 562 } 563 564 ret = qemu_rbd_connect(&cluster, &io_ctx, opts->location, false, keypairs, 565 password_secret, errp); 566 if (ret < 0) { 567 return ret; 568 } 569 570 ret = rbd_create(io_ctx, opts->location->image, opts->size, &obj_order); 571 if (ret < 0) { 572 error_setg_errno(errp, -ret, "error rbd create"); 573 goto out; 574 } 575 576 #ifdef LIBRBD_SUPPORTS_ENCRYPTION 577 if (opts->has_encrypt) { 578 rbd_image_t image; 579 580 ret = rbd_open(io_ctx, opts->location->image, &image, NULL); 581 if (ret < 0) { 582 error_setg_errno(errp, -ret, 583 "error opening image '%s' for encryption format", 584 opts->location->image); 585 goto out; 586 } 587 588 ret = qemu_rbd_encryption_format(image, opts->encrypt, errp); 589 rbd_close(image); 590 if (ret < 0) { 591 /* encryption format fail, try removing the image */ 592 rbd_remove(io_ctx, opts->location->image); 593 goto out; 594 } 595 } 596 #endif 597 598 ret = 0; 599 out: 600 rados_ioctx_destroy(io_ctx); 601 rados_shutdown(cluster); 602 return ret; 603 } 604 605 static int qemu_rbd_co_create(BlockdevCreateOptions *options, Error **errp) 606 { 607 return qemu_rbd_do_create(options, NULL, NULL, errp); 608 } 609 610 static int qemu_rbd_extract_encryption_create_options( 611 QemuOpts *opts, 612 RbdEncryptionCreateOptions **spec, 613 Error **errp) 614 { 615 QDict *opts_qdict; 616 QDict *encrypt_qdict; 617 Visitor *v; 618 int ret = 0; 619 620 opts_qdict = qemu_opts_to_qdict(opts, NULL); 621 qdict_extract_subqdict(opts_qdict, &encrypt_qdict, "encrypt."); 622 qobject_unref(opts_qdict); 623 if (!qdict_size(encrypt_qdict)) { 624 *spec = NULL; 625 goto exit; 626 } 627 628 /* Convert options into a QAPI object */ 629 v = qobject_input_visitor_new_flat_confused(encrypt_qdict, errp); 630 if (!v) { 631 ret = -EINVAL; 632 goto exit; 633 } 634 635 visit_type_RbdEncryptionCreateOptions(v, NULL, spec, errp); 636 visit_free(v); 637 if (!*spec) { 638 ret = -EINVAL; 639 goto exit; 640 } 641 642 exit: 643 qobject_unref(encrypt_qdict); 644 return ret; 645 } 646 647 static int coroutine_fn qemu_rbd_co_create_opts(BlockDriver *drv, 648 const char *filename, 649 QemuOpts *opts, 650 Error **errp) 651 { 652 BlockdevCreateOptions *create_options; 653 BlockdevCreateOptionsRbd *rbd_opts; 654 BlockdevOptionsRbd *loc; 655 RbdEncryptionCreateOptions *encrypt = NULL; 656 Error *local_err = NULL; 657 const char *keypairs, *password_secret; 658 QDict *options = NULL; 659 int ret = 0; 660 661 create_options = g_new0(BlockdevCreateOptions, 1); 662 create_options->driver = BLOCKDEV_DRIVER_RBD; 663 rbd_opts = &create_options->u.rbd; 664 665 rbd_opts->location = g_new0(BlockdevOptionsRbd, 1); 666 667 password_secret = qemu_opt_get(opts, "password-secret"); 668 669 /* Read out options */ 670 rbd_opts->size = ROUND_UP(qemu_opt_get_size_del(opts, BLOCK_OPT_SIZE, 0), 671 BDRV_SECTOR_SIZE); 672 rbd_opts->cluster_size = qemu_opt_get_size_del(opts, 673 BLOCK_OPT_CLUSTER_SIZE, 0); 674 rbd_opts->has_cluster_size = (rbd_opts->cluster_size != 0); 675 676 options = qdict_new(); 677 qemu_rbd_parse_filename(filename, options, &local_err); 678 if (local_err) { 679 ret = -EINVAL; 680 error_propagate(errp, local_err); 681 goto exit; 682 } 683 684 ret = qemu_rbd_extract_encryption_create_options(opts, &encrypt, errp); 685 if (ret < 0) { 686 goto exit; 687 } 688 rbd_opts->encrypt = encrypt; 689 rbd_opts->has_encrypt = !!encrypt; 690 691 /* 692 * Caution: while qdict_get_try_str() is fine, getting non-string 693 * types would require more care. When @options come from -blockdev 694 * or blockdev_add, its members are typed according to the QAPI 695 * schema, but when they come from -drive, they're all QString. 696 */ 697 loc = rbd_opts->location; 698 loc->pool = g_strdup(qdict_get_try_str(options, "pool")); 699 loc->conf = g_strdup(qdict_get_try_str(options, "conf")); 700 loc->has_conf = !!loc->conf; 701 loc->user = g_strdup(qdict_get_try_str(options, "user")); 702 loc->has_user = !!loc->user; 703 loc->q_namespace = g_strdup(qdict_get_try_str(options, "namespace")); 704 loc->has_q_namespace = !!loc->q_namespace; 705 loc->image = g_strdup(qdict_get_try_str(options, "image")); 706 keypairs = qdict_get_try_str(options, "=keyvalue-pairs"); 707 708 ret = qemu_rbd_do_create(create_options, keypairs, password_secret, errp); 709 if (ret < 0) { 710 goto exit; 711 } 712 713 exit: 714 qobject_unref(options); 715 qapi_free_BlockdevCreateOptions(create_options); 716 return ret; 717 } 718 719 static char *qemu_rbd_mon_host(BlockdevOptionsRbd *opts, Error **errp) 720 { 721 const char **vals; 722 const char *host, *port; 723 char *rados_str; 724 InetSocketAddressBaseList *p; 725 int i, cnt; 726 727 if (!opts->has_server) { 728 return NULL; 729 } 730 731 for (cnt = 0, p = opts->server; p; p = p->next) { 732 cnt++; 733 } 734 735 vals = g_new(const char *, cnt + 1); 736 737 for (i = 0, p = opts->server; p; p = p->next, i++) { 738 host = p->value->host; 739 port = p->value->port; 740 741 if (strchr(host, ':')) { 742 vals[i] = g_strdup_printf("[%s]:%s", host, port); 743 } else { 744 vals[i] = g_strdup_printf("%s:%s", host, port); 745 } 746 } 747 vals[i] = NULL; 748 749 rados_str = i ? g_strjoinv(";", (char **)vals) : NULL; 750 g_strfreev((char **)vals); 751 return rados_str; 752 } 753 754 static int qemu_rbd_connect(rados_t *cluster, rados_ioctx_t *io_ctx, 755 BlockdevOptionsRbd *opts, bool cache, 756 const char *keypairs, const char *secretid, 757 Error **errp) 758 { 759 char *mon_host = NULL; 760 Error *local_err = NULL; 761 int r; 762 763 if (secretid) { 764 if (opts->key_secret) { 765 error_setg(errp, 766 "Legacy 'password-secret' clashes with 'key-secret'"); 767 return -EINVAL; 768 } 769 opts->key_secret = g_strdup(secretid); 770 opts->has_key_secret = true; 771 } 772 773 mon_host = qemu_rbd_mon_host(opts, &local_err); 774 if (local_err) { 775 error_propagate(errp, local_err); 776 r = -EINVAL; 777 goto out; 778 } 779 780 r = rados_create(cluster, opts->user); 781 if (r < 0) { 782 error_setg_errno(errp, -r, "error initializing"); 783 goto out; 784 } 785 786 /* try default location when conf=NULL, but ignore failure */ 787 r = rados_conf_read_file(*cluster, opts->conf); 788 if (opts->has_conf && r < 0) { 789 error_setg_errno(errp, -r, "error reading conf file %s", opts->conf); 790 goto failed_shutdown; 791 } 792 793 r = qemu_rbd_set_keypairs(*cluster, keypairs, errp); 794 if (r < 0) { 795 goto failed_shutdown; 796 } 797 798 if (mon_host) { 799 r = rados_conf_set(*cluster, "mon_host", mon_host); 800 if (r < 0) { 801 goto failed_shutdown; 802 } 803 } 804 805 r = qemu_rbd_set_auth(*cluster, opts, errp); 806 if (r < 0) { 807 goto failed_shutdown; 808 } 809 810 /* 811 * Fallback to more conservative semantics if setting cache 812 * options fails. Ignore errors from setting rbd_cache because the 813 * only possible error is that the option does not exist, and 814 * librbd defaults to no caching. If write through caching cannot 815 * be set up, fall back to no caching. 816 */ 817 if (cache) { 818 rados_conf_set(*cluster, "rbd_cache", "true"); 819 } else { 820 rados_conf_set(*cluster, "rbd_cache", "false"); 821 } 822 823 r = rados_connect(*cluster); 824 if (r < 0) { 825 error_setg_errno(errp, -r, "error connecting"); 826 goto failed_shutdown; 827 } 828 829 r = rados_ioctx_create(*cluster, opts->pool, io_ctx); 830 if (r < 0) { 831 error_setg_errno(errp, -r, "error opening pool %s", opts->pool); 832 goto failed_shutdown; 833 } 834 835 #ifdef HAVE_RBD_NAMESPACE_EXISTS 836 if (opts->has_q_namespace && strlen(opts->q_namespace) > 0) { 837 bool exists; 838 839 r = rbd_namespace_exists(*io_ctx, opts->q_namespace, &exists); 840 if (r < 0) { 841 error_setg_errno(errp, -r, "error checking namespace"); 842 goto failed_ioctx_destroy; 843 } 844 845 if (!exists) { 846 error_setg(errp, "namespace '%s' does not exist", 847 opts->q_namespace); 848 r = -ENOENT; 849 goto failed_ioctx_destroy; 850 } 851 } 852 #endif 853 854 /* 855 * Set the namespace after opening the io context on the pool, 856 * if nspace == NULL or if nspace == "", it is just as we did nothing 857 */ 858 rados_ioctx_set_namespace(*io_ctx, opts->q_namespace); 859 860 r = 0; 861 goto out; 862 863 #ifdef HAVE_RBD_NAMESPACE_EXISTS 864 failed_ioctx_destroy: 865 rados_ioctx_destroy(*io_ctx); 866 #endif 867 failed_shutdown: 868 rados_shutdown(*cluster); 869 out: 870 g_free(mon_host); 871 return r; 872 } 873 874 static int qemu_rbd_convert_options(QDict *options, BlockdevOptionsRbd **opts, 875 Error **errp) 876 { 877 Visitor *v; 878 879 /* Convert the remaining options into a QAPI object */ 880 v = qobject_input_visitor_new_flat_confused(options, errp); 881 if (!v) { 882 return -EINVAL; 883 } 884 885 visit_type_BlockdevOptionsRbd(v, NULL, opts, errp); 886 visit_free(v); 887 if (!opts) { 888 return -EINVAL; 889 } 890 891 return 0; 892 } 893 894 static int qemu_rbd_attempt_legacy_options(QDict *options, 895 BlockdevOptionsRbd **opts, 896 char **keypairs) 897 { 898 char *filename; 899 int r; 900 901 filename = g_strdup(qdict_get_try_str(options, "filename")); 902 if (!filename) { 903 return -EINVAL; 904 } 905 qdict_del(options, "filename"); 906 907 qemu_rbd_parse_filename(filename, options, NULL); 908 909 /* keypairs freed by caller */ 910 *keypairs = g_strdup(qdict_get_try_str(options, "=keyvalue-pairs")); 911 if (*keypairs) { 912 qdict_del(options, "=keyvalue-pairs"); 913 } 914 915 r = qemu_rbd_convert_options(options, opts, NULL); 916 917 g_free(filename); 918 return r; 919 } 920 921 static int qemu_rbd_open(BlockDriverState *bs, QDict *options, int flags, 922 Error **errp) 923 { 924 BDRVRBDState *s = bs->opaque; 925 BlockdevOptionsRbd *opts = NULL; 926 const QDictEntry *e; 927 Error *local_err = NULL; 928 char *keypairs, *secretid; 929 rbd_image_info_t info; 930 int r; 931 932 keypairs = g_strdup(qdict_get_try_str(options, "=keyvalue-pairs")); 933 if (keypairs) { 934 qdict_del(options, "=keyvalue-pairs"); 935 } 936 937 secretid = g_strdup(qdict_get_try_str(options, "password-secret")); 938 if (secretid) { 939 qdict_del(options, "password-secret"); 940 } 941 942 r = qemu_rbd_convert_options(options, &opts, &local_err); 943 if (local_err) { 944 /* If keypairs are present, that means some options are present in 945 * the modern option format. Don't attempt to parse legacy option 946 * formats, as we won't support mixed usage. */ 947 if (keypairs) { 948 error_propagate(errp, local_err); 949 goto out; 950 } 951 952 /* If the initial attempt to convert and process the options failed, 953 * we may be attempting to open an image file that has the rbd options 954 * specified in the older format consisting of all key/value pairs 955 * encoded in the filename. Go ahead and attempt to parse the 956 * filename, and see if we can pull out the required options. */ 957 r = qemu_rbd_attempt_legacy_options(options, &opts, &keypairs); 958 if (r < 0) { 959 /* Propagate the original error, not the legacy parsing fallback 960 * error, as the latter was just a best-effort attempt. */ 961 error_propagate(errp, local_err); 962 goto out; 963 } 964 /* Take care whenever deciding to actually deprecate; once this ability 965 * is removed, we will not be able to open any images with legacy-styled 966 * backing image strings. */ 967 warn_report("RBD options encoded in the filename as keyvalue pairs " 968 "is deprecated"); 969 } 970 971 /* Remove the processed options from the QDict (the visitor processes 972 * _all_ options in the QDict) */ 973 while ((e = qdict_first(options))) { 974 qdict_del(options, e->key); 975 } 976 977 r = qemu_rbd_connect(&s->cluster, &s->io_ctx, opts, 978 !(flags & BDRV_O_NOCACHE), keypairs, secretid, errp); 979 if (r < 0) { 980 goto out; 981 } 982 983 s->snap = g_strdup(opts->snapshot); 984 s->image_name = g_strdup(opts->image); 985 986 /* rbd_open is always r/w */ 987 r = rbd_open(s->io_ctx, s->image_name, &s->image, s->snap); 988 if (r < 0) { 989 error_setg_errno(errp, -r, "error reading header from %s", 990 s->image_name); 991 goto failed_open; 992 } 993 994 if (opts->has_encrypt) { 995 #ifdef LIBRBD_SUPPORTS_ENCRYPTION 996 r = qemu_rbd_encryption_load(s->image, opts->encrypt, errp); 997 if (r < 0) { 998 goto failed_post_open; 999 } 1000 #else 1001 r = -ENOTSUP; 1002 error_setg(errp, "RBD library does not support image encryption"); 1003 goto failed_post_open; 1004 #endif 1005 } 1006 1007 r = rbd_stat(s->image, &info, sizeof(info)); 1008 if (r < 0) { 1009 error_setg_errno(errp, -r, "error getting image info from %s", 1010 s->image_name); 1011 goto failed_post_open; 1012 } 1013 s->image_size = info.size; 1014 s->object_size = info.obj_size; 1015 1016 /* If we are using an rbd snapshot, we must be r/o, otherwise 1017 * leave as-is */ 1018 if (s->snap != NULL) { 1019 r = bdrv_apply_auto_read_only(bs, "rbd snapshots are read-only", errp); 1020 if (r < 0) { 1021 goto failed_post_open; 1022 } 1023 } 1024 1025 #ifdef LIBRBD_SUPPORTS_WRITE_ZEROES 1026 bs->supported_zero_flags = BDRV_REQ_MAY_UNMAP | BDRV_REQ_NO_FALLBACK; 1027 #endif 1028 1029 /* When extending regular files, we get zeros from the OS */ 1030 bs->supported_truncate_flags = BDRV_REQ_ZERO_WRITE; 1031 1032 r = 0; 1033 goto out; 1034 1035 failed_post_open: 1036 rbd_close(s->image); 1037 failed_open: 1038 rados_ioctx_destroy(s->io_ctx); 1039 g_free(s->snap); 1040 g_free(s->image_name); 1041 rados_shutdown(s->cluster); 1042 out: 1043 qapi_free_BlockdevOptionsRbd(opts); 1044 g_free(keypairs); 1045 g_free(secretid); 1046 return r; 1047 } 1048 1049 1050 /* Since RBD is currently always opened R/W via the API, 1051 * we just need to check if we are using a snapshot or not, in 1052 * order to determine if we will allow it to be R/W */ 1053 static int qemu_rbd_reopen_prepare(BDRVReopenState *state, 1054 BlockReopenQueue *queue, Error **errp) 1055 { 1056 BDRVRBDState *s = state->bs->opaque; 1057 int ret = 0; 1058 1059 if (s->snap && state->flags & BDRV_O_RDWR) { 1060 error_setg(errp, 1061 "Cannot change node '%s' to r/w when using RBD snapshot", 1062 bdrv_get_device_or_node_name(state->bs)); 1063 ret = -EINVAL; 1064 } 1065 1066 return ret; 1067 } 1068 1069 static void qemu_rbd_close(BlockDriverState *bs) 1070 { 1071 BDRVRBDState *s = bs->opaque; 1072 1073 rbd_close(s->image); 1074 rados_ioctx_destroy(s->io_ctx); 1075 g_free(s->snap); 1076 g_free(s->image_name); 1077 rados_shutdown(s->cluster); 1078 } 1079 1080 /* Resize the RBD image and update the 'image_size' with the current size */ 1081 static int qemu_rbd_resize(BlockDriverState *bs, uint64_t size) 1082 { 1083 BDRVRBDState *s = bs->opaque; 1084 int r; 1085 1086 r = rbd_resize(s->image, size); 1087 if (r < 0) { 1088 return r; 1089 } 1090 1091 s->image_size = size; 1092 1093 return 0; 1094 } 1095 1096 static void qemu_rbd_finish_bh(void *opaque) 1097 { 1098 RBDTask *task = opaque; 1099 task->complete = true; 1100 aio_co_wake(task->co); 1101 } 1102 1103 /* 1104 * This is the completion callback function for all rbd aio calls 1105 * started from qemu_rbd_start_co(). 1106 * 1107 * Note: this function is being called from a non qemu thread so 1108 * we need to be careful about what we do here. Generally we only 1109 * schedule a BH, and do the rest of the io completion handling 1110 * from qemu_rbd_finish_bh() which runs in a qemu context. 1111 */ 1112 static void qemu_rbd_completion_cb(rbd_completion_t c, RBDTask *task) 1113 { 1114 task->ret = rbd_aio_get_return_value(c); 1115 rbd_aio_release(c); 1116 aio_bh_schedule_oneshot(bdrv_get_aio_context(task->bs), 1117 qemu_rbd_finish_bh, task); 1118 } 1119 1120 static int coroutine_fn qemu_rbd_start_co(BlockDriverState *bs, 1121 uint64_t offset, 1122 uint64_t bytes, 1123 QEMUIOVector *qiov, 1124 int flags, 1125 RBDAIOCmd cmd) 1126 { 1127 BDRVRBDState *s = bs->opaque; 1128 RBDTask task = { .bs = bs, .co = qemu_coroutine_self() }; 1129 rbd_completion_t c; 1130 int r; 1131 1132 assert(!qiov || qiov->size == bytes); 1133 1134 if (cmd == RBD_AIO_WRITE || cmd == RBD_AIO_WRITE_ZEROES) { 1135 /* 1136 * RBD APIs don't allow us to write more than actual size, so in order 1137 * to support growing images, we resize the image before write 1138 * operations that exceed the current size. 1139 */ 1140 if (offset + bytes > s->image_size) { 1141 int r = qemu_rbd_resize(bs, offset + bytes); 1142 if (r < 0) { 1143 return r; 1144 } 1145 } 1146 } 1147 1148 r = rbd_aio_create_completion(&task, 1149 (rbd_callback_t) qemu_rbd_completion_cb, &c); 1150 if (r < 0) { 1151 return r; 1152 } 1153 1154 switch (cmd) { 1155 case RBD_AIO_READ: 1156 r = rbd_aio_readv(s->image, qiov->iov, qiov->niov, offset, c); 1157 break; 1158 case RBD_AIO_WRITE: 1159 r = rbd_aio_writev(s->image, qiov->iov, qiov->niov, offset, c); 1160 break; 1161 case RBD_AIO_DISCARD: 1162 r = rbd_aio_discard(s->image, offset, bytes, c); 1163 break; 1164 case RBD_AIO_FLUSH: 1165 r = rbd_aio_flush(s->image, c); 1166 break; 1167 #ifdef LIBRBD_SUPPORTS_WRITE_ZEROES 1168 case RBD_AIO_WRITE_ZEROES: { 1169 int zero_flags = 0; 1170 #ifdef RBD_WRITE_ZEROES_FLAG_THICK_PROVISION 1171 if (!(flags & BDRV_REQ_MAY_UNMAP)) { 1172 zero_flags = RBD_WRITE_ZEROES_FLAG_THICK_PROVISION; 1173 } 1174 #endif 1175 r = rbd_aio_write_zeroes(s->image, offset, bytes, c, zero_flags, 0); 1176 break; 1177 } 1178 #endif 1179 default: 1180 r = -EINVAL; 1181 } 1182 1183 if (r < 0) { 1184 error_report("rbd request failed early: cmd %d offset %" PRIu64 1185 " bytes %" PRIu64 " flags %d r %d (%s)", cmd, offset, 1186 bytes, flags, r, strerror(-r)); 1187 rbd_aio_release(c); 1188 return r; 1189 } 1190 1191 while (!task.complete) { 1192 qemu_coroutine_yield(); 1193 } 1194 1195 if (task.ret < 0) { 1196 error_report("rbd request failed: cmd %d offset %" PRIu64 " bytes %" 1197 PRIu64 " flags %d task.ret %" PRIi64 " (%s)", cmd, offset, 1198 bytes, flags, task.ret, strerror(-task.ret)); 1199 return task.ret; 1200 } 1201 1202 /* zero pad short reads */ 1203 if (cmd == RBD_AIO_READ && task.ret < qiov->size) { 1204 qemu_iovec_memset(qiov, task.ret, 0, qiov->size - task.ret); 1205 } 1206 1207 return 0; 1208 } 1209 1210 static int 1211 coroutine_fn qemu_rbd_co_preadv(BlockDriverState *bs, int64_t offset, 1212 int64_t bytes, QEMUIOVector *qiov, 1213 BdrvRequestFlags flags) 1214 { 1215 return qemu_rbd_start_co(bs, offset, bytes, qiov, flags, RBD_AIO_READ); 1216 } 1217 1218 static int 1219 coroutine_fn qemu_rbd_co_pwritev(BlockDriverState *bs, int64_t offset, 1220 int64_t bytes, QEMUIOVector *qiov, 1221 BdrvRequestFlags flags) 1222 { 1223 return qemu_rbd_start_co(bs, offset, bytes, qiov, flags, RBD_AIO_WRITE); 1224 } 1225 1226 static int coroutine_fn qemu_rbd_co_flush(BlockDriverState *bs) 1227 { 1228 return qemu_rbd_start_co(bs, 0, 0, NULL, 0, RBD_AIO_FLUSH); 1229 } 1230 1231 static int coroutine_fn qemu_rbd_co_pdiscard(BlockDriverState *bs, 1232 int64_t offset, int64_t bytes) 1233 { 1234 return qemu_rbd_start_co(bs, offset, bytes, NULL, 0, RBD_AIO_DISCARD); 1235 } 1236 1237 #ifdef LIBRBD_SUPPORTS_WRITE_ZEROES 1238 static int 1239 coroutine_fn qemu_rbd_co_pwrite_zeroes(BlockDriverState *bs, int64_t offset, 1240 int64_t bytes, BdrvRequestFlags flags) 1241 { 1242 return qemu_rbd_start_co(bs, offset, bytes, NULL, flags, 1243 RBD_AIO_WRITE_ZEROES); 1244 } 1245 #endif 1246 1247 static int qemu_rbd_getinfo(BlockDriverState *bs, BlockDriverInfo *bdi) 1248 { 1249 BDRVRBDState *s = bs->opaque; 1250 bdi->cluster_size = s->object_size; 1251 return 0; 1252 } 1253 1254 static ImageInfoSpecific *qemu_rbd_get_specific_info(BlockDriverState *bs, 1255 Error **errp) 1256 { 1257 BDRVRBDState *s = bs->opaque; 1258 ImageInfoSpecific *spec_info; 1259 char buf[RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN] = {0}; 1260 int r; 1261 1262 if (s->image_size >= RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN) { 1263 r = rbd_read(s->image, 0, 1264 RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN, buf); 1265 if (r < 0) { 1266 error_setg_errno(errp, -r, "cannot read image start for probe"); 1267 return NULL; 1268 } 1269 } 1270 1271 spec_info = g_new(ImageInfoSpecific, 1); 1272 *spec_info = (ImageInfoSpecific){ 1273 .type = IMAGE_INFO_SPECIFIC_KIND_RBD, 1274 .u.rbd.data = g_new0(ImageInfoSpecificRbd, 1), 1275 }; 1276 1277 if (memcmp(buf, rbd_luks_header_verification, 1278 RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN) == 0) { 1279 spec_info->u.rbd.data->encryption_format = 1280 RBD_IMAGE_ENCRYPTION_FORMAT_LUKS; 1281 spec_info->u.rbd.data->has_encryption_format = true; 1282 } else if (memcmp(buf, rbd_luks2_header_verification, 1283 RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN) == 0) { 1284 spec_info->u.rbd.data->encryption_format = 1285 RBD_IMAGE_ENCRYPTION_FORMAT_LUKS2; 1286 spec_info->u.rbd.data->has_encryption_format = true; 1287 } else { 1288 spec_info->u.rbd.data->has_encryption_format = false; 1289 } 1290 1291 return spec_info; 1292 } 1293 1294 /* 1295 * rbd_diff_iterate2 allows to interrupt the exection by returning a negative 1296 * value in the callback routine. Choose a value that does not conflict with 1297 * an existing exitcode and return it if we want to prematurely stop the 1298 * execution because we detected a change in the allocation status. 1299 */ 1300 #define QEMU_RBD_EXIT_DIFF_ITERATE2 -9000 1301 1302 static int qemu_rbd_diff_iterate_cb(uint64_t offs, size_t len, 1303 int exists, void *opaque) 1304 { 1305 RBDDiffIterateReq *req = opaque; 1306 1307 assert(req->offs + req->bytes <= offs); 1308 1309 /* treat a hole like an unallocated area and bail out */ 1310 if (!exists) { 1311 return 0; 1312 } 1313 1314 if (!req->exists && offs > req->offs) { 1315 /* 1316 * we started in an unallocated area and hit the first allocated 1317 * block. req->bytes must be set to the length of the unallocated area 1318 * before the allocated area. stop further processing. 1319 */ 1320 req->bytes = offs - req->offs; 1321 return QEMU_RBD_EXIT_DIFF_ITERATE2; 1322 } 1323 1324 if (req->exists && offs > req->offs + req->bytes) { 1325 /* 1326 * we started in an allocated area and jumped over an unallocated area, 1327 * req->bytes contains the length of the allocated area before the 1328 * unallocated area. stop further processing. 1329 */ 1330 return QEMU_RBD_EXIT_DIFF_ITERATE2; 1331 } 1332 1333 req->bytes += len; 1334 req->exists = true; 1335 1336 return 0; 1337 } 1338 1339 static int coroutine_fn qemu_rbd_co_block_status(BlockDriverState *bs, 1340 bool want_zero, int64_t offset, 1341 int64_t bytes, int64_t *pnum, 1342 int64_t *map, 1343 BlockDriverState **file) 1344 { 1345 BDRVRBDState *s = bs->opaque; 1346 int status, r; 1347 RBDDiffIterateReq req = { .offs = offset }; 1348 uint64_t features, flags; 1349 uint64_t head = 0; 1350 1351 assert(offset + bytes <= s->image_size); 1352 1353 /* default to all sectors allocated */ 1354 status = BDRV_BLOCK_DATA | BDRV_BLOCK_OFFSET_VALID; 1355 *map = offset; 1356 *file = bs; 1357 *pnum = bytes; 1358 1359 /* check if RBD image supports fast-diff */ 1360 r = rbd_get_features(s->image, &features); 1361 if (r < 0) { 1362 return status; 1363 } 1364 if (!(features & RBD_FEATURE_FAST_DIFF)) { 1365 return status; 1366 } 1367 1368 /* check if RBD fast-diff result is valid */ 1369 r = rbd_get_flags(s->image, &flags); 1370 if (r < 0) { 1371 return status; 1372 } 1373 if (flags & RBD_FLAG_FAST_DIFF_INVALID) { 1374 return status; 1375 } 1376 1377 #if LIBRBD_VERSION_CODE < LIBRBD_VERSION(1, 17, 0) 1378 /* 1379 * librbd had a bug until early 2022 that affected all versions of ceph that 1380 * supported fast-diff. This bug results in reporting of incorrect offsets 1381 * if the offset parameter to rbd_diff_iterate2 is not object aligned. 1382 * Work around this bug by rounding down the offset to object boundaries. 1383 * This is OK because we call rbd_diff_iterate2 with whole_object = true. 1384 * However, this workaround only works for non cloned images with default 1385 * striping. 1386 * 1387 * See: https://tracker.ceph.com/issues/53784 1388 */ 1389 1390 /* check if RBD image has non-default striping enabled */ 1391 if (features & RBD_FEATURE_STRIPINGV2) { 1392 return status; 1393 } 1394 1395 #pragma GCC diagnostic push 1396 #pragma GCC diagnostic ignored "-Wdeprecated-declarations" 1397 /* 1398 * check if RBD image is a clone (= has a parent). 1399 * 1400 * rbd_get_parent_info is deprecated from Nautilus onwards, but the 1401 * replacement rbd_get_parent is not present in Luminous and Mimic. 1402 */ 1403 if (rbd_get_parent_info(s->image, NULL, 0, NULL, 0, NULL, 0) != -ENOENT) { 1404 return status; 1405 } 1406 #pragma GCC diagnostic pop 1407 1408 head = req.offs & (s->object_size - 1); 1409 req.offs -= head; 1410 bytes += head; 1411 #endif 1412 1413 r = rbd_diff_iterate2(s->image, NULL, req.offs, bytes, true, true, 1414 qemu_rbd_diff_iterate_cb, &req); 1415 if (r < 0 && r != QEMU_RBD_EXIT_DIFF_ITERATE2) { 1416 return status; 1417 } 1418 assert(req.bytes <= bytes); 1419 if (!req.exists) { 1420 if (r == 0) { 1421 /* 1422 * rbd_diff_iterate2 does not invoke callbacks for unallocated 1423 * areas. This here catches the case where no callback was 1424 * invoked at all (req.bytes == 0). 1425 */ 1426 assert(req.bytes == 0); 1427 req.bytes = bytes; 1428 } 1429 status = BDRV_BLOCK_ZERO | BDRV_BLOCK_OFFSET_VALID; 1430 } 1431 1432 assert(req.bytes > head); 1433 *pnum = req.bytes - head; 1434 return status; 1435 } 1436 1437 static int64_t qemu_rbd_getlength(BlockDriverState *bs) 1438 { 1439 BDRVRBDState *s = bs->opaque; 1440 int r; 1441 1442 r = rbd_get_size(s->image, &s->image_size); 1443 if (r < 0) { 1444 return r; 1445 } 1446 1447 return s->image_size; 1448 } 1449 1450 static int coroutine_fn qemu_rbd_co_truncate(BlockDriverState *bs, 1451 int64_t offset, 1452 bool exact, 1453 PreallocMode prealloc, 1454 BdrvRequestFlags flags, 1455 Error **errp) 1456 { 1457 int r; 1458 1459 if (prealloc != PREALLOC_MODE_OFF) { 1460 error_setg(errp, "Unsupported preallocation mode '%s'", 1461 PreallocMode_str(prealloc)); 1462 return -ENOTSUP; 1463 } 1464 1465 r = qemu_rbd_resize(bs, offset); 1466 if (r < 0) { 1467 error_setg_errno(errp, -r, "Failed to resize file"); 1468 return r; 1469 } 1470 1471 return 0; 1472 } 1473 1474 static int qemu_rbd_snap_create(BlockDriverState *bs, 1475 QEMUSnapshotInfo *sn_info) 1476 { 1477 BDRVRBDState *s = bs->opaque; 1478 int r; 1479 1480 if (sn_info->name[0] == '\0') { 1481 return -EINVAL; /* we need a name for rbd snapshots */ 1482 } 1483 1484 /* 1485 * rbd snapshots are using the name as the user controlled unique identifier 1486 * we can't use the rbd snapid for that purpose, as it can't be set 1487 */ 1488 if (sn_info->id_str[0] != '\0' && 1489 strcmp(sn_info->id_str, sn_info->name) != 0) { 1490 return -EINVAL; 1491 } 1492 1493 if (strlen(sn_info->name) >= sizeof(sn_info->id_str)) { 1494 return -ERANGE; 1495 } 1496 1497 r = rbd_snap_create(s->image, sn_info->name); 1498 if (r < 0) { 1499 error_report("failed to create snap: %s", strerror(-r)); 1500 return r; 1501 } 1502 1503 return 0; 1504 } 1505 1506 static int qemu_rbd_snap_remove(BlockDriverState *bs, 1507 const char *snapshot_id, 1508 const char *snapshot_name, 1509 Error **errp) 1510 { 1511 BDRVRBDState *s = bs->opaque; 1512 int r; 1513 1514 if (!snapshot_name) { 1515 error_setg(errp, "rbd need a valid snapshot name"); 1516 return -EINVAL; 1517 } 1518 1519 /* If snapshot_id is specified, it must be equal to name, see 1520 qemu_rbd_snap_list() */ 1521 if (snapshot_id && strcmp(snapshot_id, snapshot_name)) { 1522 error_setg(errp, 1523 "rbd do not support snapshot id, it should be NULL or " 1524 "equal to snapshot name"); 1525 return -EINVAL; 1526 } 1527 1528 r = rbd_snap_remove(s->image, snapshot_name); 1529 if (r < 0) { 1530 error_setg_errno(errp, -r, "Failed to remove the snapshot"); 1531 } 1532 return r; 1533 } 1534 1535 static int qemu_rbd_snap_rollback(BlockDriverState *bs, 1536 const char *snapshot_name) 1537 { 1538 BDRVRBDState *s = bs->opaque; 1539 1540 return rbd_snap_rollback(s->image, snapshot_name); 1541 } 1542 1543 static int qemu_rbd_snap_list(BlockDriverState *bs, 1544 QEMUSnapshotInfo **psn_tab) 1545 { 1546 BDRVRBDState *s = bs->opaque; 1547 QEMUSnapshotInfo *sn_info, *sn_tab = NULL; 1548 int i, snap_count; 1549 rbd_snap_info_t *snaps; 1550 int max_snaps = RBD_MAX_SNAPS; 1551 1552 do { 1553 snaps = g_new(rbd_snap_info_t, max_snaps); 1554 snap_count = rbd_snap_list(s->image, snaps, &max_snaps); 1555 if (snap_count <= 0) { 1556 g_free(snaps); 1557 } 1558 } while (snap_count == -ERANGE); 1559 1560 if (snap_count <= 0) { 1561 goto done; 1562 } 1563 1564 sn_tab = g_new0(QEMUSnapshotInfo, snap_count); 1565 1566 for (i = 0; i < snap_count; i++) { 1567 const char *snap_name = snaps[i].name; 1568 1569 sn_info = sn_tab + i; 1570 pstrcpy(sn_info->id_str, sizeof(sn_info->id_str), snap_name); 1571 pstrcpy(sn_info->name, sizeof(sn_info->name), snap_name); 1572 1573 sn_info->vm_state_size = snaps[i].size; 1574 sn_info->date_sec = 0; 1575 sn_info->date_nsec = 0; 1576 sn_info->vm_clock_nsec = 0; 1577 } 1578 rbd_snap_list_end(snaps); 1579 g_free(snaps); 1580 1581 done: 1582 *psn_tab = sn_tab; 1583 return snap_count; 1584 } 1585 1586 static void coroutine_fn qemu_rbd_co_invalidate_cache(BlockDriverState *bs, 1587 Error **errp) 1588 { 1589 BDRVRBDState *s = bs->opaque; 1590 int r = rbd_invalidate_cache(s->image); 1591 if (r < 0) { 1592 error_setg_errno(errp, -r, "Failed to invalidate the cache"); 1593 } 1594 } 1595 1596 static QemuOptsList qemu_rbd_create_opts = { 1597 .name = "rbd-create-opts", 1598 .head = QTAILQ_HEAD_INITIALIZER(qemu_rbd_create_opts.head), 1599 .desc = { 1600 { 1601 .name = BLOCK_OPT_SIZE, 1602 .type = QEMU_OPT_SIZE, 1603 .help = "Virtual disk size" 1604 }, 1605 { 1606 .name = BLOCK_OPT_CLUSTER_SIZE, 1607 .type = QEMU_OPT_SIZE, 1608 .help = "RBD object size" 1609 }, 1610 { 1611 .name = "password-secret", 1612 .type = QEMU_OPT_STRING, 1613 .help = "ID of secret providing the password", 1614 }, 1615 { 1616 .name = "encrypt.format", 1617 .type = QEMU_OPT_STRING, 1618 .help = "Encrypt the image, format choices: 'luks', 'luks2'", 1619 }, 1620 { 1621 .name = "encrypt.cipher-alg", 1622 .type = QEMU_OPT_STRING, 1623 .help = "Name of encryption cipher algorithm" 1624 " (allowed values: aes-128, aes-256)", 1625 }, 1626 { 1627 .name = "encrypt.key-secret", 1628 .type = QEMU_OPT_STRING, 1629 .help = "ID of secret providing LUKS passphrase", 1630 }, 1631 { /* end of list */ } 1632 } 1633 }; 1634 1635 static const char *const qemu_rbd_strong_runtime_opts[] = { 1636 "pool", 1637 "namespace", 1638 "image", 1639 "conf", 1640 "snapshot", 1641 "user", 1642 "server.", 1643 "password-secret", 1644 1645 NULL 1646 }; 1647 1648 static BlockDriver bdrv_rbd = { 1649 .format_name = "rbd", 1650 .instance_size = sizeof(BDRVRBDState), 1651 .bdrv_parse_filename = qemu_rbd_parse_filename, 1652 .bdrv_file_open = qemu_rbd_open, 1653 .bdrv_close = qemu_rbd_close, 1654 .bdrv_reopen_prepare = qemu_rbd_reopen_prepare, 1655 .bdrv_co_create = qemu_rbd_co_create, 1656 .bdrv_co_create_opts = qemu_rbd_co_create_opts, 1657 .bdrv_has_zero_init = bdrv_has_zero_init_1, 1658 .bdrv_get_info = qemu_rbd_getinfo, 1659 .bdrv_get_specific_info = qemu_rbd_get_specific_info, 1660 .create_opts = &qemu_rbd_create_opts, 1661 .bdrv_getlength = qemu_rbd_getlength, 1662 .bdrv_co_truncate = qemu_rbd_co_truncate, 1663 .protocol_name = "rbd", 1664 1665 .bdrv_co_preadv = qemu_rbd_co_preadv, 1666 .bdrv_co_pwritev = qemu_rbd_co_pwritev, 1667 .bdrv_co_flush_to_disk = qemu_rbd_co_flush, 1668 .bdrv_co_pdiscard = qemu_rbd_co_pdiscard, 1669 #ifdef LIBRBD_SUPPORTS_WRITE_ZEROES 1670 .bdrv_co_pwrite_zeroes = qemu_rbd_co_pwrite_zeroes, 1671 #endif 1672 .bdrv_co_block_status = qemu_rbd_co_block_status, 1673 1674 .bdrv_snapshot_create = qemu_rbd_snap_create, 1675 .bdrv_snapshot_delete = qemu_rbd_snap_remove, 1676 .bdrv_snapshot_list = qemu_rbd_snap_list, 1677 .bdrv_snapshot_goto = qemu_rbd_snap_rollback, 1678 .bdrv_co_invalidate_cache = qemu_rbd_co_invalidate_cache, 1679 1680 .strong_runtime_opts = qemu_rbd_strong_runtime_opts, 1681 }; 1682 1683 static void bdrv_rbd_init(void) 1684 { 1685 bdrv_register(&bdrv_rbd); 1686 } 1687 1688 block_init(bdrv_rbd_init); 1689