1 /* 2 * QEMU Block driver for RADOS (Ceph) 3 * 4 * Copyright (C) 2010-2011 Christian Brunner <chb@muc.de>, 5 * Josh Durgin <josh.durgin@dreamhost.com> 6 * 7 * This work is licensed under the terms of the GNU GPL, version 2. See 8 * the COPYING file in the top-level directory. 9 * 10 * Contributions after 2012-01-13 are licensed under the terms of the 11 * GNU GPL, version 2 or (at your option) any later version. 12 */ 13 14 #include "qemu/osdep.h" 15 16 #include <rbd/librbd.h> 17 #include "qapi/error.h" 18 #include "qemu/error-report.h" 19 #include "qemu/module.h" 20 #include "qemu/option.h" 21 #include "block/block-io.h" 22 #include "block/block_int.h" 23 #include "block/qdict.h" 24 #include "crypto/secret.h" 25 #include "qemu/cutils.h" 26 #include "sysemu/replay.h" 27 #include "qapi/qmp/qstring.h" 28 #include "qapi/qmp/qdict.h" 29 #include "qapi/qmp/qjson.h" 30 #include "qapi/qmp/qlist.h" 31 #include "qapi/qobject-input-visitor.h" 32 #include "qapi/qapi-visit-block-core.h" 33 34 /* 35 * When specifying the image filename use: 36 * 37 * rbd:poolname/devicename[@snapshotname][:option1=value1[:option2=value2...]] 38 * 39 * poolname must be the name of an existing rados pool. 40 * 41 * devicename is the name of the rbd image. 42 * 43 * Each option given is used to configure rados, and may be any valid 44 * Ceph option, "id", or "conf". 45 * 46 * The "id" option indicates what user we should authenticate as to 47 * the Ceph cluster. If it is excluded we will use the Ceph default 48 * (normally 'admin'). 49 * 50 * The "conf" option specifies a Ceph configuration file to read. If 51 * it is not specified, we will read from the default Ceph locations 52 * (e.g., /etc/ceph/ceph.conf). To avoid reading _any_ configuration 53 * file, specify conf=/dev/null. 54 * 55 * Configuration values containing :, @, or = can be escaped with a 56 * leading "\". 57 */ 58 59 #define OBJ_MAX_SIZE (1UL << OBJ_DEFAULT_OBJ_ORDER) 60 61 #define RBD_MAX_SNAPS 100 62 63 #define RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN 8 64 65 static const char rbd_luks_header_verification[ 66 RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN] = { 67 'L', 'U', 'K', 'S', 0xBA, 0xBE, 0, 1 68 }; 69 70 static const char rbd_luks2_header_verification[ 71 RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN] = { 72 'L', 'U', 'K', 'S', 0xBA, 0xBE, 0, 2 73 }; 74 75 static const char rbd_layered_luks_header_verification[ 76 RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN] = { 77 'R', 'B', 'D', 'L', 0xBA, 0xBE, 0, 1 78 }; 79 80 static const char rbd_layered_luks2_header_verification[ 81 RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN] = { 82 'R', 'B', 'D', 'L', 0xBA, 0xBE, 0, 2 83 }; 84 85 typedef enum { 86 RBD_AIO_READ, 87 RBD_AIO_WRITE, 88 RBD_AIO_DISCARD, 89 RBD_AIO_FLUSH, 90 RBD_AIO_WRITE_ZEROES 91 } RBDAIOCmd; 92 93 typedef struct BDRVRBDState { 94 rados_t cluster; 95 rados_ioctx_t io_ctx; 96 rbd_image_t image; 97 char *image_name; 98 char *snap; 99 char *namespace; 100 uint64_t image_size; 101 uint64_t object_size; 102 } BDRVRBDState; 103 104 typedef struct RBDTask { 105 BlockDriverState *bs; 106 Coroutine *co; 107 bool complete; 108 int64_t ret; 109 } RBDTask; 110 111 typedef struct RBDDiffIterateReq { 112 uint64_t offs; 113 uint64_t bytes; 114 bool exists; 115 } RBDDiffIterateReq; 116 117 static int qemu_rbd_connect(rados_t *cluster, rados_ioctx_t *io_ctx, 118 BlockdevOptionsRbd *opts, bool cache, 119 const char *keypairs, const char *secretid, 120 Error **errp); 121 122 static char *qemu_rbd_strchr(char *src, char delim) 123 { 124 char *p; 125 126 for (p = src; *p; ++p) { 127 if (*p == delim) { 128 return p; 129 } 130 if (*p == '\\' && p[1] != '\0') { 131 ++p; 132 } 133 } 134 135 return NULL; 136 } 137 138 139 static char *qemu_rbd_next_tok(char *src, char delim, char **p) 140 { 141 char *end; 142 143 *p = NULL; 144 145 end = qemu_rbd_strchr(src, delim); 146 if (end) { 147 *p = end + 1; 148 *end = '\0'; 149 } 150 return src; 151 } 152 153 static void qemu_rbd_unescape(char *src) 154 { 155 char *p; 156 157 for (p = src; *src; ++src, ++p) { 158 if (*src == '\\' && src[1] != '\0') { 159 src++; 160 } 161 *p = *src; 162 } 163 *p = '\0'; 164 } 165 166 static void qemu_rbd_parse_filename(const char *filename, QDict *options, 167 Error **errp) 168 { 169 const char *start; 170 char *p, *buf; 171 QList *keypairs = NULL; 172 char *found_str, *image_name; 173 174 if (!strstart(filename, "rbd:", &start)) { 175 error_setg(errp, "File name must start with 'rbd:'"); 176 return; 177 } 178 179 buf = g_strdup(start); 180 p = buf; 181 182 found_str = qemu_rbd_next_tok(p, '/', &p); 183 if (!p) { 184 error_setg(errp, "Pool name is required"); 185 goto done; 186 } 187 qemu_rbd_unescape(found_str); 188 qdict_put_str(options, "pool", found_str); 189 190 if (qemu_rbd_strchr(p, '@')) { 191 image_name = qemu_rbd_next_tok(p, '@', &p); 192 193 found_str = qemu_rbd_next_tok(p, ':', &p); 194 qemu_rbd_unescape(found_str); 195 qdict_put_str(options, "snapshot", found_str); 196 } else { 197 image_name = qemu_rbd_next_tok(p, ':', &p); 198 } 199 /* Check for namespace in the image_name */ 200 if (qemu_rbd_strchr(image_name, '/')) { 201 found_str = qemu_rbd_next_tok(image_name, '/', &image_name); 202 qemu_rbd_unescape(found_str); 203 qdict_put_str(options, "namespace", found_str); 204 } else { 205 qdict_put_str(options, "namespace", ""); 206 } 207 qemu_rbd_unescape(image_name); 208 qdict_put_str(options, "image", image_name); 209 if (!p) { 210 goto done; 211 } 212 213 /* The following are essentially all key/value pairs, and we treat 214 * 'id' and 'conf' a bit special. Key/value pairs may be in any order. */ 215 while (p) { 216 char *name, *value; 217 name = qemu_rbd_next_tok(p, '=', &p); 218 if (!p) { 219 error_setg(errp, "conf option %s has no value", name); 220 break; 221 } 222 223 qemu_rbd_unescape(name); 224 225 value = qemu_rbd_next_tok(p, ':', &p); 226 qemu_rbd_unescape(value); 227 228 if (!strcmp(name, "conf")) { 229 qdict_put_str(options, "conf", value); 230 } else if (!strcmp(name, "id")) { 231 qdict_put_str(options, "user", value); 232 } else { 233 /* 234 * We pass these internally to qemu_rbd_set_keypairs(), so 235 * we can get away with the simpler list of [ "key1", 236 * "value1", "key2", "value2" ] rather than a raw dict 237 * { "key1": "value1", "key2": "value2" } where we can't 238 * guarantee order, or even a more correct but complex 239 * [ { "key1": "value1" }, { "key2": "value2" } ] 240 */ 241 if (!keypairs) { 242 keypairs = qlist_new(); 243 } 244 qlist_append_str(keypairs, name); 245 qlist_append_str(keypairs, value); 246 } 247 } 248 249 if (keypairs) { 250 qdict_put(options, "=keyvalue-pairs", 251 qstring_from_gstring(qobject_to_json(QOBJECT(keypairs)))); 252 } 253 254 done: 255 g_free(buf); 256 qobject_unref(keypairs); 257 return; 258 } 259 260 static int qemu_rbd_set_auth(rados_t cluster, BlockdevOptionsRbd *opts, 261 Error **errp) 262 { 263 char *key, *acr; 264 int r; 265 GString *accu; 266 RbdAuthModeList *auth; 267 268 if (opts->key_secret) { 269 key = qcrypto_secret_lookup_as_base64(opts->key_secret, errp); 270 if (!key) { 271 return -EIO; 272 } 273 r = rados_conf_set(cluster, "key", key); 274 g_free(key); 275 if (r < 0) { 276 error_setg_errno(errp, -r, "Could not set 'key'"); 277 return r; 278 } 279 } 280 281 if (opts->has_auth_client_required) { 282 accu = g_string_new(""); 283 for (auth = opts->auth_client_required; auth; auth = auth->next) { 284 if (accu->str[0]) { 285 g_string_append_c(accu, ';'); 286 } 287 g_string_append(accu, RbdAuthMode_str(auth->value)); 288 } 289 acr = g_string_free(accu, FALSE); 290 r = rados_conf_set(cluster, "auth_client_required", acr); 291 g_free(acr); 292 if (r < 0) { 293 error_setg_errno(errp, -r, 294 "Could not set 'auth_client_required'"); 295 return r; 296 } 297 } 298 299 return 0; 300 } 301 302 static int qemu_rbd_set_keypairs(rados_t cluster, const char *keypairs_json, 303 Error **errp) 304 { 305 QList *keypairs; 306 QString *name; 307 QString *value; 308 const char *key; 309 size_t remaining; 310 int ret = 0; 311 312 if (!keypairs_json) { 313 return ret; 314 } 315 keypairs = qobject_to(QList, 316 qobject_from_json(keypairs_json, &error_abort)); 317 remaining = qlist_size(keypairs) / 2; 318 assert(remaining); 319 320 while (remaining--) { 321 name = qobject_to(QString, qlist_pop(keypairs)); 322 value = qobject_to(QString, qlist_pop(keypairs)); 323 assert(name && value); 324 key = qstring_get_str(name); 325 326 ret = rados_conf_set(cluster, key, qstring_get_str(value)); 327 qobject_unref(value); 328 if (ret < 0) { 329 error_setg_errno(errp, -ret, "invalid conf option %s", key); 330 qobject_unref(name); 331 ret = -EINVAL; 332 break; 333 } 334 qobject_unref(name); 335 } 336 337 qobject_unref(keypairs); 338 return ret; 339 } 340 341 #ifdef LIBRBD_SUPPORTS_ENCRYPTION 342 static int qemu_rbd_convert_luks_options( 343 RbdEncryptionOptionsLUKSBase *luks_opts, 344 char **passphrase, 345 size_t *passphrase_len, 346 Error **errp) 347 { 348 return qcrypto_secret_lookup(luks_opts->key_secret, (uint8_t **)passphrase, 349 passphrase_len, errp); 350 } 351 352 static int qemu_rbd_convert_luks_create_options( 353 RbdEncryptionCreateOptionsLUKSBase *luks_opts, 354 rbd_encryption_algorithm_t *alg, 355 char **passphrase, 356 size_t *passphrase_len, 357 Error **errp) 358 { 359 int r = 0; 360 361 r = qemu_rbd_convert_luks_options( 362 qapi_RbdEncryptionCreateOptionsLUKSBase_base(luks_opts), 363 passphrase, passphrase_len, errp); 364 if (r < 0) { 365 return r; 366 } 367 368 if (luks_opts->has_cipher_alg) { 369 switch (luks_opts->cipher_alg) { 370 case QCRYPTO_CIPHER_ALG_AES_128: { 371 *alg = RBD_ENCRYPTION_ALGORITHM_AES128; 372 break; 373 } 374 case QCRYPTO_CIPHER_ALG_AES_256: { 375 *alg = RBD_ENCRYPTION_ALGORITHM_AES256; 376 break; 377 } 378 default: { 379 r = -ENOTSUP; 380 error_setg_errno(errp, -r, "unknown encryption algorithm: %u", 381 luks_opts->cipher_alg); 382 return r; 383 } 384 } 385 } else { 386 /* default alg */ 387 *alg = RBD_ENCRYPTION_ALGORITHM_AES256; 388 } 389 390 return 0; 391 } 392 393 static int qemu_rbd_encryption_format(rbd_image_t image, 394 RbdEncryptionCreateOptions *encrypt, 395 Error **errp) 396 { 397 int r = 0; 398 g_autofree char *passphrase = NULL; 399 rbd_encryption_format_t format; 400 rbd_encryption_options_t opts; 401 rbd_encryption_luks1_format_options_t luks_opts; 402 rbd_encryption_luks2_format_options_t luks2_opts; 403 size_t opts_size; 404 uint64_t raw_size, effective_size; 405 406 r = rbd_get_size(image, &raw_size); 407 if (r < 0) { 408 error_setg_errno(errp, -r, "cannot get raw image size"); 409 return r; 410 } 411 412 switch (encrypt->format) { 413 case RBD_IMAGE_ENCRYPTION_FORMAT_LUKS: { 414 memset(&luks_opts, 0, sizeof(luks_opts)); 415 format = RBD_ENCRYPTION_FORMAT_LUKS1; 416 opts = &luks_opts; 417 opts_size = sizeof(luks_opts); 418 r = qemu_rbd_convert_luks_create_options( 419 qapi_RbdEncryptionCreateOptionsLUKS_base(&encrypt->u.luks), 420 &luks_opts.alg, &passphrase, &luks_opts.passphrase_size, 421 errp); 422 if (r < 0) { 423 return r; 424 } 425 luks_opts.passphrase = passphrase; 426 break; 427 } 428 case RBD_IMAGE_ENCRYPTION_FORMAT_LUKS2: { 429 memset(&luks2_opts, 0, sizeof(luks2_opts)); 430 format = RBD_ENCRYPTION_FORMAT_LUKS2; 431 opts = &luks2_opts; 432 opts_size = sizeof(luks2_opts); 433 r = qemu_rbd_convert_luks_create_options( 434 qapi_RbdEncryptionCreateOptionsLUKS2_base( 435 &encrypt->u.luks2), 436 &luks2_opts.alg, &passphrase, &luks2_opts.passphrase_size, 437 errp); 438 if (r < 0) { 439 return r; 440 } 441 luks2_opts.passphrase = passphrase; 442 break; 443 } 444 default: { 445 r = -ENOTSUP; 446 error_setg_errno( 447 errp, -r, "unknown image encryption format: %u", 448 encrypt->format); 449 return r; 450 } 451 } 452 453 r = rbd_encryption_format(image, format, opts, opts_size); 454 if (r < 0) { 455 error_setg_errno(errp, -r, "encryption format fail"); 456 return r; 457 } 458 459 r = rbd_get_size(image, &effective_size); 460 if (r < 0) { 461 error_setg_errno(errp, -r, "cannot get effective image size"); 462 return r; 463 } 464 465 r = rbd_resize(image, raw_size + (raw_size - effective_size)); 466 if (r < 0) { 467 error_setg_errno(errp, -r, "cannot resize image after format"); 468 return r; 469 } 470 471 return 0; 472 } 473 474 static int qemu_rbd_encryption_load(rbd_image_t image, 475 RbdEncryptionOptions *encrypt, 476 Error **errp) 477 { 478 int r = 0; 479 g_autofree char *passphrase = NULL; 480 rbd_encryption_luks1_format_options_t luks_opts; 481 rbd_encryption_luks2_format_options_t luks2_opts; 482 #ifdef LIBRBD_SUPPORTS_ENCRYPTION_LOAD2 483 rbd_encryption_luks_format_options_t luks_any_opts; 484 #endif 485 rbd_encryption_format_t format; 486 rbd_encryption_options_t opts; 487 size_t opts_size; 488 489 switch (encrypt->format) { 490 case RBD_IMAGE_ENCRYPTION_FORMAT_LUKS: { 491 memset(&luks_opts, 0, sizeof(luks_opts)); 492 format = RBD_ENCRYPTION_FORMAT_LUKS1; 493 opts = &luks_opts; 494 opts_size = sizeof(luks_opts); 495 r = qemu_rbd_convert_luks_options( 496 qapi_RbdEncryptionOptionsLUKS_base(&encrypt->u.luks), 497 &passphrase, &luks_opts.passphrase_size, errp); 498 if (r < 0) { 499 return r; 500 } 501 luks_opts.passphrase = passphrase; 502 break; 503 } 504 case RBD_IMAGE_ENCRYPTION_FORMAT_LUKS2: { 505 memset(&luks2_opts, 0, sizeof(luks2_opts)); 506 format = RBD_ENCRYPTION_FORMAT_LUKS2; 507 opts = &luks2_opts; 508 opts_size = sizeof(luks2_opts); 509 r = qemu_rbd_convert_luks_options( 510 qapi_RbdEncryptionOptionsLUKS2_base(&encrypt->u.luks2), 511 &passphrase, &luks2_opts.passphrase_size, errp); 512 if (r < 0) { 513 return r; 514 } 515 luks2_opts.passphrase = passphrase; 516 break; 517 } 518 #ifdef LIBRBD_SUPPORTS_ENCRYPTION_LOAD2 519 case RBD_IMAGE_ENCRYPTION_FORMAT_LUKS_ANY: { 520 memset(&luks_any_opts, 0, sizeof(luks_any_opts)); 521 format = RBD_ENCRYPTION_FORMAT_LUKS; 522 opts = &luks_any_opts; 523 opts_size = sizeof(luks_any_opts); 524 r = qemu_rbd_convert_luks_options( 525 qapi_RbdEncryptionOptionsLUKSAny_base(&encrypt->u.luks_any), 526 &passphrase, &luks_any_opts.passphrase_size, errp); 527 if (r < 0) { 528 return r; 529 } 530 luks_any_opts.passphrase = passphrase; 531 break; 532 } 533 #endif 534 default: { 535 r = -ENOTSUP; 536 error_setg_errno( 537 errp, -r, "unknown image encryption format: %u", 538 encrypt->format); 539 return r; 540 } 541 } 542 543 r = rbd_encryption_load(image, format, opts, opts_size); 544 if (r < 0) { 545 error_setg_errno(errp, -r, "encryption load fail"); 546 return r; 547 } 548 549 return 0; 550 } 551 552 #ifdef LIBRBD_SUPPORTS_ENCRYPTION_LOAD2 553 static int qemu_rbd_encryption_load2(rbd_image_t image, 554 RbdEncryptionOptions *encrypt, 555 Error **errp) 556 { 557 int r = 0; 558 int encrypt_count = 1; 559 int i; 560 RbdEncryptionOptions *curr_encrypt; 561 rbd_encryption_spec_t *specs; 562 rbd_encryption_luks1_format_options_t *luks_opts; 563 rbd_encryption_luks2_format_options_t *luks2_opts; 564 rbd_encryption_luks_format_options_t *luks_any_opts; 565 566 /* count encryption options */ 567 for (curr_encrypt = encrypt->parent; curr_encrypt; 568 curr_encrypt = curr_encrypt->parent) { 569 ++encrypt_count; 570 } 571 572 specs = g_new0(rbd_encryption_spec_t, encrypt_count); 573 574 curr_encrypt = encrypt; 575 for (i = 0; i < encrypt_count; ++i) { 576 switch (curr_encrypt->format) { 577 case RBD_IMAGE_ENCRYPTION_FORMAT_LUKS: { 578 specs[i].format = RBD_ENCRYPTION_FORMAT_LUKS1; 579 580 luks_opts = g_new0(rbd_encryption_luks1_format_options_t, 1); 581 specs[i].opts = luks_opts; 582 specs[i].opts_size = sizeof(*luks_opts); 583 584 r = qemu_rbd_convert_luks_options( 585 qapi_RbdEncryptionOptionsLUKS_base( 586 &curr_encrypt->u.luks), 587 (char **)&luks_opts->passphrase, 588 &luks_opts->passphrase_size, 589 errp); 590 break; 591 } 592 case RBD_IMAGE_ENCRYPTION_FORMAT_LUKS2: { 593 specs[i].format = RBD_ENCRYPTION_FORMAT_LUKS2; 594 595 luks2_opts = g_new0(rbd_encryption_luks2_format_options_t, 1); 596 specs[i].opts = luks2_opts; 597 specs[i].opts_size = sizeof(*luks2_opts); 598 599 r = qemu_rbd_convert_luks_options( 600 qapi_RbdEncryptionOptionsLUKS2_base( 601 &curr_encrypt->u.luks2), 602 (char **)&luks2_opts->passphrase, 603 &luks2_opts->passphrase_size, 604 errp); 605 break; 606 } 607 case RBD_IMAGE_ENCRYPTION_FORMAT_LUKS_ANY: { 608 specs[i].format = RBD_ENCRYPTION_FORMAT_LUKS; 609 610 luks_any_opts = g_new0(rbd_encryption_luks_format_options_t, 1); 611 specs[i].opts = luks_any_opts; 612 specs[i].opts_size = sizeof(*luks_any_opts); 613 614 r = qemu_rbd_convert_luks_options( 615 qapi_RbdEncryptionOptionsLUKSAny_base( 616 &curr_encrypt->u.luks_any), 617 (char **)&luks_any_opts->passphrase, 618 &luks_any_opts->passphrase_size, 619 errp); 620 break; 621 } 622 default: { 623 r = -ENOTSUP; 624 error_setg_errno( 625 errp, -r, "unknown image encryption format: %u", 626 curr_encrypt->format); 627 } 628 } 629 630 if (r < 0) { 631 goto exit; 632 } 633 634 curr_encrypt = curr_encrypt->parent; 635 } 636 637 r = rbd_encryption_load2(image, specs, encrypt_count); 638 if (r < 0) { 639 error_setg_errno(errp, -r, "layered encryption load fail"); 640 goto exit; 641 } 642 643 exit: 644 for (i = 0; i < encrypt_count; ++i) { 645 if (!specs[i].opts) { 646 break; 647 } 648 649 switch (specs[i].format) { 650 case RBD_ENCRYPTION_FORMAT_LUKS1: { 651 luks_opts = specs[i].opts; 652 g_free((void *)luks_opts->passphrase); 653 break; 654 } 655 case RBD_ENCRYPTION_FORMAT_LUKS2: { 656 luks2_opts = specs[i].opts; 657 g_free((void *)luks2_opts->passphrase); 658 break; 659 } 660 case RBD_ENCRYPTION_FORMAT_LUKS: { 661 luks_any_opts = specs[i].opts; 662 g_free((void *)luks_any_opts->passphrase); 663 break; 664 } 665 } 666 667 g_free(specs[i].opts); 668 } 669 g_free(specs); 670 return r; 671 } 672 #endif 673 #endif 674 675 /* FIXME Deprecate and remove keypairs or make it available in QMP. */ 676 static int qemu_rbd_do_create(BlockdevCreateOptions *options, 677 const char *keypairs, const char *password_secret, 678 Error **errp) 679 { 680 BlockdevCreateOptionsRbd *opts = &options->u.rbd; 681 rados_t cluster; 682 rados_ioctx_t io_ctx; 683 int obj_order = 0; 684 int ret; 685 686 assert(options->driver == BLOCKDEV_DRIVER_RBD); 687 if (opts->location->snapshot) { 688 error_setg(errp, "Can't use snapshot name for image creation"); 689 return -EINVAL; 690 } 691 692 #ifndef LIBRBD_SUPPORTS_ENCRYPTION 693 if (opts->encrypt) { 694 error_setg(errp, "RBD library does not support image encryption"); 695 return -ENOTSUP; 696 } 697 #endif 698 699 if (opts->has_cluster_size) { 700 int64_t objsize = opts->cluster_size; 701 if ((objsize - 1) & objsize) { /* not a power of 2? */ 702 error_setg(errp, "obj size needs to be power of 2"); 703 return -EINVAL; 704 } 705 if (objsize < 4096) { 706 error_setg(errp, "obj size too small"); 707 return -EINVAL; 708 } 709 obj_order = ctz32(objsize); 710 } 711 712 ret = qemu_rbd_connect(&cluster, &io_ctx, opts->location, false, keypairs, 713 password_secret, errp); 714 if (ret < 0) { 715 return ret; 716 } 717 718 ret = rbd_create(io_ctx, opts->location->image, opts->size, &obj_order); 719 if (ret < 0) { 720 error_setg_errno(errp, -ret, "error rbd create"); 721 goto out; 722 } 723 724 #ifdef LIBRBD_SUPPORTS_ENCRYPTION 725 if (opts->encrypt) { 726 rbd_image_t image; 727 728 ret = rbd_open(io_ctx, opts->location->image, &image, NULL); 729 if (ret < 0) { 730 error_setg_errno(errp, -ret, 731 "error opening image '%s' for encryption format", 732 opts->location->image); 733 goto out; 734 } 735 736 ret = qemu_rbd_encryption_format(image, opts->encrypt, errp); 737 rbd_close(image); 738 if (ret < 0) { 739 /* encryption format fail, try removing the image */ 740 rbd_remove(io_ctx, opts->location->image); 741 goto out; 742 } 743 } 744 #endif 745 746 ret = 0; 747 out: 748 rados_ioctx_destroy(io_ctx); 749 rados_shutdown(cluster); 750 return ret; 751 } 752 753 static int qemu_rbd_co_create(BlockdevCreateOptions *options, Error **errp) 754 { 755 return qemu_rbd_do_create(options, NULL, NULL, errp); 756 } 757 758 static int qemu_rbd_extract_encryption_create_options( 759 QemuOpts *opts, 760 RbdEncryptionCreateOptions **spec, 761 Error **errp) 762 { 763 QDict *opts_qdict; 764 QDict *encrypt_qdict; 765 Visitor *v; 766 int ret = 0; 767 768 opts_qdict = qemu_opts_to_qdict(opts, NULL); 769 qdict_extract_subqdict(opts_qdict, &encrypt_qdict, "encrypt."); 770 qobject_unref(opts_qdict); 771 if (!qdict_size(encrypt_qdict)) { 772 *spec = NULL; 773 goto exit; 774 } 775 776 /* Convert options into a QAPI object */ 777 v = qobject_input_visitor_new_flat_confused(encrypt_qdict, errp); 778 if (!v) { 779 ret = -EINVAL; 780 goto exit; 781 } 782 783 visit_type_RbdEncryptionCreateOptions(v, NULL, spec, errp); 784 visit_free(v); 785 if (!*spec) { 786 ret = -EINVAL; 787 goto exit; 788 } 789 790 exit: 791 qobject_unref(encrypt_qdict); 792 return ret; 793 } 794 795 static int coroutine_fn qemu_rbd_co_create_opts(BlockDriver *drv, 796 const char *filename, 797 QemuOpts *opts, 798 Error **errp) 799 { 800 BlockdevCreateOptions *create_options; 801 BlockdevCreateOptionsRbd *rbd_opts; 802 BlockdevOptionsRbd *loc; 803 RbdEncryptionCreateOptions *encrypt = NULL; 804 Error *local_err = NULL; 805 const char *keypairs, *password_secret; 806 QDict *options = NULL; 807 int ret = 0; 808 809 create_options = g_new0(BlockdevCreateOptions, 1); 810 create_options->driver = BLOCKDEV_DRIVER_RBD; 811 rbd_opts = &create_options->u.rbd; 812 813 rbd_opts->location = g_new0(BlockdevOptionsRbd, 1); 814 815 password_secret = qemu_opt_get(opts, "password-secret"); 816 817 /* Read out options */ 818 rbd_opts->size = ROUND_UP(qemu_opt_get_size_del(opts, BLOCK_OPT_SIZE, 0), 819 BDRV_SECTOR_SIZE); 820 rbd_opts->cluster_size = qemu_opt_get_size_del(opts, 821 BLOCK_OPT_CLUSTER_SIZE, 0); 822 rbd_opts->has_cluster_size = (rbd_opts->cluster_size != 0); 823 824 options = qdict_new(); 825 qemu_rbd_parse_filename(filename, options, &local_err); 826 if (local_err) { 827 ret = -EINVAL; 828 error_propagate(errp, local_err); 829 goto exit; 830 } 831 832 ret = qemu_rbd_extract_encryption_create_options(opts, &encrypt, errp); 833 if (ret < 0) { 834 goto exit; 835 } 836 rbd_opts->encrypt = encrypt; 837 838 /* 839 * Caution: while qdict_get_try_str() is fine, getting non-string 840 * types would require more care. When @options come from -blockdev 841 * or blockdev_add, its members are typed according to the QAPI 842 * schema, but when they come from -drive, they're all QString. 843 */ 844 loc = rbd_opts->location; 845 loc->pool = g_strdup(qdict_get_try_str(options, "pool")); 846 loc->conf = g_strdup(qdict_get_try_str(options, "conf")); 847 loc->user = g_strdup(qdict_get_try_str(options, "user")); 848 loc->q_namespace = g_strdup(qdict_get_try_str(options, "namespace")); 849 loc->image = g_strdup(qdict_get_try_str(options, "image")); 850 keypairs = qdict_get_try_str(options, "=keyvalue-pairs"); 851 852 ret = qemu_rbd_do_create(create_options, keypairs, password_secret, errp); 853 if (ret < 0) { 854 goto exit; 855 } 856 857 exit: 858 qobject_unref(options); 859 qapi_free_BlockdevCreateOptions(create_options); 860 return ret; 861 } 862 863 static char *qemu_rbd_mon_host(BlockdevOptionsRbd *opts, Error **errp) 864 { 865 const char **vals; 866 const char *host, *port; 867 char *rados_str; 868 InetSocketAddressBaseList *p; 869 int i, cnt; 870 871 if (!opts->has_server) { 872 return NULL; 873 } 874 875 for (cnt = 0, p = opts->server; p; p = p->next) { 876 cnt++; 877 } 878 879 vals = g_new(const char *, cnt + 1); 880 881 for (i = 0, p = opts->server; p; p = p->next, i++) { 882 host = p->value->host; 883 port = p->value->port; 884 885 if (strchr(host, ':')) { 886 vals[i] = g_strdup_printf("[%s]:%s", host, port); 887 } else { 888 vals[i] = g_strdup_printf("%s:%s", host, port); 889 } 890 } 891 vals[i] = NULL; 892 893 rados_str = i ? g_strjoinv(";", (char **)vals) : NULL; 894 g_strfreev((char **)vals); 895 return rados_str; 896 } 897 898 static int qemu_rbd_connect(rados_t *cluster, rados_ioctx_t *io_ctx, 899 BlockdevOptionsRbd *opts, bool cache, 900 const char *keypairs, const char *secretid, 901 Error **errp) 902 { 903 char *mon_host = NULL; 904 Error *local_err = NULL; 905 int r; 906 907 if (secretid) { 908 if (opts->key_secret) { 909 error_setg(errp, 910 "Legacy 'password-secret' clashes with 'key-secret'"); 911 return -EINVAL; 912 } 913 opts->key_secret = g_strdup(secretid); 914 } 915 916 mon_host = qemu_rbd_mon_host(opts, &local_err); 917 if (local_err) { 918 error_propagate(errp, local_err); 919 r = -EINVAL; 920 goto out; 921 } 922 923 r = rados_create(cluster, opts->user); 924 if (r < 0) { 925 error_setg_errno(errp, -r, "error initializing"); 926 goto out; 927 } 928 929 /* try default location when conf=NULL, but ignore failure */ 930 r = rados_conf_read_file(*cluster, opts->conf); 931 if (opts->conf && r < 0) { 932 error_setg_errno(errp, -r, "error reading conf file %s", opts->conf); 933 goto failed_shutdown; 934 } 935 936 r = qemu_rbd_set_keypairs(*cluster, keypairs, errp); 937 if (r < 0) { 938 goto failed_shutdown; 939 } 940 941 if (mon_host) { 942 r = rados_conf_set(*cluster, "mon_host", mon_host); 943 if (r < 0) { 944 goto failed_shutdown; 945 } 946 } 947 948 r = qemu_rbd_set_auth(*cluster, opts, errp); 949 if (r < 0) { 950 goto failed_shutdown; 951 } 952 953 /* 954 * Fallback to more conservative semantics if setting cache 955 * options fails. Ignore errors from setting rbd_cache because the 956 * only possible error is that the option does not exist, and 957 * librbd defaults to no caching. If write through caching cannot 958 * be set up, fall back to no caching. 959 */ 960 if (cache) { 961 rados_conf_set(*cluster, "rbd_cache", "true"); 962 } else { 963 rados_conf_set(*cluster, "rbd_cache", "false"); 964 } 965 966 r = rados_connect(*cluster); 967 if (r < 0) { 968 error_setg_errno(errp, -r, "error connecting"); 969 goto failed_shutdown; 970 } 971 972 r = rados_ioctx_create(*cluster, opts->pool, io_ctx); 973 if (r < 0) { 974 error_setg_errno(errp, -r, "error opening pool %s", opts->pool); 975 goto failed_shutdown; 976 } 977 978 #ifdef HAVE_RBD_NAMESPACE_EXISTS 979 if (opts->q_namespace && strlen(opts->q_namespace) > 0) { 980 bool exists; 981 982 r = rbd_namespace_exists(*io_ctx, opts->q_namespace, &exists); 983 if (r < 0) { 984 error_setg_errno(errp, -r, "error checking namespace"); 985 goto failed_ioctx_destroy; 986 } 987 988 if (!exists) { 989 error_setg(errp, "namespace '%s' does not exist", 990 opts->q_namespace); 991 r = -ENOENT; 992 goto failed_ioctx_destroy; 993 } 994 } 995 #endif 996 997 /* 998 * Set the namespace after opening the io context on the pool, 999 * if nspace == NULL or if nspace == "", it is just as we did nothing 1000 */ 1001 rados_ioctx_set_namespace(*io_ctx, opts->q_namespace); 1002 1003 r = 0; 1004 goto out; 1005 1006 #ifdef HAVE_RBD_NAMESPACE_EXISTS 1007 failed_ioctx_destroy: 1008 rados_ioctx_destroy(*io_ctx); 1009 #endif 1010 failed_shutdown: 1011 rados_shutdown(*cluster); 1012 out: 1013 g_free(mon_host); 1014 return r; 1015 } 1016 1017 static int qemu_rbd_convert_options(QDict *options, BlockdevOptionsRbd **opts, 1018 Error **errp) 1019 { 1020 Visitor *v; 1021 1022 /* Convert the remaining options into a QAPI object */ 1023 v = qobject_input_visitor_new_flat_confused(options, errp); 1024 if (!v) { 1025 return -EINVAL; 1026 } 1027 1028 visit_type_BlockdevOptionsRbd(v, NULL, opts, errp); 1029 visit_free(v); 1030 if (!opts) { 1031 return -EINVAL; 1032 } 1033 1034 return 0; 1035 } 1036 1037 static int qemu_rbd_attempt_legacy_options(QDict *options, 1038 BlockdevOptionsRbd **opts, 1039 char **keypairs) 1040 { 1041 char *filename; 1042 int r; 1043 1044 filename = g_strdup(qdict_get_try_str(options, "filename")); 1045 if (!filename) { 1046 return -EINVAL; 1047 } 1048 qdict_del(options, "filename"); 1049 1050 qemu_rbd_parse_filename(filename, options, NULL); 1051 1052 /* keypairs freed by caller */ 1053 *keypairs = g_strdup(qdict_get_try_str(options, "=keyvalue-pairs")); 1054 if (*keypairs) { 1055 qdict_del(options, "=keyvalue-pairs"); 1056 } 1057 1058 r = qemu_rbd_convert_options(options, opts, NULL); 1059 1060 g_free(filename); 1061 return r; 1062 } 1063 1064 static int qemu_rbd_open(BlockDriverState *bs, QDict *options, int flags, 1065 Error **errp) 1066 { 1067 BDRVRBDState *s = bs->opaque; 1068 BlockdevOptionsRbd *opts = NULL; 1069 const QDictEntry *e; 1070 Error *local_err = NULL; 1071 char *keypairs, *secretid; 1072 rbd_image_info_t info; 1073 int r; 1074 1075 keypairs = g_strdup(qdict_get_try_str(options, "=keyvalue-pairs")); 1076 if (keypairs) { 1077 qdict_del(options, "=keyvalue-pairs"); 1078 } 1079 1080 secretid = g_strdup(qdict_get_try_str(options, "password-secret")); 1081 if (secretid) { 1082 qdict_del(options, "password-secret"); 1083 } 1084 1085 r = qemu_rbd_convert_options(options, &opts, &local_err); 1086 if (local_err) { 1087 /* If keypairs are present, that means some options are present in 1088 * the modern option format. Don't attempt to parse legacy option 1089 * formats, as we won't support mixed usage. */ 1090 if (keypairs) { 1091 error_propagate(errp, local_err); 1092 goto out; 1093 } 1094 1095 /* If the initial attempt to convert and process the options failed, 1096 * we may be attempting to open an image file that has the rbd options 1097 * specified in the older format consisting of all key/value pairs 1098 * encoded in the filename. Go ahead and attempt to parse the 1099 * filename, and see if we can pull out the required options. */ 1100 r = qemu_rbd_attempt_legacy_options(options, &opts, &keypairs); 1101 if (r < 0) { 1102 /* Propagate the original error, not the legacy parsing fallback 1103 * error, as the latter was just a best-effort attempt. */ 1104 error_propagate(errp, local_err); 1105 goto out; 1106 } 1107 /* Take care whenever deciding to actually deprecate; once this ability 1108 * is removed, we will not be able to open any images with legacy-styled 1109 * backing image strings. */ 1110 warn_report("RBD options encoded in the filename as keyvalue pairs " 1111 "is deprecated"); 1112 } 1113 1114 /* Remove the processed options from the QDict (the visitor processes 1115 * _all_ options in the QDict) */ 1116 while ((e = qdict_first(options))) { 1117 qdict_del(options, e->key); 1118 } 1119 1120 r = qemu_rbd_connect(&s->cluster, &s->io_ctx, opts, 1121 !(flags & BDRV_O_NOCACHE), keypairs, secretid, errp); 1122 if (r < 0) { 1123 goto out; 1124 } 1125 1126 s->snap = g_strdup(opts->snapshot); 1127 s->image_name = g_strdup(opts->image); 1128 1129 /* rbd_open is always r/w */ 1130 r = rbd_open(s->io_ctx, s->image_name, &s->image, s->snap); 1131 if (r < 0) { 1132 error_setg_errno(errp, -r, "error reading header from %s", 1133 s->image_name); 1134 goto failed_open; 1135 } 1136 1137 if (opts->encrypt) { 1138 #ifdef LIBRBD_SUPPORTS_ENCRYPTION 1139 if (opts->encrypt->parent) { 1140 #ifdef LIBRBD_SUPPORTS_ENCRYPTION_LOAD2 1141 r = qemu_rbd_encryption_load2(s->image, opts->encrypt, errp); 1142 #else 1143 r = -ENOTSUP; 1144 error_setg(errp, "RBD library does not support layered encryption"); 1145 #endif 1146 } else { 1147 r = qemu_rbd_encryption_load(s->image, opts->encrypt, errp); 1148 } 1149 if (r < 0) { 1150 goto failed_post_open; 1151 } 1152 #else 1153 r = -ENOTSUP; 1154 error_setg(errp, "RBD library does not support image encryption"); 1155 goto failed_post_open; 1156 #endif 1157 } 1158 1159 r = rbd_stat(s->image, &info, sizeof(info)); 1160 if (r < 0) { 1161 error_setg_errno(errp, -r, "error getting image info from %s", 1162 s->image_name); 1163 goto failed_post_open; 1164 } 1165 s->image_size = info.size; 1166 s->object_size = info.obj_size; 1167 1168 /* If we are using an rbd snapshot, we must be r/o, otherwise 1169 * leave as-is */ 1170 if (s->snap != NULL) { 1171 bdrv_graph_rdlock_main_loop(); 1172 r = bdrv_apply_auto_read_only(bs, "rbd snapshots are read-only", errp); 1173 bdrv_graph_rdunlock_main_loop(); 1174 if (r < 0) { 1175 goto failed_post_open; 1176 } 1177 } 1178 1179 #ifdef LIBRBD_SUPPORTS_WRITE_ZEROES 1180 bs->supported_zero_flags = BDRV_REQ_MAY_UNMAP | BDRV_REQ_NO_FALLBACK; 1181 #endif 1182 1183 /* When extending regular files, we get zeros from the OS */ 1184 bs->supported_truncate_flags = BDRV_REQ_ZERO_WRITE; 1185 1186 r = 0; 1187 goto out; 1188 1189 failed_post_open: 1190 rbd_close(s->image); 1191 failed_open: 1192 rados_ioctx_destroy(s->io_ctx); 1193 g_free(s->snap); 1194 g_free(s->image_name); 1195 rados_shutdown(s->cluster); 1196 out: 1197 qapi_free_BlockdevOptionsRbd(opts); 1198 g_free(keypairs); 1199 g_free(secretid); 1200 return r; 1201 } 1202 1203 1204 /* Since RBD is currently always opened R/W via the API, 1205 * we just need to check if we are using a snapshot or not, in 1206 * order to determine if we will allow it to be R/W */ 1207 static int qemu_rbd_reopen_prepare(BDRVReopenState *state, 1208 BlockReopenQueue *queue, Error **errp) 1209 { 1210 BDRVRBDState *s = state->bs->opaque; 1211 int ret = 0; 1212 1213 GRAPH_RDLOCK_GUARD_MAINLOOP(); 1214 1215 if (s->snap && state->flags & BDRV_O_RDWR) { 1216 error_setg(errp, 1217 "Cannot change node '%s' to r/w when using RBD snapshot", 1218 bdrv_get_device_or_node_name(state->bs)); 1219 ret = -EINVAL; 1220 } 1221 1222 return ret; 1223 } 1224 1225 static void qemu_rbd_close(BlockDriverState *bs) 1226 { 1227 BDRVRBDState *s = bs->opaque; 1228 1229 rbd_close(s->image); 1230 rados_ioctx_destroy(s->io_ctx); 1231 g_free(s->snap); 1232 g_free(s->image_name); 1233 rados_shutdown(s->cluster); 1234 } 1235 1236 /* Resize the RBD image and update the 'image_size' with the current size */ 1237 static int qemu_rbd_resize(BlockDriverState *bs, uint64_t size) 1238 { 1239 BDRVRBDState *s = bs->opaque; 1240 int r; 1241 1242 r = rbd_resize(s->image, size); 1243 if (r < 0) { 1244 return r; 1245 } 1246 1247 s->image_size = size; 1248 1249 return 0; 1250 } 1251 1252 static void qemu_rbd_finish_bh(void *opaque) 1253 { 1254 RBDTask *task = opaque; 1255 task->complete = true; 1256 aio_co_wake(task->co); 1257 } 1258 1259 /* 1260 * This is the completion callback function for all rbd aio calls 1261 * started from qemu_rbd_start_co(). 1262 * 1263 * Note: this function is being called from a non qemu thread so 1264 * we need to be careful about what we do here. Generally we only 1265 * schedule a BH, and do the rest of the io completion handling 1266 * from qemu_rbd_finish_bh() which runs in a qemu context. 1267 */ 1268 static void qemu_rbd_completion_cb(rbd_completion_t c, RBDTask *task) 1269 { 1270 task->ret = rbd_aio_get_return_value(c); 1271 rbd_aio_release(c); 1272 aio_bh_schedule_oneshot(bdrv_get_aio_context(task->bs), 1273 qemu_rbd_finish_bh, task); 1274 } 1275 1276 static int coroutine_fn qemu_rbd_start_co(BlockDriverState *bs, 1277 uint64_t offset, 1278 uint64_t bytes, 1279 QEMUIOVector *qiov, 1280 int flags, 1281 RBDAIOCmd cmd) 1282 { 1283 BDRVRBDState *s = bs->opaque; 1284 RBDTask task = { .bs = bs, .co = qemu_coroutine_self() }; 1285 rbd_completion_t c; 1286 int r; 1287 1288 assert(!qiov || qiov->size == bytes); 1289 1290 if (cmd == RBD_AIO_WRITE || cmd == RBD_AIO_WRITE_ZEROES) { 1291 /* 1292 * RBD APIs don't allow us to write more than actual size, so in order 1293 * to support growing images, we resize the image before write 1294 * operations that exceed the current size. 1295 */ 1296 if (offset + bytes > s->image_size) { 1297 r = qemu_rbd_resize(bs, offset + bytes); 1298 if (r < 0) { 1299 return r; 1300 } 1301 } 1302 } 1303 1304 r = rbd_aio_create_completion(&task, 1305 (rbd_callback_t) qemu_rbd_completion_cb, &c); 1306 if (r < 0) { 1307 return r; 1308 } 1309 1310 switch (cmd) { 1311 case RBD_AIO_READ: 1312 r = rbd_aio_readv(s->image, qiov->iov, qiov->niov, offset, c); 1313 break; 1314 case RBD_AIO_WRITE: 1315 r = rbd_aio_writev(s->image, qiov->iov, qiov->niov, offset, c); 1316 break; 1317 case RBD_AIO_DISCARD: 1318 r = rbd_aio_discard(s->image, offset, bytes, c); 1319 break; 1320 case RBD_AIO_FLUSH: 1321 r = rbd_aio_flush(s->image, c); 1322 break; 1323 #ifdef LIBRBD_SUPPORTS_WRITE_ZEROES 1324 case RBD_AIO_WRITE_ZEROES: { 1325 int zero_flags = 0; 1326 #ifdef RBD_WRITE_ZEROES_FLAG_THICK_PROVISION 1327 if (!(flags & BDRV_REQ_MAY_UNMAP)) { 1328 zero_flags = RBD_WRITE_ZEROES_FLAG_THICK_PROVISION; 1329 } 1330 #endif 1331 r = rbd_aio_write_zeroes(s->image, offset, bytes, c, zero_flags, 0); 1332 break; 1333 } 1334 #endif 1335 default: 1336 r = -EINVAL; 1337 } 1338 1339 if (r < 0) { 1340 error_report("rbd request failed early: cmd %d offset %" PRIu64 1341 " bytes %" PRIu64 " flags %d r %d (%s)", cmd, offset, 1342 bytes, flags, r, strerror(-r)); 1343 rbd_aio_release(c); 1344 return r; 1345 } 1346 1347 while (!task.complete) { 1348 qemu_coroutine_yield(); 1349 } 1350 1351 if (task.ret < 0) { 1352 error_report("rbd request failed: cmd %d offset %" PRIu64 " bytes %" 1353 PRIu64 " flags %d task.ret %" PRIi64 " (%s)", cmd, offset, 1354 bytes, flags, task.ret, strerror(-task.ret)); 1355 return task.ret; 1356 } 1357 1358 /* zero pad short reads */ 1359 if (cmd == RBD_AIO_READ && task.ret < qiov->size) { 1360 qemu_iovec_memset(qiov, task.ret, 0, qiov->size - task.ret); 1361 } 1362 1363 return 0; 1364 } 1365 1366 static int 1367 coroutine_fn qemu_rbd_co_preadv(BlockDriverState *bs, int64_t offset, 1368 int64_t bytes, QEMUIOVector *qiov, 1369 BdrvRequestFlags flags) 1370 { 1371 return qemu_rbd_start_co(bs, offset, bytes, qiov, flags, RBD_AIO_READ); 1372 } 1373 1374 static int 1375 coroutine_fn qemu_rbd_co_pwritev(BlockDriverState *bs, int64_t offset, 1376 int64_t bytes, QEMUIOVector *qiov, 1377 BdrvRequestFlags flags) 1378 { 1379 return qemu_rbd_start_co(bs, offset, bytes, qiov, flags, RBD_AIO_WRITE); 1380 } 1381 1382 static int coroutine_fn qemu_rbd_co_flush(BlockDriverState *bs) 1383 { 1384 return qemu_rbd_start_co(bs, 0, 0, NULL, 0, RBD_AIO_FLUSH); 1385 } 1386 1387 static int coroutine_fn qemu_rbd_co_pdiscard(BlockDriverState *bs, 1388 int64_t offset, int64_t bytes) 1389 { 1390 return qemu_rbd_start_co(bs, offset, bytes, NULL, 0, RBD_AIO_DISCARD); 1391 } 1392 1393 #ifdef LIBRBD_SUPPORTS_WRITE_ZEROES 1394 static int 1395 coroutine_fn qemu_rbd_co_pwrite_zeroes(BlockDriverState *bs, int64_t offset, 1396 int64_t bytes, BdrvRequestFlags flags) 1397 { 1398 return qemu_rbd_start_co(bs, offset, bytes, NULL, flags, 1399 RBD_AIO_WRITE_ZEROES); 1400 } 1401 #endif 1402 1403 static int coroutine_fn 1404 qemu_rbd_co_get_info(BlockDriverState *bs, BlockDriverInfo *bdi) 1405 { 1406 BDRVRBDState *s = bs->opaque; 1407 bdi->cluster_size = s->object_size; 1408 return 0; 1409 } 1410 1411 static ImageInfoSpecific *qemu_rbd_get_specific_info(BlockDriverState *bs, 1412 Error **errp) 1413 { 1414 BDRVRBDState *s = bs->opaque; 1415 ImageInfoSpecific *spec_info; 1416 char buf[RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN] = {0}; 1417 int r; 1418 1419 if (s->image_size >= RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN) { 1420 r = rbd_read(s->image, 0, 1421 RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN, buf); 1422 if (r < 0) { 1423 error_setg_errno(errp, -r, "cannot read image start for probe"); 1424 return NULL; 1425 } 1426 } 1427 1428 spec_info = g_new(ImageInfoSpecific, 1); 1429 *spec_info = (ImageInfoSpecific){ 1430 .type = IMAGE_INFO_SPECIFIC_KIND_RBD, 1431 .u.rbd.data = g_new0(ImageInfoSpecificRbd, 1), 1432 }; 1433 1434 if (memcmp(buf, rbd_luks_header_verification, 1435 RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN) == 0) { 1436 spec_info->u.rbd.data->encryption_format = 1437 RBD_IMAGE_ENCRYPTION_FORMAT_LUKS; 1438 spec_info->u.rbd.data->has_encryption_format = true; 1439 } else if (memcmp(buf, rbd_luks2_header_verification, 1440 RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN) == 0) { 1441 spec_info->u.rbd.data->encryption_format = 1442 RBD_IMAGE_ENCRYPTION_FORMAT_LUKS2; 1443 spec_info->u.rbd.data->has_encryption_format = true; 1444 } else if (memcmp(buf, rbd_layered_luks_header_verification, 1445 RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN) == 0) { 1446 spec_info->u.rbd.data->encryption_format = 1447 RBD_IMAGE_ENCRYPTION_FORMAT_LUKS; 1448 spec_info->u.rbd.data->has_encryption_format = true; 1449 } else if (memcmp(buf, rbd_layered_luks2_header_verification, 1450 RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN) == 0) { 1451 spec_info->u.rbd.data->encryption_format = 1452 RBD_IMAGE_ENCRYPTION_FORMAT_LUKS2; 1453 spec_info->u.rbd.data->has_encryption_format = true; 1454 } else { 1455 spec_info->u.rbd.data->has_encryption_format = false; 1456 } 1457 1458 return spec_info; 1459 } 1460 1461 /* 1462 * rbd_diff_iterate2 allows to interrupt the exection by returning a negative 1463 * value in the callback routine. Choose a value that does not conflict with 1464 * an existing exitcode and return it if we want to prematurely stop the 1465 * execution because we detected a change in the allocation status. 1466 */ 1467 #define QEMU_RBD_EXIT_DIFF_ITERATE2 -9000 1468 1469 static int qemu_rbd_diff_iterate_cb(uint64_t offs, size_t len, 1470 int exists, void *opaque) 1471 { 1472 RBDDiffIterateReq *req = opaque; 1473 1474 assert(req->offs + req->bytes <= offs); 1475 1476 /* treat a hole like an unallocated area and bail out */ 1477 if (!exists) { 1478 return 0; 1479 } 1480 1481 if (!req->exists && offs > req->offs) { 1482 /* 1483 * we started in an unallocated area and hit the first allocated 1484 * block. req->bytes must be set to the length of the unallocated area 1485 * before the allocated area. stop further processing. 1486 */ 1487 req->bytes = offs - req->offs; 1488 return QEMU_RBD_EXIT_DIFF_ITERATE2; 1489 } 1490 1491 if (req->exists && offs > req->offs + req->bytes) { 1492 /* 1493 * we started in an allocated area and jumped over an unallocated area, 1494 * req->bytes contains the length of the allocated area before the 1495 * unallocated area. stop further processing. 1496 */ 1497 return QEMU_RBD_EXIT_DIFF_ITERATE2; 1498 } 1499 1500 req->bytes += len; 1501 req->exists = true; 1502 1503 return 0; 1504 } 1505 1506 static int coroutine_fn qemu_rbd_co_block_status(BlockDriverState *bs, 1507 bool want_zero, int64_t offset, 1508 int64_t bytes, int64_t *pnum, 1509 int64_t *map, 1510 BlockDriverState **file) 1511 { 1512 BDRVRBDState *s = bs->opaque; 1513 int status, r; 1514 RBDDiffIterateReq req = { .offs = offset }; 1515 uint64_t features, flags; 1516 uint64_t head = 0; 1517 1518 assert(offset + bytes <= s->image_size); 1519 1520 /* default to all sectors allocated */ 1521 status = BDRV_BLOCK_DATA | BDRV_BLOCK_OFFSET_VALID; 1522 *map = offset; 1523 *file = bs; 1524 *pnum = bytes; 1525 1526 /* check if RBD image supports fast-diff */ 1527 r = rbd_get_features(s->image, &features); 1528 if (r < 0) { 1529 return status; 1530 } 1531 if (!(features & RBD_FEATURE_FAST_DIFF)) { 1532 return status; 1533 } 1534 1535 /* check if RBD fast-diff result is valid */ 1536 r = rbd_get_flags(s->image, &flags); 1537 if (r < 0) { 1538 return status; 1539 } 1540 if (flags & RBD_FLAG_FAST_DIFF_INVALID) { 1541 return status; 1542 } 1543 1544 #if LIBRBD_VERSION_CODE < LIBRBD_VERSION(1, 17, 0) 1545 /* 1546 * librbd had a bug until early 2022 that affected all versions of ceph that 1547 * supported fast-diff. This bug results in reporting of incorrect offsets 1548 * if the offset parameter to rbd_diff_iterate2 is not object aligned. 1549 * Work around this bug by rounding down the offset to object boundaries. 1550 * This is OK because we call rbd_diff_iterate2 with whole_object = true. 1551 * However, this workaround only works for non cloned images with default 1552 * striping. 1553 * 1554 * See: https://tracker.ceph.com/issues/53784 1555 */ 1556 1557 /* check if RBD image has non-default striping enabled */ 1558 if (features & RBD_FEATURE_STRIPINGV2) { 1559 return status; 1560 } 1561 1562 #pragma GCC diagnostic push 1563 #pragma GCC diagnostic ignored "-Wdeprecated-declarations" 1564 /* 1565 * check if RBD image is a clone (= has a parent). 1566 * 1567 * rbd_get_parent_info is deprecated from Nautilus onwards, but the 1568 * replacement rbd_get_parent is not present in Luminous and Mimic. 1569 */ 1570 if (rbd_get_parent_info(s->image, NULL, 0, NULL, 0, NULL, 0) != -ENOENT) { 1571 return status; 1572 } 1573 #pragma GCC diagnostic pop 1574 1575 head = req.offs & (s->object_size - 1); 1576 req.offs -= head; 1577 bytes += head; 1578 #endif 1579 1580 r = rbd_diff_iterate2(s->image, NULL, req.offs, bytes, true, true, 1581 qemu_rbd_diff_iterate_cb, &req); 1582 if (r < 0 && r != QEMU_RBD_EXIT_DIFF_ITERATE2) { 1583 return status; 1584 } 1585 assert(req.bytes <= bytes); 1586 if (!req.exists) { 1587 if (r == 0) { 1588 /* 1589 * rbd_diff_iterate2 does not invoke callbacks for unallocated 1590 * areas. This here catches the case where no callback was 1591 * invoked at all (req.bytes == 0). 1592 */ 1593 assert(req.bytes == 0); 1594 req.bytes = bytes; 1595 } 1596 status = BDRV_BLOCK_ZERO | BDRV_BLOCK_OFFSET_VALID; 1597 } 1598 1599 assert(req.bytes > head); 1600 *pnum = req.bytes - head; 1601 return status; 1602 } 1603 1604 static int64_t coroutine_fn qemu_rbd_co_getlength(BlockDriverState *bs) 1605 { 1606 BDRVRBDState *s = bs->opaque; 1607 int r; 1608 1609 r = rbd_get_size(s->image, &s->image_size); 1610 if (r < 0) { 1611 return r; 1612 } 1613 1614 return s->image_size; 1615 } 1616 1617 static int coroutine_fn qemu_rbd_co_truncate(BlockDriverState *bs, 1618 int64_t offset, 1619 bool exact, 1620 PreallocMode prealloc, 1621 BdrvRequestFlags flags, 1622 Error **errp) 1623 { 1624 int r; 1625 1626 if (prealloc != PREALLOC_MODE_OFF) { 1627 error_setg(errp, "Unsupported preallocation mode '%s'", 1628 PreallocMode_str(prealloc)); 1629 return -ENOTSUP; 1630 } 1631 1632 r = qemu_rbd_resize(bs, offset); 1633 if (r < 0) { 1634 error_setg_errno(errp, -r, "Failed to resize file"); 1635 return r; 1636 } 1637 1638 return 0; 1639 } 1640 1641 static int qemu_rbd_snap_create(BlockDriverState *bs, 1642 QEMUSnapshotInfo *sn_info) 1643 { 1644 BDRVRBDState *s = bs->opaque; 1645 int r; 1646 1647 if (sn_info->name[0] == '\0') { 1648 return -EINVAL; /* we need a name for rbd snapshots */ 1649 } 1650 1651 /* 1652 * rbd snapshots are using the name as the user controlled unique identifier 1653 * we can't use the rbd snapid for that purpose, as it can't be set 1654 */ 1655 if (sn_info->id_str[0] != '\0' && 1656 strcmp(sn_info->id_str, sn_info->name) != 0) { 1657 return -EINVAL; 1658 } 1659 1660 if (strlen(sn_info->name) >= sizeof(sn_info->id_str)) { 1661 return -ERANGE; 1662 } 1663 1664 r = rbd_snap_create(s->image, sn_info->name); 1665 if (r < 0) { 1666 error_report("failed to create snap: %s", strerror(-r)); 1667 return r; 1668 } 1669 1670 return 0; 1671 } 1672 1673 static int qemu_rbd_snap_remove(BlockDriverState *bs, 1674 const char *snapshot_id, 1675 const char *snapshot_name, 1676 Error **errp) 1677 { 1678 BDRVRBDState *s = bs->opaque; 1679 int r; 1680 1681 if (!snapshot_name) { 1682 error_setg(errp, "rbd need a valid snapshot name"); 1683 return -EINVAL; 1684 } 1685 1686 /* If snapshot_id is specified, it must be equal to name, see 1687 qemu_rbd_snap_list() */ 1688 if (snapshot_id && strcmp(snapshot_id, snapshot_name)) { 1689 error_setg(errp, 1690 "rbd do not support snapshot id, it should be NULL or " 1691 "equal to snapshot name"); 1692 return -EINVAL; 1693 } 1694 1695 r = rbd_snap_remove(s->image, snapshot_name); 1696 if (r < 0) { 1697 error_setg_errno(errp, -r, "Failed to remove the snapshot"); 1698 } 1699 return r; 1700 } 1701 1702 static int qemu_rbd_snap_rollback(BlockDriverState *bs, 1703 const char *snapshot_name) 1704 { 1705 BDRVRBDState *s = bs->opaque; 1706 1707 return rbd_snap_rollback(s->image, snapshot_name); 1708 } 1709 1710 static int qemu_rbd_snap_list(BlockDriverState *bs, 1711 QEMUSnapshotInfo **psn_tab) 1712 { 1713 BDRVRBDState *s = bs->opaque; 1714 QEMUSnapshotInfo *sn_info, *sn_tab = NULL; 1715 int i, snap_count; 1716 rbd_snap_info_t *snaps; 1717 int max_snaps = RBD_MAX_SNAPS; 1718 1719 do { 1720 snaps = g_new(rbd_snap_info_t, max_snaps); 1721 snap_count = rbd_snap_list(s->image, snaps, &max_snaps); 1722 if (snap_count <= 0) { 1723 g_free(snaps); 1724 } 1725 } while (snap_count == -ERANGE); 1726 1727 if (snap_count <= 0) { 1728 goto done; 1729 } 1730 1731 sn_tab = g_new0(QEMUSnapshotInfo, snap_count); 1732 1733 for (i = 0; i < snap_count; i++) { 1734 const char *snap_name = snaps[i].name; 1735 1736 sn_info = sn_tab + i; 1737 pstrcpy(sn_info->id_str, sizeof(sn_info->id_str), snap_name); 1738 pstrcpy(sn_info->name, sizeof(sn_info->name), snap_name); 1739 1740 sn_info->vm_state_size = snaps[i].size; 1741 sn_info->date_sec = 0; 1742 sn_info->date_nsec = 0; 1743 sn_info->vm_clock_nsec = 0; 1744 } 1745 rbd_snap_list_end(snaps); 1746 g_free(snaps); 1747 1748 done: 1749 *psn_tab = sn_tab; 1750 return snap_count; 1751 } 1752 1753 static void coroutine_fn qemu_rbd_co_invalidate_cache(BlockDriverState *bs, 1754 Error **errp) 1755 { 1756 BDRVRBDState *s = bs->opaque; 1757 int r = rbd_invalidate_cache(s->image); 1758 if (r < 0) { 1759 error_setg_errno(errp, -r, "Failed to invalidate the cache"); 1760 } 1761 } 1762 1763 static QemuOptsList qemu_rbd_create_opts = { 1764 .name = "rbd-create-opts", 1765 .head = QTAILQ_HEAD_INITIALIZER(qemu_rbd_create_opts.head), 1766 .desc = { 1767 { 1768 .name = BLOCK_OPT_SIZE, 1769 .type = QEMU_OPT_SIZE, 1770 .help = "Virtual disk size" 1771 }, 1772 { 1773 .name = BLOCK_OPT_CLUSTER_SIZE, 1774 .type = QEMU_OPT_SIZE, 1775 .help = "RBD object size" 1776 }, 1777 { 1778 .name = "password-secret", 1779 .type = QEMU_OPT_STRING, 1780 .help = "ID of secret providing the password", 1781 }, 1782 { 1783 .name = "encrypt.format", 1784 .type = QEMU_OPT_STRING, 1785 .help = "Encrypt the image, format choices: 'luks', 'luks2'", 1786 }, 1787 { 1788 .name = "encrypt.cipher-alg", 1789 .type = QEMU_OPT_STRING, 1790 .help = "Name of encryption cipher algorithm" 1791 " (allowed values: aes-128, aes-256)", 1792 }, 1793 { 1794 .name = "encrypt.key-secret", 1795 .type = QEMU_OPT_STRING, 1796 .help = "ID of secret providing LUKS passphrase", 1797 }, 1798 { /* end of list */ } 1799 } 1800 }; 1801 1802 static const char *const qemu_rbd_strong_runtime_opts[] = { 1803 "pool", 1804 "namespace", 1805 "image", 1806 "conf", 1807 "snapshot", 1808 "user", 1809 "server.", 1810 "password-secret", 1811 1812 NULL 1813 }; 1814 1815 static BlockDriver bdrv_rbd = { 1816 .format_name = "rbd", 1817 .instance_size = sizeof(BDRVRBDState), 1818 1819 .bdrv_parse_filename = qemu_rbd_parse_filename, 1820 .bdrv_open = qemu_rbd_open, 1821 .bdrv_close = qemu_rbd_close, 1822 .bdrv_reopen_prepare = qemu_rbd_reopen_prepare, 1823 .bdrv_co_create = qemu_rbd_co_create, 1824 .bdrv_co_create_opts = qemu_rbd_co_create_opts, 1825 .bdrv_has_zero_init = bdrv_has_zero_init_1, 1826 .bdrv_co_get_info = qemu_rbd_co_get_info, 1827 .bdrv_get_specific_info = qemu_rbd_get_specific_info, 1828 .create_opts = &qemu_rbd_create_opts, 1829 .bdrv_co_getlength = qemu_rbd_co_getlength, 1830 .bdrv_co_truncate = qemu_rbd_co_truncate, 1831 .protocol_name = "rbd", 1832 1833 .bdrv_co_preadv = qemu_rbd_co_preadv, 1834 .bdrv_co_pwritev = qemu_rbd_co_pwritev, 1835 .bdrv_co_flush_to_disk = qemu_rbd_co_flush, 1836 .bdrv_co_pdiscard = qemu_rbd_co_pdiscard, 1837 #ifdef LIBRBD_SUPPORTS_WRITE_ZEROES 1838 .bdrv_co_pwrite_zeroes = qemu_rbd_co_pwrite_zeroes, 1839 #endif 1840 .bdrv_co_block_status = qemu_rbd_co_block_status, 1841 1842 .bdrv_snapshot_create = qemu_rbd_snap_create, 1843 .bdrv_snapshot_delete = qemu_rbd_snap_remove, 1844 .bdrv_snapshot_list = qemu_rbd_snap_list, 1845 .bdrv_snapshot_goto = qemu_rbd_snap_rollback, 1846 .bdrv_co_invalidate_cache = qemu_rbd_co_invalidate_cache, 1847 1848 .strong_runtime_opts = qemu_rbd_strong_runtime_opts, 1849 }; 1850 1851 static void bdrv_rbd_init(void) 1852 { 1853 bdrv_register(&bdrv_rbd); 1854 } 1855 1856 block_init(bdrv_rbd_init); 1857