1 /*
2 * QEMU Block driver for RADOS (Ceph)
3 *
4 * Copyright (C) 2010-2011 Christian Brunner <chb@muc.de>,
5 * Josh Durgin <josh.durgin@dreamhost.com>
6 *
7 * This work is licensed under the terms of the GNU GPL, version 2. See
8 * the COPYING file in the top-level directory.
9 *
10 * Contributions after 2012-01-13 are licensed under the terms of the
11 * GNU GPL, version 2 or (at your option) any later version.
12 */
13
14 #include "qemu/osdep.h"
15
16 #include <rbd/librbd.h>
17 #include "qapi/error.h"
18 #include "qemu/error-report.h"
19 #include "qemu/module.h"
20 #include "qemu/option.h"
21 #include "block/block-io.h"
22 #include "block/block_int.h"
23 #include "block/qdict.h"
24 #include "crypto/secret.h"
25 #include "qemu/cutils.h"
26 #include "system/replay.h"
27 #include "qobject/qstring.h"
28 #include "qobject/qdict.h"
29 #include "qobject/qjson.h"
30 #include "qobject/qlist.h"
31 #include "qapi/qobject-input-visitor.h"
32 #include "qapi/qapi-visit-block-core.h"
33
34 /*
35 * When specifying the image filename use:
36 *
37 * rbd:poolname/devicename[@snapshotname][:option1=value1[:option2=value2...]]
38 *
39 * poolname must be the name of an existing rados pool.
40 *
41 * devicename is the name of the rbd image.
42 *
43 * Each option given is used to configure rados, and may be any valid
44 * Ceph option, "id", or "conf".
45 *
46 * The "id" option indicates what user we should authenticate as to
47 * the Ceph cluster. If it is excluded we will use the Ceph default
48 * (normally 'admin').
49 *
50 * The "conf" option specifies a Ceph configuration file to read. If
51 * it is not specified, we will read from the default Ceph locations
52 * (e.g., /etc/ceph/ceph.conf). To avoid reading _any_ configuration
53 * file, specify conf=/dev/null.
54 *
55 * Configuration values containing :, @, or = can be escaped with a
56 * leading "\".
57 */
58
59 #define OBJ_MAX_SIZE (1UL << OBJ_DEFAULT_OBJ_ORDER)
60
61 #define RBD_MAX_SNAPS 100
62
63 #define RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN 8
64
65 static const char rbd_luks_header_verification[
66 RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN] = {
67 'L', 'U', 'K', 'S', 0xBA, 0xBE, 0, 1
68 };
69
70 static const char rbd_luks2_header_verification[
71 RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN] = {
72 'L', 'U', 'K', 'S', 0xBA, 0xBE, 0, 2
73 };
74
75 static const char rbd_layered_luks_header_verification[
76 RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN] = {
77 'R', 'B', 'D', 'L', 0xBA, 0xBE, 0, 1
78 };
79
80 static const char rbd_layered_luks2_header_verification[
81 RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN] = {
82 'R', 'B', 'D', 'L', 0xBA, 0xBE, 0, 2
83 };
84
85 typedef enum {
86 RBD_AIO_READ,
87 RBD_AIO_WRITE,
88 RBD_AIO_DISCARD,
89 RBD_AIO_FLUSH,
90 RBD_AIO_WRITE_ZEROES
91 } RBDAIOCmd;
92
93 typedef struct BDRVRBDState {
94 rados_t cluster;
95 rados_ioctx_t io_ctx;
96 rbd_image_t image;
97 char *image_name;
98 char *snap;
99 char *namespace;
100 uint64_t image_size;
101 uint64_t object_size;
102
103 /*
104 * If @bs->encrypted is true, this is the encryption format actually loaded
105 * at the librbd level. If it is false, it is the result of probing.
106 * RBD_IMAGE_ENCRYPTION_FORMAT__MAX means that encryption is not enabled and
107 * probing didn't find any known encryption header either.
108 */
109 RbdImageEncryptionFormat encryption_format;
110 } BDRVRBDState;
111
112 typedef struct RBDTask {
113 BlockDriverState *bs;
114 Coroutine *co;
115 bool complete;
116 int64_t ret;
117 } RBDTask;
118
119 typedef struct RBDDiffIterateReq {
120 uint64_t offs;
121 uint64_t bytes;
122 bool exists;
123 } RBDDiffIterateReq;
124
125 static int qemu_rbd_connect(rados_t *cluster, rados_ioctx_t *io_ctx,
126 BlockdevOptionsRbd *opts, bool cache,
127 const char *keypairs, const char *secretid,
128 Error **errp);
129
qemu_rbd_strchr(char * src,char delim)130 static char *qemu_rbd_strchr(char *src, char delim)
131 {
132 char *p;
133
134 for (p = src; *p; ++p) {
135 if (*p == delim) {
136 return p;
137 }
138 if (*p == '\\' && p[1] != '\0') {
139 ++p;
140 }
141 }
142
143 return NULL;
144 }
145
146
qemu_rbd_next_tok(char * src,char delim,char ** p)147 static char *qemu_rbd_next_tok(char *src, char delim, char **p)
148 {
149 char *end;
150
151 *p = NULL;
152
153 end = qemu_rbd_strchr(src, delim);
154 if (end) {
155 *p = end + 1;
156 *end = '\0';
157 }
158 return src;
159 }
160
qemu_rbd_unescape(char * src)161 static void qemu_rbd_unescape(char *src)
162 {
163 char *p;
164
165 for (p = src; *src; ++src, ++p) {
166 if (*src == '\\' && src[1] != '\0') {
167 src++;
168 }
169 *p = *src;
170 }
171 *p = '\0';
172 }
173
qemu_rbd_parse_filename(const char * filename,QDict * options,Error ** errp)174 static void qemu_rbd_parse_filename(const char *filename, QDict *options,
175 Error **errp)
176 {
177 const char *start;
178 char *p, *buf;
179 QList *keypairs = NULL;
180 char *found_str, *image_name;
181
182 if (!strstart(filename, "rbd:", &start)) {
183 error_setg(errp, "File name must start with 'rbd:'");
184 return;
185 }
186
187 buf = g_strdup(start);
188 p = buf;
189
190 found_str = qemu_rbd_next_tok(p, '/', &p);
191 if (!p) {
192 error_setg(errp, "Pool name is required");
193 goto done;
194 }
195 qemu_rbd_unescape(found_str);
196 qdict_put_str(options, "pool", found_str);
197
198 if (qemu_rbd_strchr(p, '@')) {
199 image_name = qemu_rbd_next_tok(p, '@', &p);
200
201 found_str = qemu_rbd_next_tok(p, ':', &p);
202 qemu_rbd_unescape(found_str);
203 qdict_put_str(options, "snapshot", found_str);
204 } else {
205 image_name = qemu_rbd_next_tok(p, ':', &p);
206 }
207 /* Check for namespace in the image_name */
208 if (qemu_rbd_strchr(image_name, '/')) {
209 found_str = qemu_rbd_next_tok(image_name, '/', &image_name);
210 qemu_rbd_unescape(found_str);
211 qdict_put_str(options, "namespace", found_str);
212 } else {
213 qdict_put_str(options, "namespace", "");
214 }
215 qemu_rbd_unescape(image_name);
216 qdict_put_str(options, "image", image_name);
217 if (!p) {
218 goto done;
219 }
220
221 /* The following are essentially all key/value pairs, and we treat
222 * 'id' and 'conf' a bit special. Key/value pairs may be in any order. */
223 while (p) {
224 char *name, *value;
225 name = qemu_rbd_next_tok(p, '=', &p);
226 if (!p) {
227 error_setg(errp, "conf option %s has no value", name);
228 break;
229 }
230
231 qemu_rbd_unescape(name);
232
233 value = qemu_rbd_next_tok(p, ':', &p);
234 qemu_rbd_unescape(value);
235
236 if (!strcmp(name, "conf")) {
237 qdict_put_str(options, "conf", value);
238 } else if (!strcmp(name, "id")) {
239 qdict_put_str(options, "user", value);
240 } else {
241 /*
242 * We pass these internally to qemu_rbd_set_keypairs(), so
243 * we can get away with the simpler list of [ "key1",
244 * "value1", "key2", "value2" ] rather than a raw dict
245 * { "key1": "value1", "key2": "value2" } where we can't
246 * guarantee order, or even a more correct but complex
247 * [ { "key1": "value1" }, { "key2": "value2" } ]
248 */
249 if (!keypairs) {
250 keypairs = qlist_new();
251 }
252 qlist_append_str(keypairs, name);
253 qlist_append_str(keypairs, value);
254 }
255 }
256
257 if (keypairs) {
258 qdict_put(options, "=keyvalue-pairs",
259 qstring_from_gstring(qobject_to_json(QOBJECT(keypairs))));
260 }
261
262 done:
263 g_free(buf);
264 qobject_unref(keypairs);
265 }
266
qemu_rbd_set_auth(rados_t cluster,BlockdevOptionsRbd * opts,Error ** errp)267 static int qemu_rbd_set_auth(rados_t cluster, BlockdevOptionsRbd *opts,
268 Error **errp)
269 {
270 char *key, *acr;
271 int r;
272 GString *accu;
273 RbdAuthModeList *auth;
274
275 if (opts->key_secret) {
276 key = qcrypto_secret_lookup_as_base64(opts->key_secret, errp);
277 if (!key) {
278 return -EIO;
279 }
280 r = rados_conf_set(cluster, "key", key);
281 g_free(key);
282 if (r < 0) {
283 error_setg_errno(errp, -r, "Could not set 'key'");
284 return r;
285 }
286 }
287
288 if (opts->has_auth_client_required) {
289 accu = g_string_new("");
290 for (auth = opts->auth_client_required; auth; auth = auth->next) {
291 if (accu->str[0]) {
292 g_string_append_c(accu, ';');
293 }
294 g_string_append(accu, RbdAuthMode_str(auth->value));
295 }
296 acr = g_string_free(accu, FALSE);
297 r = rados_conf_set(cluster, "auth_client_required", acr);
298 g_free(acr);
299 if (r < 0) {
300 error_setg_errno(errp, -r,
301 "Could not set 'auth_client_required'");
302 return r;
303 }
304 }
305
306 return 0;
307 }
308
qemu_rbd_set_keypairs(rados_t cluster,const char * keypairs_json,Error ** errp)309 static int qemu_rbd_set_keypairs(rados_t cluster, const char *keypairs_json,
310 Error **errp)
311 {
312 QList *keypairs;
313 QString *name;
314 QString *value;
315 const char *key;
316 size_t remaining;
317 int ret = 0;
318
319 if (!keypairs_json) {
320 return ret;
321 }
322 keypairs = qobject_to(QList,
323 qobject_from_json(keypairs_json, &error_abort));
324 remaining = qlist_size(keypairs) / 2;
325 assert(remaining);
326
327 while (remaining--) {
328 name = qobject_to(QString, qlist_pop(keypairs));
329 value = qobject_to(QString, qlist_pop(keypairs));
330 assert(name && value);
331 key = qstring_get_str(name);
332
333 ret = rados_conf_set(cluster, key, qstring_get_str(value));
334 qobject_unref(value);
335 if (ret < 0) {
336 error_setg_errno(errp, -ret, "invalid conf option %s", key);
337 qobject_unref(name);
338 ret = -EINVAL;
339 break;
340 }
341 qobject_unref(name);
342 }
343
344 qobject_unref(keypairs);
345 return ret;
346 }
347
348 #ifdef LIBRBD_SUPPORTS_ENCRYPTION
qemu_rbd_convert_luks_options(RbdEncryptionOptionsLUKSBase * luks_opts,char ** passphrase,size_t * passphrase_len,Error ** errp)349 static int qemu_rbd_convert_luks_options(
350 RbdEncryptionOptionsLUKSBase *luks_opts,
351 char **passphrase,
352 size_t *passphrase_len,
353 Error **errp)
354 {
355 return qcrypto_secret_lookup(luks_opts->key_secret, (uint8_t **)passphrase,
356 passphrase_len, errp);
357 }
358
qemu_rbd_convert_luks_create_options(RbdEncryptionCreateOptionsLUKSBase * luks_opts,rbd_encryption_algorithm_t * alg,char ** passphrase,size_t * passphrase_len,Error ** errp)359 static int qemu_rbd_convert_luks_create_options(
360 RbdEncryptionCreateOptionsLUKSBase *luks_opts,
361 rbd_encryption_algorithm_t *alg,
362 char **passphrase,
363 size_t *passphrase_len,
364 Error **errp)
365 {
366 int r = 0;
367
368 r = qemu_rbd_convert_luks_options(
369 qapi_RbdEncryptionCreateOptionsLUKSBase_base(luks_opts),
370 passphrase, passphrase_len, errp);
371 if (r < 0) {
372 return r;
373 }
374
375 if (luks_opts->has_cipher_alg) {
376 switch (luks_opts->cipher_alg) {
377 case QCRYPTO_CIPHER_ALGO_AES_128: {
378 *alg = RBD_ENCRYPTION_ALGORITHM_AES128;
379 break;
380 }
381 case QCRYPTO_CIPHER_ALGO_AES_256: {
382 *alg = RBD_ENCRYPTION_ALGORITHM_AES256;
383 break;
384 }
385 default: {
386 r = -ENOTSUP;
387 error_setg_errno(errp, -r, "unknown encryption algorithm: %u",
388 luks_opts->cipher_alg);
389 return r;
390 }
391 }
392 } else {
393 /* default alg */
394 *alg = RBD_ENCRYPTION_ALGORITHM_AES256;
395 }
396
397 return 0;
398 }
399
qemu_rbd_encryption_format(rbd_image_t image,RbdEncryptionCreateOptions * encrypt,Error ** errp)400 static int qemu_rbd_encryption_format(rbd_image_t image,
401 RbdEncryptionCreateOptions *encrypt,
402 Error **errp)
403 {
404 int r = 0;
405 g_autofree char *passphrase = NULL;
406 rbd_encryption_format_t format;
407 rbd_encryption_options_t opts;
408 rbd_encryption_luks1_format_options_t luks_opts;
409 rbd_encryption_luks2_format_options_t luks2_opts;
410 size_t opts_size;
411 uint64_t raw_size, effective_size;
412
413 r = rbd_get_size(image, &raw_size);
414 if (r < 0) {
415 error_setg_errno(errp, -r, "cannot get raw image size");
416 return r;
417 }
418
419 switch (encrypt->format) {
420 case RBD_IMAGE_ENCRYPTION_FORMAT_LUKS: {
421 memset(&luks_opts, 0, sizeof(luks_opts));
422 format = RBD_ENCRYPTION_FORMAT_LUKS1;
423 opts = &luks_opts;
424 opts_size = sizeof(luks_opts);
425 r = qemu_rbd_convert_luks_create_options(
426 qapi_RbdEncryptionCreateOptionsLUKS_base(&encrypt->u.luks),
427 &luks_opts.alg, &passphrase, &luks_opts.passphrase_size,
428 errp);
429 if (r < 0) {
430 return r;
431 }
432 luks_opts.passphrase = passphrase;
433 break;
434 }
435 case RBD_IMAGE_ENCRYPTION_FORMAT_LUKS2: {
436 memset(&luks2_opts, 0, sizeof(luks2_opts));
437 format = RBD_ENCRYPTION_FORMAT_LUKS2;
438 opts = &luks2_opts;
439 opts_size = sizeof(luks2_opts);
440 r = qemu_rbd_convert_luks_create_options(
441 qapi_RbdEncryptionCreateOptionsLUKS2_base(
442 &encrypt->u.luks2),
443 &luks2_opts.alg, &passphrase, &luks2_opts.passphrase_size,
444 errp);
445 if (r < 0) {
446 return r;
447 }
448 luks2_opts.passphrase = passphrase;
449 break;
450 }
451 default: {
452 r = -ENOTSUP;
453 error_setg_errno(
454 errp, -r, "unknown image encryption format: %u",
455 encrypt->format);
456 return r;
457 }
458 }
459
460 r = rbd_encryption_format(image, format, opts, opts_size);
461 if (r < 0) {
462 error_setg_errno(errp, -r, "encryption format fail");
463 return r;
464 }
465
466 r = rbd_get_size(image, &effective_size);
467 if (r < 0) {
468 error_setg_errno(errp, -r, "cannot get effective image size");
469 return r;
470 }
471
472 r = rbd_resize(image, raw_size + (raw_size - effective_size));
473 if (r < 0) {
474 error_setg_errno(errp, -r, "cannot resize image after format");
475 return r;
476 }
477
478 return 0;
479 }
480
qemu_rbd_encryption_load(BlockDriverState * bs,rbd_image_t image,RbdEncryptionOptions * encrypt,Error ** errp)481 static int qemu_rbd_encryption_load(BlockDriverState *bs,
482 rbd_image_t image,
483 RbdEncryptionOptions *encrypt,
484 Error **errp)
485 {
486 BDRVRBDState *s = bs->opaque;
487 int r = 0;
488 g_autofree char *passphrase = NULL;
489 rbd_encryption_luks1_format_options_t luks_opts;
490 rbd_encryption_luks2_format_options_t luks2_opts;
491 #ifdef LIBRBD_SUPPORTS_ENCRYPTION_LOAD2
492 rbd_encryption_luks_format_options_t luks_any_opts;
493 #endif
494 rbd_encryption_format_t format;
495 rbd_encryption_options_t opts;
496 size_t opts_size;
497
498 switch (encrypt->format) {
499 case RBD_IMAGE_ENCRYPTION_FORMAT_LUKS: {
500 memset(&luks_opts, 0, sizeof(luks_opts));
501 format = RBD_ENCRYPTION_FORMAT_LUKS1;
502 opts = &luks_opts;
503 opts_size = sizeof(luks_opts);
504 r = qemu_rbd_convert_luks_options(
505 qapi_RbdEncryptionOptionsLUKS_base(&encrypt->u.luks),
506 &passphrase, &luks_opts.passphrase_size, errp);
507 if (r < 0) {
508 return r;
509 }
510 luks_opts.passphrase = passphrase;
511 break;
512 }
513 case RBD_IMAGE_ENCRYPTION_FORMAT_LUKS2: {
514 memset(&luks2_opts, 0, sizeof(luks2_opts));
515 format = RBD_ENCRYPTION_FORMAT_LUKS2;
516 opts = &luks2_opts;
517 opts_size = sizeof(luks2_opts);
518 r = qemu_rbd_convert_luks_options(
519 qapi_RbdEncryptionOptionsLUKS2_base(&encrypt->u.luks2),
520 &passphrase, &luks2_opts.passphrase_size, errp);
521 if (r < 0) {
522 return r;
523 }
524 luks2_opts.passphrase = passphrase;
525 break;
526 }
527 #ifdef LIBRBD_SUPPORTS_ENCRYPTION_LOAD2
528 case RBD_IMAGE_ENCRYPTION_FORMAT_LUKS_ANY: {
529 memset(&luks_any_opts, 0, sizeof(luks_any_opts));
530 format = RBD_ENCRYPTION_FORMAT_LUKS;
531 opts = &luks_any_opts;
532 opts_size = sizeof(luks_any_opts);
533 r = qemu_rbd_convert_luks_options(
534 qapi_RbdEncryptionOptionsLUKSAny_base(&encrypt->u.luks_any),
535 &passphrase, &luks_any_opts.passphrase_size, errp);
536 if (r < 0) {
537 return r;
538 }
539 luks_any_opts.passphrase = passphrase;
540 break;
541 }
542 #endif
543 default: {
544 r = -ENOTSUP;
545 error_setg_errno(
546 errp, -r, "unknown image encryption format: %u",
547 encrypt->format);
548 return r;
549 }
550 }
551
552 r = rbd_encryption_load(image, format, opts, opts_size);
553 if (r < 0) {
554 error_setg_errno(errp, -r, "encryption load fail");
555 return r;
556 }
557 bs->encrypted = true;
558 s->encryption_format = encrypt->format;
559
560 return 0;
561 }
562
563 #ifdef LIBRBD_SUPPORTS_ENCRYPTION_LOAD2
qemu_rbd_encryption_load2(BlockDriverState * bs,rbd_image_t image,RbdEncryptionOptions * encrypt,Error ** errp)564 static int qemu_rbd_encryption_load2(BlockDriverState *bs,
565 rbd_image_t image,
566 RbdEncryptionOptions *encrypt,
567 Error **errp)
568 {
569 BDRVRBDState *s = bs->opaque;
570 int r = 0;
571 int encrypt_count = 1;
572 int i;
573 RbdEncryptionOptions *curr_encrypt;
574 rbd_encryption_spec_t *specs;
575 rbd_encryption_luks1_format_options_t *luks_opts;
576 rbd_encryption_luks2_format_options_t *luks2_opts;
577 rbd_encryption_luks_format_options_t *luks_any_opts;
578
579 /* count encryption options */
580 for (curr_encrypt = encrypt->parent; curr_encrypt;
581 curr_encrypt = curr_encrypt->parent) {
582 ++encrypt_count;
583 }
584
585 specs = g_new0(rbd_encryption_spec_t, encrypt_count);
586
587 curr_encrypt = encrypt;
588 for (i = 0; i < encrypt_count; ++i) {
589 switch (curr_encrypt->format) {
590 case RBD_IMAGE_ENCRYPTION_FORMAT_LUKS: {
591 specs[i].format = RBD_ENCRYPTION_FORMAT_LUKS1;
592
593 luks_opts = g_new0(rbd_encryption_luks1_format_options_t, 1);
594 specs[i].opts = luks_opts;
595 specs[i].opts_size = sizeof(*luks_opts);
596
597 r = qemu_rbd_convert_luks_options(
598 qapi_RbdEncryptionOptionsLUKS_base(
599 &curr_encrypt->u.luks),
600 (char **)&luks_opts->passphrase,
601 &luks_opts->passphrase_size,
602 errp);
603 break;
604 }
605 case RBD_IMAGE_ENCRYPTION_FORMAT_LUKS2: {
606 specs[i].format = RBD_ENCRYPTION_FORMAT_LUKS2;
607
608 luks2_opts = g_new0(rbd_encryption_luks2_format_options_t, 1);
609 specs[i].opts = luks2_opts;
610 specs[i].opts_size = sizeof(*luks2_opts);
611
612 r = qemu_rbd_convert_luks_options(
613 qapi_RbdEncryptionOptionsLUKS2_base(
614 &curr_encrypt->u.luks2),
615 (char **)&luks2_opts->passphrase,
616 &luks2_opts->passphrase_size,
617 errp);
618 break;
619 }
620 case RBD_IMAGE_ENCRYPTION_FORMAT_LUKS_ANY: {
621 specs[i].format = RBD_ENCRYPTION_FORMAT_LUKS;
622
623 luks_any_opts = g_new0(rbd_encryption_luks_format_options_t, 1);
624 specs[i].opts = luks_any_opts;
625 specs[i].opts_size = sizeof(*luks_any_opts);
626
627 r = qemu_rbd_convert_luks_options(
628 qapi_RbdEncryptionOptionsLUKSAny_base(
629 &curr_encrypt->u.luks_any),
630 (char **)&luks_any_opts->passphrase,
631 &luks_any_opts->passphrase_size,
632 errp);
633 break;
634 }
635 default: {
636 r = -ENOTSUP;
637 error_setg_errno(
638 errp, -r, "unknown image encryption format: %u",
639 curr_encrypt->format);
640 }
641 }
642
643 if (r < 0) {
644 goto exit;
645 }
646
647 curr_encrypt = curr_encrypt->parent;
648 }
649
650 r = rbd_encryption_load2(image, specs, encrypt_count);
651 if (r < 0) {
652 error_setg_errno(errp, -r, "layered encryption load fail");
653 goto exit;
654 }
655 bs->encrypted = true;
656 s->encryption_format = encrypt->format;
657
658 exit:
659 for (i = 0; i < encrypt_count; ++i) {
660 if (!specs[i].opts) {
661 break;
662 }
663
664 switch (specs[i].format) {
665 case RBD_ENCRYPTION_FORMAT_LUKS1: {
666 luks_opts = specs[i].opts;
667 g_free((void *)luks_opts->passphrase);
668 break;
669 }
670 case RBD_ENCRYPTION_FORMAT_LUKS2: {
671 luks2_opts = specs[i].opts;
672 g_free((void *)luks2_opts->passphrase);
673 break;
674 }
675 case RBD_ENCRYPTION_FORMAT_LUKS: {
676 luks_any_opts = specs[i].opts;
677 g_free((void *)luks_any_opts->passphrase);
678 break;
679 }
680 }
681
682 g_free(specs[i].opts);
683 }
684 g_free(specs);
685 return r;
686 }
687 #endif
688 #endif
689
690 /*
691 * For an image without encryption enabled on the rbd layer, probe the start of
692 * the image if it could be opened as an encrypted image so that we can display
693 * it when the user queries the node (most importantly in qemu-img).
694 *
695 * If the guest writes an encryption header to its disk after this probing, this
696 * won't be reflected when queried, but that's okay. There is no reason why the
697 * user should want to apply encryption at the rbd level while the image is
698 * still in use. This is just guest data.
699 */
qemu_rbd_encryption_probe(BlockDriverState * bs)700 static void qemu_rbd_encryption_probe(BlockDriverState *bs)
701 {
702 BDRVRBDState *s = bs->opaque;
703 char buf[RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN] = {0};
704 int r;
705
706 assert(s->encryption_format == RBD_IMAGE_ENCRYPTION_FORMAT__MAX);
707
708 r = rbd_read(s->image, 0,
709 RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN, buf);
710 if (r < RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN) {
711 return;
712 }
713
714 if (memcmp(buf, rbd_luks_header_verification,
715 RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN) == 0) {
716 s->encryption_format = RBD_IMAGE_ENCRYPTION_FORMAT_LUKS;
717 } else if (memcmp(buf, rbd_luks2_header_verification,
718 RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN) == 0) {
719 s->encryption_format = RBD_IMAGE_ENCRYPTION_FORMAT_LUKS2;
720 } else if (memcmp(buf, rbd_layered_luks_header_verification,
721 RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN) == 0) {
722 s->encryption_format = RBD_IMAGE_ENCRYPTION_FORMAT_LUKS;
723 } else if (memcmp(buf, rbd_layered_luks2_header_verification,
724 RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN) == 0) {
725 s->encryption_format = RBD_IMAGE_ENCRYPTION_FORMAT_LUKS2;
726 }
727 }
728
729 /* FIXME Deprecate and remove keypairs or make it available in QMP. */
qemu_rbd_do_create(BlockdevCreateOptions * options,const char * keypairs,const char * password_secret,Error ** errp)730 static int qemu_rbd_do_create(BlockdevCreateOptions *options,
731 const char *keypairs, const char *password_secret,
732 Error **errp)
733 {
734 BlockdevCreateOptionsRbd *opts = &options->u.rbd;
735 rados_t cluster;
736 rados_ioctx_t io_ctx;
737 int obj_order = 0;
738 int ret;
739
740 assert(options->driver == BLOCKDEV_DRIVER_RBD);
741 if (opts->location->snapshot) {
742 error_setg(errp, "Can't use snapshot name for image creation");
743 return -EINVAL;
744 }
745
746 #ifndef LIBRBD_SUPPORTS_ENCRYPTION
747 if (opts->encrypt) {
748 error_setg(errp, "RBD library does not support image encryption");
749 return -ENOTSUP;
750 }
751 #endif
752
753 if (opts->has_cluster_size) {
754 int64_t objsize = opts->cluster_size;
755 if ((objsize - 1) & objsize) { /* not a power of 2? */
756 error_setg(errp, "obj size needs to be power of 2");
757 return -EINVAL;
758 }
759 if (objsize < 4096) {
760 error_setg(errp, "obj size too small");
761 return -EINVAL;
762 }
763 obj_order = ctz32(objsize);
764 }
765
766 ret = qemu_rbd_connect(&cluster, &io_ctx, opts->location, false, keypairs,
767 password_secret, errp);
768 if (ret < 0) {
769 return ret;
770 }
771
772 ret = rbd_create(io_ctx, opts->location->image, opts->size, &obj_order);
773 if (ret < 0) {
774 error_setg_errno(errp, -ret, "error rbd create");
775 goto out;
776 }
777
778 #ifdef LIBRBD_SUPPORTS_ENCRYPTION
779 if (opts->encrypt) {
780 rbd_image_t image;
781
782 ret = rbd_open(io_ctx, opts->location->image, &image, NULL);
783 if (ret < 0) {
784 error_setg_errno(errp, -ret,
785 "error opening image '%s' for encryption format",
786 opts->location->image);
787 goto out;
788 }
789
790 ret = qemu_rbd_encryption_format(image, opts->encrypt, errp);
791 rbd_close(image);
792 if (ret < 0) {
793 /* encryption format fail, try removing the image */
794 rbd_remove(io_ctx, opts->location->image);
795 goto out;
796 }
797 }
798 #endif
799
800 ret = 0;
801 out:
802 rados_ioctx_destroy(io_ctx);
803 rados_shutdown(cluster);
804 return ret;
805 }
806
qemu_rbd_co_create(BlockdevCreateOptions * options,Error ** errp)807 static int qemu_rbd_co_create(BlockdevCreateOptions *options, Error **errp)
808 {
809 return qemu_rbd_do_create(options, NULL, NULL, errp);
810 }
811
qemu_rbd_extract_encryption_create_options(QemuOpts * opts,RbdEncryptionCreateOptions ** spec,Error ** errp)812 static int qemu_rbd_extract_encryption_create_options(
813 QemuOpts *opts,
814 RbdEncryptionCreateOptions **spec,
815 Error **errp)
816 {
817 QDict *opts_qdict;
818 QDict *encrypt_qdict;
819 Visitor *v;
820 int ret = 0;
821
822 opts_qdict = qemu_opts_to_qdict(opts, NULL);
823 qdict_extract_subqdict(opts_qdict, &encrypt_qdict, "encrypt.");
824 qobject_unref(opts_qdict);
825 if (!qdict_size(encrypt_qdict)) {
826 *spec = NULL;
827 goto exit;
828 }
829
830 /* Convert options into a QAPI object */
831 v = qobject_input_visitor_new_flat_confused(encrypt_qdict, errp);
832 if (!v) {
833 ret = -EINVAL;
834 goto exit;
835 }
836
837 visit_type_RbdEncryptionCreateOptions(v, NULL, spec, errp);
838 visit_free(v);
839 if (!*spec) {
840 ret = -EINVAL;
841 goto exit;
842 }
843
844 exit:
845 qobject_unref(encrypt_qdict);
846 return ret;
847 }
848
qemu_rbd_co_create_opts(BlockDriver * drv,const char * filename,QemuOpts * opts,Error ** errp)849 static int coroutine_fn qemu_rbd_co_create_opts(BlockDriver *drv,
850 const char *filename,
851 QemuOpts *opts,
852 Error **errp)
853 {
854 BlockdevCreateOptions *create_options;
855 BlockdevCreateOptionsRbd *rbd_opts;
856 BlockdevOptionsRbd *loc;
857 RbdEncryptionCreateOptions *encrypt = NULL;
858 Error *local_err = NULL;
859 const char *keypairs, *password_secret;
860 QDict *options = NULL;
861 int ret = 0;
862
863 create_options = g_new0(BlockdevCreateOptions, 1);
864 create_options->driver = BLOCKDEV_DRIVER_RBD;
865 rbd_opts = &create_options->u.rbd;
866
867 rbd_opts->location = g_new0(BlockdevOptionsRbd, 1);
868
869 password_secret = qemu_opt_get(opts, "password-secret");
870
871 /* Read out options */
872 rbd_opts->size = ROUND_UP(qemu_opt_get_size_del(opts, BLOCK_OPT_SIZE, 0),
873 BDRV_SECTOR_SIZE);
874 rbd_opts->cluster_size = qemu_opt_get_size_del(opts,
875 BLOCK_OPT_CLUSTER_SIZE, 0);
876 rbd_opts->has_cluster_size = (rbd_opts->cluster_size != 0);
877
878 options = qdict_new();
879 qemu_rbd_parse_filename(filename, options, &local_err);
880 if (local_err) {
881 ret = -EINVAL;
882 error_propagate(errp, local_err);
883 goto exit;
884 }
885
886 ret = qemu_rbd_extract_encryption_create_options(opts, &encrypt, errp);
887 if (ret < 0) {
888 goto exit;
889 }
890 rbd_opts->encrypt = encrypt;
891
892 /*
893 * Caution: while qdict_get_try_str() is fine, getting non-string
894 * types would require more care. When @options come from -blockdev
895 * or blockdev_add, its members are typed according to the QAPI
896 * schema, but when they come from -drive, they're all QString.
897 */
898 loc = rbd_opts->location;
899 loc->pool = g_strdup(qdict_get_try_str(options, "pool"));
900 loc->conf = g_strdup(qdict_get_try_str(options, "conf"));
901 loc->user = g_strdup(qdict_get_try_str(options, "user"));
902 loc->q_namespace = g_strdup(qdict_get_try_str(options, "namespace"));
903 loc->image = g_strdup(qdict_get_try_str(options, "image"));
904 keypairs = qdict_get_try_str(options, "=keyvalue-pairs");
905
906 ret = qemu_rbd_do_create(create_options, keypairs, password_secret, errp);
907 if (ret < 0) {
908 goto exit;
909 }
910
911 exit:
912 qobject_unref(options);
913 qapi_free_BlockdevCreateOptions(create_options);
914 return ret;
915 }
916
qemu_rbd_mon_host(BlockdevOptionsRbd * opts,Error ** errp)917 static char *qemu_rbd_mon_host(BlockdevOptionsRbd *opts, Error **errp)
918 {
919 const char **vals;
920 const char *host, *port;
921 char *rados_str;
922 InetSocketAddressBaseList *p;
923 int i, cnt;
924
925 if (!opts->has_server) {
926 return NULL;
927 }
928
929 for (cnt = 0, p = opts->server; p; p = p->next) {
930 cnt++;
931 }
932
933 vals = g_new(const char *, cnt + 1);
934
935 for (i = 0, p = opts->server; p; p = p->next, i++) {
936 host = p->value->host;
937 port = p->value->port;
938
939 if (strchr(host, ':')) {
940 vals[i] = g_strdup_printf("[%s]:%s", host, port);
941 } else {
942 vals[i] = g_strdup_printf("%s:%s", host, port);
943 }
944 }
945 vals[i] = NULL;
946
947 rados_str = i ? g_strjoinv(";", (char **)vals) : NULL;
948 g_strfreev((char **)vals);
949 return rados_str;
950 }
951
qemu_rbd_connect(rados_t * cluster,rados_ioctx_t * io_ctx,BlockdevOptionsRbd * opts,bool cache,const char * keypairs,const char * secretid,Error ** errp)952 static int qemu_rbd_connect(rados_t *cluster, rados_ioctx_t *io_ctx,
953 BlockdevOptionsRbd *opts, bool cache,
954 const char *keypairs, const char *secretid,
955 Error **errp)
956 {
957 char *mon_host = NULL;
958 Error *local_err = NULL;
959 int r;
960
961 if (secretid) {
962 if (opts->key_secret) {
963 error_setg(errp,
964 "Legacy 'password-secret' clashes with 'key-secret'");
965 return -EINVAL;
966 }
967 opts->key_secret = g_strdup(secretid);
968 }
969
970 mon_host = qemu_rbd_mon_host(opts, &local_err);
971 if (local_err) {
972 error_propagate(errp, local_err);
973 r = -EINVAL;
974 goto out;
975 }
976
977 r = rados_create(cluster, opts->user);
978 if (r < 0) {
979 error_setg_errno(errp, -r, "error initializing");
980 goto out;
981 }
982
983 /* try default location when conf=NULL, but ignore failure */
984 r = rados_conf_read_file(*cluster, opts->conf);
985 if (opts->conf && r < 0) {
986 error_setg_errno(errp, -r, "error reading conf file %s", opts->conf);
987 goto failed_shutdown;
988 }
989
990 r = qemu_rbd_set_keypairs(*cluster, keypairs, errp);
991 if (r < 0) {
992 goto failed_shutdown;
993 }
994
995 if (mon_host) {
996 r = rados_conf_set(*cluster, "mon_host", mon_host);
997 if (r < 0) {
998 goto failed_shutdown;
999 }
1000 }
1001
1002 r = qemu_rbd_set_auth(*cluster, opts, errp);
1003 if (r < 0) {
1004 goto failed_shutdown;
1005 }
1006
1007 /*
1008 * Fallback to more conservative semantics if setting cache
1009 * options fails. Ignore errors from setting rbd_cache because the
1010 * only possible error is that the option does not exist, and
1011 * librbd defaults to no caching. If write through caching cannot
1012 * be set up, fall back to no caching.
1013 */
1014 if (cache) {
1015 rados_conf_set(*cluster, "rbd_cache", "true");
1016 } else {
1017 rados_conf_set(*cluster, "rbd_cache", "false");
1018 }
1019
1020 r = rados_connect(*cluster);
1021 if (r < 0) {
1022 error_setg_errno(errp, -r, "error connecting");
1023 goto failed_shutdown;
1024 }
1025
1026 r = rados_ioctx_create(*cluster, opts->pool, io_ctx);
1027 if (r < 0) {
1028 error_setg_errno(errp, -r, "error opening pool %s", opts->pool);
1029 goto failed_shutdown;
1030 }
1031
1032 #ifdef HAVE_RBD_NAMESPACE_EXISTS
1033 if (opts->q_namespace && strlen(opts->q_namespace) > 0) {
1034 bool exists;
1035
1036 r = rbd_namespace_exists(*io_ctx, opts->q_namespace, &exists);
1037 if (r < 0) {
1038 error_setg_errno(errp, -r, "error checking namespace");
1039 goto failed_ioctx_destroy;
1040 }
1041
1042 if (!exists) {
1043 error_setg(errp, "namespace '%s' does not exist",
1044 opts->q_namespace);
1045 r = -ENOENT;
1046 goto failed_ioctx_destroy;
1047 }
1048 }
1049 #endif
1050
1051 /*
1052 * Set the namespace after opening the io context on the pool,
1053 * if nspace == NULL or if nspace == "", it is just as we did nothing
1054 */
1055 rados_ioctx_set_namespace(*io_ctx, opts->q_namespace);
1056
1057 r = 0;
1058 goto out;
1059
1060 #ifdef HAVE_RBD_NAMESPACE_EXISTS
1061 failed_ioctx_destroy:
1062 rados_ioctx_destroy(*io_ctx);
1063 #endif
1064 failed_shutdown:
1065 rados_shutdown(*cluster);
1066 out:
1067 g_free(mon_host);
1068 return r;
1069 }
1070
qemu_rbd_convert_options(QDict * options,BlockdevOptionsRbd ** opts,Error ** errp)1071 static int qemu_rbd_convert_options(QDict *options, BlockdevOptionsRbd **opts,
1072 Error **errp)
1073 {
1074 Visitor *v;
1075
1076 /* Convert the remaining options into a QAPI object */
1077 v = qobject_input_visitor_new_flat_confused(options, errp);
1078 if (!v) {
1079 return -EINVAL;
1080 }
1081
1082 visit_type_BlockdevOptionsRbd(v, NULL, opts, errp);
1083 visit_free(v);
1084 if (!opts) {
1085 return -EINVAL;
1086 }
1087
1088 return 0;
1089 }
1090
qemu_rbd_attempt_legacy_options(QDict * options,BlockdevOptionsRbd ** opts,char ** keypairs)1091 static int qemu_rbd_attempt_legacy_options(QDict *options,
1092 BlockdevOptionsRbd **opts,
1093 char **keypairs)
1094 {
1095 char *filename;
1096 int r;
1097
1098 filename = g_strdup(qdict_get_try_str(options, "filename"));
1099 if (!filename) {
1100 return -EINVAL;
1101 }
1102 qdict_del(options, "filename");
1103
1104 qemu_rbd_parse_filename(filename, options, NULL);
1105
1106 /* keypairs freed by caller */
1107 *keypairs = g_strdup(qdict_get_try_str(options, "=keyvalue-pairs"));
1108 if (*keypairs) {
1109 qdict_del(options, "=keyvalue-pairs");
1110 }
1111
1112 r = qemu_rbd_convert_options(options, opts, NULL);
1113
1114 g_free(filename);
1115 return r;
1116 }
1117
qemu_rbd_open(BlockDriverState * bs,QDict * options,int flags,Error ** errp)1118 static int qemu_rbd_open(BlockDriverState *bs, QDict *options, int flags,
1119 Error **errp)
1120 {
1121 BDRVRBDState *s = bs->opaque;
1122 BlockdevOptionsRbd *opts = NULL;
1123 const QDictEntry *e;
1124 Error *local_err = NULL;
1125 char *keypairs, *secretid;
1126 rbd_image_info_t info;
1127 int r;
1128
1129 keypairs = g_strdup(qdict_get_try_str(options, "=keyvalue-pairs"));
1130 if (keypairs) {
1131 qdict_del(options, "=keyvalue-pairs");
1132 }
1133
1134 secretid = g_strdup(qdict_get_try_str(options, "password-secret"));
1135 if (secretid) {
1136 qdict_del(options, "password-secret");
1137 }
1138
1139 r = qemu_rbd_convert_options(options, &opts, &local_err);
1140 if (local_err) {
1141 /* If keypairs are present, that means some options are present in
1142 * the modern option format. Don't attempt to parse legacy option
1143 * formats, as we won't support mixed usage. */
1144 if (keypairs) {
1145 error_propagate(errp, local_err);
1146 goto out;
1147 }
1148
1149 /* If the initial attempt to convert and process the options failed,
1150 * we may be attempting to open an image file that has the rbd options
1151 * specified in the older format consisting of all key/value pairs
1152 * encoded in the filename. Go ahead and attempt to parse the
1153 * filename, and see if we can pull out the required options. */
1154 r = qemu_rbd_attempt_legacy_options(options, &opts, &keypairs);
1155 if (r < 0) {
1156 /* Propagate the original error, not the legacy parsing fallback
1157 * error, as the latter was just a best-effort attempt. */
1158 error_propagate(errp, local_err);
1159 goto out;
1160 }
1161 /* Take care whenever deciding to actually deprecate; once this ability
1162 * is removed, we will not be able to open any images with legacy-styled
1163 * backing image strings. */
1164 warn_report("RBD options encoded in the filename as keyvalue pairs "
1165 "is deprecated");
1166 }
1167
1168 /* Remove the processed options from the QDict (the visitor processes
1169 * _all_ options in the QDict) */
1170 while ((e = qdict_first(options))) {
1171 qdict_del(options, e->key);
1172 }
1173
1174 r = qemu_rbd_connect(&s->cluster, &s->io_ctx, opts,
1175 !(flags & BDRV_O_NOCACHE), keypairs, secretid, errp);
1176 if (r < 0) {
1177 goto out;
1178 }
1179
1180 s->snap = g_strdup(opts->snapshot);
1181 s->image_name = g_strdup(opts->image);
1182
1183 /* rbd_open is always r/w */
1184 r = rbd_open(s->io_ctx, s->image_name, &s->image, s->snap);
1185 if (r < 0) {
1186 error_setg_errno(errp, -r, "error reading header from %s",
1187 s->image_name);
1188 goto failed_open;
1189 }
1190
1191 s->encryption_format = RBD_IMAGE_ENCRYPTION_FORMAT__MAX;
1192 if (opts->encrypt) {
1193 #ifdef LIBRBD_SUPPORTS_ENCRYPTION
1194 if (opts->encrypt->parent) {
1195 #ifdef LIBRBD_SUPPORTS_ENCRYPTION_LOAD2
1196 r = qemu_rbd_encryption_load2(bs, s->image, opts->encrypt, errp);
1197 #else
1198 r = -ENOTSUP;
1199 error_setg(errp, "RBD library does not support layered encryption");
1200 #endif
1201 } else {
1202 r = qemu_rbd_encryption_load(bs, s->image, opts->encrypt, errp);
1203 }
1204 if (r < 0) {
1205 goto failed_post_open;
1206 }
1207 #else
1208 r = -ENOTSUP;
1209 error_setg(errp, "RBD library does not support image encryption");
1210 goto failed_post_open;
1211 #endif
1212 } else {
1213 qemu_rbd_encryption_probe(bs);
1214 }
1215
1216 r = rbd_stat(s->image, &info, sizeof(info));
1217 if (r < 0) {
1218 error_setg_errno(errp, -r, "error getting image info from %s",
1219 s->image_name);
1220 goto failed_post_open;
1221 }
1222 s->image_size = info.size;
1223 s->object_size = info.obj_size;
1224
1225 /* If we are using an rbd snapshot, we must be r/o, otherwise
1226 * leave as-is */
1227 if (s->snap != NULL) {
1228 bdrv_graph_rdlock_main_loop();
1229 r = bdrv_apply_auto_read_only(bs, "rbd snapshots are read-only", errp);
1230 bdrv_graph_rdunlock_main_loop();
1231 if (r < 0) {
1232 goto failed_post_open;
1233 }
1234 }
1235
1236 #ifdef LIBRBD_SUPPORTS_WRITE_ZEROES
1237 bs->supported_zero_flags = BDRV_REQ_MAY_UNMAP | BDRV_REQ_NO_FALLBACK;
1238 #endif
1239
1240 /* When extending regular files, we get zeros from the OS */
1241 bs->supported_truncate_flags = BDRV_REQ_ZERO_WRITE;
1242
1243 r = 0;
1244 goto out;
1245
1246 failed_post_open:
1247 rbd_close(s->image);
1248 failed_open:
1249 rados_ioctx_destroy(s->io_ctx);
1250 g_free(s->snap);
1251 g_free(s->image_name);
1252 rados_shutdown(s->cluster);
1253 out:
1254 qapi_free_BlockdevOptionsRbd(opts);
1255 g_free(keypairs);
1256 g_free(secretid);
1257 return r;
1258 }
1259
1260
1261 /* Since RBD is currently always opened R/W via the API,
1262 * we just need to check if we are using a snapshot or not, in
1263 * order to determine if we will allow it to be R/W */
qemu_rbd_reopen_prepare(BDRVReopenState * state,BlockReopenQueue * queue,Error ** errp)1264 static int qemu_rbd_reopen_prepare(BDRVReopenState *state,
1265 BlockReopenQueue *queue, Error **errp)
1266 {
1267 BDRVRBDState *s = state->bs->opaque;
1268 int ret = 0;
1269
1270 GRAPH_RDLOCK_GUARD_MAINLOOP();
1271
1272 if (s->snap && state->flags & BDRV_O_RDWR) {
1273 error_setg(errp,
1274 "Cannot change node '%s' to r/w when using RBD snapshot",
1275 bdrv_get_device_or_node_name(state->bs));
1276 ret = -EINVAL;
1277 }
1278
1279 return ret;
1280 }
1281
qemu_rbd_close(BlockDriverState * bs)1282 static void qemu_rbd_close(BlockDriverState *bs)
1283 {
1284 BDRVRBDState *s = bs->opaque;
1285
1286 rbd_close(s->image);
1287 rados_ioctx_destroy(s->io_ctx);
1288 g_free(s->snap);
1289 g_free(s->image_name);
1290 rados_shutdown(s->cluster);
1291 }
1292
1293 /* Resize the RBD image and update the 'image_size' with the current size */
qemu_rbd_resize(BlockDriverState * bs,uint64_t size)1294 static int qemu_rbd_resize(BlockDriverState *bs, uint64_t size)
1295 {
1296 BDRVRBDState *s = bs->opaque;
1297 int r;
1298
1299 r = rbd_resize(s->image, size);
1300 if (r < 0) {
1301 return r;
1302 }
1303
1304 s->image_size = size;
1305
1306 return 0;
1307 }
1308
qemu_rbd_finish_bh(void * opaque)1309 static void qemu_rbd_finish_bh(void *opaque)
1310 {
1311 RBDTask *task = opaque;
1312 task->complete = true;
1313 aio_co_wake(task->co);
1314 }
1315
1316 /*
1317 * This is the completion callback function for all rbd aio calls
1318 * started from qemu_rbd_start_co().
1319 *
1320 * Note: this function is being called from a non qemu thread so
1321 * we need to be careful about what we do here. Generally we only
1322 * schedule a BH, and do the rest of the io completion handling
1323 * from qemu_rbd_finish_bh() which runs in a qemu context.
1324 */
qemu_rbd_completion_cb(rbd_completion_t c,RBDTask * task)1325 static void qemu_rbd_completion_cb(rbd_completion_t c, RBDTask *task)
1326 {
1327 task->ret = rbd_aio_get_return_value(c);
1328 rbd_aio_release(c);
1329 aio_bh_schedule_oneshot(bdrv_get_aio_context(task->bs),
1330 qemu_rbd_finish_bh, task);
1331 }
1332
qemu_rbd_start_co(BlockDriverState * bs,uint64_t offset,uint64_t bytes,QEMUIOVector * qiov,int flags,RBDAIOCmd cmd)1333 static int coroutine_fn qemu_rbd_start_co(BlockDriverState *bs,
1334 uint64_t offset,
1335 uint64_t bytes,
1336 QEMUIOVector *qiov,
1337 int flags,
1338 RBDAIOCmd cmd)
1339 {
1340 BDRVRBDState *s = bs->opaque;
1341 RBDTask task = { .bs = bs, .co = qemu_coroutine_self() };
1342 rbd_completion_t c;
1343 int r;
1344
1345 assert(!qiov || qiov->size == bytes);
1346
1347 if (cmd == RBD_AIO_WRITE || cmd == RBD_AIO_WRITE_ZEROES) {
1348 /*
1349 * RBD APIs don't allow us to write more than actual size, so in order
1350 * to support growing images, we resize the image before write
1351 * operations that exceed the current size.
1352 */
1353 if (offset + bytes > s->image_size) {
1354 r = qemu_rbd_resize(bs, offset + bytes);
1355 if (r < 0) {
1356 return r;
1357 }
1358 }
1359 }
1360
1361 r = rbd_aio_create_completion(&task,
1362 (rbd_callback_t) qemu_rbd_completion_cb, &c);
1363 if (r < 0) {
1364 return r;
1365 }
1366
1367 switch (cmd) {
1368 case RBD_AIO_READ:
1369 r = rbd_aio_readv(s->image, qiov->iov, qiov->niov, offset, c);
1370 break;
1371 case RBD_AIO_WRITE:
1372 r = rbd_aio_writev(s->image, qiov->iov, qiov->niov, offset, c);
1373 break;
1374 case RBD_AIO_DISCARD:
1375 r = rbd_aio_discard(s->image, offset, bytes, c);
1376 break;
1377 case RBD_AIO_FLUSH:
1378 r = rbd_aio_flush(s->image, c);
1379 break;
1380 #ifdef LIBRBD_SUPPORTS_WRITE_ZEROES
1381 case RBD_AIO_WRITE_ZEROES: {
1382 int zero_flags = 0;
1383 #ifdef RBD_WRITE_ZEROES_FLAG_THICK_PROVISION
1384 if (!(flags & BDRV_REQ_MAY_UNMAP)) {
1385 zero_flags = RBD_WRITE_ZEROES_FLAG_THICK_PROVISION;
1386 }
1387 #endif
1388 r = rbd_aio_write_zeroes(s->image, offset, bytes, c, zero_flags, 0);
1389 break;
1390 }
1391 #endif
1392 default:
1393 r = -EINVAL;
1394 }
1395
1396 if (r < 0) {
1397 error_report("rbd request failed early: cmd %d offset %" PRIu64
1398 " bytes %" PRIu64 " flags %d r %d (%s)", cmd, offset,
1399 bytes, flags, r, strerror(-r));
1400 rbd_aio_release(c);
1401 return r;
1402 }
1403
1404 while (!task.complete) {
1405 qemu_coroutine_yield();
1406 }
1407
1408 if (task.ret < 0) {
1409 error_report("rbd request failed: cmd %d offset %" PRIu64 " bytes %"
1410 PRIu64 " flags %d task.ret %" PRIi64 " (%s)", cmd, offset,
1411 bytes, flags, task.ret, strerror(-task.ret));
1412 return task.ret;
1413 }
1414
1415 /* zero pad short reads */
1416 if (cmd == RBD_AIO_READ && task.ret < qiov->size) {
1417 qemu_iovec_memset(qiov, task.ret, 0, qiov->size - task.ret);
1418 }
1419
1420 return 0;
1421 }
1422
1423 static int
qemu_rbd_co_preadv(BlockDriverState * bs,int64_t offset,int64_t bytes,QEMUIOVector * qiov,BdrvRequestFlags flags)1424 coroutine_fn qemu_rbd_co_preadv(BlockDriverState *bs, int64_t offset,
1425 int64_t bytes, QEMUIOVector *qiov,
1426 BdrvRequestFlags flags)
1427 {
1428 return qemu_rbd_start_co(bs, offset, bytes, qiov, flags, RBD_AIO_READ);
1429 }
1430
1431 static int
qemu_rbd_co_pwritev(BlockDriverState * bs,int64_t offset,int64_t bytes,QEMUIOVector * qiov,BdrvRequestFlags flags)1432 coroutine_fn qemu_rbd_co_pwritev(BlockDriverState *bs, int64_t offset,
1433 int64_t bytes, QEMUIOVector *qiov,
1434 BdrvRequestFlags flags)
1435 {
1436 return qemu_rbd_start_co(bs, offset, bytes, qiov, flags, RBD_AIO_WRITE);
1437 }
1438
qemu_rbd_co_flush(BlockDriverState * bs)1439 static int coroutine_fn qemu_rbd_co_flush(BlockDriverState *bs)
1440 {
1441 return qemu_rbd_start_co(bs, 0, 0, NULL, 0, RBD_AIO_FLUSH);
1442 }
1443
qemu_rbd_co_pdiscard(BlockDriverState * bs,int64_t offset,int64_t bytes)1444 static int coroutine_fn qemu_rbd_co_pdiscard(BlockDriverState *bs,
1445 int64_t offset, int64_t bytes)
1446 {
1447 return qemu_rbd_start_co(bs, offset, bytes, NULL, 0, RBD_AIO_DISCARD);
1448 }
1449
1450 #ifdef LIBRBD_SUPPORTS_WRITE_ZEROES
1451 static int
qemu_rbd_co_pwrite_zeroes(BlockDriverState * bs,int64_t offset,int64_t bytes,BdrvRequestFlags flags)1452 coroutine_fn qemu_rbd_co_pwrite_zeroes(BlockDriverState *bs, int64_t offset,
1453 int64_t bytes, BdrvRequestFlags flags)
1454 {
1455 return qemu_rbd_start_co(bs, offset, bytes, NULL, flags,
1456 RBD_AIO_WRITE_ZEROES);
1457 }
1458 #endif
1459
1460 static int coroutine_fn
qemu_rbd_co_get_info(BlockDriverState * bs,BlockDriverInfo * bdi)1461 qemu_rbd_co_get_info(BlockDriverState *bs, BlockDriverInfo *bdi)
1462 {
1463 BDRVRBDState *s = bs->opaque;
1464 bdi->cluster_size = s->object_size;
1465 return 0;
1466 }
1467
qemu_rbd_get_specific_info(BlockDriverState * bs,Error ** errp)1468 static ImageInfoSpecific *qemu_rbd_get_specific_info(BlockDriverState *bs,
1469 Error **errp)
1470 {
1471 BDRVRBDState *s = bs->opaque;
1472 ImageInfoSpecific *spec_info;
1473
1474 spec_info = g_new(ImageInfoSpecific, 1);
1475 *spec_info = (ImageInfoSpecific){
1476 .type = IMAGE_INFO_SPECIFIC_KIND_RBD,
1477 .u.rbd.data = g_new0(ImageInfoSpecificRbd, 1),
1478 };
1479
1480 if (s->encryption_format == RBD_IMAGE_ENCRYPTION_FORMAT__MAX) {
1481 assert(!bs->encrypted);
1482 } else {
1483 ImageInfoSpecificRbd *rbd_info = spec_info->u.rbd.data;
1484
1485 rbd_info->has_encryption_format = true;
1486 rbd_info->encryption_format = s->encryption_format;
1487 }
1488
1489 return spec_info;
1490 }
1491
1492 /*
1493 * rbd_diff_iterate2 allows to interrupt the exection by returning a negative
1494 * value in the callback routine. Choose a value that does not conflict with
1495 * an existing exitcode and return it if we want to prematurely stop the
1496 * execution because we detected a change in the allocation status.
1497 */
1498 #define QEMU_RBD_EXIT_DIFF_ITERATE2 -9000
1499
qemu_rbd_diff_iterate_cb(uint64_t offs,size_t len,int exists,void * opaque)1500 static int qemu_rbd_diff_iterate_cb(uint64_t offs, size_t len,
1501 int exists, void *opaque)
1502 {
1503 RBDDiffIterateReq *req = opaque;
1504
1505 assert(req->offs + req->bytes <= offs);
1506
1507 /* treat a hole like an unallocated area and bail out */
1508 if (!exists) {
1509 return 0;
1510 }
1511
1512 if (!req->exists && offs > req->offs) {
1513 /*
1514 * we started in an unallocated area and hit the first allocated
1515 * block. req->bytes must be set to the length of the unallocated area
1516 * before the allocated area. stop further processing.
1517 */
1518 req->bytes = offs - req->offs;
1519 return QEMU_RBD_EXIT_DIFF_ITERATE2;
1520 }
1521
1522 if (req->exists && offs > req->offs + req->bytes) {
1523 /*
1524 * we started in an allocated area and jumped over an unallocated area,
1525 * req->bytes contains the length of the allocated area before the
1526 * unallocated area. stop further processing.
1527 */
1528 return QEMU_RBD_EXIT_DIFF_ITERATE2;
1529 }
1530
1531 req->bytes += len;
1532 req->exists = true;
1533
1534 return 0;
1535 }
1536
qemu_rbd_co_block_status(BlockDriverState * bs,unsigned int mode,int64_t offset,int64_t bytes,int64_t * pnum,int64_t * map,BlockDriverState ** file)1537 static int coroutine_fn qemu_rbd_co_block_status(BlockDriverState *bs,
1538 unsigned int mode,
1539 int64_t offset, int64_t bytes,
1540 int64_t *pnum, int64_t *map,
1541 BlockDriverState **file)
1542 {
1543 BDRVRBDState *s = bs->opaque;
1544 int status, r;
1545 RBDDiffIterateReq req = { .offs = offset };
1546 uint64_t features, flags;
1547 uint64_t head = 0;
1548
1549 assert(offset + bytes <= s->image_size);
1550
1551 /* default to all sectors allocated */
1552 status = BDRV_BLOCK_DATA | BDRV_BLOCK_OFFSET_VALID;
1553 *map = offset;
1554 *file = bs;
1555 *pnum = bytes;
1556
1557 /* check if RBD image supports fast-diff */
1558 r = rbd_get_features(s->image, &features);
1559 if (r < 0) {
1560 return status;
1561 }
1562 if (!(features & RBD_FEATURE_FAST_DIFF)) {
1563 return status;
1564 }
1565
1566 /* check if RBD fast-diff result is valid */
1567 r = rbd_get_flags(s->image, &flags);
1568 if (r < 0) {
1569 return status;
1570 }
1571 if (flags & RBD_FLAG_FAST_DIFF_INVALID) {
1572 return status;
1573 }
1574
1575 #if LIBRBD_VERSION_CODE < LIBRBD_VERSION(1, 17, 0)
1576 /*
1577 * librbd had a bug until early 2022 that affected all versions of ceph that
1578 * supported fast-diff. This bug results in reporting of incorrect offsets
1579 * if the offset parameter to rbd_diff_iterate2 is not object aligned.
1580 * Work around this bug by rounding down the offset to object boundaries.
1581 * This is OK because we call rbd_diff_iterate2 with whole_object = true.
1582 * However, this workaround only works for non cloned images with default
1583 * striping.
1584 *
1585 * See: https://tracker.ceph.com/issues/53784
1586 */
1587
1588 /* check if RBD image has non-default striping enabled */
1589 if (features & RBD_FEATURE_STRIPINGV2) {
1590 return status;
1591 }
1592
1593 #pragma GCC diagnostic push
1594 #pragma GCC diagnostic ignored "-Wdeprecated-declarations"
1595 /*
1596 * check if RBD image is a clone (= has a parent).
1597 *
1598 * rbd_get_parent_info is deprecated from Nautilus onwards, but the
1599 * replacement rbd_get_parent is not present in Luminous and Mimic.
1600 */
1601 if (rbd_get_parent_info(s->image, NULL, 0, NULL, 0, NULL, 0) != -ENOENT) {
1602 return status;
1603 }
1604 #pragma GCC diagnostic pop
1605
1606 head = req.offs & (s->object_size - 1);
1607 req.offs -= head;
1608 bytes += head;
1609 #endif
1610
1611 r = rbd_diff_iterate2(s->image, NULL, req.offs, bytes, true, true,
1612 qemu_rbd_diff_iterate_cb, &req);
1613 if (r < 0 && r != QEMU_RBD_EXIT_DIFF_ITERATE2) {
1614 return status;
1615 }
1616 assert(req.bytes <= bytes);
1617 if (!req.exists) {
1618 if (r == 0) {
1619 /*
1620 * rbd_diff_iterate2 does not invoke callbacks for unallocated
1621 * areas. This here catches the case where no callback was
1622 * invoked at all (req.bytes == 0).
1623 */
1624 assert(req.bytes == 0);
1625 req.bytes = bytes;
1626 }
1627 status = BDRV_BLOCK_ZERO | BDRV_BLOCK_OFFSET_VALID;
1628 }
1629
1630 assert(req.bytes > head);
1631 *pnum = req.bytes - head;
1632 return status;
1633 }
1634
qemu_rbd_co_getlength(BlockDriverState * bs)1635 static int64_t coroutine_fn qemu_rbd_co_getlength(BlockDriverState *bs)
1636 {
1637 BDRVRBDState *s = bs->opaque;
1638 int r;
1639
1640 r = rbd_get_size(s->image, &s->image_size);
1641 if (r < 0) {
1642 return r;
1643 }
1644
1645 return s->image_size;
1646 }
1647
qemu_rbd_co_truncate(BlockDriverState * bs,int64_t offset,bool exact,PreallocMode prealloc,BdrvRequestFlags flags,Error ** errp)1648 static int coroutine_fn qemu_rbd_co_truncate(BlockDriverState *bs,
1649 int64_t offset,
1650 bool exact,
1651 PreallocMode prealloc,
1652 BdrvRequestFlags flags,
1653 Error **errp)
1654 {
1655 int r;
1656
1657 if (prealloc != PREALLOC_MODE_OFF) {
1658 error_setg(errp, "Unsupported preallocation mode '%s'",
1659 PreallocMode_str(prealloc));
1660 return -ENOTSUP;
1661 }
1662
1663 r = qemu_rbd_resize(bs, offset);
1664 if (r < 0) {
1665 error_setg_errno(errp, -r, "Failed to resize file");
1666 return r;
1667 }
1668
1669 return 0;
1670 }
1671
qemu_rbd_snap_create(BlockDriverState * bs,QEMUSnapshotInfo * sn_info)1672 static int qemu_rbd_snap_create(BlockDriverState *bs,
1673 QEMUSnapshotInfo *sn_info)
1674 {
1675 BDRVRBDState *s = bs->opaque;
1676 int r;
1677
1678 if (sn_info->name[0] == '\0') {
1679 return -EINVAL; /* we need a name for rbd snapshots */
1680 }
1681
1682 /*
1683 * rbd snapshots are using the name as the user controlled unique identifier
1684 * we can't use the rbd snapid for that purpose, as it can't be set
1685 */
1686 if (sn_info->id_str[0] != '\0' &&
1687 strcmp(sn_info->id_str, sn_info->name) != 0) {
1688 return -EINVAL;
1689 }
1690
1691 if (strlen(sn_info->name) >= sizeof(sn_info->id_str)) {
1692 return -ERANGE;
1693 }
1694
1695 r = rbd_snap_create(s->image, sn_info->name);
1696 if (r < 0) {
1697 error_report("failed to create snap: %s", strerror(-r));
1698 return r;
1699 }
1700
1701 return 0;
1702 }
1703
qemu_rbd_snap_remove(BlockDriverState * bs,const char * snapshot_id,const char * snapshot_name,Error ** errp)1704 static int qemu_rbd_snap_remove(BlockDriverState *bs,
1705 const char *snapshot_id,
1706 const char *snapshot_name,
1707 Error **errp)
1708 {
1709 BDRVRBDState *s = bs->opaque;
1710 int r;
1711
1712 if (!snapshot_name) {
1713 error_setg(errp, "rbd need a valid snapshot name");
1714 return -EINVAL;
1715 }
1716
1717 /* If snapshot_id is specified, it must be equal to name, see
1718 qemu_rbd_snap_list() */
1719 if (snapshot_id && strcmp(snapshot_id, snapshot_name)) {
1720 error_setg(errp,
1721 "rbd do not support snapshot id, it should be NULL or "
1722 "equal to snapshot name");
1723 return -EINVAL;
1724 }
1725
1726 r = rbd_snap_remove(s->image, snapshot_name);
1727 if (r < 0) {
1728 error_setg_errno(errp, -r, "Failed to remove the snapshot");
1729 }
1730 return r;
1731 }
1732
qemu_rbd_snap_rollback(BlockDriverState * bs,const char * snapshot_name)1733 static int qemu_rbd_snap_rollback(BlockDriverState *bs,
1734 const char *snapshot_name)
1735 {
1736 BDRVRBDState *s = bs->opaque;
1737
1738 return rbd_snap_rollback(s->image, snapshot_name);
1739 }
1740
qemu_rbd_snap_list(BlockDriverState * bs,QEMUSnapshotInfo ** psn_tab)1741 static int qemu_rbd_snap_list(BlockDriverState *bs,
1742 QEMUSnapshotInfo **psn_tab)
1743 {
1744 BDRVRBDState *s = bs->opaque;
1745 QEMUSnapshotInfo *sn_info, *sn_tab = NULL;
1746 int i, snap_count;
1747 rbd_snap_info_t *snaps;
1748 int max_snaps = RBD_MAX_SNAPS;
1749
1750 do {
1751 snaps = g_new(rbd_snap_info_t, max_snaps);
1752 snap_count = rbd_snap_list(s->image, snaps, &max_snaps);
1753 if (snap_count <= 0) {
1754 g_free(snaps);
1755 }
1756 } while (snap_count == -ERANGE);
1757
1758 if (snap_count <= 0) {
1759 goto done;
1760 }
1761
1762 sn_tab = g_new0(QEMUSnapshotInfo, snap_count);
1763
1764 for (i = 0; i < snap_count; i++) {
1765 const char *snap_name = snaps[i].name;
1766
1767 sn_info = sn_tab + i;
1768 pstrcpy(sn_info->id_str, sizeof(sn_info->id_str), snap_name);
1769 pstrcpy(sn_info->name, sizeof(sn_info->name), snap_name);
1770
1771 sn_info->vm_state_size = snaps[i].size;
1772 sn_info->date_sec = 0;
1773 sn_info->date_nsec = 0;
1774 sn_info->vm_clock_nsec = 0;
1775 }
1776 rbd_snap_list_end(snaps);
1777 g_free(snaps);
1778
1779 done:
1780 *psn_tab = sn_tab;
1781 return snap_count;
1782 }
1783
qemu_rbd_co_invalidate_cache(BlockDriverState * bs,Error ** errp)1784 static void coroutine_fn qemu_rbd_co_invalidate_cache(BlockDriverState *bs,
1785 Error **errp)
1786 {
1787 BDRVRBDState *s = bs->opaque;
1788 int r = rbd_invalidate_cache(s->image);
1789 if (r < 0) {
1790 error_setg_errno(errp, -r, "Failed to invalidate the cache");
1791 }
1792 }
1793
1794 static QemuOptsList qemu_rbd_create_opts = {
1795 .name = "rbd-create-opts",
1796 .head = QTAILQ_HEAD_INITIALIZER(qemu_rbd_create_opts.head),
1797 .desc = {
1798 {
1799 .name = BLOCK_OPT_SIZE,
1800 .type = QEMU_OPT_SIZE,
1801 .help = "Virtual disk size"
1802 },
1803 {
1804 .name = BLOCK_OPT_CLUSTER_SIZE,
1805 .type = QEMU_OPT_SIZE,
1806 .help = "RBD object size"
1807 },
1808 {
1809 .name = "password-secret",
1810 .type = QEMU_OPT_STRING,
1811 .help = "ID of secret providing the password",
1812 },
1813 {
1814 .name = "encrypt.format",
1815 .type = QEMU_OPT_STRING,
1816 .help = "Encrypt the image, format choices: 'luks', 'luks2'",
1817 },
1818 {
1819 .name = "encrypt.cipher-alg",
1820 .type = QEMU_OPT_STRING,
1821 .help = "Name of encryption cipher algorithm"
1822 " (allowed values: aes-128, aes-256)",
1823 },
1824 {
1825 .name = "encrypt.key-secret",
1826 .type = QEMU_OPT_STRING,
1827 .help = "ID of secret providing LUKS passphrase",
1828 },
1829 { /* end of list */ }
1830 }
1831 };
1832
1833 static const char *const qemu_rbd_strong_runtime_opts[] = {
1834 "pool",
1835 "namespace",
1836 "image",
1837 "conf",
1838 "snapshot",
1839 "user",
1840 "server.",
1841 "password-secret",
1842
1843 NULL
1844 };
1845
1846 static BlockDriver bdrv_rbd = {
1847 .format_name = "rbd",
1848 .instance_size = sizeof(BDRVRBDState),
1849
1850 .bdrv_parse_filename = qemu_rbd_parse_filename,
1851 .bdrv_open = qemu_rbd_open,
1852 .bdrv_close = qemu_rbd_close,
1853 .bdrv_reopen_prepare = qemu_rbd_reopen_prepare,
1854 .bdrv_co_create = qemu_rbd_co_create,
1855 .bdrv_co_create_opts = qemu_rbd_co_create_opts,
1856 .bdrv_has_zero_init = bdrv_has_zero_init_1,
1857 .bdrv_co_get_info = qemu_rbd_co_get_info,
1858 .bdrv_get_specific_info = qemu_rbd_get_specific_info,
1859 .create_opts = &qemu_rbd_create_opts,
1860 .bdrv_co_getlength = qemu_rbd_co_getlength,
1861 .bdrv_co_truncate = qemu_rbd_co_truncate,
1862 .protocol_name = "rbd",
1863
1864 .bdrv_co_preadv = qemu_rbd_co_preadv,
1865 .bdrv_co_pwritev = qemu_rbd_co_pwritev,
1866 .bdrv_co_flush_to_disk = qemu_rbd_co_flush,
1867 .bdrv_co_pdiscard = qemu_rbd_co_pdiscard,
1868 #ifdef LIBRBD_SUPPORTS_WRITE_ZEROES
1869 .bdrv_co_pwrite_zeroes = qemu_rbd_co_pwrite_zeroes,
1870 #endif
1871 .bdrv_co_block_status = qemu_rbd_co_block_status,
1872
1873 .bdrv_snapshot_create = qemu_rbd_snap_create,
1874 .bdrv_snapshot_delete = qemu_rbd_snap_remove,
1875 .bdrv_snapshot_list = qemu_rbd_snap_list,
1876 .bdrv_snapshot_goto = qemu_rbd_snap_rollback,
1877 .bdrv_co_invalidate_cache = qemu_rbd_co_invalidate_cache,
1878
1879 .strong_runtime_opts = qemu_rbd_strong_runtime_opts,
1880 };
1881
bdrv_rbd_init(void)1882 static void bdrv_rbd_init(void)
1883 {
1884 bdrv_register(&bdrv_rbd);
1885 }
1886
1887 block_init(bdrv_rbd_init);
1888