1 /* 2 * QEMU Block driver for RADOS (Ceph) 3 * 4 * Copyright (C) 2010-2011 Christian Brunner <chb@muc.de>, 5 * Josh Durgin <josh.durgin@dreamhost.com> 6 * 7 * This work is licensed under the terms of the GNU GPL, version 2. See 8 * the COPYING file in the top-level directory. 9 * 10 * Contributions after 2012-01-13 are licensed under the terms of the 11 * GNU GPL, version 2 or (at your option) any later version. 12 */ 13 14 #include "qemu/osdep.h" 15 16 #include <rbd/librbd.h> 17 #include "qapi/error.h" 18 #include "qemu/error-report.h" 19 #include "qemu/module.h" 20 #include "qemu/option.h" 21 #include "block/block-io.h" 22 #include "block/block_int.h" 23 #include "block/qdict.h" 24 #include "crypto/secret.h" 25 #include "qemu/cutils.h" 26 #include "system/replay.h" 27 #include "qobject/qstring.h" 28 #include "qobject/qdict.h" 29 #include "qobject/qjson.h" 30 #include "qobject/qlist.h" 31 #include "qapi/qobject-input-visitor.h" 32 #include "qapi/qapi-visit-block-core.h" 33 34 /* 35 * When specifying the image filename use: 36 * 37 * rbd:poolname/devicename[@snapshotname][:option1=value1[:option2=value2...]] 38 * 39 * poolname must be the name of an existing rados pool. 40 * 41 * devicename is the name of the rbd image. 42 * 43 * Each option given is used to configure rados, and may be any valid 44 * Ceph option, "id", or "conf". 45 * 46 * The "id" option indicates what user we should authenticate as to 47 * the Ceph cluster. If it is excluded we will use the Ceph default 48 * (normally 'admin'). 49 * 50 * The "conf" option specifies a Ceph configuration file to read. If 51 * it is not specified, we will read from the default Ceph locations 52 * (e.g., /etc/ceph/ceph.conf). To avoid reading _any_ configuration 53 * file, specify conf=/dev/null. 54 * 55 * Configuration values containing :, @, or = can be escaped with a 56 * leading "\". 57 */ 58 59 #define OBJ_MAX_SIZE (1UL << OBJ_DEFAULT_OBJ_ORDER) 60 61 #define RBD_MAX_SNAPS 100 62 63 #define RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN 8 64 65 static const char rbd_luks_header_verification[ 66 RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN] = { 67 'L', 'U', 'K', 'S', 0xBA, 0xBE, 0, 1 68 }; 69 70 static const char rbd_luks2_header_verification[ 71 RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN] = { 72 'L', 'U', 'K', 'S', 0xBA, 0xBE, 0, 2 73 }; 74 75 static const char rbd_layered_luks_header_verification[ 76 RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN] = { 77 'R', 'B', 'D', 'L', 0xBA, 0xBE, 0, 1 78 }; 79 80 static const char rbd_layered_luks2_header_verification[ 81 RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN] = { 82 'R', 'B', 'D', 'L', 0xBA, 0xBE, 0, 2 83 }; 84 85 typedef enum { 86 RBD_AIO_READ, 87 RBD_AIO_WRITE, 88 RBD_AIO_DISCARD, 89 RBD_AIO_FLUSH, 90 RBD_AIO_WRITE_ZEROES 91 } RBDAIOCmd; 92 93 typedef struct BDRVRBDState { 94 rados_t cluster; 95 rados_ioctx_t io_ctx; 96 rbd_image_t image; 97 char *image_name; 98 char *snap; 99 char *namespace; 100 uint64_t image_size; 101 uint64_t object_size; 102 103 /* 104 * If @bs->encrypted is true, this is the encryption format actually loaded 105 * at the librbd level. If it is false, it is the result of probing. 106 * RBD_IMAGE_ENCRYPTION_FORMAT__MAX means that encryption is not enabled and 107 * probing didn't find any known encryption header either. 108 */ 109 RbdImageEncryptionFormat encryption_format; 110 } BDRVRBDState; 111 112 typedef struct RBDTask { 113 BlockDriverState *bs; 114 Coroutine *co; 115 bool complete; 116 int64_t ret; 117 } RBDTask; 118 119 typedef struct RBDDiffIterateReq { 120 uint64_t offs; 121 uint64_t bytes; 122 bool exists; 123 } RBDDiffIterateReq; 124 125 static int qemu_rbd_connect(rados_t *cluster, rados_ioctx_t *io_ctx, 126 BlockdevOptionsRbd *opts, bool cache, 127 const char *keypairs, const char *secretid, 128 Error **errp); 129 130 static char *qemu_rbd_strchr(char *src, char delim) 131 { 132 char *p; 133 134 for (p = src; *p; ++p) { 135 if (*p == delim) { 136 return p; 137 } 138 if (*p == '\\' && p[1] != '\0') { 139 ++p; 140 } 141 } 142 143 return NULL; 144 } 145 146 147 static char *qemu_rbd_next_tok(char *src, char delim, char **p) 148 { 149 char *end; 150 151 *p = NULL; 152 153 end = qemu_rbd_strchr(src, delim); 154 if (end) { 155 *p = end + 1; 156 *end = '\0'; 157 } 158 return src; 159 } 160 161 static void qemu_rbd_unescape(char *src) 162 { 163 char *p; 164 165 for (p = src; *src; ++src, ++p) { 166 if (*src == '\\' && src[1] != '\0') { 167 src++; 168 } 169 *p = *src; 170 } 171 *p = '\0'; 172 } 173 174 static void qemu_rbd_parse_filename(const char *filename, QDict *options, 175 Error **errp) 176 { 177 const char *start; 178 char *p, *buf; 179 QList *keypairs = NULL; 180 char *found_str, *image_name; 181 182 if (!strstart(filename, "rbd:", &start)) { 183 error_setg(errp, "File name must start with 'rbd:'"); 184 return; 185 } 186 187 buf = g_strdup(start); 188 p = buf; 189 190 found_str = qemu_rbd_next_tok(p, '/', &p); 191 if (!p) { 192 error_setg(errp, "Pool name is required"); 193 goto done; 194 } 195 qemu_rbd_unescape(found_str); 196 qdict_put_str(options, "pool", found_str); 197 198 if (qemu_rbd_strchr(p, '@')) { 199 image_name = qemu_rbd_next_tok(p, '@', &p); 200 201 found_str = qemu_rbd_next_tok(p, ':', &p); 202 qemu_rbd_unescape(found_str); 203 qdict_put_str(options, "snapshot", found_str); 204 } else { 205 image_name = qemu_rbd_next_tok(p, ':', &p); 206 } 207 /* Check for namespace in the image_name */ 208 if (qemu_rbd_strchr(image_name, '/')) { 209 found_str = qemu_rbd_next_tok(image_name, '/', &image_name); 210 qemu_rbd_unescape(found_str); 211 qdict_put_str(options, "namespace", found_str); 212 } else { 213 qdict_put_str(options, "namespace", ""); 214 } 215 qemu_rbd_unescape(image_name); 216 qdict_put_str(options, "image", image_name); 217 if (!p) { 218 goto done; 219 } 220 221 /* The following are essentially all key/value pairs, and we treat 222 * 'id' and 'conf' a bit special. Key/value pairs may be in any order. */ 223 while (p) { 224 char *name, *value; 225 name = qemu_rbd_next_tok(p, '=', &p); 226 if (!p) { 227 error_setg(errp, "conf option %s has no value", name); 228 break; 229 } 230 231 qemu_rbd_unescape(name); 232 233 value = qemu_rbd_next_tok(p, ':', &p); 234 qemu_rbd_unescape(value); 235 236 if (!strcmp(name, "conf")) { 237 qdict_put_str(options, "conf", value); 238 } else if (!strcmp(name, "id")) { 239 qdict_put_str(options, "user", value); 240 } else { 241 /* 242 * We pass these internally to qemu_rbd_set_keypairs(), so 243 * we can get away with the simpler list of [ "key1", 244 * "value1", "key2", "value2" ] rather than a raw dict 245 * { "key1": "value1", "key2": "value2" } where we can't 246 * guarantee order, or even a more correct but complex 247 * [ { "key1": "value1" }, { "key2": "value2" } ] 248 */ 249 if (!keypairs) { 250 keypairs = qlist_new(); 251 } 252 qlist_append_str(keypairs, name); 253 qlist_append_str(keypairs, value); 254 } 255 } 256 257 if (keypairs) { 258 qdict_put(options, "=keyvalue-pairs", 259 qstring_from_gstring(qobject_to_json(QOBJECT(keypairs)))); 260 } 261 262 done: 263 g_free(buf); 264 qobject_unref(keypairs); 265 } 266 267 static int qemu_rbd_set_auth(rados_t cluster, BlockdevOptionsRbd *opts, 268 Error **errp) 269 { 270 char *key, *acr; 271 int r; 272 GString *accu; 273 RbdAuthModeList *auth; 274 275 if (opts->key_secret) { 276 key = qcrypto_secret_lookup_as_base64(opts->key_secret, errp); 277 if (!key) { 278 return -EIO; 279 } 280 r = rados_conf_set(cluster, "key", key); 281 g_free(key); 282 if (r < 0) { 283 error_setg_errno(errp, -r, "Could not set 'key'"); 284 return r; 285 } 286 } 287 288 if (opts->has_auth_client_required) { 289 accu = g_string_new(""); 290 for (auth = opts->auth_client_required; auth; auth = auth->next) { 291 if (accu->str[0]) { 292 g_string_append_c(accu, ';'); 293 } 294 g_string_append(accu, RbdAuthMode_str(auth->value)); 295 } 296 acr = g_string_free(accu, FALSE); 297 r = rados_conf_set(cluster, "auth_client_required", acr); 298 g_free(acr); 299 if (r < 0) { 300 error_setg_errno(errp, -r, 301 "Could not set 'auth_client_required'"); 302 return r; 303 } 304 } 305 306 return 0; 307 } 308 309 static int qemu_rbd_set_keypairs(rados_t cluster, const char *keypairs_json, 310 Error **errp) 311 { 312 QList *keypairs; 313 QString *name; 314 QString *value; 315 const char *key; 316 size_t remaining; 317 int ret = 0; 318 319 if (!keypairs_json) { 320 return ret; 321 } 322 keypairs = qobject_to(QList, 323 qobject_from_json(keypairs_json, &error_abort)); 324 remaining = qlist_size(keypairs) / 2; 325 assert(remaining); 326 327 while (remaining--) { 328 name = qobject_to(QString, qlist_pop(keypairs)); 329 value = qobject_to(QString, qlist_pop(keypairs)); 330 assert(name && value); 331 key = qstring_get_str(name); 332 333 ret = rados_conf_set(cluster, key, qstring_get_str(value)); 334 qobject_unref(value); 335 if (ret < 0) { 336 error_setg_errno(errp, -ret, "invalid conf option %s", key); 337 qobject_unref(name); 338 ret = -EINVAL; 339 break; 340 } 341 qobject_unref(name); 342 } 343 344 qobject_unref(keypairs); 345 return ret; 346 } 347 348 #ifdef LIBRBD_SUPPORTS_ENCRYPTION 349 static int qemu_rbd_convert_luks_options( 350 RbdEncryptionOptionsLUKSBase *luks_opts, 351 char **passphrase, 352 size_t *passphrase_len, 353 Error **errp) 354 { 355 return qcrypto_secret_lookup(luks_opts->key_secret, (uint8_t **)passphrase, 356 passphrase_len, errp); 357 } 358 359 static int qemu_rbd_convert_luks_create_options( 360 RbdEncryptionCreateOptionsLUKSBase *luks_opts, 361 rbd_encryption_algorithm_t *alg, 362 char **passphrase, 363 size_t *passphrase_len, 364 Error **errp) 365 { 366 int r = 0; 367 368 r = qemu_rbd_convert_luks_options( 369 qapi_RbdEncryptionCreateOptionsLUKSBase_base(luks_opts), 370 passphrase, passphrase_len, errp); 371 if (r < 0) { 372 return r; 373 } 374 375 if (luks_opts->has_cipher_alg) { 376 switch (luks_opts->cipher_alg) { 377 case QCRYPTO_CIPHER_ALGO_AES_128: { 378 *alg = RBD_ENCRYPTION_ALGORITHM_AES128; 379 break; 380 } 381 case QCRYPTO_CIPHER_ALGO_AES_256: { 382 *alg = RBD_ENCRYPTION_ALGORITHM_AES256; 383 break; 384 } 385 default: { 386 r = -ENOTSUP; 387 error_setg_errno(errp, -r, "unknown encryption algorithm: %u", 388 luks_opts->cipher_alg); 389 return r; 390 } 391 } 392 } else { 393 /* default alg */ 394 *alg = RBD_ENCRYPTION_ALGORITHM_AES256; 395 } 396 397 return 0; 398 } 399 400 static int qemu_rbd_encryption_format(rbd_image_t image, 401 RbdEncryptionCreateOptions *encrypt, 402 Error **errp) 403 { 404 int r = 0; 405 g_autofree char *passphrase = NULL; 406 rbd_encryption_format_t format; 407 rbd_encryption_options_t opts; 408 rbd_encryption_luks1_format_options_t luks_opts; 409 rbd_encryption_luks2_format_options_t luks2_opts; 410 size_t opts_size; 411 uint64_t raw_size, effective_size; 412 413 r = rbd_get_size(image, &raw_size); 414 if (r < 0) { 415 error_setg_errno(errp, -r, "cannot get raw image size"); 416 return r; 417 } 418 419 switch (encrypt->format) { 420 case RBD_IMAGE_ENCRYPTION_FORMAT_LUKS: { 421 memset(&luks_opts, 0, sizeof(luks_opts)); 422 format = RBD_ENCRYPTION_FORMAT_LUKS1; 423 opts = &luks_opts; 424 opts_size = sizeof(luks_opts); 425 r = qemu_rbd_convert_luks_create_options( 426 qapi_RbdEncryptionCreateOptionsLUKS_base(&encrypt->u.luks), 427 &luks_opts.alg, &passphrase, &luks_opts.passphrase_size, 428 errp); 429 if (r < 0) { 430 return r; 431 } 432 luks_opts.passphrase = passphrase; 433 break; 434 } 435 case RBD_IMAGE_ENCRYPTION_FORMAT_LUKS2: { 436 memset(&luks2_opts, 0, sizeof(luks2_opts)); 437 format = RBD_ENCRYPTION_FORMAT_LUKS2; 438 opts = &luks2_opts; 439 opts_size = sizeof(luks2_opts); 440 r = qemu_rbd_convert_luks_create_options( 441 qapi_RbdEncryptionCreateOptionsLUKS2_base( 442 &encrypt->u.luks2), 443 &luks2_opts.alg, &passphrase, &luks2_opts.passphrase_size, 444 errp); 445 if (r < 0) { 446 return r; 447 } 448 luks2_opts.passphrase = passphrase; 449 break; 450 } 451 default: { 452 r = -ENOTSUP; 453 error_setg_errno( 454 errp, -r, "unknown image encryption format: %u", 455 encrypt->format); 456 return r; 457 } 458 } 459 460 r = rbd_encryption_format(image, format, opts, opts_size); 461 if (r < 0) { 462 error_setg_errno(errp, -r, "encryption format fail"); 463 return r; 464 } 465 466 r = rbd_get_size(image, &effective_size); 467 if (r < 0) { 468 error_setg_errno(errp, -r, "cannot get effective image size"); 469 return r; 470 } 471 472 r = rbd_resize(image, raw_size + (raw_size - effective_size)); 473 if (r < 0) { 474 error_setg_errno(errp, -r, "cannot resize image after format"); 475 return r; 476 } 477 478 return 0; 479 } 480 481 static int qemu_rbd_encryption_load(BlockDriverState *bs, 482 rbd_image_t image, 483 RbdEncryptionOptions *encrypt, 484 Error **errp) 485 { 486 BDRVRBDState *s = bs->opaque; 487 int r = 0; 488 g_autofree char *passphrase = NULL; 489 rbd_encryption_luks1_format_options_t luks_opts; 490 rbd_encryption_luks2_format_options_t luks2_opts; 491 #ifdef LIBRBD_SUPPORTS_ENCRYPTION_LOAD2 492 rbd_encryption_luks_format_options_t luks_any_opts; 493 #endif 494 rbd_encryption_format_t format; 495 rbd_encryption_options_t opts; 496 size_t opts_size; 497 498 switch (encrypt->format) { 499 case RBD_IMAGE_ENCRYPTION_FORMAT_LUKS: { 500 memset(&luks_opts, 0, sizeof(luks_opts)); 501 format = RBD_ENCRYPTION_FORMAT_LUKS1; 502 opts = &luks_opts; 503 opts_size = sizeof(luks_opts); 504 r = qemu_rbd_convert_luks_options( 505 qapi_RbdEncryptionOptionsLUKS_base(&encrypt->u.luks), 506 &passphrase, &luks_opts.passphrase_size, errp); 507 if (r < 0) { 508 return r; 509 } 510 luks_opts.passphrase = passphrase; 511 break; 512 } 513 case RBD_IMAGE_ENCRYPTION_FORMAT_LUKS2: { 514 memset(&luks2_opts, 0, sizeof(luks2_opts)); 515 format = RBD_ENCRYPTION_FORMAT_LUKS2; 516 opts = &luks2_opts; 517 opts_size = sizeof(luks2_opts); 518 r = qemu_rbd_convert_luks_options( 519 qapi_RbdEncryptionOptionsLUKS2_base(&encrypt->u.luks2), 520 &passphrase, &luks2_opts.passphrase_size, errp); 521 if (r < 0) { 522 return r; 523 } 524 luks2_opts.passphrase = passphrase; 525 break; 526 } 527 #ifdef LIBRBD_SUPPORTS_ENCRYPTION_LOAD2 528 case RBD_IMAGE_ENCRYPTION_FORMAT_LUKS_ANY: { 529 memset(&luks_any_opts, 0, sizeof(luks_any_opts)); 530 format = RBD_ENCRYPTION_FORMAT_LUKS; 531 opts = &luks_any_opts; 532 opts_size = sizeof(luks_any_opts); 533 r = qemu_rbd_convert_luks_options( 534 qapi_RbdEncryptionOptionsLUKSAny_base(&encrypt->u.luks_any), 535 &passphrase, &luks_any_opts.passphrase_size, errp); 536 if (r < 0) { 537 return r; 538 } 539 luks_any_opts.passphrase = passphrase; 540 break; 541 } 542 #endif 543 default: { 544 r = -ENOTSUP; 545 error_setg_errno( 546 errp, -r, "unknown image encryption format: %u", 547 encrypt->format); 548 return r; 549 } 550 } 551 552 r = rbd_encryption_load(image, format, opts, opts_size); 553 if (r < 0) { 554 error_setg_errno(errp, -r, "encryption load fail"); 555 return r; 556 } 557 bs->encrypted = true; 558 s->encryption_format = encrypt->format; 559 560 return 0; 561 } 562 563 #ifdef LIBRBD_SUPPORTS_ENCRYPTION_LOAD2 564 static int qemu_rbd_encryption_load2(BlockDriverState *bs, 565 rbd_image_t image, 566 RbdEncryptionOptions *encrypt, 567 Error **errp) 568 { 569 BDRVRBDState *s = bs->opaque; 570 int r = 0; 571 int encrypt_count = 1; 572 int i; 573 RbdEncryptionOptions *curr_encrypt; 574 rbd_encryption_spec_t *specs; 575 rbd_encryption_luks1_format_options_t *luks_opts; 576 rbd_encryption_luks2_format_options_t *luks2_opts; 577 rbd_encryption_luks_format_options_t *luks_any_opts; 578 579 /* count encryption options */ 580 for (curr_encrypt = encrypt->parent; curr_encrypt; 581 curr_encrypt = curr_encrypt->parent) { 582 ++encrypt_count; 583 } 584 585 specs = g_new0(rbd_encryption_spec_t, encrypt_count); 586 587 curr_encrypt = encrypt; 588 for (i = 0; i < encrypt_count; ++i) { 589 switch (curr_encrypt->format) { 590 case RBD_IMAGE_ENCRYPTION_FORMAT_LUKS: { 591 specs[i].format = RBD_ENCRYPTION_FORMAT_LUKS1; 592 593 luks_opts = g_new0(rbd_encryption_luks1_format_options_t, 1); 594 specs[i].opts = luks_opts; 595 specs[i].opts_size = sizeof(*luks_opts); 596 597 r = qemu_rbd_convert_luks_options( 598 qapi_RbdEncryptionOptionsLUKS_base( 599 &curr_encrypt->u.luks), 600 (char **)&luks_opts->passphrase, 601 &luks_opts->passphrase_size, 602 errp); 603 break; 604 } 605 case RBD_IMAGE_ENCRYPTION_FORMAT_LUKS2: { 606 specs[i].format = RBD_ENCRYPTION_FORMAT_LUKS2; 607 608 luks2_opts = g_new0(rbd_encryption_luks2_format_options_t, 1); 609 specs[i].opts = luks2_opts; 610 specs[i].opts_size = sizeof(*luks2_opts); 611 612 r = qemu_rbd_convert_luks_options( 613 qapi_RbdEncryptionOptionsLUKS2_base( 614 &curr_encrypt->u.luks2), 615 (char **)&luks2_opts->passphrase, 616 &luks2_opts->passphrase_size, 617 errp); 618 break; 619 } 620 case RBD_IMAGE_ENCRYPTION_FORMAT_LUKS_ANY: { 621 specs[i].format = RBD_ENCRYPTION_FORMAT_LUKS; 622 623 luks_any_opts = g_new0(rbd_encryption_luks_format_options_t, 1); 624 specs[i].opts = luks_any_opts; 625 specs[i].opts_size = sizeof(*luks_any_opts); 626 627 r = qemu_rbd_convert_luks_options( 628 qapi_RbdEncryptionOptionsLUKSAny_base( 629 &curr_encrypt->u.luks_any), 630 (char **)&luks_any_opts->passphrase, 631 &luks_any_opts->passphrase_size, 632 errp); 633 break; 634 } 635 default: { 636 r = -ENOTSUP; 637 error_setg_errno( 638 errp, -r, "unknown image encryption format: %u", 639 curr_encrypt->format); 640 } 641 } 642 643 if (r < 0) { 644 goto exit; 645 } 646 647 curr_encrypt = curr_encrypt->parent; 648 } 649 650 r = rbd_encryption_load2(image, specs, encrypt_count); 651 if (r < 0) { 652 error_setg_errno(errp, -r, "layered encryption load fail"); 653 goto exit; 654 } 655 bs->encrypted = true; 656 s->encryption_format = encrypt->format; 657 658 exit: 659 for (i = 0; i < encrypt_count; ++i) { 660 if (!specs[i].opts) { 661 break; 662 } 663 664 switch (specs[i].format) { 665 case RBD_ENCRYPTION_FORMAT_LUKS1: { 666 luks_opts = specs[i].opts; 667 g_free((void *)luks_opts->passphrase); 668 break; 669 } 670 case RBD_ENCRYPTION_FORMAT_LUKS2: { 671 luks2_opts = specs[i].opts; 672 g_free((void *)luks2_opts->passphrase); 673 break; 674 } 675 case RBD_ENCRYPTION_FORMAT_LUKS: { 676 luks_any_opts = specs[i].opts; 677 g_free((void *)luks_any_opts->passphrase); 678 break; 679 } 680 } 681 682 g_free(specs[i].opts); 683 } 684 g_free(specs); 685 return r; 686 } 687 #endif 688 #endif 689 690 /* 691 * For an image without encryption enabled on the rbd layer, probe the start of 692 * the image if it could be opened as an encrypted image so that we can display 693 * it when the user queries the node (most importantly in qemu-img). 694 * 695 * If the guest writes an encryption header to its disk after this probing, this 696 * won't be reflected when queried, but that's okay. There is no reason why the 697 * user should want to apply encryption at the rbd level while the image is 698 * still in use. This is just guest data. 699 */ 700 static void qemu_rbd_encryption_probe(BlockDriverState *bs) 701 { 702 BDRVRBDState *s = bs->opaque; 703 char buf[RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN] = {0}; 704 int r; 705 706 assert(s->encryption_format == RBD_IMAGE_ENCRYPTION_FORMAT__MAX); 707 708 r = rbd_read(s->image, 0, 709 RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN, buf); 710 if (r < RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN) { 711 return; 712 } 713 714 if (memcmp(buf, rbd_luks_header_verification, 715 RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN) == 0) { 716 s->encryption_format = RBD_IMAGE_ENCRYPTION_FORMAT_LUKS; 717 } else if (memcmp(buf, rbd_luks2_header_verification, 718 RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN) == 0) { 719 s->encryption_format = RBD_IMAGE_ENCRYPTION_FORMAT_LUKS2; 720 } else if (memcmp(buf, rbd_layered_luks_header_verification, 721 RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN) == 0) { 722 s->encryption_format = RBD_IMAGE_ENCRYPTION_FORMAT_LUKS; 723 } else if (memcmp(buf, rbd_layered_luks2_header_verification, 724 RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN) == 0) { 725 s->encryption_format = RBD_IMAGE_ENCRYPTION_FORMAT_LUKS2; 726 } 727 } 728 729 /* FIXME Deprecate and remove keypairs or make it available in QMP. */ 730 static int qemu_rbd_do_create(BlockdevCreateOptions *options, 731 const char *keypairs, const char *password_secret, 732 Error **errp) 733 { 734 BlockdevCreateOptionsRbd *opts = &options->u.rbd; 735 rados_t cluster; 736 rados_ioctx_t io_ctx; 737 int obj_order = 0; 738 int ret; 739 740 assert(options->driver == BLOCKDEV_DRIVER_RBD); 741 if (opts->location->snapshot) { 742 error_setg(errp, "Can't use snapshot name for image creation"); 743 return -EINVAL; 744 } 745 746 #ifndef LIBRBD_SUPPORTS_ENCRYPTION 747 if (opts->encrypt) { 748 error_setg(errp, "RBD library does not support image encryption"); 749 return -ENOTSUP; 750 } 751 #endif 752 753 if (opts->has_cluster_size) { 754 int64_t objsize = opts->cluster_size; 755 if ((objsize - 1) & objsize) { /* not a power of 2? */ 756 error_setg(errp, "obj size needs to be power of 2"); 757 return -EINVAL; 758 } 759 if (objsize < 4096) { 760 error_setg(errp, "obj size too small"); 761 return -EINVAL; 762 } 763 obj_order = ctz32(objsize); 764 } 765 766 ret = qemu_rbd_connect(&cluster, &io_ctx, opts->location, false, keypairs, 767 password_secret, errp); 768 if (ret < 0) { 769 return ret; 770 } 771 772 ret = rbd_create(io_ctx, opts->location->image, opts->size, &obj_order); 773 if (ret < 0) { 774 error_setg_errno(errp, -ret, "error rbd create"); 775 goto out; 776 } 777 778 #ifdef LIBRBD_SUPPORTS_ENCRYPTION 779 if (opts->encrypt) { 780 rbd_image_t image; 781 782 ret = rbd_open(io_ctx, opts->location->image, &image, NULL); 783 if (ret < 0) { 784 error_setg_errno(errp, -ret, 785 "error opening image '%s' for encryption format", 786 opts->location->image); 787 goto out; 788 } 789 790 ret = qemu_rbd_encryption_format(image, opts->encrypt, errp); 791 rbd_close(image); 792 if (ret < 0) { 793 /* encryption format fail, try removing the image */ 794 rbd_remove(io_ctx, opts->location->image); 795 goto out; 796 } 797 } 798 #endif 799 800 ret = 0; 801 out: 802 rados_ioctx_destroy(io_ctx); 803 rados_shutdown(cluster); 804 return ret; 805 } 806 807 static int qemu_rbd_co_create(BlockdevCreateOptions *options, Error **errp) 808 { 809 return qemu_rbd_do_create(options, NULL, NULL, errp); 810 } 811 812 static int qemu_rbd_extract_encryption_create_options( 813 QemuOpts *opts, 814 RbdEncryptionCreateOptions **spec, 815 Error **errp) 816 { 817 QDict *opts_qdict; 818 QDict *encrypt_qdict; 819 Visitor *v; 820 int ret = 0; 821 822 opts_qdict = qemu_opts_to_qdict(opts, NULL); 823 qdict_extract_subqdict(opts_qdict, &encrypt_qdict, "encrypt."); 824 qobject_unref(opts_qdict); 825 if (!qdict_size(encrypt_qdict)) { 826 *spec = NULL; 827 goto exit; 828 } 829 830 /* Convert options into a QAPI object */ 831 v = qobject_input_visitor_new_flat_confused(encrypt_qdict, errp); 832 if (!v) { 833 ret = -EINVAL; 834 goto exit; 835 } 836 837 visit_type_RbdEncryptionCreateOptions(v, NULL, spec, errp); 838 visit_free(v); 839 if (!*spec) { 840 ret = -EINVAL; 841 goto exit; 842 } 843 844 exit: 845 qobject_unref(encrypt_qdict); 846 return ret; 847 } 848 849 static int coroutine_fn qemu_rbd_co_create_opts(BlockDriver *drv, 850 const char *filename, 851 QemuOpts *opts, 852 Error **errp) 853 { 854 BlockdevCreateOptions *create_options; 855 BlockdevCreateOptionsRbd *rbd_opts; 856 BlockdevOptionsRbd *loc; 857 RbdEncryptionCreateOptions *encrypt = NULL; 858 Error *local_err = NULL; 859 const char *keypairs, *password_secret; 860 QDict *options = NULL; 861 int ret = 0; 862 863 create_options = g_new0(BlockdevCreateOptions, 1); 864 create_options->driver = BLOCKDEV_DRIVER_RBD; 865 rbd_opts = &create_options->u.rbd; 866 867 rbd_opts->location = g_new0(BlockdevOptionsRbd, 1); 868 869 password_secret = qemu_opt_get(opts, "password-secret"); 870 871 /* Read out options */ 872 rbd_opts->size = ROUND_UP(qemu_opt_get_size_del(opts, BLOCK_OPT_SIZE, 0), 873 BDRV_SECTOR_SIZE); 874 rbd_opts->cluster_size = qemu_opt_get_size_del(opts, 875 BLOCK_OPT_CLUSTER_SIZE, 0); 876 rbd_opts->has_cluster_size = (rbd_opts->cluster_size != 0); 877 878 options = qdict_new(); 879 qemu_rbd_parse_filename(filename, options, &local_err); 880 if (local_err) { 881 ret = -EINVAL; 882 error_propagate(errp, local_err); 883 goto exit; 884 } 885 886 ret = qemu_rbd_extract_encryption_create_options(opts, &encrypt, errp); 887 if (ret < 0) { 888 goto exit; 889 } 890 rbd_opts->encrypt = encrypt; 891 892 /* 893 * Caution: while qdict_get_try_str() is fine, getting non-string 894 * types would require more care. When @options come from -blockdev 895 * or blockdev_add, its members are typed according to the QAPI 896 * schema, but when they come from -drive, they're all QString. 897 */ 898 loc = rbd_opts->location; 899 loc->pool = g_strdup(qdict_get_try_str(options, "pool")); 900 loc->conf = g_strdup(qdict_get_try_str(options, "conf")); 901 loc->user = g_strdup(qdict_get_try_str(options, "user")); 902 loc->q_namespace = g_strdup(qdict_get_try_str(options, "namespace")); 903 loc->image = g_strdup(qdict_get_try_str(options, "image")); 904 keypairs = qdict_get_try_str(options, "=keyvalue-pairs"); 905 906 ret = qemu_rbd_do_create(create_options, keypairs, password_secret, errp); 907 if (ret < 0) { 908 goto exit; 909 } 910 911 exit: 912 qobject_unref(options); 913 qapi_free_BlockdevCreateOptions(create_options); 914 return ret; 915 } 916 917 static char *qemu_rbd_mon_host(BlockdevOptionsRbd *opts, Error **errp) 918 { 919 const char **vals; 920 const char *host, *port; 921 char *rados_str; 922 InetSocketAddressBaseList *p; 923 int i, cnt; 924 925 if (!opts->has_server) { 926 return NULL; 927 } 928 929 for (cnt = 0, p = opts->server; p; p = p->next) { 930 cnt++; 931 } 932 933 vals = g_new(const char *, cnt + 1); 934 935 for (i = 0, p = opts->server; p; p = p->next, i++) { 936 host = p->value->host; 937 port = p->value->port; 938 939 if (strchr(host, ':')) { 940 vals[i] = g_strdup_printf("[%s]:%s", host, port); 941 } else { 942 vals[i] = g_strdup_printf("%s:%s", host, port); 943 } 944 } 945 vals[i] = NULL; 946 947 rados_str = i ? g_strjoinv(";", (char **)vals) : NULL; 948 g_strfreev((char **)vals); 949 return rados_str; 950 } 951 952 static int qemu_rbd_connect(rados_t *cluster, rados_ioctx_t *io_ctx, 953 BlockdevOptionsRbd *opts, bool cache, 954 const char *keypairs, const char *secretid, 955 Error **errp) 956 { 957 char *mon_host = NULL; 958 Error *local_err = NULL; 959 int r; 960 961 if (secretid) { 962 if (opts->key_secret) { 963 error_setg(errp, 964 "Legacy 'password-secret' clashes with 'key-secret'"); 965 return -EINVAL; 966 } 967 opts->key_secret = g_strdup(secretid); 968 } 969 970 mon_host = qemu_rbd_mon_host(opts, &local_err); 971 if (local_err) { 972 error_propagate(errp, local_err); 973 r = -EINVAL; 974 goto out; 975 } 976 977 r = rados_create(cluster, opts->user); 978 if (r < 0) { 979 error_setg_errno(errp, -r, "error initializing"); 980 goto out; 981 } 982 983 /* try default location when conf=NULL, but ignore failure */ 984 r = rados_conf_read_file(*cluster, opts->conf); 985 if (opts->conf && r < 0) { 986 error_setg_errno(errp, -r, "error reading conf file %s", opts->conf); 987 goto failed_shutdown; 988 } 989 990 r = qemu_rbd_set_keypairs(*cluster, keypairs, errp); 991 if (r < 0) { 992 goto failed_shutdown; 993 } 994 995 if (mon_host) { 996 r = rados_conf_set(*cluster, "mon_host", mon_host); 997 if (r < 0) { 998 goto failed_shutdown; 999 } 1000 } 1001 1002 r = qemu_rbd_set_auth(*cluster, opts, errp); 1003 if (r < 0) { 1004 goto failed_shutdown; 1005 } 1006 1007 /* 1008 * Fallback to more conservative semantics if setting cache 1009 * options fails. Ignore errors from setting rbd_cache because the 1010 * only possible error is that the option does not exist, and 1011 * librbd defaults to no caching. If write through caching cannot 1012 * be set up, fall back to no caching. 1013 */ 1014 if (cache) { 1015 rados_conf_set(*cluster, "rbd_cache", "true"); 1016 } else { 1017 rados_conf_set(*cluster, "rbd_cache", "false"); 1018 } 1019 1020 r = rados_connect(*cluster); 1021 if (r < 0) { 1022 error_setg_errno(errp, -r, "error connecting"); 1023 goto failed_shutdown; 1024 } 1025 1026 r = rados_ioctx_create(*cluster, opts->pool, io_ctx); 1027 if (r < 0) { 1028 error_setg_errno(errp, -r, "error opening pool %s", opts->pool); 1029 goto failed_shutdown; 1030 } 1031 1032 #ifdef HAVE_RBD_NAMESPACE_EXISTS 1033 if (opts->q_namespace && strlen(opts->q_namespace) > 0) { 1034 bool exists; 1035 1036 r = rbd_namespace_exists(*io_ctx, opts->q_namespace, &exists); 1037 if (r < 0) { 1038 error_setg_errno(errp, -r, "error checking namespace"); 1039 goto failed_ioctx_destroy; 1040 } 1041 1042 if (!exists) { 1043 error_setg(errp, "namespace '%s' does not exist", 1044 opts->q_namespace); 1045 r = -ENOENT; 1046 goto failed_ioctx_destroy; 1047 } 1048 } 1049 #endif 1050 1051 /* 1052 * Set the namespace after opening the io context on the pool, 1053 * if nspace == NULL or if nspace == "", it is just as we did nothing 1054 */ 1055 rados_ioctx_set_namespace(*io_ctx, opts->q_namespace); 1056 1057 r = 0; 1058 goto out; 1059 1060 #ifdef HAVE_RBD_NAMESPACE_EXISTS 1061 failed_ioctx_destroy: 1062 rados_ioctx_destroy(*io_ctx); 1063 #endif 1064 failed_shutdown: 1065 rados_shutdown(*cluster); 1066 out: 1067 g_free(mon_host); 1068 return r; 1069 } 1070 1071 static int qemu_rbd_convert_options(QDict *options, BlockdevOptionsRbd **opts, 1072 Error **errp) 1073 { 1074 Visitor *v; 1075 1076 /* Convert the remaining options into a QAPI object */ 1077 v = qobject_input_visitor_new_flat_confused(options, errp); 1078 if (!v) { 1079 return -EINVAL; 1080 } 1081 1082 visit_type_BlockdevOptionsRbd(v, NULL, opts, errp); 1083 visit_free(v); 1084 if (!opts) { 1085 return -EINVAL; 1086 } 1087 1088 return 0; 1089 } 1090 1091 static int qemu_rbd_attempt_legacy_options(QDict *options, 1092 BlockdevOptionsRbd **opts, 1093 char **keypairs) 1094 { 1095 char *filename; 1096 int r; 1097 1098 filename = g_strdup(qdict_get_try_str(options, "filename")); 1099 if (!filename) { 1100 return -EINVAL; 1101 } 1102 qdict_del(options, "filename"); 1103 1104 qemu_rbd_parse_filename(filename, options, NULL); 1105 1106 /* keypairs freed by caller */ 1107 *keypairs = g_strdup(qdict_get_try_str(options, "=keyvalue-pairs")); 1108 if (*keypairs) { 1109 qdict_del(options, "=keyvalue-pairs"); 1110 } 1111 1112 r = qemu_rbd_convert_options(options, opts, NULL); 1113 1114 g_free(filename); 1115 return r; 1116 } 1117 1118 static int qemu_rbd_open(BlockDriverState *bs, QDict *options, int flags, 1119 Error **errp) 1120 { 1121 BDRVRBDState *s = bs->opaque; 1122 BlockdevOptionsRbd *opts = NULL; 1123 const QDictEntry *e; 1124 Error *local_err = NULL; 1125 char *keypairs, *secretid; 1126 rbd_image_info_t info; 1127 int r; 1128 1129 keypairs = g_strdup(qdict_get_try_str(options, "=keyvalue-pairs")); 1130 if (keypairs) { 1131 qdict_del(options, "=keyvalue-pairs"); 1132 } 1133 1134 secretid = g_strdup(qdict_get_try_str(options, "password-secret")); 1135 if (secretid) { 1136 qdict_del(options, "password-secret"); 1137 } 1138 1139 r = qemu_rbd_convert_options(options, &opts, &local_err); 1140 if (local_err) { 1141 /* If keypairs are present, that means some options are present in 1142 * the modern option format. Don't attempt to parse legacy option 1143 * formats, as we won't support mixed usage. */ 1144 if (keypairs) { 1145 error_propagate(errp, local_err); 1146 goto out; 1147 } 1148 1149 /* If the initial attempt to convert and process the options failed, 1150 * we may be attempting to open an image file that has the rbd options 1151 * specified in the older format consisting of all key/value pairs 1152 * encoded in the filename. Go ahead and attempt to parse the 1153 * filename, and see if we can pull out the required options. */ 1154 r = qemu_rbd_attempt_legacy_options(options, &opts, &keypairs); 1155 if (r < 0) { 1156 /* Propagate the original error, not the legacy parsing fallback 1157 * error, as the latter was just a best-effort attempt. */ 1158 error_propagate(errp, local_err); 1159 goto out; 1160 } 1161 /* Take care whenever deciding to actually deprecate; once this ability 1162 * is removed, we will not be able to open any images with legacy-styled 1163 * backing image strings. */ 1164 warn_report("RBD options encoded in the filename as keyvalue pairs " 1165 "is deprecated"); 1166 } 1167 1168 /* Remove the processed options from the QDict (the visitor processes 1169 * _all_ options in the QDict) */ 1170 while ((e = qdict_first(options))) { 1171 qdict_del(options, e->key); 1172 } 1173 1174 r = qemu_rbd_connect(&s->cluster, &s->io_ctx, opts, 1175 !(flags & BDRV_O_NOCACHE), keypairs, secretid, errp); 1176 if (r < 0) { 1177 goto out; 1178 } 1179 1180 s->snap = g_strdup(opts->snapshot); 1181 s->image_name = g_strdup(opts->image); 1182 1183 /* rbd_open is always r/w */ 1184 r = rbd_open(s->io_ctx, s->image_name, &s->image, s->snap); 1185 if (r < 0) { 1186 error_setg_errno(errp, -r, "error reading header from %s", 1187 s->image_name); 1188 goto failed_open; 1189 } 1190 1191 s->encryption_format = RBD_IMAGE_ENCRYPTION_FORMAT__MAX; 1192 if (opts->encrypt) { 1193 #ifdef LIBRBD_SUPPORTS_ENCRYPTION 1194 if (opts->encrypt->parent) { 1195 #ifdef LIBRBD_SUPPORTS_ENCRYPTION_LOAD2 1196 r = qemu_rbd_encryption_load2(bs, s->image, opts->encrypt, errp); 1197 #else 1198 r = -ENOTSUP; 1199 error_setg(errp, "RBD library does not support layered encryption"); 1200 #endif 1201 } else { 1202 r = qemu_rbd_encryption_load(bs, s->image, opts->encrypt, errp); 1203 } 1204 if (r < 0) { 1205 goto failed_post_open; 1206 } 1207 #else 1208 r = -ENOTSUP; 1209 error_setg(errp, "RBD library does not support image encryption"); 1210 goto failed_post_open; 1211 #endif 1212 } else { 1213 qemu_rbd_encryption_probe(bs); 1214 } 1215 1216 r = rbd_stat(s->image, &info, sizeof(info)); 1217 if (r < 0) { 1218 error_setg_errno(errp, -r, "error getting image info from %s", 1219 s->image_name); 1220 goto failed_post_open; 1221 } 1222 s->image_size = info.size; 1223 s->object_size = info.obj_size; 1224 1225 /* If we are using an rbd snapshot, we must be r/o, otherwise 1226 * leave as-is */ 1227 if (s->snap != NULL) { 1228 bdrv_graph_rdlock_main_loop(); 1229 r = bdrv_apply_auto_read_only(bs, "rbd snapshots are read-only", errp); 1230 bdrv_graph_rdunlock_main_loop(); 1231 if (r < 0) { 1232 goto failed_post_open; 1233 } 1234 } 1235 1236 #ifdef LIBRBD_SUPPORTS_WRITE_ZEROES 1237 bs->supported_zero_flags = BDRV_REQ_MAY_UNMAP | BDRV_REQ_NO_FALLBACK; 1238 #endif 1239 1240 /* When extending regular files, we get zeros from the OS */ 1241 bs->supported_truncate_flags = BDRV_REQ_ZERO_WRITE; 1242 1243 r = 0; 1244 goto out; 1245 1246 failed_post_open: 1247 rbd_close(s->image); 1248 failed_open: 1249 rados_ioctx_destroy(s->io_ctx); 1250 g_free(s->snap); 1251 g_free(s->image_name); 1252 rados_shutdown(s->cluster); 1253 out: 1254 qapi_free_BlockdevOptionsRbd(opts); 1255 g_free(keypairs); 1256 g_free(secretid); 1257 return r; 1258 } 1259 1260 1261 /* Since RBD is currently always opened R/W via the API, 1262 * we just need to check if we are using a snapshot or not, in 1263 * order to determine if we will allow it to be R/W */ 1264 static int qemu_rbd_reopen_prepare(BDRVReopenState *state, 1265 BlockReopenQueue *queue, Error **errp) 1266 { 1267 BDRVRBDState *s = state->bs->opaque; 1268 int ret = 0; 1269 1270 GRAPH_RDLOCK_GUARD_MAINLOOP(); 1271 1272 if (s->snap && state->flags & BDRV_O_RDWR) { 1273 error_setg(errp, 1274 "Cannot change node '%s' to r/w when using RBD snapshot", 1275 bdrv_get_device_or_node_name(state->bs)); 1276 ret = -EINVAL; 1277 } 1278 1279 return ret; 1280 } 1281 1282 static void qemu_rbd_close(BlockDriverState *bs) 1283 { 1284 BDRVRBDState *s = bs->opaque; 1285 1286 rbd_close(s->image); 1287 rados_ioctx_destroy(s->io_ctx); 1288 g_free(s->snap); 1289 g_free(s->image_name); 1290 rados_shutdown(s->cluster); 1291 } 1292 1293 /* Resize the RBD image and update the 'image_size' with the current size */ 1294 static int qemu_rbd_resize(BlockDriverState *bs, uint64_t size) 1295 { 1296 BDRVRBDState *s = bs->opaque; 1297 int r; 1298 1299 r = rbd_resize(s->image, size); 1300 if (r < 0) { 1301 return r; 1302 } 1303 1304 s->image_size = size; 1305 1306 return 0; 1307 } 1308 1309 static void qemu_rbd_finish_bh(void *opaque) 1310 { 1311 RBDTask *task = opaque; 1312 task->complete = true; 1313 aio_co_wake(task->co); 1314 } 1315 1316 /* 1317 * This is the completion callback function for all rbd aio calls 1318 * started from qemu_rbd_start_co(). 1319 * 1320 * Note: this function is being called from a non qemu thread so 1321 * we need to be careful about what we do here. Generally we only 1322 * schedule a BH, and do the rest of the io completion handling 1323 * from qemu_rbd_finish_bh() which runs in a qemu context. 1324 */ 1325 static void qemu_rbd_completion_cb(rbd_completion_t c, RBDTask *task) 1326 { 1327 task->ret = rbd_aio_get_return_value(c); 1328 rbd_aio_release(c); 1329 aio_bh_schedule_oneshot(bdrv_get_aio_context(task->bs), 1330 qemu_rbd_finish_bh, task); 1331 } 1332 1333 static int coroutine_fn qemu_rbd_start_co(BlockDriverState *bs, 1334 uint64_t offset, 1335 uint64_t bytes, 1336 QEMUIOVector *qiov, 1337 int flags, 1338 RBDAIOCmd cmd) 1339 { 1340 BDRVRBDState *s = bs->opaque; 1341 RBDTask task = { .bs = bs, .co = qemu_coroutine_self() }; 1342 rbd_completion_t c; 1343 int r; 1344 1345 assert(!qiov || qiov->size == bytes); 1346 1347 if (cmd == RBD_AIO_WRITE || cmd == RBD_AIO_WRITE_ZEROES) { 1348 /* 1349 * RBD APIs don't allow us to write more than actual size, so in order 1350 * to support growing images, we resize the image before write 1351 * operations that exceed the current size. 1352 */ 1353 if (offset + bytes > s->image_size) { 1354 r = qemu_rbd_resize(bs, offset + bytes); 1355 if (r < 0) { 1356 return r; 1357 } 1358 } 1359 } 1360 1361 r = rbd_aio_create_completion(&task, 1362 (rbd_callback_t) qemu_rbd_completion_cb, &c); 1363 if (r < 0) { 1364 return r; 1365 } 1366 1367 switch (cmd) { 1368 case RBD_AIO_READ: 1369 r = rbd_aio_readv(s->image, qiov->iov, qiov->niov, offset, c); 1370 break; 1371 case RBD_AIO_WRITE: 1372 r = rbd_aio_writev(s->image, qiov->iov, qiov->niov, offset, c); 1373 break; 1374 case RBD_AIO_DISCARD: 1375 r = rbd_aio_discard(s->image, offset, bytes, c); 1376 break; 1377 case RBD_AIO_FLUSH: 1378 r = rbd_aio_flush(s->image, c); 1379 break; 1380 #ifdef LIBRBD_SUPPORTS_WRITE_ZEROES 1381 case RBD_AIO_WRITE_ZEROES: { 1382 int zero_flags = 0; 1383 #ifdef RBD_WRITE_ZEROES_FLAG_THICK_PROVISION 1384 if (!(flags & BDRV_REQ_MAY_UNMAP)) { 1385 zero_flags = RBD_WRITE_ZEROES_FLAG_THICK_PROVISION; 1386 } 1387 #endif 1388 r = rbd_aio_write_zeroes(s->image, offset, bytes, c, zero_flags, 0); 1389 break; 1390 } 1391 #endif 1392 default: 1393 r = -EINVAL; 1394 } 1395 1396 if (r < 0) { 1397 error_report("rbd request failed early: cmd %d offset %" PRIu64 1398 " bytes %" PRIu64 " flags %d r %d (%s)", cmd, offset, 1399 bytes, flags, r, strerror(-r)); 1400 rbd_aio_release(c); 1401 return r; 1402 } 1403 1404 while (!task.complete) { 1405 qemu_coroutine_yield(); 1406 } 1407 1408 if (task.ret < 0) { 1409 error_report("rbd request failed: cmd %d offset %" PRIu64 " bytes %" 1410 PRIu64 " flags %d task.ret %" PRIi64 " (%s)", cmd, offset, 1411 bytes, flags, task.ret, strerror(-task.ret)); 1412 return task.ret; 1413 } 1414 1415 /* zero pad short reads */ 1416 if (cmd == RBD_AIO_READ && task.ret < qiov->size) { 1417 qemu_iovec_memset(qiov, task.ret, 0, qiov->size - task.ret); 1418 } 1419 1420 return 0; 1421 } 1422 1423 static int 1424 coroutine_fn qemu_rbd_co_preadv(BlockDriverState *bs, int64_t offset, 1425 int64_t bytes, QEMUIOVector *qiov, 1426 BdrvRequestFlags flags) 1427 { 1428 return qemu_rbd_start_co(bs, offset, bytes, qiov, flags, RBD_AIO_READ); 1429 } 1430 1431 static int 1432 coroutine_fn qemu_rbd_co_pwritev(BlockDriverState *bs, int64_t offset, 1433 int64_t bytes, QEMUIOVector *qiov, 1434 BdrvRequestFlags flags) 1435 { 1436 return qemu_rbd_start_co(bs, offset, bytes, qiov, flags, RBD_AIO_WRITE); 1437 } 1438 1439 static int coroutine_fn qemu_rbd_co_flush(BlockDriverState *bs) 1440 { 1441 return qemu_rbd_start_co(bs, 0, 0, NULL, 0, RBD_AIO_FLUSH); 1442 } 1443 1444 static int coroutine_fn qemu_rbd_co_pdiscard(BlockDriverState *bs, 1445 int64_t offset, int64_t bytes) 1446 { 1447 return qemu_rbd_start_co(bs, offset, bytes, NULL, 0, RBD_AIO_DISCARD); 1448 } 1449 1450 #ifdef LIBRBD_SUPPORTS_WRITE_ZEROES 1451 static int 1452 coroutine_fn qemu_rbd_co_pwrite_zeroes(BlockDriverState *bs, int64_t offset, 1453 int64_t bytes, BdrvRequestFlags flags) 1454 { 1455 return qemu_rbd_start_co(bs, offset, bytes, NULL, flags, 1456 RBD_AIO_WRITE_ZEROES); 1457 } 1458 #endif 1459 1460 static int coroutine_fn 1461 qemu_rbd_co_get_info(BlockDriverState *bs, BlockDriverInfo *bdi) 1462 { 1463 BDRVRBDState *s = bs->opaque; 1464 bdi->cluster_size = s->object_size; 1465 return 0; 1466 } 1467 1468 static ImageInfoSpecific *qemu_rbd_get_specific_info(BlockDriverState *bs, 1469 Error **errp) 1470 { 1471 BDRVRBDState *s = bs->opaque; 1472 ImageInfoSpecific *spec_info; 1473 1474 spec_info = g_new(ImageInfoSpecific, 1); 1475 *spec_info = (ImageInfoSpecific){ 1476 .type = IMAGE_INFO_SPECIFIC_KIND_RBD, 1477 .u.rbd.data = g_new0(ImageInfoSpecificRbd, 1), 1478 }; 1479 1480 if (s->encryption_format == RBD_IMAGE_ENCRYPTION_FORMAT__MAX) { 1481 assert(!bs->encrypted); 1482 } else { 1483 ImageInfoSpecificRbd *rbd_info = spec_info->u.rbd.data; 1484 1485 rbd_info->has_encryption_format = true; 1486 rbd_info->encryption_format = s->encryption_format; 1487 } 1488 1489 return spec_info; 1490 } 1491 1492 /* 1493 * rbd_diff_iterate2 allows to interrupt the exection by returning a negative 1494 * value in the callback routine. Choose a value that does not conflict with 1495 * an existing exitcode and return it if we want to prematurely stop the 1496 * execution because we detected a change in the allocation status. 1497 */ 1498 #define QEMU_RBD_EXIT_DIFF_ITERATE2 -9000 1499 1500 static int qemu_rbd_diff_iterate_cb(uint64_t offs, size_t len, 1501 int exists, void *opaque) 1502 { 1503 RBDDiffIterateReq *req = opaque; 1504 1505 assert(req->offs + req->bytes <= offs); 1506 1507 /* treat a hole like an unallocated area and bail out */ 1508 if (!exists) { 1509 return 0; 1510 } 1511 1512 if (!req->exists && offs > req->offs) { 1513 /* 1514 * we started in an unallocated area and hit the first allocated 1515 * block. req->bytes must be set to the length of the unallocated area 1516 * before the allocated area. stop further processing. 1517 */ 1518 req->bytes = offs - req->offs; 1519 return QEMU_RBD_EXIT_DIFF_ITERATE2; 1520 } 1521 1522 if (req->exists && offs > req->offs + req->bytes) { 1523 /* 1524 * we started in an allocated area and jumped over an unallocated area, 1525 * req->bytes contains the length of the allocated area before the 1526 * unallocated area. stop further processing. 1527 */ 1528 return QEMU_RBD_EXIT_DIFF_ITERATE2; 1529 } 1530 1531 req->bytes += len; 1532 req->exists = true; 1533 1534 return 0; 1535 } 1536 1537 static int coroutine_fn qemu_rbd_co_block_status(BlockDriverState *bs, 1538 unsigned int mode, 1539 int64_t offset, int64_t bytes, 1540 int64_t *pnum, int64_t *map, 1541 BlockDriverState **file) 1542 { 1543 BDRVRBDState *s = bs->opaque; 1544 int status, r; 1545 RBDDiffIterateReq req = { .offs = offset }; 1546 uint64_t features, flags; 1547 uint64_t head = 0; 1548 1549 assert(offset + bytes <= s->image_size); 1550 1551 /* default to all sectors allocated */ 1552 status = BDRV_BLOCK_DATA | BDRV_BLOCK_OFFSET_VALID; 1553 *map = offset; 1554 *file = bs; 1555 *pnum = bytes; 1556 1557 /* check if RBD image supports fast-diff */ 1558 r = rbd_get_features(s->image, &features); 1559 if (r < 0) { 1560 return status; 1561 } 1562 if (!(features & RBD_FEATURE_FAST_DIFF)) { 1563 return status; 1564 } 1565 1566 /* check if RBD fast-diff result is valid */ 1567 r = rbd_get_flags(s->image, &flags); 1568 if (r < 0) { 1569 return status; 1570 } 1571 if (flags & RBD_FLAG_FAST_DIFF_INVALID) { 1572 return status; 1573 } 1574 1575 #if LIBRBD_VERSION_CODE < LIBRBD_VERSION(1, 17, 0) 1576 /* 1577 * librbd had a bug until early 2022 that affected all versions of ceph that 1578 * supported fast-diff. This bug results in reporting of incorrect offsets 1579 * if the offset parameter to rbd_diff_iterate2 is not object aligned. 1580 * Work around this bug by rounding down the offset to object boundaries. 1581 * This is OK because we call rbd_diff_iterate2 with whole_object = true. 1582 * However, this workaround only works for non cloned images with default 1583 * striping. 1584 * 1585 * See: https://tracker.ceph.com/issues/53784 1586 */ 1587 1588 /* check if RBD image has non-default striping enabled */ 1589 if (features & RBD_FEATURE_STRIPINGV2) { 1590 return status; 1591 } 1592 1593 #pragma GCC diagnostic push 1594 #pragma GCC diagnostic ignored "-Wdeprecated-declarations" 1595 /* 1596 * check if RBD image is a clone (= has a parent). 1597 * 1598 * rbd_get_parent_info is deprecated from Nautilus onwards, but the 1599 * replacement rbd_get_parent is not present in Luminous and Mimic. 1600 */ 1601 if (rbd_get_parent_info(s->image, NULL, 0, NULL, 0, NULL, 0) != -ENOENT) { 1602 return status; 1603 } 1604 #pragma GCC diagnostic pop 1605 1606 head = req.offs & (s->object_size - 1); 1607 req.offs -= head; 1608 bytes += head; 1609 #endif 1610 1611 r = rbd_diff_iterate2(s->image, NULL, req.offs, bytes, true, true, 1612 qemu_rbd_diff_iterate_cb, &req); 1613 if (r < 0 && r != QEMU_RBD_EXIT_DIFF_ITERATE2) { 1614 return status; 1615 } 1616 assert(req.bytes <= bytes); 1617 if (!req.exists) { 1618 if (r == 0) { 1619 /* 1620 * rbd_diff_iterate2 does not invoke callbacks for unallocated 1621 * areas. This here catches the case where no callback was 1622 * invoked at all (req.bytes == 0). 1623 */ 1624 assert(req.bytes == 0); 1625 req.bytes = bytes; 1626 } 1627 status = BDRV_BLOCK_ZERO | BDRV_BLOCK_OFFSET_VALID; 1628 } 1629 1630 assert(req.bytes > head); 1631 *pnum = req.bytes - head; 1632 return status; 1633 } 1634 1635 static int64_t coroutine_fn qemu_rbd_co_getlength(BlockDriverState *bs) 1636 { 1637 BDRVRBDState *s = bs->opaque; 1638 int r; 1639 1640 r = rbd_get_size(s->image, &s->image_size); 1641 if (r < 0) { 1642 return r; 1643 } 1644 1645 return s->image_size; 1646 } 1647 1648 static int coroutine_fn qemu_rbd_co_truncate(BlockDriverState *bs, 1649 int64_t offset, 1650 bool exact, 1651 PreallocMode prealloc, 1652 BdrvRequestFlags flags, 1653 Error **errp) 1654 { 1655 int r; 1656 1657 if (prealloc != PREALLOC_MODE_OFF) { 1658 error_setg(errp, "Unsupported preallocation mode '%s'", 1659 PreallocMode_str(prealloc)); 1660 return -ENOTSUP; 1661 } 1662 1663 r = qemu_rbd_resize(bs, offset); 1664 if (r < 0) { 1665 error_setg_errno(errp, -r, "Failed to resize file"); 1666 return r; 1667 } 1668 1669 return 0; 1670 } 1671 1672 static int qemu_rbd_snap_create(BlockDriverState *bs, 1673 QEMUSnapshotInfo *sn_info) 1674 { 1675 BDRVRBDState *s = bs->opaque; 1676 int r; 1677 1678 if (sn_info->name[0] == '\0') { 1679 return -EINVAL; /* we need a name for rbd snapshots */ 1680 } 1681 1682 /* 1683 * rbd snapshots are using the name as the user controlled unique identifier 1684 * we can't use the rbd snapid for that purpose, as it can't be set 1685 */ 1686 if (sn_info->id_str[0] != '\0' && 1687 strcmp(sn_info->id_str, sn_info->name) != 0) { 1688 return -EINVAL; 1689 } 1690 1691 if (strlen(sn_info->name) >= sizeof(sn_info->id_str)) { 1692 return -ERANGE; 1693 } 1694 1695 r = rbd_snap_create(s->image, sn_info->name); 1696 if (r < 0) { 1697 error_report("failed to create snap: %s", strerror(-r)); 1698 return r; 1699 } 1700 1701 return 0; 1702 } 1703 1704 static int qemu_rbd_snap_remove(BlockDriverState *bs, 1705 const char *snapshot_id, 1706 const char *snapshot_name, 1707 Error **errp) 1708 { 1709 BDRVRBDState *s = bs->opaque; 1710 int r; 1711 1712 if (!snapshot_name) { 1713 error_setg(errp, "rbd need a valid snapshot name"); 1714 return -EINVAL; 1715 } 1716 1717 /* If snapshot_id is specified, it must be equal to name, see 1718 qemu_rbd_snap_list() */ 1719 if (snapshot_id && strcmp(snapshot_id, snapshot_name)) { 1720 error_setg(errp, 1721 "rbd do not support snapshot id, it should be NULL or " 1722 "equal to snapshot name"); 1723 return -EINVAL; 1724 } 1725 1726 r = rbd_snap_remove(s->image, snapshot_name); 1727 if (r < 0) { 1728 error_setg_errno(errp, -r, "Failed to remove the snapshot"); 1729 } 1730 return r; 1731 } 1732 1733 static int qemu_rbd_snap_rollback(BlockDriverState *bs, 1734 const char *snapshot_name) 1735 { 1736 BDRVRBDState *s = bs->opaque; 1737 1738 return rbd_snap_rollback(s->image, snapshot_name); 1739 } 1740 1741 static int qemu_rbd_snap_list(BlockDriverState *bs, 1742 QEMUSnapshotInfo **psn_tab) 1743 { 1744 BDRVRBDState *s = bs->opaque; 1745 QEMUSnapshotInfo *sn_info, *sn_tab = NULL; 1746 int i, snap_count; 1747 rbd_snap_info_t *snaps; 1748 int max_snaps = RBD_MAX_SNAPS; 1749 1750 do { 1751 snaps = g_new(rbd_snap_info_t, max_snaps); 1752 snap_count = rbd_snap_list(s->image, snaps, &max_snaps); 1753 if (snap_count <= 0) { 1754 g_free(snaps); 1755 } 1756 } while (snap_count == -ERANGE); 1757 1758 if (snap_count <= 0) { 1759 goto done; 1760 } 1761 1762 sn_tab = g_new0(QEMUSnapshotInfo, snap_count); 1763 1764 for (i = 0; i < snap_count; i++) { 1765 const char *snap_name = snaps[i].name; 1766 1767 sn_info = sn_tab + i; 1768 pstrcpy(sn_info->id_str, sizeof(sn_info->id_str), snap_name); 1769 pstrcpy(sn_info->name, sizeof(sn_info->name), snap_name); 1770 1771 sn_info->vm_state_size = snaps[i].size; 1772 sn_info->date_sec = 0; 1773 sn_info->date_nsec = 0; 1774 sn_info->vm_clock_nsec = 0; 1775 } 1776 rbd_snap_list_end(snaps); 1777 g_free(snaps); 1778 1779 done: 1780 *psn_tab = sn_tab; 1781 return snap_count; 1782 } 1783 1784 static void coroutine_fn qemu_rbd_co_invalidate_cache(BlockDriverState *bs, 1785 Error **errp) 1786 { 1787 BDRVRBDState *s = bs->opaque; 1788 int r = rbd_invalidate_cache(s->image); 1789 if (r < 0) { 1790 error_setg_errno(errp, -r, "Failed to invalidate the cache"); 1791 } 1792 } 1793 1794 static QemuOptsList qemu_rbd_create_opts = { 1795 .name = "rbd-create-opts", 1796 .head = QTAILQ_HEAD_INITIALIZER(qemu_rbd_create_opts.head), 1797 .desc = { 1798 { 1799 .name = BLOCK_OPT_SIZE, 1800 .type = QEMU_OPT_SIZE, 1801 .help = "Virtual disk size" 1802 }, 1803 { 1804 .name = BLOCK_OPT_CLUSTER_SIZE, 1805 .type = QEMU_OPT_SIZE, 1806 .help = "RBD object size" 1807 }, 1808 { 1809 .name = "password-secret", 1810 .type = QEMU_OPT_STRING, 1811 .help = "ID of secret providing the password", 1812 }, 1813 { 1814 .name = "encrypt.format", 1815 .type = QEMU_OPT_STRING, 1816 .help = "Encrypt the image, format choices: 'luks', 'luks2'", 1817 }, 1818 { 1819 .name = "encrypt.cipher-alg", 1820 .type = QEMU_OPT_STRING, 1821 .help = "Name of encryption cipher algorithm" 1822 " (allowed values: aes-128, aes-256)", 1823 }, 1824 { 1825 .name = "encrypt.key-secret", 1826 .type = QEMU_OPT_STRING, 1827 .help = "ID of secret providing LUKS passphrase", 1828 }, 1829 { /* end of list */ } 1830 } 1831 }; 1832 1833 static const char *const qemu_rbd_strong_runtime_opts[] = { 1834 "pool", 1835 "namespace", 1836 "image", 1837 "conf", 1838 "snapshot", 1839 "user", 1840 "server.", 1841 "password-secret", 1842 1843 NULL 1844 }; 1845 1846 static BlockDriver bdrv_rbd = { 1847 .format_name = "rbd", 1848 .instance_size = sizeof(BDRVRBDState), 1849 1850 .bdrv_parse_filename = qemu_rbd_parse_filename, 1851 .bdrv_open = qemu_rbd_open, 1852 .bdrv_close = qemu_rbd_close, 1853 .bdrv_reopen_prepare = qemu_rbd_reopen_prepare, 1854 .bdrv_co_create = qemu_rbd_co_create, 1855 .bdrv_co_create_opts = qemu_rbd_co_create_opts, 1856 .bdrv_has_zero_init = bdrv_has_zero_init_1, 1857 .bdrv_co_get_info = qemu_rbd_co_get_info, 1858 .bdrv_get_specific_info = qemu_rbd_get_specific_info, 1859 .create_opts = &qemu_rbd_create_opts, 1860 .bdrv_co_getlength = qemu_rbd_co_getlength, 1861 .bdrv_co_truncate = qemu_rbd_co_truncate, 1862 .protocol_name = "rbd", 1863 1864 .bdrv_co_preadv = qemu_rbd_co_preadv, 1865 .bdrv_co_pwritev = qemu_rbd_co_pwritev, 1866 .bdrv_co_flush_to_disk = qemu_rbd_co_flush, 1867 .bdrv_co_pdiscard = qemu_rbd_co_pdiscard, 1868 #ifdef LIBRBD_SUPPORTS_WRITE_ZEROES 1869 .bdrv_co_pwrite_zeroes = qemu_rbd_co_pwrite_zeroes, 1870 #endif 1871 .bdrv_co_block_status = qemu_rbd_co_block_status, 1872 1873 .bdrv_snapshot_create = qemu_rbd_snap_create, 1874 .bdrv_snapshot_delete = qemu_rbd_snap_remove, 1875 .bdrv_snapshot_list = qemu_rbd_snap_list, 1876 .bdrv_snapshot_goto = qemu_rbd_snap_rollback, 1877 .bdrv_co_invalidate_cache = qemu_rbd_co_invalidate_cache, 1878 1879 .strong_runtime_opts = qemu_rbd_strong_runtime_opts, 1880 }; 1881 1882 static void bdrv_rbd_init(void) 1883 { 1884 bdrv_register(&bdrv_rbd); 1885 } 1886 1887 block_init(bdrv_rbd_init); 1888