xref: /openbmc/qemu/block/rbd.c (revision d30b5bc95a9406b4125a35defba3a953358215cb)
1 /*
2  * QEMU Block driver for RADOS (Ceph)
3  *
4  * Copyright (C) 2010-2011 Christian Brunner <chb@muc.de>,
5  *                         Josh Durgin <josh.durgin@dreamhost.com>
6  *
7  * This work is licensed under the terms of the GNU GPL, version 2.  See
8  * the COPYING file in the top-level directory.
9  *
10  * Contributions after 2012-01-13 are licensed under the terms of the
11  * GNU GPL, version 2 or (at your option) any later version.
12  */
13 
14 #include "qemu/osdep.h"
15 
16 #include <rbd/librbd.h>
17 #include "qapi/error.h"
18 #include "qemu/error-report.h"
19 #include "qemu/module.h"
20 #include "qemu/option.h"
21 #include "block/block-io.h"
22 #include "block/block_int.h"
23 #include "block/qdict.h"
24 #include "crypto/secret.h"
25 #include "qemu/cutils.h"
26 #include "sysemu/replay.h"
27 #include "qapi/qmp/qstring.h"
28 #include "qapi/qmp/qdict.h"
29 #include "qapi/qmp/qjson.h"
30 #include "qapi/qmp/qlist.h"
31 #include "qapi/qobject-input-visitor.h"
32 #include "qapi/qapi-visit-block-core.h"
33 
34 /*
35  * When specifying the image filename use:
36  *
37  * rbd:poolname/devicename[@snapshotname][:option1=value1[:option2=value2...]]
38  *
39  * poolname must be the name of an existing rados pool.
40  *
41  * devicename is the name of the rbd image.
42  *
43  * Each option given is used to configure rados, and may be any valid
44  * Ceph option, "id", or "conf".
45  *
46  * The "id" option indicates what user we should authenticate as to
47  * the Ceph cluster.  If it is excluded we will use the Ceph default
48  * (normally 'admin').
49  *
50  * The "conf" option specifies a Ceph configuration file to read.  If
51  * it is not specified, we will read from the default Ceph locations
52  * (e.g., /etc/ceph/ceph.conf).  To avoid reading _any_ configuration
53  * file, specify conf=/dev/null.
54  *
55  * Configuration values containing :, @, or = can be escaped with a
56  * leading "\".
57  */
58 
59 #define OBJ_MAX_SIZE (1UL << OBJ_DEFAULT_OBJ_ORDER)
60 
61 #define RBD_MAX_SNAPS 100
62 
63 #define RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN 8
64 
65 static const char rbd_luks_header_verification[
66         RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN] = {
67     'L', 'U', 'K', 'S', 0xBA, 0xBE, 0, 1
68 };
69 
70 static const char rbd_luks2_header_verification[
71         RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN] = {
72     'L', 'U', 'K', 'S', 0xBA, 0xBE, 0, 2
73 };
74 
75 static const char rbd_layered_luks_header_verification[
76         RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN] = {
77     'R', 'B', 'D', 'L', 0xBA, 0xBE, 0, 1
78 };
79 
80 static const char rbd_layered_luks2_header_verification[
81         RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN] = {
82     'R', 'B', 'D', 'L', 0xBA, 0xBE, 0, 2
83 };
84 
85 typedef enum {
86     RBD_AIO_READ,
87     RBD_AIO_WRITE,
88     RBD_AIO_DISCARD,
89     RBD_AIO_FLUSH,
90     RBD_AIO_WRITE_ZEROES
91 } RBDAIOCmd;
92 
93 typedef struct BDRVRBDState {
94     rados_t cluster;
95     rados_ioctx_t io_ctx;
96     rbd_image_t image;
97     char *image_name;
98     char *snap;
99     char *namespace;
100     uint64_t image_size;
101     uint64_t object_size;
102 } BDRVRBDState;
103 
104 typedef struct RBDTask {
105     BlockDriverState *bs;
106     Coroutine *co;
107     bool complete;
108     int64_t ret;
109 } RBDTask;
110 
111 typedef struct RBDDiffIterateReq {
112     uint64_t offs;
113     uint64_t bytes;
114     bool exists;
115 } RBDDiffIterateReq;
116 
117 static int qemu_rbd_connect(rados_t *cluster, rados_ioctx_t *io_ctx,
118                             BlockdevOptionsRbd *opts, bool cache,
119                             const char *keypairs, const char *secretid,
120                             Error **errp);
121 
122 static char *qemu_rbd_strchr(char *src, char delim)
123 {
124     char *p;
125 
126     for (p = src; *p; ++p) {
127         if (*p == delim) {
128             return p;
129         }
130         if (*p == '\\' && p[1] != '\0') {
131             ++p;
132         }
133     }
134 
135     return NULL;
136 }
137 
138 
139 static char *qemu_rbd_next_tok(char *src, char delim, char **p)
140 {
141     char *end;
142 
143     *p = NULL;
144 
145     end = qemu_rbd_strchr(src, delim);
146     if (end) {
147         *p = end + 1;
148         *end = '\0';
149     }
150     return src;
151 }
152 
153 static void qemu_rbd_unescape(char *src)
154 {
155     char *p;
156 
157     for (p = src; *src; ++src, ++p) {
158         if (*src == '\\' && src[1] != '\0') {
159             src++;
160         }
161         *p = *src;
162     }
163     *p = '\0';
164 }
165 
166 static void qemu_rbd_parse_filename(const char *filename, QDict *options,
167                                     Error **errp)
168 {
169     const char *start;
170     char *p, *buf;
171     QList *keypairs = NULL;
172     char *found_str, *image_name;
173 
174     if (!strstart(filename, "rbd:", &start)) {
175         error_setg(errp, "File name must start with 'rbd:'");
176         return;
177     }
178 
179     buf = g_strdup(start);
180     p = buf;
181 
182     found_str = qemu_rbd_next_tok(p, '/', &p);
183     if (!p) {
184         error_setg(errp, "Pool name is required");
185         goto done;
186     }
187     qemu_rbd_unescape(found_str);
188     qdict_put_str(options, "pool", found_str);
189 
190     if (qemu_rbd_strchr(p, '@')) {
191         image_name = qemu_rbd_next_tok(p, '@', &p);
192 
193         found_str = qemu_rbd_next_tok(p, ':', &p);
194         qemu_rbd_unescape(found_str);
195         qdict_put_str(options, "snapshot", found_str);
196     } else {
197         image_name = qemu_rbd_next_tok(p, ':', &p);
198     }
199     /* Check for namespace in the image_name */
200     if (qemu_rbd_strchr(image_name, '/')) {
201         found_str = qemu_rbd_next_tok(image_name, '/', &image_name);
202         qemu_rbd_unescape(found_str);
203         qdict_put_str(options, "namespace", found_str);
204     } else {
205         qdict_put_str(options, "namespace", "");
206     }
207     qemu_rbd_unescape(image_name);
208     qdict_put_str(options, "image", image_name);
209     if (!p) {
210         goto done;
211     }
212 
213     /* The following are essentially all key/value pairs, and we treat
214      * 'id' and 'conf' a bit special.  Key/value pairs may be in any order. */
215     while (p) {
216         char *name, *value;
217         name = qemu_rbd_next_tok(p, '=', &p);
218         if (!p) {
219             error_setg(errp, "conf option %s has no value", name);
220             break;
221         }
222 
223         qemu_rbd_unescape(name);
224 
225         value = qemu_rbd_next_tok(p, ':', &p);
226         qemu_rbd_unescape(value);
227 
228         if (!strcmp(name, "conf")) {
229             qdict_put_str(options, "conf", value);
230         } else if (!strcmp(name, "id")) {
231             qdict_put_str(options, "user", value);
232         } else {
233             /*
234              * We pass these internally to qemu_rbd_set_keypairs(), so
235              * we can get away with the simpler list of [ "key1",
236              * "value1", "key2", "value2" ] rather than a raw dict
237              * { "key1": "value1", "key2": "value2" } where we can't
238              * guarantee order, or even a more correct but complex
239              * [ { "key1": "value1" }, { "key2": "value2" } ]
240              */
241             if (!keypairs) {
242                 keypairs = qlist_new();
243             }
244             qlist_append_str(keypairs, name);
245             qlist_append_str(keypairs, value);
246         }
247     }
248 
249     if (keypairs) {
250         qdict_put(options, "=keyvalue-pairs",
251                   qstring_from_gstring(qobject_to_json(QOBJECT(keypairs))));
252     }
253 
254 done:
255     g_free(buf);
256     qobject_unref(keypairs);
257     return;
258 }
259 
260 static int qemu_rbd_set_auth(rados_t cluster, BlockdevOptionsRbd *opts,
261                              Error **errp)
262 {
263     char *key, *acr;
264     int r;
265     GString *accu;
266     RbdAuthModeList *auth;
267 
268     if (opts->key_secret) {
269         key = qcrypto_secret_lookup_as_base64(opts->key_secret, errp);
270         if (!key) {
271             return -EIO;
272         }
273         r = rados_conf_set(cluster, "key", key);
274         g_free(key);
275         if (r < 0) {
276             error_setg_errno(errp, -r, "Could not set 'key'");
277             return r;
278         }
279     }
280 
281     if (opts->has_auth_client_required) {
282         accu = g_string_new("");
283         for (auth = opts->auth_client_required; auth; auth = auth->next) {
284             if (accu->str[0]) {
285                 g_string_append_c(accu, ';');
286             }
287             g_string_append(accu, RbdAuthMode_str(auth->value));
288         }
289         acr = g_string_free(accu, FALSE);
290         r = rados_conf_set(cluster, "auth_client_required", acr);
291         g_free(acr);
292         if (r < 0) {
293             error_setg_errno(errp, -r,
294                              "Could not set 'auth_client_required'");
295             return r;
296         }
297     }
298 
299     return 0;
300 }
301 
302 static int qemu_rbd_set_keypairs(rados_t cluster, const char *keypairs_json,
303                                  Error **errp)
304 {
305     QList *keypairs;
306     QString *name;
307     QString *value;
308     const char *key;
309     size_t remaining;
310     int ret = 0;
311 
312     if (!keypairs_json) {
313         return ret;
314     }
315     keypairs = qobject_to(QList,
316                           qobject_from_json(keypairs_json, &error_abort));
317     remaining = qlist_size(keypairs) / 2;
318     assert(remaining);
319 
320     while (remaining--) {
321         name = qobject_to(QString, qlist_pop(keypairs));
322         value = qobject_to(QString, qlist_pop(keypairs));
323         assert(name && value);
324         key = qstring_get_str(name);
325 
326         ret = rados_conf_set(cluster, key, qstring_get_str(value));
327         qobject_unref(value);
328         if (ret < 0) {
329             error_setg_errno(errp, -ret, "invalid conf option %s", key);
330             qobject_unref(name);
331             ret = -EINVAL;
332             break;
333         }
334         qobject_unref(name);
335     }
336 
337     qobject_unref(keypairs);
338     return ret;
339 }
340 
341 #ifdef LIBRBD_SUPPORTS_ENCRYPTION
342 static int qemu_rbd_convert_luks_options(
343         RbdEncryptionOptionsLUKSBase *luks_opts,
344         char **passphrase,
345         size_t *passphrase_len,
346         Error **errp)
347 {
348     return qcrypto_secret_lookup(luks_opts->key_secret, (uint8_t **)passphrase,
349                                  passphrase_len, errp);
350 }
351 
352 static int qemu_rbd_convert_luks_create_options(
353         RbdEncryptionCreateOptionsLUKSBase *luks_opts,
354         rbd_encryption_algorithm_t *alg,
355         char **passphrase,
356         size_t *passphrase_len,
357         Error **errp)
358 {
359     int r = 0;
360 
361     r = qemu_rbd_convert_luks_options(
362             qapi_RbdEncryptionCreateOptionsLUKSBase_base(luks_opts),
363             passphrase, passphrase_len, errp);
364     if (r < 0) {
365         return r;
366     }
367 
368     if (luks_opts->has_cipher_alg) {
369         switch (luks_opts->cipher_alg) {
370             case QCRYPTO_CIPHER_ALG_AES_128: {
371                 *alg = RBD_ENCRYPTION_ALGORITHM_AES128;
372                 break;
373             }
374             case QCRYPTO_CIPHER_ALG_AES_256: {
375                 *alg = RBD_ENCRYPTION_ALGORITHM_AES256;
376                 break;
377             }
378             default: {
379                 r = -ENOTSUP;
380                 error_setg_errno(errp, -r, "unknown encryption algorithm: %u",
381                                  luks_opts->cipher_alg);
382                 return r;
383             }
384         }
385     } else {
386         /* default alg */
387         *alg = RBD_ENCRYPTION_ALGORITHM_AES256;
388     }
389 
390     return 0;
391 }
392 
393 static int qemu_rbd_encryption_format(rbd_image_t image,
394                                       RbdEncryptionCreateOptions *encrypt,
395                                       Error **errp)
396 {
397     int r = 0;
398     g_autofree char *passphrase = NULL;
399     rbd_encryption_format_t format;
400     rbd_encryption_options_t opts;
401     rbd_encryption_luks1_format_options_t luks_opts;
402     rbd_encryption_luks2_format_options_t luks2_opts;
403     size_t opts_size;
404     uint64_t raw_size, effective_size;
405 
406     r = rbd_get_size(image, &raw_size);
407     if (r < 0) {
408         error_setg_errno(errp, -r, "cannot get raw image size");
409         return r;
410     }
411 
412     switch (encrypt->format) {
413         case RBD_IMAGE_ENCRYPTION_FORMAT_LUKS: {
414             memset(&luks_opts, 0, sizeof(luks_opts));
415             format = RBD_ENCRYPTION_FORMAT_LUKS1;
416             opts = &luks_opts;
417             opts_size = sizeof(luks_opts);
418             r = qemu_rbd_convert_luks_create_options(
419                     qapi_RbdEncryptionCreateOptionsLUKS_base(&encrypt->u.luks),
420                     &luks_opts.alg, &passphrase, &luks_opts.passphrase_size,
421                     errp);
422             if (r < 0) {
423                 return r;
424             }
425             luks_opts.passphrase = passphrase;
426             break;
427         }
428         case RBD_IMAGE_ENCRYPTION_FORMAT_LUKS2: {
429             memset(&luks2_opts, 0, sizeof(luks2_opts));
430             format = RBD_ENCRYPTION_FORMAT_LUKS2;
431             opts = &luks2_opts;
432             opts_size = sizeof(luks2_opts);
433             r = qemu_rbd_convert_luks_create_options(
434                     qapi_RbdEncryptionCreateOptionsLUKS2_base(
435                             &encrypt->u.luks2),
436                     &luks2_opts.alg, &passphrase, &luks2_opts.passphrase_size,
437                     errp);
438             if (r < 0) {
439                 return r;
440             }
441             luks2_opts.passphrase = passphrase;
442             break;
443         }
444         default: {
445             r = -ENOTSUP;
446             error_setg_errno(
447                     errp, -r, "unknown image encryption format: %u",
448                     encrypt->format);
449             return r;
450         }
451     }
452 
453     r = rbd_encryption_format(image, format, opts, opts_size);
454     if (r < 0) {
455         error_setg_errno(errp, -r, "encryption format fail");
456         return r;
457     }
458 
459     r = rbd_get_size(image, &effective_size);
460     if (r < 0) {
461         error_setg_errno(errp, -r, "cannot get effective image size");
462         return r;
463     }
464 
465     r = rbd_resize(image, raw_size + (raw_size - effective_size));
466     if (r < 0) {
467         error_setg_errno(errp, -r, "cannot resize image after format");
468         return r;
469     }
470 
471     return 0;
472 }
473 
474 static int qemu_rbd_encryption_load(rbd_image_t image,
475                                     RbdEncryptionOptions *encrypt,
476                                     Error **errp)
477 {
478     int r = 0;
479     g_autofree char *passphrase = NULL;
480     rbd_encryption_luks1_format_options_t luks_opts;
481     rbd_encryption_luks2_format_options_t luks2_opts;
482 #ifdef LIBRBD_SUPPORTS_ENCRYPTION_LOAD2
483     rbd_encryption_luks_format_options_t luks_any_opts;
484 #endif
485     rbd_encryption_format_t format;
486     rbd_encryption_options_t opts;
487     size_t opts_size;
488 
489     switch (encrypt->format) {
490         case RBD_IMAGE_ENCRYPTION_FORMAT_LUKS: {
491             memset(&luks_opts, 0, sizeof(luks_opts));
492             format = RBD_ENCRYPTION_FORMAT_LUKS1;
493             opts = &luks_opts;
494             opts_size = sizeof(luks_opts);
495             r = qemu_rbd_convert_luks_options(
496                     qapi_RbdEncryptionOptionsLUKS_base(&encrypt->u.luks),
497                     &passphrase, &luks_opts.passphrase_size, errp);
498             if (r < 0) {
499                 return r;
500             }
501             luks_opts.passphrase = passphrase;
502             break;
503         }
504         case RBD_IMAGE_ENCRYPTION_FORMAT_LUKS2: {
505             memset(&luks2_opts, 0, sizeof(luks2_opts));
506             format = RBD_ENCRYPTION_FORMAT_LUKS2;
507             opts = &luks2_opts;
508             opts_size = sizeof(luks2_opts);
509             r = qemu_rbd_convert_luks_options(
510                     qapi_RbdEncryptionOptionsLUKS2_base(&encrypt->u.luks2),
511                     &passphrase, &luks2_opts.passphrase_size, errp);
512             if (r < 0) {
513                 return r;
514             }
515             luks2_opts.passphrase = passphrase;
516             break;
517         }
518 #ifdef LIBRBD_SUPPORTS_ENCRYPTION_LOAD2
519         case RBD_IMAGE_ENCRYPTION_FORMAT_LUKS_ANY: {
520             memset(&luks_any_opts, 0, sizeof(luks_any_opts));
521             format = RBD_ENCRYPTION_FORMAT_LUKS;
522             opts = &luks_any_opts;
523             opts_size = sizeof(luks_any_opts);
524             r = qemu_rbd_convert_luks_options(
525                     qapi_RbdEncryptionOptionsLUKSAny_base(&encrypt->u.luks_any),
526                     &passphrase, &luks_any_opts.passphrase_size, errp);
527             if (r < 0) {
528                 return r;
529             }
530             luks_any_opts.passphrase = passphrase;
531             break;
532         }
533 #endif
534         default: {
535             r = -ENOTSUP;
536             error_setg_errno(
537                     errp, -r, "unknown image encryption format: %u",
538                     encrypt->format);
539             return r;
540         }
541     }
542 
543     r = rbd_encryption_load(image, format, opts, opts_size);
544     if (r < 0) {
545         error_setg_errno(errp, -r, "encryption load fail");
546         return r;
547     }
548 
549     return 0;
550 }
551 
552 #ifdef LIBRBD_SUPPORTS_ENCRYPTION_LOAD2
553 static int qemu_rbd_encryption_load2(rbd_image_t image,
554                                      RbdEncryptionOptions *encrypt,
555                                      Error **errp)
556 {
557     int r = 0;
558     int encrypt_count = 1;
559     int i;
560     RbdEncryptionOptions *curr_encrypt;
561     rbd_encryption_spec_t *specs;
562     rbd_encryption_luks1_format_options_t *luks_opts;
563     rbd_encryption_luks2_format_options_t *luks2_opts;
564     rbd_encryption_luks_format_options_t *luks_any_opts;
565 
566     /* count encryption options */
567     for (curr_encrypt = encrypt->parent; curr_encrypt;
568          curr_encrypt = curr_encrypt->parent) {
569         ++encrypt_count;
570     }
571 
572     specs = g_new0(rbd_encryption_spec_t, encrypt_count);
573 
574     curr_encrypt = encrypt;
575     for (i = 0; i < encrypt_count; ++i) {
576         switch (curr_encrypt->format) {
577             case RBD_IMAGE_ENCRYPTION_FORMAT_LUKS: {
578                 specs[i].format = RBD_ENCRYPTION_FORMAT_LUKS1;
579 
580                 luks_opts = g_new0(rbd_encryption_luks1_format_options_t, 1);
581                 specs[i].opts = luks_opts;
582                 specs[i].opts_size = sizeof(*luks_opts);
583 
584                 r = qemu_rbd_convert_luks_options(
585                         qapi_RbdEncryptionOptionsLUKS_base(
586                                 &curr_encrypt->u.luks),
587                         (char **)&luks_opts->passphrase,
588                         &luks_opts->passphrase_size,
589                         errp);
590                 break;
591             }
592             case RBD_IMAGE_ENCRYPTION_FORMAT_LUKS2: {
593                 specs[i].format = RBD_ENCRYPTION_FORMAT_LUKS2;
594 
595                 luks2_opts = g_new0(rbd_encryption_luks2_format_options_t, 1);
596                 specs[i].opts = luks2_opts;
597                 specs[i].opts_size = sizeof(*luks2_opts);
598 
599                 r = qemu_rbd_convert_luks_options(
600                         qapi_RbdEncryptionOptionsLUKS2_base(
601                                 &curr_encrypt->u.luks2),
602                         (char **)&luks2_opts->passphrase,
603                         &luks2_opts->passphrase_size,
604                         errp);
605                 break;
606             }
607             case RBD_IMAGE_ENCRYPTION_FORMAT_LUKS_ANY: {
608                 specs[i].format = RBD_ENCRYPTION_FORMAT_LUKS;
609 
610                 luks_any_opts = g_new0(rbd_encryption_luks_format_options_t, 1);
611                 specs[i].opts = luks_any_opts;
612                 specs[i].opts_size = sizeof(*luks_any_opts);
613 
614                 r = qemu_rbd_convert_luks_options(
615                         qapi_RbdEncryptionOptionsLUKSAny_base(
616                                 &curr_encrypt->u.luks_any),
617                         (char **)&luks_any_opts->passphrase,
618                         &luks_any_opts->passphrase_size,
619                         errp);
620                 break;
621             }
622             default: {
623                 r = -ENOTSUP;
624                 error_setg_errno(
625                         errp, -r, "unknown image encryption format: %u",
626                         curr_encrypt->format);
627             }
628         }
629 
630         if (r < 0) {
631             goto exit;
632         }
633 
634         curr_encrypt = curr_encrypt->parent;
635     }
636 
637     r = rbd_encryption_load2(image, specs, encrypt_count);
638     if (r < 0) {
639         error_setg_errno(errp, -r, "layered encryption load fail");
640         goto exit;
641     }
642 
643 exit:
644     for (i = 0; i < encrypt_count; ++i) {
645         if (!specs[i].opts) {
646             break;
647         }
648 
649         switch (specs[i].format) {
650             case RBD_ENCRYPTION_FORMAT_LUKS1: {
651                 luks_opts = specs[i].opts;
652                 g_free((void *)luks_opts->passphrase);
653                 break;
654             }
655             case RBD_ENCRYPTION_FORMAT_LUKS2: {
656                 luks2_opts = specs[i].opts;
657                 g_free((void *)luks2_opts->passphrase);
658                 break;
659             }
660             case RBD_ENCRYPTION_FORMAT_LUKS: {
661                 luks_any_opts = specs[i].opts;
662                 g_free((void *)luks_any_opts->passphrase);
663                 break;
664             }
665         }
666 
667         g_free(specs[i].opts);
668     }
669     g_free(specs);
670     return r;
671 }
672 #endif
673 #endif
674 
675 /* FIXME Deprecate and remove keypairs or make it available in QMP. */
676 static int qemu_rbd_do_create(BlockdevCreateOptions *options,
677                               const char *keypairs, const char *password_secret,
678                               Error **errp)
679 {
680     BlockdevCreateOptionsRbd *opts = &options->u.rbd;
681     rados_t cluster;
682     rados_ioctx_t io_ctx;
683     int obj_order = 0;
684     int ret;
685 
686     assert(options->driver == BLOCKDEV_DRIVER_RBD);
687     if (opts->location->snapshot) {
688         error_setg(errp, "Can't use snapshot name for image creation");
689         return -EINVAL;
690     }
691 
692 #ifndef LIBRBD_SUPPORTS_ENCRYPTION
693     if (opts->encrypt) {
694         error_setg(errp, "RBD library does not support image encryption");
695         return -ENOTSUP;
696     }
697 #endif
698 
699     if (opts->has_cluster_size) {
700         int64_t objsize = opts->cluster_size;
701         if ((objsize - 1) & objsize) {    /* not a power of 2? */
702             error_setg(errp, "obj size needs to be power of 2");
703             return -EINVAL;
704         }
705         if (objsize < 4096) {
706             error_setg(errp, "obj size too small");
707             return -EINVAL;
708         }
709         obj_order = ctz32(objsize);
710     }
711 
712     ret = qemu_rbd_connect(&cluster, &io_ctx, opts->location, false, keypairs,
713                            password_secret, errp);
714     if (ret < 0) {
715         return ret;
716     }
717 
718     ret = rbd_create(io_ctx, opts->location->image, opts->size, &obj_order);
719     if (ret < 0) {
720         error_setg_errno(errp, -ret, "error rbd create");
721         goto out;
722     }
723 
724 #ifdef LIBRBD_SUPPORTS_ENCRYPTION
725     if (opts->encrypt) {
726         rbd_image_t image;
727 
728         ret = rbd_open(io_ctx, opts->location->image, &image, NULL);
729         if (ret < 0) {
730             error_setg_errno(errp, -ret,
731                              "error opening image '%s' for encryption format",
732                              opts->location->image);
733             goto out;
734         }
735 
736         ret = qemu_rbd_encryption_format(image, opts->encrypt, errp);
737         rbd_close(image);
738         if (ret < 0) {
739             /* encryption format fail, try removing the image */
740             rbd_remove(io_ctx, opts->location->image);
741             goto out;
742         }
743     }
744 #endif
745 
746     ret = 0;
747 out:
748     rados_ioctx_destroy(io_ctx);
749     rados_shutdown(cluster);
750     return ret;
751 }
752 
753 static int qemu_rbd_co_create(BlockdevCreateOptions *options, Error **errp)
754 {
755     return qemu_rbd_do_create(options, NULL, NULL, errp);
756 }
757 
758 static int qemu_rbd_extract_encryption_create_options(
759         QemuOpts *opts,
760         RbdEncryptionCreateOptions **spec,
761         Error **errp)
762 {
763     QDict *opts_qdict;
764     QDict *encrypt_qdict;
765     Visitor *v;
766     int ret = 0;
767 
768     opts_qdict = qemu_opts_to_qdict(opts, NULL);
769     qdict_extract_subqdict(opts_qdict, &encrypt_qdict, "encrypt.");
770     qobject_unref(opts_qdict);
771     if (!qdict_size(encrypt_qdict)) {
772         *spec = NULL;
773         goto exit;
774     }
775 
776     /* Convert options into a QAPI object */
777     v = qobject_input_visitor_new_flat_confused(encrypt_qdict, errp);
778     if (!v) {
779         ret = -EINVAL;
780         goto exit;
781     }
782 
783     visit_type_RbdEncryptionCreateOptions(v, NULL, spec, errp);
784     visit_free(v);
785     if (!*spec) {
786         ret = -EINVAL;
787         goto exit;
788     }
789 
790 exit:
791     qobject_unref(encrypt_qdict);
792     return ret;
793 }
794 
795 static int coroutine_fn qemu_rbd_co_create_opts(BlockDriver *drv,
796                                                 const char *filename,
797                                                 QemuOpts *opts,
798                                                 Error **errp)
799 {
800     BlockdevCreateOptions *create_options;
801     BlockdevCreateOptionsRbd *rbd_opts;
802     BlockdevOptionsRbd *loc;
803     RbdEncryptionCreateOptions *encrypt = NULL;
804     Error *local_err = NULL;
805     const char *keypairs, *password_secret;
806     QDict *options = NULL;
807     int ret = 0;
808 
809     create_options = g_new0(BlockdevCreateOptions, 1);
810     create_options->driver = BLOCKDEV_DRIVER_RBD;
811     rbd_opts = &create_options->u.rbd;
812 
813     rbd_opts->location = g_new0(BlockdevOptionsRbd, 1);
814 
815     password_secret = qemu_opt_get(opts, "password-secret");
816 
817     /* Read out options */
818     rbd_opts->size = ROUND_UP(qemu_opt_get_size_del(opts, BLOCK_OPT_SIZE, 0),
819                               BDRV_SECTOR_SIZE);
820     rbd_opts->cluster_size = qemu_opt_get_size_del(opts,
821                                                    BLOCK_OPT_CLUSTER_SIZE, 0);
822     rbd_opts->has_cluster_size = (rbd_opts->cluster_size != 0);
823 
824     options = qdict_new();
825     qemu_rbd_parse_filename(filename, options, &local_err);
826     if (local_err) {
827         ret = -EINVAL;
828         error_propagate(errp, local_err);
829         goto exit;
830     }
831 
832     ret = qemu_rbd_extract_encryption_create_options(opts, &encrypt, errp);
833     if (ret < 0) {
834         goto exit;
835     }
836     rbd_opts->encrypt     = encrypt;
837 
838     /*
839      * Caution: while qdict_get_try_str() is fine, getting non-string
840      * types would require more care.  When @options come from -blockdev
841      * or blockdev_add, its members are typed according to the QAPI
842      * schema, but when they come from -drive, they're all QString.
843      */
844     loc = rbd_opts->location;
845     loc->pool        = g_strdup(qdict_get_try_str(options, "pool"));
846     loc->conf        = g_strdup(qdict_get_try_str(options, "conf"));
847     loc->user        = g_strdup(qdict_get_try_str(options, "user"));
848     loc->q_namespace = g_strdup(qdict_get_try_str(options, "namespace"));
849     loc->image       = g_strdup(qdict_get_try_str(options, "image"));
850     keypairs         = qdict_get_try_str(options, "=keyvalue-pairs");
851 
852     ret = qemu_rbd_do_create(create_options, keypairs, password_secret, errp);
853     if (ret < 0) {
854         goto exit;
855     }
856 
857 exit:
858     qobject_unref(options);
859     qapi_free_BlockdevCreateOptions(create_options);
860     return ret;
861 }
862 
863 static char *qemu_rbd_mon_host(BlockdevOptionsRbd *opts, Error **errp)
864 {
865     const char **vals;
866     const char *host, *port;
867     char *rados_str;
868     InetSocketAddressBaseList *p;
869     int i, cnt;
870 
871     if (!opts->has_server) {
872         return NULL;
873     }
874 
875     for (cnt = 0, p = opts->server; p; p = p->next) {
876         cnt++;
877     }
878 
879     vals = g_new(const char *, cnt + 1);
880 
881     for (i = 0, p = opts->server; p; p = p->next, i++) {
882         host = p->value->host;
883         port = p->value->port;
884 
885         if (strchr(host, ':')) {
886             vals[i] = g_strdup_printf("[%s]:%s", host, port);
887         } else {
888             vals[i] = g_strdup_printf("%s:%s", host, port);
889         }
890     }
891     vals[i] = NULL;
892 
893     rados_str = i ? g_strjoinv(";", (char **)vals) : NULL;
894     g_strfreev((char **)vals);
895     return rados_str;
896 }
897 
898 static int qemu_rbd_connect(rados_t *cluster, rados_ioctx_t *io_ctx,
899                             BlockdevOptionsRbd *opts, bool cache,
900                             const char *keypairs, const char *secretid,
901                             Error **errp)
902 {
903     char *mon_host = NULL;
904     Error *local_err = NULL;
905     int r;
906 
907     if (secretid) {
908         if (opts->key_secret) {
909             error_setg(errp,
910                        "Legacy 'password-secret' clashes with 'key-secret'");
911             return -EINVAL;
912         }
913         opts->key_secret = g_strdup(secretid);
914     }
915 
916     mon_host = qemu_rbd_mon_host(opts, &local_err);
917     if (local_err) {
918         error_propagate(errp, local_err);
919         r = -EINVAL;
920         goto out;
921     }
922 
923     r = rados_create(cluster, opts->user);
924     if (r < 0) {
925         error_setg_errno(errp, -r, "error initializing");
926         goto out;
927     }
928 
929     /* try default location when conf=NULL, but ignore failure */
930     r = rados_conf_read_file(*cluster, opts->conf);
931     if (opts->conf && r < 0) {
932         error_setg_errno(errp, -r, "error reading conf file %s", opts->conf);
933         goto failed_shutdown;
934     }
935 
936     r = qemu_rbd_set_keypairs(*cluster, keypairs, errp);
937     if (r < 0) {
938         goto failed_shutdown;
939     }
940 
941     if (mon_host) {
942         r = rados_conf_set(*cluster, "mon_host", mon_host);
943         if (r < 0) {
944             goto failed_shutdown;
945         }
946     }
947 
948     r = qemu_rbd_set_auth(*cluster, opts, errp);
949     if (r < 0) {
950         goto failed_shutdown;
951     }
952 
953     /*
954      * Fallback to more conservative semantics if setting cache
955      * options fails. Ignore errors from setting rbd_cache because the
956      * only possible error is that the option does not exist, and
957      * librbd defaults to no caching. If write through caching cannot
958      * be set up, fall back to no caching.
959      */
960     if (cache) {
961         rados_conf_set(*cluster, "rbd_cache", "true");
962     } else {
963         rados_conf_set(*cluster, "rbd_cache", "false");
964     }
965 
966     r = rados_connect(*cluster);
967     if (r < 0) {
968         error_setg_errno(errp, -r, "error connecting");
969         goto failed_shutdown;
970     }
971 
972     r = rados_ioctx_create(*cluster, opts->pool, io_ctx);
973     if (r < 0) {
974         error_setg_errno(errp, -r, "error opening pool %s", opts->pool);
975         goto failed_shutdown;
976     }
977 
978 #ifdef HAVE_RBD_NAMESPACE_EXISTS
979     if (opts->q_namespace && strlen(opts->q_namespace) > 0) {
980         bool exists;
981 
982         r = rbd_namespace_exists(*io_ctx, opts->q_namespace, &exists);
983         if (r < 0) {
984             error_setg_errno(errp, -r, "error checking namespace");
985             goto failed_ioctx_destroy;
986         }
987 
988         if (!exists) {
989             error_setg(errp, "namespace '%s' does not exist",
990                        opts->q_namespace);
991             r = -ENOENT;
992             goto failed_ioctx_destroy;
993         }
994     }
995 #endif
996 
997     /*
998      * Set the namespace after opening the io context on the pool,
999      * if nspace == NULL or if nspace == "", it is just as we did nothing
1000      */
1001     rados_ioctx_set_namespace(*io_ctx, opts->q_namespace);
1002 
1003     r = 0;
1004     goto out;
1005 
1006 #ifdef HAVE_RBD_NAMESPACE_EXISTS
1007 failed_ioctx_destroy:
1008     rados_ioctx_destroy(*io_ctx);
1009 #endif
1010 failed_shutdown:
1011     rados_shutdown(*cluster);
1012 out:
1013     g_free(mon_host);
1014     return r;
1015 }
1016 
1017 static int qemu_rbd_convert_options(QDict *options, BlockdevOptionsRbd **opts,
1018                                     Error **errp)
1019 {
1020     Visitor *v;
1021 
1022     /* Convert the remaining options into a QAPI object */
1023     v = qobject_input_visitor_new_flat_confused(options, errp);
1024     if (!v) {
1025         return -EINVAL;
1026     }
1027 
1028     visit_type_BlockdevOptionsRbd(v, NULL, opts, errp);
1029     visit_free(v);
1030     if (!opts) {
1031         return -EINVAL;
1032     }
1033 
1034     return 0;
1035 }
1036 
1037 static int qemu_rbd_attempt_legacy_options(QDict *options,
1038                                            BlockdevOptionsRbd **opts,
1039                                            char **keypairs)
1040 {
1041     char *filename;
1042     int r;
1043 
1044     filename = g_strdup(qdict_get_try_str(options, "filename"));
1045     if (!filename) {
1046         return -EINVAL;
1047     }
1048     qdict_del(options, "filename");
1049 
1050     qemu_rbd_parse_filename(filename, options, NULL);
1051 
1052     /* keypairs freed by caller */
1053     *keypairs = g_strdup(qdict_get_try_str(options, "=keyvalue-pairs"));
1054     if (*keypairs) {
1055         qdict_del(options, "=keyvalue-pairs");
1056     }
1057 
1058     r = qemu_rbd_convert_options(options, opts, NULL);
1059 
1060     g_free(filename);
1061     return r;
1062 }
1063 
1064 static int qemu_rbd_open(BlockDriverState *bs, QDict *options, int flags,
1065                          Error **errp)
1066 {
1067     BDRVRBDState *s = bs->opaque;
1068     BlockdevOptionsRbd *opts = NULL;
1069     const QDictEntry *e;
1070     Error *local_err = NULL;
1071     char *keypairs, *secretid;
1072     rbd_image_info_t info;
1073     int r;
1074 
1075     keypairs = g_strdup(qdict_get_try_str(options, "=keyvalue-pairs"));
1076     if (keypairs) {
1077         qdict_del(options, "=keyvalue-pairs");
1078     }
1079 
1080     secretid = g_strdup(qdict_get_try_str(options, "password-secret"));
1081     if (secretid) {
1082         qdict_del(options, "password-secret");
1083     }
1084 
1085     r = qemu_rbd_convert_options(options, &opts, &local_err);
1086     if (local_err) {
1087         /* If keypairs are present, that means some options are present in
1088          * the modern option format.  Don't attempt to parse legacy option
1089          * formats, as we won't support mixed usage. */
1090         if (keypairs) {
1091             error_propagate(errp, local_err);
1092             goto out;
1093         }
1094 
1095         /* If the initial attempt to convert and process the options failed,
1096          * we may be attempting to open an image file that has the rbd options
1097          * specified in the older format consisting of all key/value pairs
1098          * encoded in the filename.  Go ahead and attempt to parse the
1099          * filename, and see if we can pull out the required options. */
1100         r = qemu_rbd_attempt_legacy_options(options, &opts, &keypairs);
1101         if (r < 0) {
1102             /* Propagate the original error, not the legacy parsing fallback
1103              * error, as the latter was just a best-effort attempt. */
1104             error_propagate(errp, local_err);
1105             goto out;
1106         }
1107         /* Take care whenever deciding to actually deprecate; once this ability
1108          * is removed, we will not be able to open any images with legacy-styled
1109          * backing image strings. */
1110         warn_report("RBD options encoded in the filename as keyvalue pairs "
1111                     "is deprecated");
1112     }
1113 
1114     /* Remove the processed options from the QDict (the visitor processes
1115      * _all_ options in the QDict) */
1116     while ((e = qdict_first(options))) {
1117         qdict_del(options, e->key);
1118     }
1119 
1120     r = qemu_rbd_connect(&s->cluster, &s->io_ctx, opts,
1121                          !(flags & BDRV_O_NOCACHE), keypairs, secretid, errp);
1122     if (r < 0) {
1123         goto out;
1124     }
1125 
1126     s->snap = g_strdup(opts->snapshot);
1127     s->image_name = g_strdup(opts->image);
1128 
1129     /* rbd_open is always r/w */
1130     r = rbd_open(s->io_ctx, s->image_name, &s->image, s->snap);
1131     if (r < 0) {
1132         error_setg_errno(errp, -r, "error reading header from %s",
1133                          s->image_name);
1134         goto failed_open;
1135     }
1136 
1137     if (opts->encrypt) {
1138 #ifdef LIBRBD_SUPPORTS_ENCRYPTION
1139         if (opts->encrypt->parent) {
1140 #ifdef LIBRBD_SUPPORTS_ENCRYPTION_LOAD2
1141             r = qemu_rbd_encryption_load2(s->image, opts->encrypt, errp);
1142 #else
1143             r = -ENOTSUP;
1144             error_setg(errp, "RBD library does not support layered encryption");
1145 #endif
1146         } else {
1147             r = qemu_rbd_encryption_load(s->image, opts->encrypt, errp);
1148         }
1149         if (r < 0) {
1150             goto failed_post_open;
1151         }
1152 #else
1153         r = -ENOTSUP;
1154         error_setg(errp, "RBD library does not support image encryption");
1155         goto failed_post_open;
1156 #endif
1157     }
1158 
1159     r = rbd_stat(s->image, &info, sizeof(info));
1160     if (r < 0) {
1161         error_setg_errno(errp, -r, "error getting image info from %s",
1162                          s->image_name);
1163         goto failed_post_open;
1164     }
1165     s->image_size = info.size;
1166     s->object_size = info.obj_size;
1167 
1168     /* If we are using an rbd snapshot, we must be r/o, otherwise
1169      * leave as-is */
1170     if (s->snap != NULL) {
1171         r = bdrv_apply_auto_read_only(bs, "rbd snapshots are read-only", errp);
1172         if (r < 0) {
1173             goto failed_post_open;
1174         }
1175     }
1176 
1177 #ifdef LIBRBD_SUPPORTS_WRITE_ZEROES
1178     bs->supported_zero_flags = BDRV_REQ_MAY_UNMAP | BDRV_REQ_NO_FALLBACK;
1179 #endif
1180 
1181     /* When extending regular files, we get zeros from the OS */
1182     bs->supported_truncate_flags = BDRV_REQ_ZERO_WRITE;
1183 
1184     r = 0;
1185     goto out;
1186 
1187 failed_post_open:
1188     rbd_close(s->image);
1189 failed_open:
1190     rados_ioctx_destroy(s->io_ctx);
1191     g_free(s->snap);
1192     g_free(s->image_name);
1193     rados_shutdown(s->cluster);
1194 out:
1195     qapi_free_BlockdevOptionsRbd(opts);
1196     g_free(keypairs);
1197     g_free(secretid);
1198     return r;
1199 }
1200 
1201 
1202 /* Since RBD is currently always opened R/W via the API,
1203  * we just need to check if we are using a snapshot or not, in
1204  * order to determine if we will allow it to be R/W */
1205 static int qemu_rbd_reopen_prepare(BDRVReopenState *state,
1206                                    BlockReopenQueue *queue, Error **errp)
1207 {
1208     BDRVRBDState *s = state->bs->opaque;
1209     int ret = 0;
1210 
1211     if (s->snap && state->flags & BDRV_O_RDWR) {
1212         error_setg(errp,
1213                    "Cannot change node '%s' to r/w when using RBD snapshot",
1214                    bdrv_get_device_or_node_name(state->bs));
1215         ret = -EINVAL;
1216     }
1217 
1218     return ret;
1219 }
1220 
1221 static void qemu_rbd_close(BlockDriverState *bs)
1222 {
1223     BDRVRBDState *s = bs->opaque;
1224 
1225     rbd_close(s->image);
1226     rados_ioctx_destroy(s->io_ctx);
1227     g_free(s->snap);
1228     g_free(s->image_name);
1229     rados_shutdown(s->cluster);
1230 }
1231 
1232 /* Resize the RBD image and update the 'image_size' with the current size */
1233 static int qemu_rbd_resize(BlockDriverState *bs, uint64_t size)
1234 {
1235     BDRVRBDState *s = bs->opaque;
1236     int r;
1237 
1238     r = rbd_resize(s->image, size);
1239     if (r < 0) {
1240         return r;
1241     }
1242 
1243     s->image_size = size;
1244 
1245     return 0;
1246 }
1247 
1248 static void qemu_rbd_finish_bh(void *opaque)
1249 {
1250     RBDTask *task = opaque;
1251     task->complete = true;
1252     aio_co_wake(task->co);
1253 }
1254 
1255 /*
1256  * This is the completion callback function for all rbd aio calls
1257  * started from qemu_rbd_start_co().
1258  *
1259  * Note: this function is being called from a non qemu thread so
1260  * we need to be careful about what we do here. Generally we only
1261  * schedule a BH, and do the rest of the io completion handling
1262  * from qemu_rbd_finish_bh() which runs in a qemu context.
1263  */
1264 static void qemu_rbd_completion_cb(rbd_completion_t c, RBDTask *task)
1265 {
1266     task->ret = rbd_aio_get_return_value(c);
1267     rbd_aio_release(c);
1268     aio_bh_schedule_oneshot(bdrv_get_aio_context(task->bs),
1269                             qemu_rbd_finish_bh, task);
1270 }
1271 
1272 static int coroutine_fn qemu_rbd_start_co(BlockDriverState *bs,
1273                                           uint64_t offset,
1274                                           uint64_t bytes,
1275                                           QEMUIOVector *qiov,
1276                                           int flags,
1277                                           RBDAIOCmd cmd)
1278 {
1279     BDRVRBDState *s = bs->opaque;
1280     RBDTask task = { .bs = bs, .co = qemu_coroutine_self() };
1281     rbd_completion_t c;
1282     int r;
1283 
1284     assert(!qiov || qiov->size == bytes);
1285 
1286     if (cmd == RBD_AIO_WRITE || cmd == RBD_AIO_WRITE_ZEROES) {
1287         /*
1288          * RBD APIs don't allow us to write more than actual size, so in order
1289          * to support growing images, we resize the image before write
1290          * operations that exceed the current size.
1291          */
1292         if (offset + bytes > s->image_size) {
1293             int r = qemu_rbd_resize(bs, offset + bytes);
1294             if (r < 0) {
1295                 return r;
1296             }
1297         }
1298     }
1299 
1300     r = rbd_aio_create_completion(&task,
1301                                   (rbd_callback_t) qemu_rbd_completion_cb, &c);
1302     if (r < 0) {
1303         return r;
1304     }
1305 
1306     switch (cmd) {
1307     case RBD_AIO_READ:
1308         r = rbd_aio_readv(s->image, qiov->iov, qiov->niov, offset, c);
1309         break;
1310     case RBD_AIO_WRITE:
1311         r = rbd_aio_writev(s->image, qiov->iov, qiov->niov, offset, c);
1312         break;
1313     case RBD_AIO_DISCARD:
1314         r = rbd_aio_discard(s->image, offset, bytes, c);
1315         break;
1316     case RBD_AIO_FLUSH:
1317         r = rbd_aio_flush(s->image, c);
1318         break;
1319 #ifdef LIBRBD_SUPPORTS_WRITE_ZEROES
1320     case RBD_AIO_WRITE_ZEROES: {
1321         int zero_flags = 0;
1322 #ifdef RBD_WRITE_ZEROES_FLAG_THICK_PROVISION
1323         if (!(flags & BDRV_REQ_MAY_UNMAP)) {
1324             zero_flags = RBD_WRITE_ZEROES_FLAG_THICK_PROVISION;
1325         }
1326 #endif
1327         r = rbd_aio_write_zeroes(s->image, offset, bytes, c, zero_flags, 0);
1328         break;
1329     }
1330 #endif
1331     default:
1332         r = -EINVAL;
1333     }
1334 
1335     if (r < 0) {
1336         error_report("rbd request failed early: cmd %d offset %" PRIu64
1337                      " bytes %" PRIu64 " flags %d r %d (%s)", cmd, offset,
1338                      bytes, flags, r, strerror(-r));
1339         rbd_aio_release(c);
1340         return r;
1341     }
1342 
1343     while (!task.complete) {
1344         qemu_coroutine_yield();
1345     }
1346 
1347     if (task.ret < 0) {
1348         error_report("rbd request failed: cmd %d offset %" PRIu64 " bytes %"
1349                      PRIu64 " flags %d task.ret %" PRIi64 " (%s)", cmd, offset,
1350                      bytes, flags, task.ret, strerror(-task.ret));
1351         return task.ret;
1352     }
1353 
1354     /* zero pad short reads */
1355     if (cmd == RBD_AIO_READ && task.ret < qiov->size) {
1356         qemu_iovec_memset(qiov, task.ret, 0, qiov->size - task.ret);
1357     }
1358 
1359     return 0;
1360 }
1361 
1362 static int
1363 coroutine_fn qemu_rbd_co_preadv(BlockDriverState *bs, int64_t offset,
1364                                 int64_t bytes, QEMUIOVector *qiov,
1365                                 BdrvRequestFlags flags)
1366 {
1367     return qemu_rbd_start_co(bs, offset, bytes, qiov, flags, RBD_AIO_READ);
1368 }
1369 
1370 static int
1371 coroutine_fn qemu_rbd_co_pwritev(BlockDriverState *bs, int64_t offset,
1372                                  int64_t bytes, QEMUIOVector *qiov,
1373                                  BdrvRequestFlags flags)
1374 {
1375     return qemu_rbd_start_co(bs, offset, bytes, qiov, flags, RBD_AIO_WRITE);
1376 }
1377 
1378 static int coroutine_fn qemu_rbd_co_flush(BlockDriverState *bs)
1379 {
1380     return qemu_rbd_start_co(bs, 0, 0, NULL, 0, RBD_AIO_FLUSH);
1381 }
1382 
1383 static int coroutine_fn qemu_rbd_co_pdiscard(BlockDriverState *bs,
1384                                              int64_t offset, int64_t bytes)
1385 {
1386     return qemu_rbd_start_co(bs, offset, bytes, NULL, 0, RBD_AIO_DISCARD);
1387 }
1388 
1389 #ifdef LIBRBD_SUPPORTS_WRITE_ZEROES
1390 static int
1391 coroutine_fn qemu_rbd_co_pwrite_zeroes(BlockDriverState *bs, int64_t offset,
1392                                        int64_t bytes, BdrvRequestFlags flags)
1393 {
1394     return qemu_rbd_start_co(bs, offset, bytes, NULL, flags,
1395                              RBD_AIO_WRITE_ZEROES);
1396 }
1397 #endif
1398 
1399 static int coroutine_fn
1400 qemu_rbd_co_get_info(BlockDriverState *bs, BlockDriverInfo *bdi)
1401 {
1402     BDRVRBDState *s = bs->opaque;
1403     bdi->cluster_size = s->object_size;
1404     return 0;
1405 }
1406 
1407 static ImageInfoSpecific *qemu_rbd_get_specific_info(BlockDriverState *bs,
1408                                                      Error **errp)
1409 {
1410     BDRVRBDState *s = bs->opaque;
1411     ImageInfoSpecific *spec_info;
1412     char buf[RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN] = {0};
1413     int r;
1414 
1415     if (s->image_size >= RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN) {
1416         r = rbd_read(s->image, 0,
1417                      RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN, buf);
1418         if (r < 0) {
1419             error_setg_errno(errp, -r, "cannot read image start for probe");
1420             return NULL;
1421         }
1422     }
1423 
1424     spec_info = g_new(ImageInfoSpecific, 1);
1425     *spec_info = (ImageInfoSpecific){
1426         .type  = IMAGE_INFO_SPECIFIC_KIND_RBD,
1427         .u.rbd.data = g_new0(ImageInfoSpecificRbd, 1),
1428     };
1429 
1430     if (memcmp(buf, rbd_luks_header_verification,
1431                RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN) == 0) {
1432         spec_info->u.rbd.data->encryption_format =
1433                 RBD_IMAGE_ENCRYPTION_FORMAT_LUKS;
1434         spec_info->u.rbd.data->has_encryption_format = true;
1435     } else if (memcmp(buf, rbd_luks2_header_verification,
1436                RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN) == 0) {
1437         spec_info->u.rbd.data->encryption_format =
1438                 RBD_IMAGE_ENCRYPTION_FORMAT_LUKS2;
1439         spec_info->u.rbd.data->has_encryption_format = true;
1440     } else if (memcmp(buf, rbd_layered_luks_header_verification,
1441                RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN) == 0) {
1442         spec_info->u.rbd.data->encryption_format =
1443                 RBD_IMAGE_ENCRYPTION_FORMAT_LUKS;
1444         spec_info->u.rbd.data->has_encryption_format = true;
1445     } else if (memcmp(buf, rbd_layered_luks2_header_verification,
1446                RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN) == 0) {
1447         spec_info->u.rbd.data->encryption_format =
1448                 RBD_IMAGE_ENCRYPTION_FORMAT_LUKS2;
1449         spec_info->u.rbd.data->has_encryption_format = true;
1450     } else {
1451         spec_info->u.rbd.data->has_encryption_format = false;
1452     }
1453 
1454     return spec_info;
1455 }
1456 
1457 /*
1458  * rbd_diff_iterate2 allows to interrupt the exection by returning a negative
1459  * value in the callback routine. Choose a value that does not conflict with
1460  * an existing exitcode and return it if we want to prematurely stop the
1461  * execution because we detected a change in the allocation status.
1462  */
1463 #define QEMU_RBD_EXIT_DIFF_ITERATE2 -9000
1464 
1465 static int qemu_rbd_diff_iterate_cb(uint64_t offs, size_t len,
1466                                     int exists, void *opaque)
1467 {
1468     RBDDiffIterateReq *req = opaque;
1469 
1470     assert(req->offs + req->bytes <= offs);
1471 
1472     /* treat a hole like an unallocated area and bail out */
1473     if (!exists) {
1474         return 0;
1475     }
1476 
1477     if (!req->exists && offs > req->offs) {
1478         /*
1479          * we started in an unallocated area and hit the first allocated
1480          * block. req->bytes must be set to the length of the unallocated area
1481          * before the allocated area. stop further processing.
1482          */
1483         req->bytes = offs - req->offs;
1484         return QEMU_RBD_EXIT_DIFF_ITERATE2;
1485     }
1486 
1487     if (req->exists && offs > req->offs + req->bytes) {
1488         /*
1489          * we started in an allocated area and jumped over an unallocated area,
1490          * req->bytes contains the length of the allocated area before the
1491          * unallocated area. stop further processing.
1492          */
1493         return QEMU_RBD_EXIT_DIFF_ITERATE2;
1494     }
1495 
1496     req->bytes += len;
1497     req->exists = true;
1498 
1499     return 0;
1500 }
1501 
1502 static int coroutine_fn qemu_rbd_co_block_status(BlockDriverState *bs,
1503                                                  bool want_zero, int64_t offset,
1504                                                  int64_t bytes, int64_t *pnum,
1505                                                  int64_t *map,
1506                                                  BlockDriverState **file)
1507 {
1508     BDRVRBDState *s = bs->opaque;
1509     int status, r;
1510     RBDDiffIterateReq req = { .offs = offset };
1511     uint64_t features, flags;
1512     uint64_t head = 0;
1513 
1514     assert(offset + bytes <= s->image_size);
1515 
1516     /* default to all sectors allocated */
1517     status = BDRV_BLOCK_DATA | BDRV_BLOCK_OFFSET_VALID;
1518     *map = offset;
1519     *file = bs;
1520     *pnum = bytes;
1521 
1522     /* check if RBD image supports fast-diff */
1523     r = rbd_get_features(s->image, &features);
1524     if (r < 0) {
1525         return status;
1526     }
1527     if (!(features & RBD_FEATURE_FAST_DIFF)) {
1528         return status;
1529     }
1530 
1531     /* check if RBD fast-diff result is valid */
1532     r = rbd_get_flags(s->image, &flags);
1533     if (r < 0) {
1534         return status;
1535     }
1536     if (flags & RBD_FLAG_FAST_DIFF_INVALID) {
1537         return status;
1538     }
1539 
1540 #if LIBRBD_VERSION_CODE < LIBRBD_VERSION(1, 17, 0)
1541     /*
1542      * librbd had a bug until early 2022 that affected all versions of ceph that
1543      * supported fast-diff. This bug results in reporting of incorrect offsets
1544      * if the offset parameter to rbd_diff_iterate2 is not object aligned.
1545      * Work around this bug by rounding down the offset to object boundaries.
1546      * This is OK because we call rbd_diff_iterate2 with whole_object = true.
1547      * However, this workaround only works for non cloned images with default
1548      * striping.
1549      *
1550      * See: https://tracker.ceph.com/issues/53784
1551      */
1552 
1553     /* check if RBD image has non-default striping enabled */
1554     if (features & RBD_FEATURE_STRIPINGV2) {
1555         return status;
1556     }
1557 
1558 #pragma GCC diagnostic push
1559 #pragma GCC diagnostic ignored "-Wdeprecated-declarations"
1560     /*
1561      * check if RBD image is a clone (= has a parent).
1562      *
1563      * rbd_get_parent_info is deprecated from Nautilus onwards, but the
1564      * replacement rbd_get_parent is not present in Luminous and Mimic.
1565      */
1566     if (rbd_get_parent_info(s->image, NULL, 0, NULL, 0, NULL, 0) != -ENOENT) {
1567         return status;
1568     }
1569 #pragma GCC diagnostic pop
1570 
1571     head = req.offs & (s->object_size - 1);
1572     req.offs -= head;
1573     bytes += head;
1574 #endif
1575 
1576     r = rbd_diff_iterate2(s->image, NULL, req.offs, bytes, true, true,
1577                           qemu_rbd_diff_iterate_cb, &req);
1578     if (r < 0 && r != QEMU_RBD_EXIT_DIFF_ITERATE2) {
1579         return status;
1580     }
1581     assert(req.bytes <= bytes);
1582     if (!req.exists) {
1583         if (r == 0) {
1584             /*
1585              * rbd_diff_iterate2 does not invoke callbacks for unallocated
1586              * areas. This here catches the case where no callback was
1587              * invoked at all (req.bytes == 0).
1588              */
1589             assert(req.bytes == 0);
1590             req.bytes = bytes;
1591         }
1592         status = BDRV_BLOCK_ZERO | BDRV_BLOCK_OFFSET_VALID;
1593     }
1594 
1595     assert(req.bytes > head);
1596     *pnum = req.bytes - head;
1597     return status;
1598 }
1599 
1600 static int64_t coroutine_fn qemu_rbd_co_getlength(BlockDriverState *bs)
1601 {
1602     BDRVRBDState *s = bs->opaque;
1603     int r;
1604 
1605     r = rbd_get_size(s->image, &s->image_size);
1606     if (r < 0) {
1607         return r;
1608     }
1609 
1610     return s->image_size;
1611 }
1612 
1613 static int coroutine_fn qemu_rbd_co_truncate(BlockDriverState *bs,
1614                                              int64_t offset,
1615                                              bool exact,
1616                                              PreallocMode prealloc,
1617                                              BdrvRequestFlags flags,
1618                                              Error **errp)
1619 {
1620     int r;
1621 
1622     if (prealloc != PREALLOC_MODE_OFF) {
1623         error_setg(errp, "Unsupported preallocation mode '%s'",
1624                    PreallocMode_str(prealloc));
1625         return -ENOTSUP;
1626     }
1627 
1628     r = qemu_rbd_resize(bs, offset);
1629     if (r < 0) {
1630         error_setg_errno(errp, -r, "Failed to resize file");
1631         return r;
1632     }
1633 
1634     return 0;
1635 }
1636 
1637 static int qemu_rbd_snap_create(BlockDriverState *bs,
1638                                 QEMUSnapshotInfo *sn_info)
1639 {
1640     BDRVRBDState *s = bs->opaque;
1641     int r;
1642 
1643     if (sn_info->name[0] == '\0') {
1644         return -EINVAL; /* we need a name for rbd snapshots */
1645     }
1646 
1647     /*
1648      * rbd snapshots are using the name as the user controlled unique identifier
1649      * we can't use the rbd snapid for that purpose, as it can't be set
1650      */
1651     if (sn_info->id_str[0] != '\0' &&
1652         strcmp(sn_info->id_str, sn_info->name) != 0) {
1653         return -EINVAL;
1654     }
1655 
1656     if (strlen(sn_info->name) >= sizeof(sn_info->id_str)) {
1657         return -ERANGE;
1658     }
1659 
1660     r = rbd_snap_create(s->image, sn_info->name);
1661     if (r < 0) {
1662         error_report("failed to create snap: %s", strerror(-r));
1663         return r;
1664     }
1665 
1666     return 0;
1667 }
1668 
1669 static int qemu_rbd_snap_remove(BlockDriverState *bs,
1670                                 const char *snapshot_id,
1671                                 const char *snapshot_name,
1672                                 Error **errp)
1673 {
1674     BDRVRBDState *s = bs->opaque;
1675     int r;
1676 
1677     if (!snapshot_name) {
1678         error_setg(errp, "rbd need a valid snapshot name");
1679         return -EINVAL;
1680     }
1681 
1682     /* If snapshot_id is specified, it must be equal to name, see
1683        qemu_rbd_snap_list() */
1684     if (snapshot_id && strcmp(snapshot_id, snapshot_name)) {
1685         error_setg(errp,
1686                    "rbd do not support snapshot id, it should be NULL or "
1687                    "equal to snapshot name");
1688         return -EINVAL;
1689     }
1690 
1691     r = rbd_snap_remove(s->image, snapshot_name);
1692     if (r < 0) {
1693         error_setg_errno(errp, -r, "Failed to remove the snapshot");
1694     }
1695     return r;
1696 }
1697 
1698 static int qemu_rbd_snap_rollback(BlockDriverState *bs,
1699                                   const char *snapshot_name)
1700 {
1701     BDRVRBDState *s = bs->opaque;
1702 
1703     return rbd_snap_rollback(s->image, snapshot_name);
1704 }
1705 
1706 static int qemu_rbd_snap_list(BlockDriverState *bs,
1707                               QEMUSnapshotInfo **psn_tab)
1708 {
1709     BDRVRBDState *s = bs->opaque;
1710     QEMUSnapshotInfo *sn_info, *sn_tab = NULL;
1711     int i, snap_count;
1712     rbd_snap_info_t *snaps;
1713     int max_snaps = RBD_MAX_SNAPS;
1714 
1715     do {
1716         snaps = g_new(rbd_snap_info_t, max_snaps);
1717         snap_count = rbd_snap_list(s->image, snaps, &max_snaps);
1718         if (snap_count <= 0) {
1719             g_free(snaps);
1720         }
1721     } while (snap_count == -ERANGE);
1722 
1723     if (snap_count <= 0) {
1724         goto done;
1725     }
1726 
1727     sn_tab = g_new0(QEMUSnapshotInfo, snap_count);
1728 
1729     for (i = 0; i < snap_count; i++) {
1730         const char *snap_name = snaps[i].name;
1731 
1732         sn_info = sn_tab + i;
1733         pstrcpy(sn_info->id_str, sizeof(sn_info->id_str), snap_name);
1734         pstrcpy(sn_info->name, sizeof(sn_info->name), snap_name);
1735 
1736         sn_info->vm_state_size = snaps[i].size;
1737         sn_info->date_sec = 0;
1738         sn_info->date_nsec = 0;
1739         sn_info->vm_clock_nsec = 0;
1740     }
1741     rbd_snap_list_end(snaps);
1742     g_free(snaps);
1743 
1744  done:
1745     *psn_tab = sn_tab;
1746     return snap_count;
1747 }
1748 
1749 static void coroutine_fn qemu_rbd_co_invalidate_cache(BlockDriverState *bs,
1750                                                       Error **errp)
1751 {
1752     BDRVRBDState *s = bs->opaque;
1753     int r = rbd_invalidate_cache(s->image);
1754     if (r < 0) {
1755         error_setg_errno(errp, -r, "Failed to invalidate the cache");
1756     }
1757 }
1758 
1759 static QemuOptsList qemu_rbd_create_opts = {
1760     .name = "rbd-create-opts",
1761     .head = QTAILQ_HEAD_INITIALIZER(qemu_rbd_create_opts.head),
1762     .desc = {
1763         {
1764             .name = BLOCK_OPT_SIZE,
1765             .type = QEMU_OPT_SIZE,
1766             .help = "Virtual disk size"
1767         },
1768         {
1769             .name = BLOCK_OPT_CLUSTER_SIZE,
1770             .type = QEMU_OPT_SIZE,
1771             .help = "RBD object size"
1772         },
1773         {
1774             .name = "password-secret",
1775             .type = QEMU_OPT_STRING,
1776             .help = "ID of secret providing the password",
1777         },
1778         {
1779             .name = "encrypt.format",
1780             .type = QEMU_OPT_STRING,
1781             .help = "Encrypt the image, format choices: 'luks', 'luks2'",
1782         },
1783         {
1784             .name = "encrypt.cipher-alg",
1785             .type = QEMU_OPT_STRING,
1786             .help = "Name of encryption cipher algorithm"
1787                     " (allowed values: aes-128, aes-256)",
1788         },
1789         {
1790             .name = "encrypt.key-secret",
1791             .type = QEMU_OPT_STRING,
1792             .help = "ID of secret providing LUKS passphrase",
1793         },
1794         { /* end of list */ }
1795     }
1796 };
1797 
1798 static const char *const qemu_rbd_strong_runtime_opts[] = {
1799     "pool",
1800     "namespace",
1801     "image",
1802     "conf",
1803     "snapshot",
1804     "user",
1805     "server.",
1806     "password-secret",
1807 
1808     NULL
1809 };
1810 
1811 static BlockDriver bdrv_rbd = {
1812     .format_name            = "rbd",
1813     .instance_size          = sizeof(BDRVRBDState),
1814     .bdrv_parse_filename    = qemu_rbd_parse_filename,
1815     .bdrv_file_open         = qemu_rbd_open,
1816     .bdrv_close             = qemu_rbd_close,
1817     .bdrv_reopen_prepare    = qemu_rbd_reopen_prepare,
1818     .bdrv_co_create         = qemu_rbd_co_create,
1819     .bdrv_co_create_opts    = qemu_rbd_co_create_opts,
1820     .bdrv_has_zero_init     = bdrv_has_zero_init_1,
1821     .bdrv_co_get_info       = qemu_rbd_co_get_info,
1822     .bdrv_get_specific_info = qemu_rbd_get_specific_info,
1823     .create_opts            = &qemu_rbd_create_opts,
1824     .bdrv_co_getlength      = qemu_rbd_co_getlength,
1825     .bdrv_co_truncate       = qemu_rbd_co_truncate,
1826     .protocol_name          = "rbd",
1827 
1828     .bdrv_co_preadv         = qemu_rbd_co_preadv,
1829     .bdrv_co_pwritev        = qemu_rbd_co_pwritev,
1830     .bdrv_co_flush_to_disk  = qemu_rbd_co_flush,
1831     .bdrv_co_pdiscard       = qemu_rbd_co_pdiscard,
1832 #ifdef LIBRBD_SUPPORTS_WRITE_ZEROES
1833     .bdrv_co_pwrite_zeroes  = qemu_rbd_co_pwrite_zeroes,
1834 #endif
1835     .bdrv_co_block_status   = qemu_rbd_co_block_status,
1836 
1837     .bdrv_snapshot_create   = qemu_rbd_snap_create,
1838     .bdrv_snapshot_delete   = qemu_rbd_snap_remove,
1839     .bdrv_snapshot_list     = qemu_rbd_snap_list,
1840     .bdrv_snapshot_goto     = qemu_rbd_snap_rollback,
1841     .bdrv_co_invalidate_cache = qemu_rbd_co_invalidate_cache,
1842 
1843     .strong_runtime_opts    = qemu_rbd_strong_runtime_opts,
1844 };
1845 
1846 static void bdrv_rbd_init(void)
1847 {
1848     bdrv_register(&bdrv_rbd);
1849 }
1850 
1851 block_init(bdrv_rbd_init);
1852