1 /*
2 * QEMU Block driver for RADOS (Ceph)
3 *
4 * Copyright (C) 2010-2011 Christian Brunner <chb@muc.de>,
5 * Josh Durgin <josh.durgin@dreamhost.com>
6 *
7 * This work is licensed under the terms of the GNU GPL, version 2. See
8 * the COPYING file in the top-level directory.
9 *
10 * Contributions after 2012-01-13 are licensed under the terms of the
11 * GNU GPL, version 2 or (at your option) any later version.
12 */
13
14 #include "qemu/osdep.h"
15
16 #include <rbd/librbd.h>
17 #include "qapi/error.h"
18 #include "qemu/error-report.h"
19 #include "qemu/module.h"
20 #include "qemu/option.h"
21 #include "block/block-io.h"
22 #include "block/block_int.h"
23 #include "block/qdict.h"
24 #include "crypto/secret.h"
25 #include "qemu/cutils.h"
26 #include "system/replay.h"
27 #include "qobject/qstring.h"
28 #include "qobject/qdict.h"
29 #include "qobject/qjson.h"
30 #include "qobject/qlist.h"
31 #include "qapi/qobject-input-visitor.h"
32 #include "qapi/qapi-visit-block-core.h"
33
34 /*
35 * When specifying the image filename use:
36 *
37 * rbd:poolname/devicename[@snapshotname][:option1=value1[:option2=value2...]]
38 *
39 * poolname must be the name of an existing rados pool.
40 *
41 * devicename is the name of the rbd image.
42 *
43 * Each option given is used to configure rados, and may be any valid
44 * Ceph option, "id", or "conf".
45 *
46 * The "id" option indicates what user we should authenticate as to
47 * the Ceph cluster. If it is excluded we will use the Ceph default
48 * (normally 'admin').
49 *
50 * The "conf" option specifies a Ceph configuration file to read. If
51 * it is not specified, we will read from the default Ceph locations
52 * (e.g., /etc/ceph/ceph.conf). To avoid reading _any_ configuration
53 * file, specify conf=/dev/null.
54 *
55 * Configuration values containing :, @, or = can be escaped with a
56 * leading "\".
57 */
58
59 #define OBJ_MAX_SIZE (1UL << OBJ_DEFAULT_OBJ_ORDER)
60
61 #define RBD_MAX_SNAPS 100
62
63 #define RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN 8
64
65 static const char rbd_luks_header_verification[
66 RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN] = {
67 'L', 'U', 'K', 'S', 0xBA, 0xBE, 0, 1
68 };
69
70 static const char rbd_luks2_header_verification[
71 RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN] = {
72 'L', 'U', 'K', 'S', 0xBA, 0xBE, 0, 2
73 };
74
75 static const char rbd_layered_luks_header_verification[
76 RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN] = {
77 'R', 'B', 'D', 'L', 0xBA, 0xBE, 0, 1
78 };
79
80 static const char rbd_layered_luks2_header_verification[
81 RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN] = {
82 'R', 'B', 'D', 'L', 0xBA, 0xBE, 0, 2
83 };
84
85 typedef enum {
86 RBD_AIO_READ,
87 RBD_AIO_WRITE,
88 RBD_AIO_DISCARD,
89 RBD_AIO_FLUSH,
90 RBD_AIO_WRITE_ZEROES
91 } RBDAIOCmd;
92
93 typedef struct BDRVRBDState {
94 rados_t cluster;
95 rados_ioctx_t io_ctx;
96 rbd_image_t image;
97 char *image_name;
98 char *snap;
99 char *namespace;
100 uint64_t image_size;
101 uint64_t object_size;
102
103 /*
104 * If @bs->encrypted is true, this is the encryption format actually loaded
105 * at the librbd level. If it is false, it is the result of probing.
106 * RBD_IMAGE_ENCRYPTION_FORMAT__MAX means that encryption is not enabled and
107 * probing didn't find any known encryption header either.
108 */
109 RbdImageEncryptionFormat encryption_format;
110 } BDRVRBDState;
111
112 typedef struct RBDTask {
113 Coroutine *co;
114 int64_t ret;
115 } RBDTask;
116
117 typedef struct RBDDiffIterateReq {
118 uint64_t offs;
119 uint64_t bytes;
120 bool exists;
121 } RBDDiffIterateReq;
122
123 static int qemu_rbd_connect(rados_t *cluster, rados_ioctx_t *io_ctx,
124 BlockdevOptionsRbd *opts, bool cache,
125 const char *keypairs, const char *secretid,
126 Error **errp);
127
qemu_rbd_strchr(char * src,char delim)128 static char *qemu_rbd_strchr(char *src, char delim)
129 {
130 char *p;
131
132 for (p = src; *p; ++p) {
133 if (*p == delim) {
134 return p;
135 }
136 if (*p == '\\' && p[1] != '\0') {
137 ++p;
138 }
139 }
140
141 return NULL;
142 }
143
144
qemu_rbd_next_tok(char * src,char delim,char ** p)145 static char *qemu_rbd_next_tok(char *src, char delim, char **p)
146 {
147 char *end;
148
149 *p = NULL;
150
151 end = qemu_rbd_strchr(src, delim);
152 if (end) {
153 *p = end + 1;
154 *end = '\0';
155 }
156 return src;
157 }
158
qemu_rbd_unescape(char * src)159 static void qemu_rbd_unescape(char *src)
160 {
161 char *p;
162
163 for (p = src; *src; ++src, ++p) {
164 if (*src == '\\' && src[1] != '\0') {
165 src++;
166 }
167 *p = *src;
168 }
169 *p = '\0';
170 }
171
qemu_rbd_parse_filename(const char * filename,QDict * options,Error ** errp)172 static void qemu_rbd_parse_filename(const char *filename, QDict *options,
173 Error **errp)
174 {
175 const char *start;
176 char *p, *buf;
177 QList *keypairs = NULL;
178 char *found_str, *image_name;
179
180 if (!strstart(filename, "rbd:", &start)) {
181 error_setg(errp, "File name must start with 'rbd:'");
182 return;
183 }
184
185 buf = g_strdup(start);
186 p = buf;
187
188 found_str = qemu_rbd_next_tok(p, '/', &p);
189 if (!p) {
190 error_setg(errp, "Pool name is required");
191 goto done;
192 }
193 qemu_rbd_unescape(found_str);
194 qdict_put_str(options, "pool", found_str);
195
196 if (qemu_rbd_strchr(p, '@')) {
197 image_name = qemu_rbd_next_tok(p, '@', &p);
198
199 found_str = qemu_rbd_next_tok(p, ':', &p);
200 qemu_rbd_unescape(found_str);
201 qdict_put_str(options, "snapshot", found_str);
202 } else {
203 image_name = qemu_rbd_next_tok(p, ':', &p);
204 }
205 /* Check for namespace in the image_name */
206 if (qemu_rbd_strchr(image_name, '/')) {
207 found_str = qemu_rbd_next_tok(image_name, '/', &image_name);
208 qemu_rbd_unescape(found_str);
209 qdict_put_str(options, "namespace", found_str);
210 } else {
211 qdict_put_str(options, "namespace", "");
212 }
213 qemu_rbd_unescape(image_name);
214 qdict_put_str(options, "image", image_name);
215 if (!p) {
216 goto done;
217 }
218
219 /* The following are essentially all key/value pairs, and we treat
220 * 'id' and 'conf' a bit special. Key/value pairs may be in any order. */
221 while (p) {
222 char *name, *value;
223 name = qemu_rbd_next_tok(p, '=', &p);
224 if (!p) {
225 error_setg(errp, "conf option %s has no value", name);
226 break;
227 }
228
229 qemu_rbd_unescape(name);
230
231 value = qemu_rbd_next_tok(p, ':', &p);
232 qemu_rbd_unescape(value);
233
234 if (!strcmp(name, "conf")) {
235 qdict_put_str(options, "conf", value);
236 } else if (!strcmp(name, "id")) {
237 qdict_put_str(options, "user", value);
238 } else {
239 /*
240 * We pass these internally to qemu_rbd_set_keypairs(), so
241 * we can get away with the simpler list of [ "key1",
242 * "value1", "key2", "value2" ] rather than a raw dict
243 * { "key1": "value1", "key2": "value2" } where we can't
244 * guarantee order, or even a more correct but complex
245 * [ { "key1": "value1" }, { "key2": "value2" } ]
246 */
247 if (!keypairs) {
248 keypairs = qlist_new();
249 }
250 qlist_append_str(keypairs, name);
251 qlist_append_str(keypairs, value);
252 }
253 }
254
255 if (keypairs) {
256 qdict_put(options, "=keyvalue-pairs",
257 qstring_from_gstring(qobject_to_json(QOBJECT(keypairs))));
258 }
259
260 done:
261 g_free(buf);
262 qobject_unref(keypairs);
263 }
264
qemu_rbd_set_auth(rados_t cluster,BlockdevOptionsRbd * opts,Error ** errp)265 static int qemu_rbd_set_auth(rados_t cluster, BlockdevOptionsRbd *opts,
266 Error **errp)
267 {
268 char *key, *acr;
269 int r;
270 GString *accu;
271 RbdAuthModeList *auth;
272
273 if (opts->key_secret) {
274 key = qcrypto_secret_lookup_as_base64(opts->key_secret, errp);
275 if (!key) {
276 return -EIO;
277 }
278 r = rados_conf_set(cluster, "key", key);
279 g_free(key);
280 if (r < 0) {
281 error_setg_errno(errp, -r, "Could not set 'key'");
282 return r;
283 }
284 }
285
286 if (opts->has_auth_client_required) {
287 accu = g_string_new("");
288 for (auth = opts->auth_client_required; auth; auth = auth->next) {
289 if (accu->str[0]) {
290 g_string_append_c(accu, ';');
291 }
292 g_string_append(accu, RbdAuthMode_str(auth->value));
293 }
294 acr = g_string_free(accu, FALSE);
295 r = rados_conf_set(cluster, "auth_client_required", acr);
296 g_free(acr);
297 if (r < 0) {
298 error_setg_errno(errp, -r,
299 "Could not set 'auth_client_required'");
300 return r;
301 }
302 }
303
304 return 0;
305 }
306
qemu_rbd_set_keypairs(rados_t cluster,const char * keypairs_json,Error ** errp)307 static int qemu_rbd_set_keypairs(rados_t cluster, const char *keypairs_json,
308 Error **errp)
309 {
310 QList *keypairs;
311 QString *name;
312 QString *value;
313 const char *key;
314 size_t remaining;
315 int ret = 0;
316
317 if (!keypairs_json) {
318 return ret;
319 }
320 keypairs = qobject_to(QList,
321 qobject_from_json(keypairs_json, &error_abort));
322 remaining = qlist_size(keypairs) / 2;
323 assert(remaining);
324
325 while (remaining--) {
326 name = qobject_to(QString, qlist_pop(keypairs));
327 value = qobject_to(QString, qlist_pop(keypairs));
328 assert(name && value);
329 key = qstring_get_str(name);
330
331 ret = rados_conf_set(cluster, key, qstring_get_str(value));
332 qobject_unref(value);
333 if (ret < 0) {
334 error_setg_errno(errp, -ret, "invalid conf option %s", key);
335 qobject_unref(name);
336 ret = -EINVAL;
337 break;
338 }
339 qobject_unref(name);
340 }
341
342 qobject_unref(keypairs);
343 return ret;
344 }
345
346 #ifdef LIBRBD_SUPPORTS_ENCRYPTION
qemu_rbd_convert_luks_options(RbdEncryptionOptionsLUKSBase * luks_opts,char ** passphrase,size_t * passphrase_len,Error ** errp)347 static int qemu_rbd_convert_luks_options(
348 RbdEncryptionOptionsLUKSBase *luks_opts,
349 char **passphrase,
350 size_t *passphrase_len,
351 Error **errp)
352 {
353 return qcrypto_secret_lookup(luks_opts->key_secret, (uint8_t **)passphrase,
354 passphrase_len, errp);
355 }
356
qemu_rbd_convert_luks_create_options(RbdEncryptionCreateOptionsLUKSBase * luks_opts,rbd_encryption_algorithm_t * alg,char ** passphrase,size_t * passphrase_len,Error ** errp)357 static int qemu_rbd_convert_luks_create_options(
358 RbdEncryptionCreateOptionsLUKSBase *luks_opts,
359 rbd_encryption_algorithm_t *alg,
360 char **passphrase,
361 size_t *passphrase_len,
362 Error **errp)
363 {
364 int r = 0;
365
366 r = qemu_rbd_convert_luks_options(
367 qapi_RbdEncryptionCreateOptionsLUKSBase_base(luks_opts),
368 passphrase, passphrase_len, errp);
369 if (r < 0) {
370 return r;
371 }
372
373 if (luks_opts->has_cipher_alg) {
374 switch (luks_opts->cipher_alg) {
375 case QCRYPTO_CIPHER_ALGO_AES_128: {
376 *alg = RBD_ENCRYPTION_ALGORITHM_AES128;
377 break;
378 }
379 case QCRYPTO_CIPHER_ALGO_AES_256: {
380 *alg = RBD_ENCRYPTION_ALGORITHM_AES256;
381 break;
382 }
383 default: {
384 r = -ENOTSUP;
385 error_setg_errno(errp, -r, "unknown encryption algorithm: %u",
386 luks_opts->cipher_alg);
387 return r;
388 }
389 }
390 } else {
391 /* default alg */
392 *alg = RBD_ENCRYPTION_ALGORITHM_AES256;
393 }
394
395 return 0;
396 }
397
qemu_rbd_encryption_format(rbd_image_t image,RbdEncryptionCreateOptions * encrypt,Error ** errp)398 static int qemu_rbd_encryption_format(rbd_image_t image,
399 RbdEncryptionCreateOptions *encrypt,
400 Error **errp)
401 {
402 int r = 0;
403 g_autofree char *passphrase = NULL;
404 rbd_encryption_format_t format;
405 rbd_encryption_options_t opts;
406 rbd_encryption_luks1_format_options_t luks_opts;
407 rbd_encryption_luks2_format_options_t luks2_opts;
408 size_t opts_size;
409 uint64_t raw_size, effective_size;
410
411 r = rbd_get_size(image, &raw_size);
412 if (r < 0) {
413 error_setg_errno(errp, -r, "cannot get raw image size");
414 return r;
415 }
416
417 switch (encrypt->format) {
418 case RBD_IMAGE_ENCRYPTION_FORMAT_LUKS: {
419 memset(&luks_opts, 0, sizeof(luks_opts));
420 format = RBD_ENCRYPTION_FORMAT_LUKS1;
421 opts = &luks_opts;
422 opts_size = sizeof(luks_opts);
423 r = qemu_rbd_convert_luks_create_options(
424 qapi_RbdEncryptionCreateOptionsLUKS_base(&encrypt->u.luks),
425 &luks_opts.alg, &passphrase, &luks_opts.passphrase_size,
426 errp);
427 if (r < 0) {
428 return r;
429 }
430 luks_opts.passphrase = passphrase;
431 break;
432 }
433 case RBD_IMAGE_ENCRYPTION_FORMAT_LUKS2: {
434 memset(&luks2_opts, 0, sizeof(luks2_opts));
435 format = RBD_ENCRYPTION_FORMAT_LUKS2;
436 opts = &luks2_opts;
437 opts_size = sizeof(luks2_opts);
438 r = qemu_rbd_convert_luks_create_options(
439 qapi_RbdEncryptionCreateOptionsLUKS2_base(
440 &encrypt->u.luks2),
441 &luks2_opts.alg, &passphrase, &luks2_opts.passphrase_size,
442 errp);
443 if (r < 0) {
444 return r;
445 }
446 luks2_opts.passphrase = passphrase;
447 break;
448 }
449 default: {
450 r = -ENOTSUP;
451 error_setg_errno(
452 errp, -r, "unknown image encryption format: %u",
453 encrypt->format);
454 return r;
455 }
456 }
457
458 r = rbd_encryption_format(image, format, opts, opts_size);
459 if (r < 0) {
460 error_setg_errno(errp, -r, "encryption format fail");
461 return r;
462 }
463
464 r = rbd_get_size(image, &effective_size);
465 if (r < 0) {
466 error_setg_errno(errp, -r, "cannot get effective image size");
467 return r;
468 }
469
470 r = rbd_resize(image, raw_size + (raw_size - effective_size));
471 if (r < 0) {
472 error_setg_errno(errp, -r, "cannot resize image after format");
473 return r;
474 }
475
476 return 0;
477 }
478
qemu_rbd_encryption_load(BlockDriverState * bs,rbd_image_t image,RbdEncryptionOptions * encrypt,Error ** errp)479 static int qemu_rbd_encryption_load(BlockDriverState *bs,
480 rbd_image_t image,
481 RbdEncryptionOptions *encrypt,
482 Error **errp)
483 {
484 BDRVRBDState *s = bs->opaque;
485 int r = 0;
486 g_autofree char *passphrase = NULL;
487 rbd_encryption_luks1_format_options_t luks_opts;
488 rbd_encryption_luks2_format_options_t luks2_opts;
489 #ifdef LIBRBD_SUPPORTS_ENCRYPTION_LOAD2
490 rbd_encryption_luks_format_options_t luks_any_opts;
491 #endif
492 rbd_encryption_format_t format;
493 rbd_encryption_options_t opts;
494 size_t opts_size;
495
496 switch (encrypt->format) {
497 case RBD_IMAGE_ENCRYPTION_FORMAT_LUKS: {
498 memset(&luks_opts, 0, sizeof(luks_opts));
499 format = RBD_ENCRYPTION_FORMAT_LUKS1;
500 opts = &luks_opts;
501 opts_size = sizeof(luks_opts);
502 r = qemu_rbd_convert_luks_options(
503 qapi_RbdEncryptionOptionsLUKS_base(&encrypt->u.luks),
504 &passphrase, &luks_opts.passphrase_size, errp);
505 if (r < 0) {
506 return r;
507 }
508 luks_opts.passphrase = passphrase;
509 break;
510 }
511 case RBD_IMAGE_ENCRYPTION_FORMAT_LUKS2: {
512 memset(&luks2_opts, 0, sizeof(luks2_opts));
513 format = RBD_ENCRYPTION_FORMAT_LUKS2;
514 opts = &luks2_opts;
515 opts_size = sizeof(luks2_opts);
516 r = qemu_rbd_convert_luks_options(
517 qapi_RbdEncryptionOptionsLUKS2_base(&encrypt->u.luks2),
518 &passphrase, &luks2_opts.passphrase_size, errp);
519 if (r < 0) {
520 return r;
521 }
522 luks2_opts.passphrase = passphrase;
523 break;
524 }
525 #ifdef LIBRBD_SUPPORTS_ENCRYPTION_LOAD2
526 case RBD_IMAGE_ENCRYPTION_FORMAT_LUKS_ANY: {
527 memset(&luks_any_opts, 0, sizeof(luks_any_opts));
528 format = RBD_ENCRYPTION_FORMAT_LUKS;
529 opts = &luks_any_opts;
530 opts_size = sizeof(luks_any_opts);
531 r = qemu_rbd_convert_luks_options(
532 qapi_RbdEncryptionOptionsLUKSAny_base(&encrypt->u.luks_any),
533 &passphrase, &luks_any_opts.passphrase_size, errp);
534 if (r < 0) {
535 return r;
536 }
537 luks_any_opts.passphrase = passphrase;
538 break;
539 }
540 #endif
541 default: {
542 r = -ENOTSUP;
543 error_setg_errno(
544 errp, -r, "unknown image encryption format: %u",
545 encrypt->format);
546 return r;
547 }
548 }
549
550 r = rbd_encryption_load(image, format, opts, opts_size);
551 if (r < 0) {
552 error_setg_errno(errp, -r, "encryption load fail");
553 return r;
554 }
555 bs->encrypted = true;
556 s->encryption_format = encrypt->format;
557
558 return 0;
559 }
560
561 #ifdef LIBRBD_SUPPORTS_ENCRYPTION_LOAD2
qemu_rbd_encryption_load2(BlockDriverState * bs,rbd_image_t image,RbdEncryptionOptions * encrypt,Error ** errp)562 static int qemu_rbd_encryption_load2(BlockDriverState *bs,
563 rbd_image_t image,
564 RbdEncryptionOptions *encrypt,
565 Error **errp)
566 {
567 BDRVRBDState *s = bs->opaque;
568 int r = 0;
569 int encrypt_count = 1;
570 int i;
571 RbdEncryptionOptions *curr_encrypt;
572 rbd_encryption_spec_t *specs;
573 rbd_encryption_luks1_format_options_t *luks_opts;
574 rbd_encryption_luks2_format_options_t *luks2_opts;
575 rbd_encryption_luks_format_options_t *luks_any_opts;
576
577 /* count encryption options */
578 for (curr_encrypt = encrypt->parent; curr_encrypt;
579 curr_encrypt = curr_encrypt->parent) {
580 ++encrypt_count;
581 }
582
583 specs = g_new0(rbd_encryption_spec_t, encrypt_count);
584
585 curr_encrypt = encrypt;
586 for (i = 0; i < encrypt_count; ++i) {
587 switch (curr_encrypt->format) {
588 case RBD_IMAGE_ENCRYPTION_FORMAT_LUKS: {
589 specs[i].format = RBD_ENCRYPTION_FORMAT_LUKS1;
590
591 luks_opts = g_new0(rbd_encryption_luks1_format_options_t, 1);
592 specs[i].opts = luks_opts;
593 specs[i].opts_size = sizeof(*luks_opts);
594
595 r = qemu_rbd_convert_luks_options(
596 qapi_RbdEncryptionOptionsLUKS_base(
597 &curr_encrypt->u.luks),
598 (char **)&luks_opts->passphrase,
599 &luks_opts->passphrase_size,
600 errp);
601 break;
602 }
603 case RBD_IMAGE_ENCRYPTION_FORMAT_LUKS2: {
604 specs[i].format = RBD_ENCRYPTION_FORMAT_LUKS2;
605
606 luks2_opts = g_new0(rbd_encryption_luks2_format_options_t, 1);
607 specs[i].opts = luks2_opts;
608 specs[i].opts_size = sizeof(*luks2_opts);
609
610 r = qemu_rbd_convert_luks_options(
611 qapi_RbdEncryptionOptionsLUKS2_base(
612 &curr_encrypt->u.luks2),
613 (char **)&luks2_opts->passphrase,
614 &luks2_opts->passphrase_size,
615 errp);
616 break;
617 }
618 case RBD_IMAGE_ENCRYPTION_FORMAT_LUKS_ANY: {
619 specs[i].format = RBD_ENCRYPTION_FORMAT_LUKS;
620
621 luks_any_opts = g_new0(rbd_encryption_luks_format_options_t, 1);
622 specs[i].opts = luks_any_opts;
623 specs[i].opts_size = sizeof(*luks_any_opts);
624
625 r = qemu_rbd_convert_luks_options(
626 qapi_RbdEncryptionOptionsLUKSAny_base(
627 &curr_encrypt->u.luks_any),
628 (char **)&luks_any_opts->passphrase,
629 &luks_any_opts->passphrase_size,
630 errp);
631 break;
632 }
633 default: {
634 r = -ENOTSUP;
635 error_setg_errno(
636 errp, -r, "unknown image encryption format: %u",
637 curr_encrypt->format);
638 }
639 }
640
641 if (r < 0) {
642 goto exit;
643 }
644
645 curr_encrypt = curr_encrypt->parent;
646 }
647
648 r = rbd_encryption_load2(image, specs, encrypt_count);
649 if (r < 0) {
650 error_setg_errno(errp, -r, "layered encryption load fail");
651 goto exit;
652 }
653 bs->encrypted = true;
654 s->encryption_format = encrypt->format;
655
656 exit:
657 for (i = 0; i < encrypt_count; ++i) {
658 if (!specs[i].opts) {
659 break;
660 }
661
662 switch (specs[i].format) {
663 case RBD_ENCRYPTION_FORMAT_LUKS1: {
664 luks_opts = specs[i].opts;
665 g_free((void *)luks_opts->passphrase);
666 break;
667 }
668 case RBD_ENCRYPTION_FORMAT_LUKS2: {
669 luks2_opts = specs[i].opts;
670 g_free((void *)luks2_opts->passphrase);
671 break;
672 }
673 case RBD_ENCRYPTION_FORMAT_LUKS: {
674 luks_any_opts = specs[i].opts;
675 g_free((void *)luks_any_opts->passphrase);
676 break;
677 }
678 }
679
680 g_free(specs[i].opts);
681 }
682 g_free(specs);
683 return r;
684 }
685 #endif
686 #endif
687
688 /*
689 * For an image without encryption enabled on the rbd layer, probe the start of
690 * the image if it could be opened as an encrypted image so that we can display
691 * it when the user queries the node (most importantly in qemu-img).
692 *
693 * If the guest writes an encryption header to its disk after this probing, this
694 * won't be reflected when queried, but that's okay. There is no reason why the
695 * user should want to apply encryption at the rbd level while the image is
696 * still in use. This is just guest data.
697 */
qemu_rbd_encryption_probe(BlockDriverState * bs)698 static void qemu_rbd_encryption_probe(BlockDriverState *bs)
699 {
700 BDRVRBDState *s = bs->opaque;
701 char buf[RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN] = {0};
702 int r;
703
704 assert(s->encryption_format == RBD_IMAGE_ENCRYPTION_FORMAT__MAX);
705
706 r = rbd_read(s->image, 0,
707 RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN, buf);
708 if (r < RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN) {
709 return;
710 }
711
712 if (memcmp(buf, rbd_luks_header_verification,
713 RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN) == 0) {
714 s->encryption_format = RBD_IMAGE_ENCRYPTION_FORMAT_LUKS;
715 } else if (memcmp(buf, rbd_luks2_header_verification,
716 RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN) == 0) {
717 s->encryption_format = RBD_IMAGE_ENCRYPTION_FORMAT_LUKS2;
718 } else if (memcmp(buf, rbd_layered_luks_header_verification,
719 RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN) == 0) {
720 s->encryption_format = RBD_IMAGE_ENCRYPTION_FORMAT_LUKS;
721 } else if (memcmp(buf, rbd_layered_luks2_header_verification,
722 RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN) == 0) {
723 s->encryption_format = RBD_IMAGE_ENCRYPTION_FORMAT_LUKS2;
724 }
725 }
726
727 /* FIXME Deprecate and remove keypairs or make it available in QMP. */
qemu_rbd_do_create(BlockdevCreateOptions * options,const char * keypairs,const char * password_secret,Error ** errp)728 static int qemu_rbd_do_create(BlockdevCreateOptions *options,
729 const char *keypairs, const char *password_secret,
730 Error **errp)
731 {
732 BlockdevCreateOptionsRbd *opts = &options->u.rbd;
733 rados_t cluster;
734 rados_ioctx_t io_ctx;
735 int obj_order = 0;
736 int ret;
737
738 assert(options->driver == BLOCKDEV_DRIVER_RBD);
739 if (opts->location->snapshot) {
740 error_setg(errp, "Can't use snapshot name for image creation");
741 return -EINVAL;
742 }
743
744 #ifndef LIBRBD_SUPPORTS_ENCRYPTION
745 if (opts->encrypt) {
746 error_setg(errp, "RBD library does not support image encryption");
747 return -ENOTSUP;
748 }
749 #endif
750
751 if (opts->has_cluster_size) {
752 int64_t objsize = opts->cluster_size;
753 if ((objsize - 1) & objsize) { /* not a power of 2? */
754 error_setg(errp, "obj size needs to be power of 2");
755 return -EINVAL;
756 }
757 if (objsize < 4096) {
758 error_setg(errp, "obj size too small");
759 return -EINVAL;
760 }
761 obj_order = ctz32(objsize);
762 }
763
764 ret = qemu_rbd_connect(&cluster, &io_ctx, opts->location, false, keypairs,
765 password_secret, errp);
766 if (ret < 0) {
767 return ret;
768 }
769
770 ret = rbd_create(io_ctx, opts->location->image, opts->size, &obj_order);
771 if (ret < 0) {
772 error_setg_errno(errp, -ret, "error rbd create");
773 goto out;
774 }
775
776 #ifdef LIBRBD_SUPPORTS_ENCRYPTION
777 if (opts->encrypt) {
778 rbd_image_t image;
779
780 ret = rbd_open(io_ctx, opts->location->image, &image, NULL);
781 if (ret < 0) {
782 error_setg_errno(errp, -ret,
783 "error opening image '%s' for encryption format",
784 opts->location->image);
785 goto out;
786 }
787
788 ret = qemu_rbd_encryption_format(image, opts->encrypt, errp);
789 rbd_close(image);
790 if (ret < 0) {
791 /* encryption format fail, try removing the image */
792 rbd_remove(io_ctx, opts->location->image);
793 goto out;
794 }
795 }
796 #endif
797
798 ret = 0;
799 out:
800 rados_ioctx_destroy(io_ctx);
801 rados_shutdown(cluster);
802 return ret;
803 }
804
qemu_rbd_co_create(BlockdevCreateOptions * options,Error ** errp)805 static int qemu_rbd_co_create(BlockdevCreateOptions *options, Error **errp)
806 {
807 return qemu_rbd_do_create(options, NULL, NULL, errp);
808 }
809
qemu_rbd_extract_encryption_create_options(QemuOpts * opts,RbdEncryptionCreateOptions ** spec,Error ** errp)810 static int qemu_rbd_extract_encryption_create_options(
811 QemuOpts *opts,
812 RbdEncryptionCreateOptions **spec,
813 Error **errp)
814 {
815 QDict *opts_qdict;
816 QDict *encrypt_qdict;
817 Visitor *v;
818 int ret = 0;
819
820 opts_qdict = qemu_opts_to_qdict(opts, NULL);
821 qdict_extract_subqdict(opts_qdict, &encrypt_qdict, "encrypt.");
822 qobject_unref(opts_qdict);
823 if (!qdict_size(encrypt_qdict)) {
824 *spec = NULL;
825 goto exit;
826 }
827
828 /* Convert options into a QAPI object */
829 v = qobject_input_visitor_new_flat_confused(encrypt_qdict, errp);
830 if (!v) {
831 ret = -EINVAL;
832 goto exit;
833 }
834
835 visit_type_RbdEncryptionCreateOptions(v, NULL, spec, errp);
836 visit_free(v);
837 if (!*spec) {
838 ret = -EINVAL;
839 goto exit;
840 }
841
842 exit:
843 qobject_unref(encrypt_qdict);
844 return ret;
845 }
846
qemu_rbd_co_create_opts(BlockDriver * drv,const char * filename,QemuOpts * opts,Error ** errp)847 static int coroutine_fn qemu_rbd_co_create_opts(BlockDriver *drv,
848 const char *filename,
849 QemuOpts *opts,
850 Error **errp)
851 {
852 BlockdevCreateOptions *create_options;
853 BlockdevCreateOptionsRbd *rbd_opts;
854 BlockdevOptionsRbd *loc;
855 RbdEncryptionCreateOptions *encrypt = NULL;
856 Error *local_err = NULL;
857 const char *keypairs, *password_secret;
858 QDict *options = NULL;
859 int ret = 0;
860
861 create_options = g_new0(BlockdevCreateOptions, 1);
862 create_options->driver = BLOCKDEV_DRIVER_RBD;
863 rbd_opts = &create_options->u.rbd;
864
865 rbd_opts->location = g_new0(BlockdevOptionsRbd, 1);
866
867 password_secret = qemu_opt_get(opts, "password-secret");
868
869 /* Read out options */
870 rbd_opts->size = ROUND_UP(qemu_opt_get_size_del(opts, BLOCK_OPT_SIZE, 0),
871 BDRV_SECTOR_SIZE);
872 rbd_opts->cluster_size = qemu_opt_get_size_del(opts,
873 BLOCK_OPT_CLUSTER_SIZE, 0);
874 rbd_opts->has_cluster_size = (rbd_opts->cluster_size != 0);
875
876 options = qdict_new();
877 qemu_rbd_parse_filename(filename, options, &local_err);
878 if (local_err) {
879 ret = -EINVAL;
880 error_propagate(errp, local_err);
881 goto exit;
882 }
883
884 ret = qemu_rbd_extract_encryption_create_options(opts, &encrypt, errp);
885 if (ret < 0) {
886 goto exit;
887 }
888 rbd_opts->encrypt = encrypt;
889
890 /*
891 * Caution: while qdict_get_try_str() is fine, getting non-string
892 * types would require more care. When @options come from -blockdev
893 * or blockdev_add, its members are typed according to the QAPI
894 * schema, but when they come from -drive, they're all QString.
895 */
896 loc = rbd_opts->location;
897 loc->pool = g_strdup(qdict_get_try_str(options, "pool"));
898 loc->conf = g_strdup(qdict_get_try_str(options, "conf"));
899 loc->user = g_strdup(qdict_get_try_str(options, "user"));
900 loc->q_namespace = g_strdup(qdict_get_try_str(options, "namespace"));
901 loc->image = g_strdup(qdict_get_try_str(options, "image"));
902 keypairs = qdict_get_try_str(options, "=keyvalue-pairs");
903
904 ret = qemu_rbd_do_create(create_options, keypairs, password_secret, errp);
905 if (ret < 0) {
906 goto exit;
907 }
908
909 exit:
910 qobject_unref(options);
911 qapi_free_BlockdevCreateOptions(create_options);
912 return ret;
913 }
914
qemu_rbd_mon_host(BlockdevOptionsRbd * opts,Error ** errp)915 static char *qemu_rbd_mon_host(BlockdevOptionsRbd *opts, Error **errp)
916 {
917 const char **vals;
918 const char *host, *port;
919 char *rados_str;
920 InetSocketAddressBaseList *p;
921 int i, cnt;
922
923 if (!opts->has_server) {
924 return NULL;
925 }
926
927 for (cnt = 0, p = opts->server; p; p = p->next) {
928 cnt++;
929 }
930
931 vals = g_new(const char *, cnt + 1);
932
933 for (i = 0, p = opts->server; p; p = p->next, i++) {
934 host = p->value->host;
935 port = p->value->port;
936
937 if (strchr(host, ':')) {
938 vals[i] = g_strdup_printf("[%s]:%s", host, port);
939 } else {
940 vals[i] = g_strdup_printf("%s:%s", host, port);
941 }
942 }
943 vals[i] = NULL;
944
945 rados_str = i ? g_strjoinv(";", (char **)vals) : NULL;
946 g_strfreev((char **)vals);
947 return rados_str;
948 }
949
qemu_rbd_connect(rados_t * cluster,rados_ioctx_t * io_ctx,BlockdevOptionsRbd * opts,bool cache,const char * keypairs,const char * secretid,Error ** errp)950 static int qemu_rbd_connect(rados_t *cluster, rados_ioctx_t *io_ctx,
951 BlockdevOptionsRbd *opts, bool cache,
952 const char *keypairs, const char *secretid,
953 Error **errp)
954 {
955 char *mon_host = NULL;
956 Error *local_err = NULL;
957 int r;
958
959 if (secretid) {
960 if (opts->key_secret) {
961 error_setg(errp,
962 "Legacy 'password-secret' clashes with 'key-secret'");
963 return -EINVAL;
964 }
965 opts->key_secret = g_strdup(secretid);
966 }
967
968 mon_host = qemu_rbd_mon_host(opts, &local_err);
969 if (local_err) {
970 error_propagate(errp, local_err);
971 r = -EINVAL;
972 goto out;
973 }
974
975 r = rados_create(cluster, opts->user);
976 if (r < 0) {
977 error_setg_errno(errp, -r, "error initializing");
978 goto out;
979 }
980
981 /* try default location when conf=NULL, but ignore failure */
982 r = rados_conf_read_file(*cluster, opts->conf);
983 if (opts->conf && r < 0) {
984 error_setg_errno(errp, -r, "error reading conf file %s", opts->conf);
985 goto failed_shutdown;
986 }
987
988 r = qemu_rbd_set_keypairs(*cluster, keypairs, errp);
989 if (r < 0) {
990 goto failed_shutdown;
991 }
992
993 if (mon_host) {
994 r = rados_conf_set(*cluster, "mon_host", mon_host);
995 if (r < 0) {
996 goto failed_shutdown;
997 }
998 }
999
1000 r = qemu_rbd_set_auth(*cluster, opts, errp);
1001 if (r < 0) {
1002 goto failed_shutdown;
1003 }
1004
1005 /*
1006 * Fallback to more conservative semantics if setting cache
1007 * options fails. Ignore errors from setting rbd_cache because the
1008 * only possible error is that the option does not exist, and
1009 * librbd defaults to no caching. If write through caching cannot
1010 * be set up, fall back to no caching.
1011 */
1012 if (cache) {
1013 rados_conf_set(*cluster, "rbd_cache", "true");
1014 } else {
1015 rados_conf_set(*cluster, "rbd_cache", "false");
1016 }
1017
1018 r = rados_connect(*cluster);
1019 if (r < 0) {
1020 error_setg_errno(errp, -r, "error connecting");
1021 goto failed_shutdown;
1022 }
1023
1024 r = rados_ioctx_create(*cluster, opts->pool, io_ctx);
1025 if (r < 0) {
1026 error_setg_errno(errp, -r, "error opening pool %s", opts->pool);
1027 goto failed_shutdown;
1028 }
1029
1030 #ifdef HAVE_RBD_NAMESPACE_EXISTS
1031 if (opts->q_namespace && strlen(opts->q_namespace) > 0) {
1032 bool exists;
1033
1034 r = rbd_namespace_exists(*io_ctx, opts->q_namespace, &exists);
1035 if (r < 0) {
1036 error_setg_errno(errp, -r, "error checking namespace");
1037 goto failed_ioctx_destroy;
1038 }
1039
1040 if (!exists) {
1041 error_setg(errp, "namespace '%s' does not exist",
1042 opts->q_namespace);
1043 r = -ENOENT;
1044 goto failed_ioctx_destroy;
1045 }
1046 }
1047 #endif
1048
1049 /*
1050 * Set the namespace after opening the io context on the pool,
1051 * if nspace == NULL or if nspace == "", it is just as we did nothing
1052 */
1053 rados_ioctx_set_namespace(*io_ctx, opts->q_namespace);
1054
1055 r = 0;
1056 goto out;
1057
1058 #ifdef HAVE_RBD_NAMESPACE_EXISTS
1059 failed_ioctx_destroy:
1060 rados_ioctx_destroy(*io_ctx);
1061 #endif
1062 failed_shutdown:
1063 rados_shutdown(*cluster);
1064 out:
1065 g_free(mon_host);
1066 return r;
1067 }
1068
qemu_rbd_convert_options(QDict * options,BlockdevOptionsRbd ** opts,Error ** errp)1069 static int qemu_rbd_convert_options(QDict *options, BlockdevOptionsRbd **opts,
1070 Error **errp)
1071 {
1072 Visitor *v;
1073
1074 /* Convert the remaining options into a QAPI object */
1075 v = qobject_input_visitor_new_flat_confused(options, errp);
1076 if (!v) {
1077 return -EINVAL;
1078 }
1079
1080 visit_type_BlockdevOptionsRbd(v, NULL, opts, errp);
1081 visit_free(v);
1082 if (!opts) {
1083 return -EINVAL;
1084 }
1085
1086 return 0;
1087 }
1088
qemu_rbd_attempt_legacy_options(QDict * options,BlockdevOptionsRbd ** opts,char ** keypairs)1089 static int qemu_rbd_attempt_legacy_options(QDict *options,
1090 BlockdevOptionsRbd **opts,
1091 char **keypairs)
1092 {
1093 char *filename;
1094 int r;
1095
1096 filename = g_strdup(qdict_get_try_str(options, "filename"));
1097 if (!filename) {
1098 return -EINVAL;
1099 }
1100 qdict_del(options, "filename");
1101
1102 qemu_rbd_parse_filename(filename, options, NULL);
1103
1104 /* keypairs freed by caller */
1105 *keypairs = g_strdup(qdict_get_try_str(options, "=keyvalue-pairs"));
1106 if (*keypairs) {
1107 qdict_del(options, "=keyvalue-pairs");
1108 }
1109
1110 r = qemu_rbd_convert_options(options, opts, NULL);
1111
1112 g_free(filename);
1113 return r;
1114 }
1115
qemu_rbd_open(BlockDriverState * bs,QDict * options,int flags,Error ** errp)1116 static int qemu_rbd_open(BlockDriverState *bs, QDict *options, int flags,
1117 Error **errp)
1118 {
1119 BDRVRBDState *s = bs->opaque;
1120 BlockdevOptionsRbd *opts = NULL;
1121 const QDictEntry *e;
1122 Error *local_err = NULL;
1123 char *keypairs, *secretid;
1124 rbd_image_info_t info;
1125 int r;
1126
1127 keypairs = g_strdup(qdict_get_try_str(options, "=keyvalue-pairs"));
1128 if (keypairs) {
1129 qdict_del(options, "=keyvalue-pairs");
1130 }
1131
1132 secretid = g_strdup(qdict_get_try_str(options, "password-secret"));
1133 if (secretid) {
1134 qdict_del(options, "password-secret");
1135 }
1136
1137 r = qemu_rbd_convert_options(options, &opts, &local_err);
1138 if (local_err) {
1139 /* If keypairs are present, that means some options are present in
1140 * the modern option format. Don't attempt to parse legacy option
1141 * formats, as we won't support mixed usage. */
1142 if (keypairs) {
1143 error_propagate(errp, local_err);
1144 goto out;
1145 }
1146
1147 /* If the initial attempt to convert and process the options failed,
1148 * we may be attempting to open an image file that has the rbd options
1149 * specified in the older format consisting of all key/value pairs
1150 * encoded in the filename. Go ahead and attempt to parse the
1151 * filename, and see if we can pull out the required options. */
1152 r = qemu_rbd_attempt_legacy_options(options, &opts, &keypairs);
1153 if (r < 0) {
1154 /* Propagate the original error, not the legacy parsing fallback
1155 * error, as the latter was just a best-effort attempt. */
1156 error_propagate(errp, local_err);
1157 goto out;
1158 }
1159 /* Take care whenever deciding to actually deprecate; once this ability
1160 * is removed, we will not be able to open any images with legacy-styled
1161 * backing image strings. */
1162 warn_report("RBD options encoded in the filename as keyvalue pairs "
1163 "is deprecated");
1164 }
1165
1166 /* Remove the processed options from the QDict (the visitor processes
1167 * _all_ options in the QDict) */
1168 while ((e = qdict_first(options))) {
1169 qdict_del(options, e->key);
1170 }
1171
1172 r = qemu_rbd_connect(&s->cluster, &s->io_ctx, opts,
1173 !(flags & BDRV_O_NOCACHE), keypairs, secretid, errp);
1174 if (r < 0) {
1175 goto out;
1176 }
1177
1178 s->snap = g_strdup(opts->snapshot);
1179 s->image_name = g_strdup(opts->image);
1180
1181 /* rbd_open is always r/w */
1182 r = rbd_open(s->io_ctx, s->image_name, &s->image, s->snap);
1183 if (r < 0) {
1184 error_setg_errno(errp, -r, "error reading header from %s",
1185 s->image_name);
1186 goto failed_open;
1187 }
1188
1189 s->encryption_format = RBD_IMAGE_ENCRYPTION_FORMAT__MAX;
1190 if (opts->encrypt) {
1191 #ifdef LIBRBD_SUPPORTS_ENCRYPTION
1192 if (opts->encrypt->parent) {
1193 #ifdef LIBRBD_SUPPORTS_ENCRYPTION_LOAD2
1194 r = qemu_rbd_encryption_load2(bs, s->image, opts->encrypt, errp);
1195 #else
1196 r = -ENOTSUP;
1197 error_setg(errp, "RBD library does not support layered encryption");
1198 #endif
1199 } else {
1200 r = qemu_rbd_encryption_load(bs, s->image, opts->encrypt, errp);
1201 }
1202 if (r < 0) {
1203 goto failed_post_open;
1204 }
1205 #else
1206 r = -ENOTSUP;
1207 error_setg(errp, "RBD library does not support image encryption");
1208 goto failed_post_open;
1209 #endif
1210 } else {
1211 qemu_rbd_encryption_probe(bs);
1212 }
1213
1214 r = rbd_stat(s->image, &info, sizeof(info));
1215 if (r < 0) {
1216 error_setg_errno(errp, -r, "error getting image info from %s",
1217 s->image_name);
1218 goto failed_post_open;
1219 }
1220 s->image_size = info.size;
1221 s->object_size = info.obj_size;
1222
1223 /* If we are using an rbd snapshot, we must be r/o, otherwise
1224 * leave as-is */
1225 if (s->snap != NULL) {
1226 bdrv_graph_rdlock_main_loop();
1227 r = bdrv_apply_auto_read_only(bs, "rbd snapshots are read-only", errp);
1228 bdrv_graph_rdunlock_main_loop();
1229 if (r < 0) {
1230 goto failed_post_open;
1231 }
1232 }
1233
1234 #ifdef LIBRBD_SUPPORTS_WRITE_ZEROES
1235 bs->supported_zero_flags = BDRV_REQ_MAY_UNMAP | BDRV_REQ_NO_FALLBACK;
1236 #endif
1237
1238 /* When extending regular files, we get zeros from the OS */
1239 bs->supported_truncate_flags = BDRV_REQ_ZERO_WRITE;
1240
1241 r = 0;
1242 goto out;
1243
1244 failed_post_open:
1245 rbd_close(s->image);
1246 failed_open:
1247 rados_ioctx_destroy(s->io_ctx);
1248 g_free(s->snap);
1249 g_free(s->image_name);
1250 rados_shutdown(s->cluster);
1251 out:
1252 qapi_free_BlockdevOptionsRbd(opts);
1253 g_free(keypairs);
1254 g_free(secretid);
1255 return r;
1256 }
1257
1258
1259 /* Since RBD is currently always opened R/W via the API,
1260 * we just need to check if we are using a snapshot or not, in
1261 * order to determine if we will allow it to be R/W */
qemu_rbd_reopen_prepare(BDRVReopenState * state,BlockReopenQueue * queue,Error ** errp)1262 static int qemu_rbd_reopen_prepare(BDRVReopenState *state,
1263 BlockReopenQueue *queue, Error **errp)
1264 {
1265 BDRVRBDState *s = state->bs->opaque;
1266 int ret = 0;
1267
1268 GRAPH_RDLOCK_GUARD_MAINLOOP();
1269
1270 if (s->snap && state->flags & BDRV_O_RDWR) {
1271 error_setg(errp,
1272 "Cannot change node '%s' to r/w when using RBD snapshot",
1273 bdrv_get_device_or_node_name(state->bs));
1274 ret = -EINVAL;
1275 }
1276
1277 return ret;
1278 }
1279
qemu_rbd_close(BlockDriverState * bs)1280 static void qemu_rbd_close(BlockDriverState *bs)
1281 {
1282 BDRVRBDState *s = bs->opaque;
1283
1284 rbd_close(s->image);
1285 rados_ioctx_destroy(s->io_ctx);
1286 g_free(s->snap);
1287 g_free(s->image_name);
1288 rados_shutdown(s->cluster);
1289 }
1290
1291 /* Resize the RBD image and update the 'image_size' with the current size */
qemu_rbd_resize(BlockDriverState * bs,uint64_t size)1292 static int qemu_rbd_resize(BlockDriverState *bs, uint64_t size)
1293 {
1294 BDRVRBDState *s = bs->opaque;
1295 int r;
1296
1297 r = rbd_resize(s->image, size);
1298 if (r < 0) {
1299 return r;
1300 }
1301
1302 s->image_size = size;
1303
1304 return 0;
1305 }
1306
qemu_rbd_finish_bh(void * opaque)1307 static void qemu_rbd_finish_bh(void *opaque)
1308 {
1309 RBDTask *task = opaque;
1310 aio_co_wake(task->co);
1311 }
1312
1313 /*
1314 * This is the completion callback function for all rbd aio calls
1315 * started from qemu_rbd_start_co().
1316 *
1317 * Note: this function is being called from a non qemu thread so
1318 * we need to be careful about what we do here. Generally we only
1319 * schedule a BH, and do the rest of the io completion handling
1320 * from qemu_rbd_finish_bh() which runs in a qemu context.
1321 */
qemu_rbd_completion_cb(rbd_completion_t c,RBDTask * task)1322 static void qemu_rbd_completion_cb(rbd_completion_t c, RBDTask *task)
1323 {
1324 task->ret = rbd_aio_get_return_value(c);
1325 rbd_aio_release(c);
1326 aio_bh_schedule_oneshot(qemu_coroutine_get_aio_context(task->co),
1327 qemu_rbd_finish_bh, task);
1328 }
1329
qemu_rbd_start_co(BlockDriverState * bs,uint64_t offset,uint64_t bytes,QEMUIOVector * qiov,int flags,RBDAIOCmd cmd)1330 static int coroutine_fn qemu_rbd_start_co(BlockDriverState *bs,
1331 uint64_t offset,
1332 uint64_t bytes,
1333 QEMUIOVector *qiov,
1334 int flags,
1335 RBDAIOCmd cmd)
1336 {
1337 BDRVRBDState *s = bs->opaque;
1338 RBDTask task = { .co = qemu_coroutine_self() };
1339 rbd_completion_t c;
1340 int r;
1341
1342 assert(!qiov || qiov->size == bytes);
1343
1344 if (cmd == RBD_AIO_WRITE || cmd == RBD_AIO_WRITE_ZEROES) {
1345 /*
1346 * RBD APIs don't allow us to write more than actual size, so in order
1347 * to support growing images, we resize the image before write
1348 * operations that exceed the current size.
1349 */
1350 if (offset + bytes > s->image_size) {
1351 r = qemu_rbd_resize(bs, offset + bytes);
1352 if (r < 0) {
1353 return r;
1354 }
1355 }
1356 }
1357
1358 r = rbd_aio_create_completion(&task,
1359 (rbd_callback_t) qemu_rbd_completion_cb, &c);
1360 if (r < 0) {
1361 return r;
1362 }
1363
1364 switch (cmd) {
1365 case RBD_AIO_READ:
1366 r = rbd_aio_readv(s->image, qiov->iov, qiov->niov, offset, c);
1367 break;
1368 case RBD_AIO_WRITE:
1369 r = rbd_aio_writev(s->image, qiov->iov, qiov->niov, offset, c);
1370 break;
1371 case RBD_AIO_DISCARD:
1372 r = rbd_aio_discard(s->image, offset, bytes, c);
1373 break;
1374 case RBD_AIO_FLUSH:
1375 r = rbd_aio_flush(s->image, c);
1376 break;
1377 #ifdef LIBRBD_SUPPORTS_WRITE_ZEROES
1378 case RBD_AIO_WRITE_ZEROES: {
1379 int zero_flags = 0;
1380 #ifdef RBD_WRITE_ZEROES_FLAG_THICK_PROVISION
1381 if (!(flags & BDRV_REQ_MAY_UNMAP)) {
1382 zero_flags = RBD_WRITE_ZEROES_FLAG_THICK_PROVISION;
1383 }
1384 #endif
1385 r = rbd_aio_write_zeroes(s->image, offset, bytes, c, zero_flags, 0);
1386 break;
1387 }
1388 #endif
1389 default:
1390 r = -EINVAL;
1391 }
1392
1393 if (r < 0) {
1394 error_report("rbd request failed early: cmd %d offset %" PRIu64
1395 " bytes %" PRIu64 " flags %d r %d (%s)", cmd, offset,
1396 bytes, flags, r, strerror(-r));
1397 rbd_aio_release(c);
1398 return r;
1399 }
1400
1401 /* Expect exactly a single wake from qemu_rbd_finish_bh() */
1402 qemu_coroutine_yield();
1403
1404 if (task.ret < 0) {
1405 error_report("rbd request failed: cmd %d offset %" PRIu64 " bytes %"
1406 PRIu64 " flags %d task.ret %" PRIi64 " (%s)", cmd, offset,
1407 bytes, flags, task.ret, strerror(-task.ret));
1408 return task.ret;
1409 }
1410
1411 /* zero pad short reads */
1412 if (cmd == RBD_AIO_READ && task.ret < qiov->size) {
1413 qemu_iovec_memset(qiov, task.ret, 0, qiov->size - task.ret);
1414 }
1415
1416 return 0;
1417 }
1418
1419 static int
qemu_rbd_co_preadv(BlockDriverState * bs,int64_t offset,int64_t bytes,QEMUIOVector * qiov,BdrvRequestFlags flags)1420 coroutine_fn qemu_rbd_co_preadv(BlockDriverState *bs, int64_t offset,
1421 int64_t bytes, QEMUIOVector *qiov,
1422 BdrvRequestFlags flags)
1423 {
1424 return qemu_rbd_start_co(bs, offset, bytes, qiov, flags, RBD_AIO_READ);
1425 }
1426
1427 static int
qemu_rbd_co_pwritev(BlockDriverState * bs,int64_t offset,int64_t bytes,QEMUIOVector * qiov,BdrvRequestFlags flags)1428 coroutine_fn qemu_rbd_co_pwritev(BlockDriverState *bs, int64_t offset,
1429 int64_t bytes, QEMUIOVector *qiov,
1430 BdrvRequestFlags flags)
1431 {
1432 return qemu_rbd_start_co(bs, offset, bytes, qiov, flags, RBD_AIO_WRITE);
1433 }
1434
qemu_rbd_co_flush(BlockDriverState * bs)1435 static int coroutine_fn qemu_rbd_co_flush(BlockDriverState *bs)
1436 {
1437 return qemu_rbd_start_co(bs, 0, 0, NULL, 0, RBD_AIO_FLUSH);
1438 }
1439
qemu_rbd_co_pdiscard(BlockDriverState * bs,int64_t offset,int64_t bytes)1440 static int coroutine_fn qemu_rbd_co_pdiscard(BlockDriverState *bs,
1441 int64_t offset, int64_t bytes)
1442 {
1443 return qemu_rbd_start_co(bs, offset, bytes, NULL, 0, RBD_AIO_DISCARD);
1444 }
1445
1446 #ifdef LIBRBD_SUPPORTS_WRITE_ZEROES
1447 static int
qemu_rbd_co_pwrite_zeroes(BlockDriverState * bs,int64_t offset,int64_t bytes,BdrvRequestFlags flags)1448 coroutine_fn qemu_rbd_co_pwrite_zeroes(BlockDriverState *bs, int64_t offset,
1449 int64_t bytes, BdrvRequestFlags flags)
1450 {
1451 return qemu_rbd_start_co(bs, offset, bytes, NULL, flags,
1452 RBD_AIO_WRITE_ZEROES);
1453 }
1454 #endif
1455
1456 static int coroutine_fn
qemu_rbd_co_get_info(BlockDriverState * bs,BlockDriverInfo * bdi)1457 qemu_rbd_co_get_info(BlockDriverState *bs, BlockDriverInfo *bdi)
1458 {
1459 BDRVRBDState *s = bs->opaque;
1460 bdi->cluster_size = s->object_size;
1461 return 0;
1462 }
1463
qemu_rbd_get_specific_info(BlockDriverState * bs,Error ** errp)1464 static ImageInfoSpecific *qemu_rbd_get_specific_info(BlockDriverState *bs,
1465 Error **errp)
1466 {
1467 BDRVRBDState *s = bs->opaque;
1468 ImageInfoSpecific *spec_info;
1469
1470 spec_info = g_new(ImageInfoSpecific, 1);
1471 *spec_info = (ImageInfoSpecific){
1472 .type = IMAGE_INFO_SPECIFIC_KIND_RBD,
1473 .u.rbd.data = g_new0(ImageInfoSpecificRbd, 1),
1474 };
1475
1476 if (s->encryption_format == RBD_IMAGE_ENCRYPTION_FORMAT__MAX) {
1477 assert(!bs->encrypted);
1478 } else {
1479 ImageInfoSpecificRbd *rbd_info = spec_info->u.rbd.data;
1480
1481 rbd_info->has_encryption_format = true;
1482 rbd_info->encryption_format = s->encryption_format;
1483 }
1484
1485 return spec_info;
1486 }
1487
1488 /*
1489 * rbd_diff_iterate2 allows to interrupt the exection by returning a negative
1490 * value in the callback routine. Choose a value that does not conflict with
1491 * an existing exitcode and return it if we want to prematurely stop the
1492 * execution because we detected a change in the allocation status.
1493 */
1494 #define QEMU_RBD_EXIT_DIFF_ITERATE2 -9000
1495
qemu_rbd_diff_iterate_cb(uint64_t offs,size_t len,int exists,void * opaque)1496 static int qemu_rbd_diff_iterate_cb(uint64_t offs, size_t len,
1497 int exists, void *opaque)
1498 {
1499 RBDDiffIterateReq *req = opaque;
1500
1501 assert(req->offs + req->bytes <= offs);
1502
1503 /* treat a hole like an unallocated area and bail out */
1504 if (!exists) {
1505 return 0;
1506 }
1507
1508 if (!req->exists && offs > req->offs) {
1509 /*
1510 * we started in an unallocated area and hit the first allocated
1511 * block. req->bytes must be set to the length of the unallocated area
1512 * before the allocated area. stop further processing.
1513 */
1514 req->bytes = offs - req->offs;
1515 return QEMU_RBD_EXIT_DIFF_ITERATE2;
1516 }
1517
1518 if (req->exists && offs > req->offs + req->bytes) {
1519 /*
1520 * we started in an allocated area and jumped over an unallocated area,
1521 * req->bytes contains the length of the allocated area before the
1522 * unallocated area. stop further processing.
1523 */
1524 return QEMU_RBD_EXIT_DIFF_ITERATE2;
1525 }
1526
1527 req->bytes += len;
1528 req->exists = true;
1529
1530 return 0;
1531 }
1532
qemu_rbd_co_block_status(BlockDriverState * bs,unsigned int mode,int64_t offset,int64_t bytes,int64_t * pnum,int64_t * map,BlockDriverState ** file)1533 static int coroutine_fn qemu_rbd_co_block_status(BlockDriverState *bs,
1534 unsigned int mode,
1535 int64_t offset, int64_t bytes,
1536 int64_t *pnum, int64_t *map,
1537 BlockDriverState **file)
1538 {
1539 BDRVRBDState *s = bs->opaque;
1540 int status, r;
1541 RBDDiffIterateReq req = { .offs = offset };
1542 uint64_t features, flags;
1543 uint64_t head = 0;
1544
1545 assert(offset + bytes <= s->image_size);
1546
1547 /* default to all sectors allocated */
1548 status = BDRV_BLOCK_DATA | BDRV_BLOCK_OFFSET_VALID;
1549 *map = offset;
1550 *file = bs;
1551 *pnum = bytes;
1552
1553 /* check if RBD image supports fast-diff */
1554 r = rbd_get_features(s->image, &features);
1555 if (r < 0) {
1556 return status;
1557 }
1558 if (!(features & RBD_FEATURE_FAST_DIFF)) {
1559 return status;
1560 }
1561
1562 /* check if RBD fast-diff result is valid */
1563 r = rbd_get_flags(s->image, &flags);
1564 if (r < 0) {
1565 return status;
1566 }
1567 if (flags & RBD_FLAG_FAST_DIFF_INVALID) {
1568 return status;
1569 }
1570
1571 #if LIBRBD_VERSION_CODE < LIBRBD_VERSION(1, 17, 0)
1572 /*
1573 * librbd had a bug until early 2022 that affected all versions of ceph that
1574 * supported fast-diff. This bug results in reporting of incorrect offsets
1575 * if the offset parameter to rbd_diff_iterate2 is not object aligned.
1576 * Work around this bug by rounding down the offset to object boundaries.
1577 * This is OK because we call rbd_diff_iterate2 with whole_object = true.
1578 * However, this workaround only works for non cloned images with default
1579 * striping.
1580 *
1581 * See: https://tracker.ceph.com/issues/53784
1582 */
1583
1584 /* check if RBD image has non-default striping enabled */
1585 if (features & RBD_FEATURE_STRIPINGV2) {
1586 return status;
1587 }
1588
1589 #pragma GCC diagnostic push
1590 #pragma GCC diagnostic ignored "-Wdeprecated-declarations"
1591 /*
1592 * check if RBD image is a clone (= has a parent).
1593 *
1594 * rbd_get_parent_info is deprecated from Nautilus onwards, but the
1595 * replacement rbd_get_parent is not present in Luminous and Mimic.
1596 */
1597 if (rbd_get_parent_info(s->image, NULL, 0, NULL, 0, NULL, 0) != -ENOENT) {
1598 return status;
1599 }
1600 #pragma GCC diagnostic pop
1601
1602 head = req.offs & (s->object_size - 1);
1603 req.offs -= head;
1604 bytes += head;
1605 #endif
1606
1607 r = rbd_diff_iterate2(s->image, NULL, req.offs, bytes, true, true,
1608 qemu_rbd_diff_iterate_cb, &req);
1609 if (r < 0 && r != QEMU_RBD_EXIT_DIFF_ITERATE2) {
1610 return status;
1611 }
1612 assert(req.bytes <= bytes);
1613 if (!req.exists) {
1614 if (r == 0) {
1615 /*
1616 * rbd_diff_iterate2 does not invoke callbacks for unallocated
1617 * areas. This here catches the case where no callback was
1618 * invoked at all (req.bytes == 0).
1619 */
1620 assert(req.bytes == 0);
1621 req.bytes = bytes;
1622 }
1623 status = BDRV_BLOCK_ZERO | BDRV_BLOCK_OFFSET_VALID;
1624 }
1625
1626 assert(req.bytes > head);
1627 *pnum = req.bytes - head;
1628 return status;
1629 }
1630
qemu_rbd_co_getlength(BlockDriverState * bs)1631 static int64_t coroutine_fn qemu_rbd_co_getlength(BlockDriverState *bs)
1632 {
1633 BDRVRBDState *s = bs->opaque;
1634 int r;
1635
1636 r = rbd_get_size(s->image, &s->image_size);
1637 if (r < 0) {
1638 return r;
1639 }
1640
1641 return s->image_size;
1642 }
1643
qemu_rbd_co_truncate(BlockDriverState * bs,int64_t offset,bool exact,PreallocMode prealloc,BdrvRequestFlags flags,Error ** errp)1644 static int coroutine_fn qemu_rbd_co_truncate(BlockDriverState *bs,
1645 int64_t offset,
1646 bool exact,
1647 PreallocMode prealloc,
1648 BdrvRequestFlags flags,
1649 Error **errp)
1650 {
1651 int r;
1652
1653 if (prealloc != PREALLOC_MODE_OFF) {
1654 error_setg(errp, "Unsupported preallocation mode '%s'",
1655 PreallocMode_str(prealloc));
1656 return -ENOTSUP;
1657 }
1658
1659 r = qemu_rbd_resize(bs, offset);
1660 if (r < 0) {
1661 error_setg_errno(errp, -r, "Failed to resize file");
1662 return r;
1663 }
1664
1665 return 0;
1666 }
1667
qemu_rbd_snap_create(BlockDriverState * bs,QEMUSnapshotInfo * sn_info)1668 static int qemu_rbd_snap_create(BlockDriverState *bs,
1669 QEMUSnapshotInfo *sn_info)
1670 {
1671 BDRVRBDState *s = bs->opaque;
1672 int r;
1673
1674 if (sn_info->name[0] == '\0') {
1675 return -EINVAL; /* we need a name for rbd snapshots */
1676 }
1677
1678 /*
1679 * rbd snapshots are using the name as the user controlled unique identifier
1680 * we can't use the rbd snapid for that purpose, as it can't be set
1681 */
1682 if (sn_info->id_str[0] != '\0' &&
1683 strcmp(sn_info->id_str, sn_info->name) != 0) {
1684 return -EINVAL;
1685 }
1686
1687 if (strlen(sn_info->name) >= sizeof(sn_info->id_str)) {
1688 return -ERANGE;
1689 }
1690
1691 r = rbd_snap_create(s->image, sn_info->name);
1692 if (r < 0) {
1693 error_report("failed to create snap: %s", strerror(-r));
1694 return r;
1695 }
1696
1697 return 0;
1698 }
1699
qemu_rbd_snap_remove(BlockDriverState * bs,const char * snapshot_id,const char * snapshot_name,Error ** errp)1700 static int qemu_rbd_snap_remove(BlockDriverState *bs,
1701 const char *snapshot_id,
1702 const char *snapshot_name,
1703 Error **errp)
1704 {
1705 BDRVRBDState *s = bs->opaque;
1706 int r;
1707
1708 if (!snapshot_name) {
1709 error_setg(errp, "rbd need a valid snapshot name");
1710 return -EINVAL;
1711 }
1712
1713 /* If snapshot_id is specified, it must be equal to name, see
1714 qemu_rbd_snap_list() */
1715 if (snapshot_id && strcmp(snapshot_id, snapshot_name)) {
1716 error_setg(errp,
1717 "rbd do not support snapshot id, it should be NULL or "
1718 "equal to snapshot name");
1719 return -EINVAL;
1720 }
1721
1722 r = rbd_snap_remove(s->image, snapshot_name);
1723 if (r < 0) {
1724 error_setg_errno(errp, -r, "Failed to remove the snapshot");
1725 }
1726 return r;
1727 }
1728
qemu_rbd_snap_rollback(BlockDriverState * bs,const char * snapshot_name)1729 static int qemu_rbd_snap_rollback(BlockDriverState *bs,
1730 const char *snapshot_name)
1731 {
1732 BDRVRBDState *s = bs->opaque;
1733
1734 return rbd_snap_rollback(s->image, snapshot_name);
1735 }
1736
qemu_rbd_snap_list(BlockDriverState * bs,QEMUSnapshotInfo ** psn_tab)1737 static int qemu_rbd_snap_list(BlockDriverState *bs,
1738 QEMUSnapshotInfo **psn_tab)
1739 {
1740 BDRVRBDState *s = bs->opaque;
1741 QEMUSnapshotInfo *sn_info, *sn_tab = NULL;
1742 int i, snap_count;
1743 rbd_snap_info_t *snaps;
1744 int max_snaps = RBD_MAX_SNAPS;
1745
1746 do {
1747 snaps = g_new(rbd_snap_info_t, max_snaps);
1748 snap_count = rbd_snap_list(s->image, snaps, &max_snaps);
1749 if (snap_count <= 0) {
1750 g_free(snaps);
1751 }
1752 } while (snap_count == -ERANGE);
1753
1754 if (snap_count <= 0) {
1755 goto done;
1756 }
1757
1758 sn_tab = g_new0(QEMUSnapshotInfo, snap_count);
1759
1760 for (i = 0; i < snap_count; i++) {
1761 const char *snap_name = snaps[i].name;
1762
1763 sn_info = sn_tab + i;
1764 pstrcpy(sn_info->id_str, sizeof(sn_info->id_str), snap_name);
1765 pstrcpy(sn_info->name, sizeof(sn_info->name), snap_name);
1766
1767 sn_info->vm_state_size = snaps[i].size;
1768 sn_info->date_sec = 0;
1769 sn_info->date_nsec = 0;
1770 sn_info->vm_clock_nsec = 0;
1771 }
1772 rbd_snap_list_end(snaps);
1773 g_free(snaps);
1774
1775 done:
1776 *psn_tab = sn_tab;
1777 return snap_count;
1778 }
1779
qemu_rbd_co_invalidate_cache(BlockDriverState * bs,Error ** errp)1780 static void coroutine_fn qemu_rbd_co_invalidate_cache(BlockDriverState *bs,
1781 Error **errp)
1782 {
1783 BDRVRBDState *s = bs->opaque;
1784 int r = rbd_invalidate_cache(s->image);
1785 if (r < 0) {
1786 error_setg_errno(errp, -r, "Failed to invalidate the cache");
1787 }
1788 }
1789
1790 static QemuOptsList qemu_rbd_create_opts = {
1791 .name = "rbd-create-opts",
1792 .head = QTAILQ_HEAD_INITIALIZER(qemu_rbd_create_opts.head),
1793 .desc = {
1794 {
1795 .name = BLOCK_OPT_SIZE,
1796 .type = QEMU_OPT_SIZE,
1797 .help = "Virtual disk size"
1798 },
1799 {
1800 .name = BLOCK_OPT_CLUSTER_SIZE,
1801 .type = QEMU_OPT_SIZE,
1802 .help = "RBD object size"
1803 },
1804 {
1805 .name = "password-secret",
1806 .type = QEMU_OPT_STRING,
1807 .help = "ID of secret providing the password",
1808 },
1809 {
1810 .name = "encrypt.format",
1811 .type = QEMU_OPT_STRING,
1812 .help = "Encrypt the image, format choices: 'luks', 'luks2'",
1813 },
1814 {
1815 .name = "encrypt.cipher-alg",
1816 .type = QEMU_OPT_STRING,
1817 .help = "Name of encryption cipher algorithm"
1818 " (allowed values: aes-128, aes-256)",
1819 },
1820 {
1821 .name = "encrypt.key-secret",
1822 .type = QEMU_OPT_STRING,
1823 .help = "ID of secret providing LUKS passphrase",
1824 },
1825 { /* end of list */ }
1826 }
1827 };
1828
1829 static const char *const qemu_rbd_strong_runtime_opts[] = {
1830 "pool",
1831 "namespace",
1832 "image",
1833 "conf",
1834 "snapshot",
1835 "user",
1836 "server.",
1837 "password-secret",
1838
1839 NULL
1840 };
1841
1842 static BlockDriver bdrv_rbd = {
1843 .format_name = "rbd",
1844 .instance_size = sizeof(BDRVRBDState),
1845
1846 .bdrv_parse_filename = qemu_rbd_parse_filename,
1847 .bdrv_open = qemu_rbd_open,
1848 .bdrv_close = qemu_rbd_close,
1849 .bdrv_reopen_prepare = qemu_rbd_reopen_prepare,
1850 .bdrv_co_create = qemu_rbd_co_create,
1851 .bdrv_co_create_opts = qemu_rbd_co_create_opts,
1852 .bdrv_has_zero_init = bdrv_has_zero_init_1,
1853 .bdrv_co_get_info = qemu_rbd_co_get_info,
1854 .bdrv_get_specific_info = qemu_rbd_get_specific_info,
1855 .create_opts = &qemu_rbd_create_opts,
1856 .bdrv_co_getlength = qemu_rbd_co_getlength,
1857 .bdrv_co_truncate = qemu_rbd_co_truncate,
1858 .protocol_name = "rbd",
1859
1860 .bdrv_co_preadv = qemu_rbd_co_preadv,
1861 .bdrv_co_pwritev = qemu_rbd_co_pwritev,
1862 .bdrv_co_flush_to_disk = qemu_rbd_co_flush,
1863 .bdrv_co_pdiscard = qemu_rbd_co_pdiscard,
1864 #ifdef LIBRBD_SUPPORTS_WRITE_ZEROES
1865 .bdrv_co_pwrite_zeroes = qemu_rbd_co_pwrite_zeroes,
1866 #endif
1867 .bdrv_co_block_status = qemu_rbd_co_block_status,
1868
1869 .bdrv_snapshot_create = qemu_rbd_snap_create,
1870 .bdrv_snapshot_delete = qemu_rbd_snap_remove,
1871 .bdrv_snapshot_list = qemu_rbd_snap_list,
1872 .bdrv_snapshot_goto = qemu_rbd_snap_rollback,
1873 .bdrv_co_invalidate_cache = qemu_rbd_co_invalidate_cache,
1874
1875 .strong_runtime_opts = qemu_rbd_strong_runtime_opts,
1876 };
1877
bdrv_rbd_init(void)1878 static void bdrv_rbd_init(void)
1879 {
1880 bdrv_register(&bdrv_rbd);
1881 }
1882
1883 block_init(bdrv_rbd_init);
1884