xref: /openbmc/qemu/block/rbd.c (revision 2e1cacfb)
1 /*
2  * QEMU Block driver for RADOS (Ceph)
3  *
4  * Copyright (C) 2010-2011 Christian Brunner <chb@muc.de>,
5  *                         Josh Durgin <josh.durgin@dreamhost.com>
6  *
7  * This work is licensed under the terms of the GNU GPL, version 2.  See
8  * the COPYING file in the top-level directory.
9  *
10  * Contributions after 2012-01-13 are licensed under the terms of the
11  * GNU GPL, version 2 or (at your option) any later version.
12  */
13 
14 #include "qemu/osdep.h"
15 
16 #include <rbd/librbd.h>
17 #include "qapi/error.h"
18 #include "qemu/error-report.h"
19 #include "qemu/module.h"
20 #include "qemu/option.h"
21 #include "block/block-io.h"
22 #include "block/block_int.h"
23 #include "block/qdict.h"
24 #include "crypto/secret.h"
25 #include "qemu/cutils.h"
26 #include "sysemu/replay.h"
27 #include "qapi/qmp/qstring.h"
28 #include "qapi/qmp/qdict.h"
29 #include "qapi/qmp/qjson.h"
30 #include "qapi/qmp/qlist.h"
31 #include "qapi/qobject-input-visitor.h"
32 #include "qapi/qapi-visit-block-core.h"
33 
34 /*
35  * When specifying the image filename use:
36  *
37  * rbd:poolname/devicename[@snapshotname][:option1=value1[:option2=value2...]]
38  *
39  * poolname must be the name of an existing rados pool.
40  *
41  * devicename is the name of the rbd image.
42  *
43  * Each option given is used to configure rados, and may be any valid
44  * Ceph option, "id", or "conf".
45  *
46  * The "id" option indicates what user we should authenticate as to
47  * the Ceph cluster.  If it is excluded we will use the Ceph default
48  * (normally 'admin').
49  *
50  * The "conf" option specifies a Ceph configuration file to read.  If
51  * it is not specified, we will read from the default Ceph locations
52  * (e.g., /etc/ceph/ceph.conf).  To avoid reading _any_ configuration
53  * file, specify conf=/dev/null.
54  *
55  * Configuration values containing :, @, or = can be escaped with a
56  * leading "\".
57  */
58 
59 #define OBJ_MAX_SIZE (1UL << OBJ_DEFAULT_OBJ_ORDER)
60 
61 #define RBD_MAX_SNAPS 100
62 
63 #define RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN 8
64 
65 static const char rbd_luks_header_verification[
66         RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN] = {
67     'L', 'U', 'K', 'S', 0xBA, 0xBE, 0, 1
68 };
69 
70 static const char rbd_luks2_header_verification[
71         RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN] = {
72     'L', 'U', 'K', 'S', 0xBA, 0xBE, 0, 2
73 };
74 
75 static const char rbd_layered_luks_header_verification[
76         RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN] = {
77     'R', 'B', 'D', 'L', 0xBA, 0xBE, 0, 1
78 };
79 
80 static const char rbd_layered_luks2_header_verification[
81         RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN] = {
82     'R', 'B', 'D', 'L', 0xBA, 0xBE, 0, 2
83 };
84 
85 typedef enum {
86     RBD_AIO_READ,
87     RBD_AIO_WRITE,
88     RBD_AIO_DISCARD,
89     RBD_AIO_FLUSH,
90     RBD_AIO_WRITE_ZEROES
91 } RBDAIOCmd;
92 
93 typedef struct BDRVRBDState {
94     rados_t cluster;
95     rados_ioctx_t io_ctx;
96     rbd_image_t image;
97     char *image_name;
98     char *snap;
99     char *namespace;
100     uint64_t image_size;
101     uint64_t object_size;
102 } BDRVRBDState;
103 
104 typedef struct RBDTask {
105     BlockDriverState *bs;
106     Coroutine *co;
107     bool complete;
108     int64_t ret;
109 } RBDTask;
110 
111 typedef struct RBDDiffIterateReq {
112     uint64_t offs;
113     uint64_t bytes;
114     bool exists;
115 } RBDDiffIterateReq;
116 
117 static int qemu_rbd_connect(rados_t *cluster, rados_ioctx_t *io_ctx,
118                             BlockdevOptionsRbd *opts, bool cache,
119                             const char *keypairs, const char *secretid,
120                             Error **errp);
121 
122 static char *qemu_rbd_strchr(char *src, char delim)
123 {
124     char *p;
125 
126     for (p = src; *p; ++p) {
127         if (*p == delim) {
128             return p;
129         }
130         if (*p == '\\' && p[1] != '\0') {
131             ++p;
132         }
133     }
134 
135     return NULL;
136 }
137 
138 
139 static char *qemu_rbd_next_tok(char *src, char delim, char **p)
140 {
141     char *end;
142 
143     *p = NULL;
144 
145     end = qemu_rbd_strchr(src, delim);
146     if (end) {
147         *p = end + 1;
148         *end = '\0';
149     }
150     return src;
151 }
152 
153 static void qemu_rbd_unescape(char *src)
154 {
155     char *p;
156 
157     for (p = src; *src; ++src, ++p) {
158         if (*src == '\\' && src[1] != '\0') {
159             src++;
160         }
161         *p = *src;
162     }
163     *p = '\0';
164 }
165 
166 static void qemu_rbd_parse_filename(const char *filename, QDict *options,
167                                     Error **errp)
168 {
169     const char *start;
170     char *p, *buf;
171     QList *keypairs = NULL;
172     char *found_str, *image_name;
173 
174     if (!strstart(filename, "rbd:", &start)) {
175         error_setg(errp, "File name must start with 'rbd:'");
176         return;
177     }
178 
179     buf = g_strdup(start);
180     p = buf;
181 
182     found_str = qemu_rbd_next_tok(p, '/', &p);
183     if (!p) {
184         error_setg(errp, "Pool name is required");
185         goto done;
186     }
187     qemu_rbd_unescape(found_str);
188     qdict_put_str(options, "pool", found_str);
189 
190     if (qemu_rbd_strchr(p, '@')) {
191         image_name = qemu_rbd_next_tok(p, '@', &p);
192 
193         found_str = qemu_rbd_next_tok(p, ':', &p);
194         qemu_rbd_unescape(found_str);
195         qdict_put_str(options, "snapshot", found_str);
196     } else {
197         image_name = qemu_rbd_next_tok(p, ':', &p);
198     }
199     /* Check for namespace in the image_name */
200     if (qemu_rbd_strchr(image_name, '/')) {
201         found_str = qemu_rbd_next_tok(image_name, '/', &image_name);
202         qemu_rbd_unescape(found_str);
203         qdict_put_str(options, "namespace", found_str);
204     } else {
205         qdict_put_str(options, "namespace", "");
206     }
207     qemu_rbd_unescape(image_name);
208     qdict_put_str(options, "image", image_name);
209     if (!p) {
210         goto done;
211     }
212 
213     /* The following are essentially all key/value pairs, and we treat
214      * 'id' and 'conf' a bit special.  Key/value pairs may be in any order. */
215     while (p) {
216         char *name, *value;
217         name = qemu_rbd_next_tok(p, '=', &p);
218         if (!p) {
219             error_setg(errp, "conf option %s has no value", name);
220             break;
221         }
222 
223         qemu_rbd_unescape(name);
224 
225         value = qemu_rbd_next_tok(p, ':', &p);
226         qemu_rbd_unescape(value);
227 
228         if (!strcmp(name, "conf")) {
229             qdict_put_str(options, "conf", value);
230         } else if (!strcmp(name, "id")) {
231             qdict_put_str(options, "user", value);
232         } else {
233             /*
234              * We pass these internally to qemu_rbd_set_keypairs(), so
235              * we can get away with the simpler list of [ "key1",
236              * "value1", "key2", "value2" ] rather than a raw dict
237              * { "key1": "value1", "key2": "value2" } where we can't
238              * guarantee order, or even a more correct but complex
239              * [ { "key1": "value1" }, { "key2": "value2" } ]
240              */
241             if (!keypairs) {
242                 keypairs = qlist_new();
243             }
244             qlist_append_str(keypairs, name);
245             qlist_append_str(keypairs, value);
246         }
247     }
248 
249     if (keypairs) {
250         qdict_put(options, "=keyvalue-pairs",
251                   qstring_from_gstring(qobject_to_json(QOBJECT(keypairs))));
252     }
253 
254 done:
255     g_free(buf);
256     qobject_unref(keypairs);
257     return;
258 }
259 
260 static int qemu_rbd_set_auth(rados_t cluster, BlockdevOptionsRbd *opts,
261                              Error **errp)
262 {
263     char *key, *acr;
264     int r;
265     GString *accu;
266     RbdAuthModeList *auth;
267 
268     if (opts->key_secret) {
269         key = qcrypto_secret_lookup_as_base64(opts->key_secret, errp);
270         if (!key) {
271             return -EIO;
272         }
273         r = rados_conf_set(cluster, "key", key);
274         g_free(key);
275         if (r < 0) {
276             error_setg_errno(errp, -r, "Could not set 'key'");
277             return r;
278         }
279     }
280 
281     if (opts->has_auth_client_required) {
282         accu = g_string_new("");
283         for (auth = opts->auth_client_required; auth; auth = auth->next) {
284             if (accu->str[0]) {
285                 g_string_append_c(accu, ';');
286             }
287             g_string_append(accu, RbdAuthMode_str(auth->value));
288         }
289         acr = g_string_free(accu, FALSE);
290         r = rados_conf_set(cluster, "auth_client_required", acr);
291         g_free(acr);
292         if (r < 0) {
293             error_setg_errno(errp, -r,
294                              "Could not set 'auth_client_required'");
295             return r;
296         }
297     }
298 
299     return 0;
300 }
301 
302 static int qemu_rbd_set_keypairs(rados_t cluster, const char *keypairs_json,
303                                  Error **errp)
304 {
305     QList *keypairs;
306     QString *name;
307     QString *value;
308     const char *key;
309     size_t remaining;
310     int ret = 0;
311 
312     if (!keypairs_json) {
313         return ret;
314     }
315     keypairs = qobject_to(QList,
316                           qobject_from_json(keypairs_json, &error_abort));
317     remaining = qlist_size(keypairs) / 2;
318     assert(remaining);
319 
320     while (remaining--) {
321         name = qobject_to(QString, qlist_pop(keypairs));
322         value = qobject_to(QString, qlist_pop(keypairs));
323         assert(name && value);
324         key = qstring_get_str(name);
325 
326         ret = rados_conf_set(cluster, key, qstring_get_str(value));
327         qobject_unref(value);
328         if (ret < 0) {
329             error_setg_errno(errp, -ret, "invalid conf option %s", key);
330             qobject_unref(name);
331             ret = -EINVAL;
332             break;
333         }
334         qobject_unref(name);
335     }
336 
337     qobject_unref(keypairs);
338     return ret;
339 }
340 
341 #ifdef LIBRBD_SUPPORTS_ENCRYPTION
342 static int qemu_rbd_convert_luks_options(
343         RbdEncryptionOptionsLUKSBase *luks_opts,
344         char **passphrase,
345         size_t *passphrase_len,
346         Error **errp)
347 {
348     return qcrypto_secret_lookup(luks_opts->key_secret, (uint8_t **)passphrase,
349                                  passphrase_len, errp);
350 }
351 
352 static int qemu_rbd_convert_luks_create_options(
353         RbdEncryptionCreateOptionsLUKSBase *luks_opts,
354         rbd_encryption_algorithm_t *alg,
355         char **passphrase,
356         size_t *passphrase_len,
357         Error **errp)
358 {
359     int r = 0;
360 
361     r = qemu_rbd_convert_luks_options(
362             qapi_RbdEncryptionCreateOptionsLUKSBase_base(luks_opts),
363             passphrase, passphrase_len, errp);
364     if (r < 0) {
365         return r;
366     }
367 
368     if (luks_opts->has_cipher_alg) {
369         switch (luks_opts->cipher_alg) {
370             case QCRYPTO_CIPHER_ALGO_AES_128: {
371                 *alg = RBD_ENCRYPTION_ALGORITHM_AES128;
372                 break;
373             }
374             case QCRYPTO_CIPHER_ALGO_AES_256: {
375                 *alg = RBD_ENCRYPTION_ALGORITHM_AES256;
376                 break;
377             }
378             default: {
379                 r = -ENOTSUP;
380                 error_setg_errno(errp, -r, "unknown encryption algorithm: %u",
381                                  luks_opts->cipher_alg);
382                 return r;
383             }
384         }
385     } else {
386         /* default alg */
387         *alg = RBD_ENCRYPTION_ALGORITHM_AES256;
388     }
389 
390     return 0;
391 }
392 
393 static int qemu_rbd_encryption_format(rbd_image_t image,
394                                       RbdEncryptionCreateOptions *encrypt,
395                                       Error **errp)
396 {
397     int r = 0;
398     g_autofree char *passphrase = NULL;
399     rbd_encryption_format_t format;
400     rbd_encryption_options_t opts;
401     rbd_encryption_luks1_format_options_t luks_opts;
402     rbd_encryption_luks2_format_options_t luks2_opts;
403     size_t opts_size;
404     uint64_t raw_size, effective_size;
405 
406     r = rbd_get_size(image, &raw_size);
407     if (r < 0) {
408         error_setg_errno(errp, -r, "cannot get raw image size");
409         return r;
410     }
411 
412     switch (encrypt->format) {
413         case RBD_IMAGE_ENCRYPTION_FORMAT_LUKS: {
414             memset(&luks_opts, 0, sizeof(luks_opts));
415             format = RBD_ENCRYPTION_FORMAT_LUKS1;
416             opts = &luks_opts;
417             opts_size = sizeof(luks_opts);
418             r = qemu_rbd_convert_luks_create_options(
419                     qapi_RbdEncryptionCreateOptionsLUKS_base(&encrypt->u.luks),
420                     &luks_opts.alg, &passphrase, &luks_opts.passphrase_size,
421                     errp);
422             if (r < 0) {
423                 return r;
424             }
425             luks_opts.passphrase = passphrase;
426             break;
427         }
428         case RBD_IMAGE_ENCRYPTION_FORMAT_LUKS2: {
429             memset(&luks2_opts, 0, sizeof(luks2_opts));
430             format = RBD_ENCRYPTION_FORMAT_LUKS2;
431             opts = &luks2_opts;
432             opts_size = sizeof(luks2_opts);
433             r = qemu_rbd_convert_luks_create_options(
434                     qapi_RbdEncryptionCreateOptionsLUKS2_base(
435                             &encrypt->u.luks2),
436                     &luks2_opts.alg, &passphrase, &luks2_opts.passphrase_size,
437                     errp);
438             if (r < 0) {
439                 return r;
440             }
441             luks2_opts.passphrase = passphrase;
442             break;
443         }
444         default: {
445             r = -ENOTSUP;
446             error_setg_errno(
447                     errp, -r, "unknown image encryption format: %u",
448                     encrypt->format);
449             return r;
450         }
451     }
452 
453     r = rbd_encryption_format(image, format, opts, opts_size);
454     if (r < 0) {
455         error_setg_errno(errp, -r, "encryption format fail");
456         return r;
457     }
458 
459     r = rbd_get_size(image, &effective_size);
460     if (r < 0) {
461         error_setg_errno(errp, -r, "cannot get effective image size");
462         return r;
463     }
464 
465     r = rbd_resize(image, raw_size + (raw_size - effective_size));
466     if (r < 0) {
467         error_setg_errno(errp, -r, "cannot resize image after format");
468         return r;
469     }
470 
471     return 0;
472 }
473 
474 static int qemu_rbd_encryption_load(rbd_image_t image,
475                                     RbdEncryptionOptions *encrypt,
476                                     Error **errp)
477 {
478     int r = 0;
479     g_autofree char *passphrase = NULL;
480     rbd_encryption_luks1_format_options_t luks_opts;
481     rbd_encryption_luks2_format_options_t luks2_opts;
482 #ifdef LIBRBD_SUPPORTS_ENCRYPTION_LOAD2
483     rbd_encryption_luks_format_options_t luks_any_opts;
484 #endif
485     rbd_encryption_format_t format;
486     rbd_encryption_options_t opts;
487     size_t opts_size;
488 
489     switch (encrypt->format) {
490         case RBD_IMAGE_ENCRYPTION_FORMAT_LUKS: {
491             memset(&luks_opts, 0, sizeof(luks_opts));
492             format = RBD_ENCRYPTION_FORMAT_LUKS1;
493             opts = &luks_opts;
494             opts_size = sizeof(luks_opts);
495             r = qemu_rbd_convert_luks_options(
496                     qapi_RbdEncryptionOptionsLUKS_base(&encrypt->u.luks),
497                     &passphrase, &luks_opts.passphrase_size, errp);
498             if (r < 0) {
499                 return r;
500             }
501             luks_opts.passphrase = passphrase;
502             break;
503         }
504         case RBD_IMAGE_ENCRYPTION_FORMAT_LUKS2: {
505             memset(&luks2_opts, 0, sizeof(luks2_opts));
506             format = RBD_ENCRYPTION_FORMAT_LUKS2;
507             opts = &luks2_opts;
508             opts_size = sizeof(luks2_opts);
509             r = qemu_rbd_convert_luks_options(
510                     qapi_RbdEncryptionOptionsLUKS2_base(&encrypt->u.luks2),
511                     &passphrase, &luks2_opts.passphrase_size, errp);
512             if (r < 0) {
513                 return r;
514             }
515             luks2_opts.passphrase = passphrase;
516             break;
517         }
518 #ifdef LIBRBD_SUPPORTS_ENCRYPTION_LOAD2
519         case RBD_IMAGE_ENCRYPTION_FORMAT_LUKS_ANY: {
520             memset(&luks_any_opts, 0, sizeof(luks_any_opts));
521             format = RBD_ENCRYPTION_FORMAT_LUKS;
522             opts = &luks_any_opts;
523             opts_size = sizeof(luks_any_opts);
524             r = qemu_rbd_convert_luks_options(
525                     qapi_RbdEncryptionOptionsLUKSAny_base(&encrypt->u.luks_any),
526                     &passphrase, &luks_any_opts.passphrase_size, errp);
527             if (r < 0) {
528                 return r;
529             }
530             luks_any_opts.passphrase = passphrase;
531             break;
532         }
533 #endif
534         default: {
535             r = -ENOTSUP;
536             error_setg_errno(
537                     errp, -r, "unknown image encryption format: %u",
538                     encrypt->format);
539             return r;
540         }
541     }
542 
543     r = rbd_encryption_load(image, format, opts, opts_size);
544     if (r < 0) {
545         error_setg_errno(errp, -r, "encryption load fail");
546         return r;
547     }
548 
549     return 0;
550 }
551 
552 #ifdef LIBRBD_SUPPORTS_ENCRYPTION_LOAD2
553 static int qemu_rbd_encryption_load2(rbd_image_t image,
554                                      RbdEncryptionOptions *encrypt,
555                                      Error **errp)
556 {
557     int r = 0;
558     int encrypt_count = 1;
559     int i;
560     RbdEncryptionOptions *curr_encrypt;
561     rbd_encryption_spec_t *specs;
562     rbd_encryption_luks1_format_options_t *luks_opts;
563     rbd_encryption_luks2_format_options_t *luks2_opts;
564     rbd_encryption_luks_format_options_t *luks_any_opts;
565 
566     /* count encryption options */
567     for (curr_encrypt = encrypt->parent; curr_encrypt;
568          curr_encrypt = curr_encrypt->parent) {
569         ++encrypt_count;
570     }
571 
572     specs = g_new0(rbd_encryption_spec_t, encrypt_count);
573 
574     curr_encrypt = encrypt;
575     for (i = 0; i < encrypt_count; ++i) {
576         switch (curr_encrypt->format) {
577             case RBD_IMAGE_ENCRYPTION_FORMAT_LUKS: {
578                 specs[i].format = RBD_ENCRYPTION_FORMAT_LUKS1;
579 
580                 luks_opts = g_new0(rbd_encryption_luks1_format_options_t, 1);
581                 specs[i].opts = luks_opts;
582                 specs[i].opts_size = sizeof(*luks_opts);
583 
584                 r = qemu_rbd_convert_luks_options(
585                         qapi_RbdEncryptionOptionsLUKS_base(
586                                 &curr_encrypt->u.luks),
587                         (char **)&luks_opts->passphrase,
588                         &luks_opts->passphrase_size,
589                         errp);
590                 break;
591             }
592             case RBD_IMAGE_ENCRYPTION_FORMAT_LUKS2: {
593                 specs[i].format = RBD_ENCRYPTION_FORMAT_LUKS2;
594 
595                 luks2_opts = g_new0(rbd_encryption_luks2_format_options_t, 1);
596                 specs[i].opts = luks2_opts;
597                 specs[i].opts_size = sizeof(*luks2_opts);
598 
599                 r = qemu_rbd_convert_luks_options(
600                         qapi_RbdEncryptionOptionsLUKS2_base(
601                                 &curr_encrypt->u.luks2),
602                         (char **)&luks2_opts->passphrase,
603                         &luks2_opts->passphrase_size,
604                         errp);
605                 break;
606             }
607             case RBD_IMAGE_ENCRYPTION_FORMAT_LUKS_ANY: {
608                 specs[i].format = RBD_ENCRYPTION_FORMAT_LUKS;
609 
610                 luks_any_opts = g_new0(rbd_encryption_luks_format_options_t, 1);
611                 specs[i].opts = luks_any_opts;
612                 specs[i].opts_size = sizeof(*luks_any_opts);
613 
614                 r = qemu_rbd_convert_luks_options(
615                         qapi_RbdEncryptionOptionsLUKSAny_base(
616                                 &curr_encrypt->u.luks_any),
617                         (char **)&luks_any_opts->passphrase,
618                         &luks_any_opts->passphrase_size,
619                         errp);
620                 break;
621             }
622             default: {
623                 r = -ENOTSUP;
624                 error_setg_errno(
625                         errp, -r, "unknown image encryption format: %u",
626                         curr_encrypt->format);
627             }
628         }
629 
630         if (r < 0) {
631             goto exit;
632         }
633 
634         curr_encrypt = curr_encrypt->parent;
635     }
636 
637     r = rbd_encryption_load2(image, specs, encrypt_count);
638     if (r < 0) {
639         error_setg_errno(errp, -r, "layered encryption load fail");
640         goto exit;
641     }
642 
643 exit:
644     for (i = 0; i < encrypt_count; ++i) {
645         if (!specs[i].opts) {
646             break;
647         }
648 
649         switch (specs[i].format) {
650             case RBD_ENCRYPTION_FORMAT_LUKS1: {
651                 luks_opts = specs[i].opts;
652                 g_free((void *)luks_opts->passphrase);
653                 break;
654             }
655             case RBD_ENCRYPTION_FORMAT_LUKS2: {
656                 luks2_opts = specs[i].opts;
657                 g_free((void *)luks2_opts->passphrase);
658                 break;
659             }
660             case RBD_ENCRYPTION_FORMAT_LUKS: {
661                 luks_any_opts = specs[i].opts;
662                 g_free((void *)luks_any_opts->passphrase);
663                 break;
664             }
665         }
666 
667         g_free(specs[i].opts);
668     }
669     g_free(specs);
670     return r;
671 }
672 #endif
673 #endif
674 
675 /* FIXME Deprecate and remove keypairs or make it available in QMP. */
676 static int qemu_rbd_do_create(BlockdevCreateOptions *options,
677                               const char *keypairs, const char *password_secret,
678                               Error **errp)
679 {
680     BlockdevCreateOptionsRbd *opts = &options->u.rbd;
681     rados_t cluster;
682     rados_ioctx_t io_ctx;
683     int obj_order = 0;
684     int ret;
685 
686     assert(options->driver == BLOCKDEV_DRIVER_RBD);
687     if (opts->location->snapshot) {
688         error_setg(errp, "Can't use snapshot name for image creation");
689         return -EINVAL;
690     }
691 
692 #ifndef LIBRBD_SUPPORTS_ENCRYPTION
693     if (opts->encrypt) {
694         error_setg(errp, "RBD library does not support image encryption");
695         return -ENOTSUP;
696     }
697 #endif
698 
699     if (opts->has_cluster_size) {
700         int64_t objsize = opts->cluster_size;
701         if ((objsize - 1) & objsize) {    /* not a power of 2? */
702             error_setg(errp, "obj size needs to be power of 2");
703             return -EINVAL;
704         }
705         if (objsize < 4096) {
706             error_setg(errp, "obj size too small");
707             return -EINVAL;
708         }
709         obj_order = ctz32(objsize);
710     }
711 
712     ret = qemu_rbd_connect(&cluster, &io_ctx, opts->location, false, keypairs,
713                            password_secret, errp);
714     if (ret < 0) {
715         return ret;
716     }
717 
718     ret = rbd_create(io_ctx, opts->location->image, opts->size, &obj_order);
719     if (ret < 0) {
720         error_setg_errno(errp, -ret, "error rbd create");
721         goto out;
722     }
723 
724 #ifdef LIBRBD_SUPPORTS_ENCRYPTION
725     if (opts->encrypt) {
726         rbd_image_t image;
727 
728         ret = rbd_open(io_ctx, opts->location->image, &image, NULL);
729         if (ret < 0) {
730             error_setg_errno(errp, -ret,
731                              "error opening image '%s' for encryption format",
732                              opts->location->image);
733             goto out;
734         }
735 
736         ret = qemu_rbd_encryption_format(image, opts->encrypt, errp);
737         rbd_close(image);
738         if (ret < 0) {
739             /* encryption format fail, try removing the image */
740             rbd_remove(io_ctx, opts->location->image);
741             goto out;
742         }
743     }
744 #endif
745 
746     ret = 0;
747 out:
748     rados_ioctx_destroy(io_ctx);
749     rados_shutdown(cluster);
750     return ret;
751 }
752 
753 static int qemu_rbd_co_create(BlockdevCreateOptions *options, Error **errp)
754 {
755     return qemu_rbd_do_create(options, NULL, NULL, errp);
756 }
757 
758 static int qemu_rbd_extract_encryption_create_options(
759         QemuOpts *opts,
760         RbdEncryptionCreateOptions **spec,
761         Error **errp)
762 {
763     QDict *opts_qdict;
764     QDict *encrypt_qdict;
765     Visitor *v;
766     int ret = 0;
767 
768     opts_qdict = qemu_opts_to_qdict(opts, NULL);
769     qdict_extract_subqdict(opts_qdict, &encrypt_qdict, "encrypt.");
770     qobject_unref(opts_qdict);
771     if (!qdict_size(encrypt_qdict)) {
772         *spec = NULL;
773         goto exit;
774     }
775 
776     /* Convert options into a QAPI object */
777     v = qobject_input_visitor_new_flat_confused(encrypt_qdict, errp);
778     if (!v) {
779         ret = -EINVAL;
780         goto exit;
781     }
782 
783     visit_type_RbdEncryptionCreateOptions(v, NULL, spec, errp);
784     visit_free(v);
785     if (!*spec) {
786         ret = -EINVAL;
787         goto exit;
788     }
789 
790 exit:
791     qobject_unref(encrypt_qdict);
792     return ret;
793 }
794 
795 static int coroutine_fn qemu_rbd_co_create_opts(BlockDriver *drv,
796                                                 const char *filename,
797                                                 QemuOpts *opts,
798                                                 Error **errp)
799 {
800     BlockdevCreateOptions *create_options;
801     BlockdevCreateOptionsRbd *rbd_opts;
802     BlockdevOptionsRbd *loc;
803     RbdEncryptionCreateOptions *encrypt = NULL;
804     Error *local_err = NULL;
805     const char *keypairs, *password_secret;
806     QDict *options = NULL;
807     int ret = 0;
808 
809     create_options = g_new0(BlockdevCreateOptions, 1);
810     create_options->driver = BLOCKDEV_DRIVER_RBD;
811     rbd_opts = &create_options->u.rbd;
812 
813     rbd_opts->location = g_new0(BlockdevOptionsRbd, 1);
814 
815     password_secret = qemu_opt_get(opts, "password-secret");
816 
817     /* Read out options */
818     rbd_opts->size = ROUND_UP(qemu_opt_get_size_del(opts, BLOCK_OPT_SIZE, 0),
819                               BDRV_SECTOR_SIZE);
820     rbd_opts->cluster_size = qemu_opt_get_size_del(opts,
821                                                    BLOCK_OPT_CLUSTER_SIZE, 0);
822     rbd_opts->has_cluster_size = (rbd_opts->cluster_size != 0);
823 
824     options = qdict_new();
825     qemu_rbd_parse_filename(filename, options, &local_err);
826     if (local_err) {
827         ret = -EINVAL;
828         error_propagate(errp, local_err);
829         goto exit;
830     }
831 
832     ret = qemu_rbd_extract_encryption_create_options(opts, &encrypt, errp);
833     if (ret < 0) {
834         goto exit;
835     }
836     rbd_opts->encrypt     = encrypt;
837 
838     /*
839      * Caution: while qdict_get_try_str() is fine, getting non-string
840      * types would require more care.  When @options come from -blockdev
841      * or blockdev_add, its members are typed according to the QAPI
842      * schema, but when they come from -drive, they're all QString.
843      */
844     loc = rbd_opts->location;
845     loc->pool        = g_strdup(qdict_get_try_str(options, "pool"));
846     loc->conf        = g_strdup(qdict_get_try_str(options, "conf"));
847     loc->user        = g_strdup(qdict_get_try_str(options, "user"));
848     loc->q_namespace = g_strdup(qdict_get_try_str(options, "namespace"));
849     loc->image       = g_strdup(qdict_get_try_str(options, "image"));
850     keypairs         = qdict_get_try_str(options, "=keyvalue-pairs");
851 
852     ret = qemu_rbd_do_create(create_options, keypairs, password_secret, errp);
853     if (ret < 0) {
854         goto exit;
855     }
856 
857 exit:
858     qobject_unref(options);
859     qapi_free_BlockdevCreateOptions(create_options);
860     return ret;
861 }
862 
863 static char *qemu_rbd_mon_host(BlockdevOptionsRbd *opts, Error **errp)
864 {
865     const char **vals;
866     const char *host, *port;
867     char *rados_str;
868     InetSocketAddressBaseList *p;
869     int i, cnt;
870 
871     if (!opts->has_server) {
872         return NULL;
873     }
874 
875     for (cnt = 0, p = opts->server; p; p = p->next) {
876         cnt++;
877     }
878 
879     vals = g_new(const char *, cnt + 1);
880 
881     for (i = 0, p = opts->server; p; p = p->next, i++) {
882         host = p->value->host;
883         port = p->value->port;
884 
885         if (strchr(host, ':')) {
886             vals[i] = g_strdup_printf("[%s]:%s", host, port);
887         } else {
888             vals[i] = g_strdup_printf("%s:%s", host, port);
889         }
890     }
891     vals[i] = NULL;
892 
893     rados_str = i ? g_strjoinv(";", (char **)vals) : NULL;
894     g_strfreev((char **)vals);
895     return rados_str;
896 }
897 
898 static int qemu_rbd_connect(rados_t *cluster, rados_ioctx_t *io_ctx,
899                             BlockdevOptionsRbd *opts, bool cache,
900                             const char *keypairs, const char *secretid,
901                             Error **errp)
902 {
903     char *mon_host = NULL;
904     Error *local_err = NULL;
905     int r;
906 
907     if (secretid) {
908         if (opts->key_secret) {
909             error_setg(errp,
910                        "Legacy 'password-secret' clashes with 'key-secret'");
911             return -EINVAL;
912         }
913         opts->key_secret = g_strdup(secretid);
914     }
915 
916     mon_host = qemu_rbd_mon_host(opts, &local_err);
917     if (local_err) {
918         error_propagate(errp, local_err);
919         r = -EINVAL;
920         goto out;
921     }
922 
923     r = rados_create(cluster, opts->user);
924     if (r < 0) {
925         error_setg_errno(errp, -r, "error initializing");
926         goto out;
927     }
928 
929     /* try default location when conf=NULL, but ignore failure */
930     r = rados_conf_read_file(*cluster, opts->conf);
931     if (opts->conf && r < 0) {
932         error_setg_errno(errp, -r, "error reading conf file %s", opts->conf);
933         goto failed_shutdown;
934     }
935 
936     r = qemu_rbd_set_keypairs(*cluster, keypairs, errp);
937     if (r < 0) {
938         goto failed_shutdown;
939     }
940 
941     if (mon_host) {
942         r = rados_conf_set(*cluster, "mon_host", mon_host);
943         if (r < 0) {
944             goto failed_shutdown;
945         }
946     }
947 
948     r = qemu_rbd_set_auth(*cluster, opts, errp);
949     if (r < 0) {
950         goto failed_shutdown;
951     }
952 
953     /*
954      * Fallback to more conservative semantics if setting cache
955      * options fails. Ignore errors from setting rbd_cache because the
956      * only possible error is that the option does not exist, and
957      * librbd defaults to no caching. If write through caching cannot
958      * be set up, fall back to no caching.
959      */
960     if (cache) {
961         rados_conf_set(*cluster, "rbd_cache", "true");
962     } else {
963         rados_conf_set(*cluster, "rbd_cache", "false");
964     }
965 
966     r = rados_connect(*cluster);
967     if (r < 0) {
968         error_setg_errno(errp, -r, "error connecting");
969         goto failed_shutdown;
970     }
971 
972     r = rados_ioctx_create(*cluster, opts->pool, io_ctx);
973     if (r < 0) {
974         error_setg_errno(errp, -r, "error opening pool %s", opts->pool);
975         goto failed_shutdown;
976     }
977 
978 #ifdef HAVE_RBD_NAMESPACE_EXISTS
979     if (opts->q_namespace && strlen(opts->q_namespace) > 0) {
980         bool exists;
981 
982         r = rbd_namespace_exists(*io_ctx, opts->q_namespace, &exists);
983         if (r < 0) {
984             error_setg_errno(errp, -r, "error checking namespace");
985             goto failed_ioctx_destroy;
986         }
987 
988         if (!exists) {
989             error_setg(errp, "namespace '%s' does not exist",
990                        opts->q_namespace);
991             r = -ENOENT;
992             goto failed_ioctx_destroy;
993         }
994     }
995 #endif
996 
997     /*
998      * Set the namespace after opening the io context on the pool,
999      * if nspace == NULL or if nspace == "", it is just as we did nothing
1000      */
1001     rados_ioctx_set_namespace(*io_ctx, opts->q_namespace);
1002 
1003     r = 0;
1004     goto out;
1005 
1006 #ifdef HAVE_RBD_NAMESPACE_EXISTS
1007 failed_ioctx_destroy:
1008     rados_ioctx_destroy(*io_ctx);
1009 #endif
1010 failed_shutdown:
1011     rados_shutdown(*cluster);
1012 out:
1013     g_free(mon_host);
1014     return r;
1015 }
1016 
1017 static int qemu_rbd_convert_options(QDict *options, BlockdevOptionsRbd **opts,
1018                                     Error **errp)
1019 {
1020     Visitor *v;
1021 
1022     /* Convert the remaining options into a QAPI object */
1023     v = qobject_input_visitor_new_flat_confused(options, errp);
1024     if (!v) {
1025         return -EINVAL;
1026     }
1027 
1028     visit_type_BlockdevOptionsRbd(v, NULL, opts, errp);
1029     visit_free(v);
1030     if (!opts) {
1031         return -EINVAL;
1032     }
1033 
1034     return 0;
1035 }
1036 
1037 static int qemu_rbd_attempt_legacy_options(QDict *options,
1038                                            BlockdevOptionsRbd **opts,
1039                                            char **keypairs)
1040 {
1041     char *filename;
1042     int r;
1043 
1044     filename = g_strdup(qdict_get_try_str(options, "filename"));
1045     if (!filename) {
1046         return -EINVAL;
1047     }
1048     qdict_del(options, "filename");
1049 
1050     qemu_rbd_parse_filename(filename, options, NULL);
1051 
1052     /* keypairs freed by caller */
1053     *keypairs = g_strdup(qdict_get_try_str(options, "=keyvalue-pairs"));
1054     if (*keypairs) {
1055         qdict_del(options, "=keyvalue-pairs");
1056     }
1057 
1058     r = qemu_rbd_convert_options(options, opts, NULL);
1059 
1060     g_free(filename);
1061     return r;
1062 }
1063 
1064 static int qemu_rbd_open(BlockDriverState *bs, QDict *options, int flags,
1065                          Error **errp)
1066 {
1067     BDRVRBDState *s = bs->opaque;
1068     BlockdevOptionsRbd *opts = NULL;
1069     const QDictEntry *e;
1070     Error *local_err = NULL;
1071     char *keypairs, *secretid;
1072     rbd_image_info_t info;
1073     int r;
1074 
1075     keypairs = g_strdup(qdict_get_try_str(options, "=keyvalue-pairs"));
1076     if (keypairs) {
1077         qdict_del(options, "=keyvalue-pairs");
1078     }
1079 
1080     secretid = g_strdup(qdict_get_try_str(options, "password-secret"));
1081     if (secretid) {
1082         qdict_del(options, "password-secret");
1083     }
1084 
1085     r = qemu_rbd_convert_options(options, &opts, &local_err);
1086     if (local_err) {
1087         /* If keypairs are present, that means some options are present in
1088          * the modern option format.  Don't attempt to parse legacy option
1089          * formats, as we won't support mixed usage. */
1090         if (keypairs) {
1091             error_propagate(errp, local_err);
1092             goto out;
1093         }
1094 
1095         /* If the initial attempt to convert and process the options failed,
1096          * we may be attempting to open an image file that has the rbd options
1097          * specified in the older format consisting of all key/value pairs
1098          * encoded in the filename.  Go ahead and attempt to parse the
1099          * filename, and see if we can pull out the required options. */
1100         r = qemu_rbd_attempt_legacy_options(options, &opts, &keypairs);
1101         if (r < 0) {
1102             /* Propagate the original error, not the legacy parsing fallback
1103              * error, as the latter was just a best-effort attempt. */
1104             error_propagate(errp, local_err);
1105             goto out;
1106         }
1107         /* Take care whenever deciding to actually deprecate; once this ability
1108          * is removed, we will not be able to open any images with legacy-styled
1109          * backing image strings. */
1110         warn_report("RBD options encoded in the filename as keyvalue pairs "
1111                     "is deprecated");
1112     }
1113 
1114     /* Remove the processed options from the QDict (the visitor processes
1115      * _all_ options in the QDict) */
1116     while ((e = qdict_first(options))) {
1117         qdict_del(options, e->key);
1118     }
1119 
1120     r = qemu_rbd_connect(&s->cluster, &s->io_ctx, opts,
1121                          !(flags & BDRV_O_NOCACHE), keypairs, secretid, errp);
1122     if (r < 0) {
1123         goto out;
1124     }
1125 
1126     s->snap = g_strdup(opts->snapshot);
1127     s->image_name = g_strdup(opts->image);
1128 
1129     /* rbd_open is always r/w */
1130     r = rbd_open(s->io_ctx, s->image_name, &s->image, s->snap);
1131     if (r < 0) {
1132         error_setg_errno(errp, -r, "error reading header from %s",
1133                          s->image_name);
1134         goto failed_open;
1135     }
1136 
1137     if (opts->encrypt) {
1138 #ifdef LIBRBD_SUPPORTS_ENCRYPTION
1139         if (opts->encrypt->parent) {
1140 #ifdef LIBRBD_SUPPORTS_ENCRYPTION_LOAD2
1141             r = qemu_rbd_encryption_load2(s->image, opts->encrypt, errp);
1142 #else
1143             r = -ENOTSUP;
1144             error_setg(errp, "RBD library does not support layered encryption");
1145 #endif
1146         } else {
1147             r = qemu_rbd_encryption_load(s->image, opts->encrypt, errp);
1148         }
1149         if (r < 0) {
1150             goto failed_post_open;
1151         }
1152 #else
1153         r = -ENOTSUP;
1154         error_setg(errp, "RBD library does not support image encryption");
1155         goto failed_post_open;
1156 #endif
1157     }
1158 
1159     r = rbd_stat(s->image, &info, sizeof(info));
1160     if (r < 0) {
1161         error_setg_errno(errp, -r, "error getting image info from %s",
1162                          s->image_name);
1163         goto failed_post_open;
1164     }
1165     s->image_size = info.size;
1166     s->object_size = info.obj_size;
1167 
1168     /* If we are using an rbd snapshot, we must be r/o, otherwise
1169      * leave as-is */
1170     if (s->snap != NULL) {
1171         bdrv_graph_rdlock_main_loop();
1172         r = bdrv_apply_auto_read_only(bs, "rbd snapshots are read-only", errp);
1173         bdrv_graph_rdunlock_main_loop();
1174         if (r < 0) {
1175             goto failed_post_open;
1176         }
1177     }
1178 
1179 #ifdef LIBRBD_SUPPORTS_WRITE_ZEROES
1180     bs->supported_zero_flags = BDRV_REQ_MAY_UNMAP | BDRV_REQ_NO_FALLBACK;
1181 #endif
1182 
1183     /* When extending regular files, we get zeros from the OS */
1184     bs->supported_truncate_flags = BDRV_REQ_ZERO_WRITE;
1185 
1186     r = 0;
1187     goto out;
1188 
1189 failed_post_open:
1190     rbd_close(s->image);
1191 failed_open:
1192     rados_ioctx_destroy(s->io_ctx);
1193     g_free(s->snap);
1194     g_free(s->image_name);
1195     rados_shutdown(s->cluster);
1196 out:
1197     qapi_free_BlockdevOptionsRbd(opts);
1198     g_free(keypairs);
1199     g_free(secretid);
1200     return r;
1201 }
1202 
1203 
1204 /* Since RBD is currently always opened R/W via the API,
1205  * we just need to check if we are using a snapshot or not, in
1206  * order to determine if we will allow it to be R/W */
1207 static int qemu_rbd_reopen_prepare(BDRVReopenState *state,
1208                                    BlockReopenQueue *queue, Error **errp)
1209 {
1210     BDRVRBDState *s = state->bs->opaque;
1211     int ret = 0;
1212 
1213     GRAPH_RDLOCK_GUARD_MAINLOOP();
1214 
1215     if (s->snap && state->flags & BDRV_O_RDWR) {
1216         error_setg(errp,
1217                    "Cannot change node '%s' to r/w when using RBD snapshot",
1218                    bdrv_get_device_or_node_name(state->bs));
1219         ret = -EINVAL;
1220     }
1221 
1222     return ret;
1223 }
1224 
1225 static void qemu_rbd_close(BlockDriverState *bs)
1226 {
1227     BDRVRBDState *s = bs->opaque;
1228 
1229     rbd_close(s->image);
1230     rados_ioctx_destroy(s->io_ctx);
1231     g_free(s->snap);
1232     g_free(s->image_name);
1233     rados_shutdown(s->cluster);
1234 }
1235 
1236 /* Resize the RBD image and update the 'image_size' with the current size */
1237 static int qemu_rbd_resize(BlockDriverState *bs, uint64_t size)
1238 {
1239     BDRVRBDState *s = bs->opaque;
1240     int r;
1241 
1242     r = rbd_resize(s->image, size);
1243     if (r < 0) {
1244         return r;
1245     }
1246 
1247     s->image_size = size;
1248 
1249     return 0;
1250 }
1251 
1252 static void qemu_rbd_finish_bh(void *opaque)
1253 {
1254     RBDTask *task = opaque;
1255     task->complete = true;
1256     aio_co_wake(task->co);
1257 }
1258 
1259 /*
1260  * This is the completion callback function for all rbd aio calls
1261  * started from qemu_rbd_start_co().
1262  *
1263  * Note: this function is being called from a non qemu thread so
1264  * we need to be careful about what we do here. Generally we only
1265  * schedule a BH, and do the rest of the io completion handling
1266  * from qemu_rbd_finish_bh() which runs in a qemu context.
1267  */
1268 static void qemu_rbd_completion_cb(rbd_completion_t c, RBDTask *task)
1269 {
1270     task->ret = rbd_aio_get_return_value(c);
1271     rbd_aio_release(c);
1272     aio_bh_schedule_oneshot(bdrv_get_aio_context(task->bs),
1273                             qemu_rbd_finish_bh, task);
1274 }
1275 
1276 static int coroutine_fn qemu_rbd_start_co(BlockDriverState *bs,
1277                                           uint64_t offset,
1278                                           uint64_t bytes,
1279                                           QEMUIOVector *qiov,
1280                                           int flags,
1281                                           RBDAIOCmd cmd)
1282 {
1283     BDRVRBDState *s = bs->opaque;
1284     RBDTask task = { .bs = bs, .co = qemu_coroutine_self() };
1285     rbd_completion_t c;
1286     int r;
1287 
1288     assert(!qiov || qiov->size == bytes);
1289 
1290     if (cmd == RBD_AIO_WRITE || cmd == RBD_AIO_WRITE_ZEROES) {
1291         /*
1292          * RBD APIs don't allow us to write more than actual size, so in order
1293          * to support growing images, we resize the image before write
1294          * operations that exceed the current size.
1295          */
1296         if (offset + bytes > s->image_size) {
1297             r = qemu_rbd_resize(bs, offset + bytes);
1298             if (r < 0) {
1299                 return r;
1300             }
1301         }
1302     }
1303 
1304     r = rbd_aio_create_completion(&task,
1305                                   (rbd_callback_t) qemu_rbd_completion_cb, &c);
1306     if (r < 0) {
1307         return r;
1308     }
1309 
1310     switch (cmd) {
1311     case RBD_AIO_READ:
1312         r = rbd_aio_readv(s->image, qiov->iov, qiov->niov, offset, c);
1313         break;
1314     case RBD_AIO_WRITE:
1315         r = rbd_aio_writev(s->image, qiov->iov, qiov->niov, offset, c);
1316         break;
1317     case RBD_AIO_DISCARD:
1318         r = rbd_aio_discard(s->image, offset, bytes, c);
1319         break;
1320     case RBD_AIO_FLUSH:
1321         r = rbd_aio_flush(s->image, c);
1322         break;
1323 #ifdef LIBRBD_SUPPORTS_WRITE_ZEROES
1324     case RBD_AIO_WRITE_ZEROES: {
1325         int zero_flags = 0;
1326 #ifdef RBD_WRITE_ZEROES_FLAG_THICK_PROVISION
1327         if (!(flags & BDRV_REQ_MAY_UNMAP)) {
1328             zero_flags = RBD_WRITE_ZEROES_FLAG_THICK_PROVISION;
1329         }
1330 #endif
1331         r = rbd_aio_write_zeroes(s->image, offset, bytes, c, zero_flags, 0);
1332         break;
1333     }
1334 #endif
1335     default:
1336         r = -EINVAL;
1337     }
1338 
1339     if (r < 0) {
1340         error_report("rbd request failed early: cmd %d offset %" PRIu64
1341                      " bytes %" PRIu64 " flags %d r %d (%s)", cmd, offset,
1342                      bytes, flags, r, strerror(-r));
1343         rbd_aio_release(c);
1344         return r;
1345     }
1346 
1347     while (!task.complete) {
1348         qemu_coroutine_yield();
1349     }
1350 
1351     if (task.ret < 0) {
1352         error_report("rbd request failed: cmd %d offset %" PRIu64 " bytes %"
1353                      PRIu64 " flags %d task.ret %" PRIi64 " (%s)", cmd, offset,
1354                      bytes, flags, task.ret, strerror(-task.ret));
1355         return task.ret;
1356     }
1357 
1358     /* zero pad short reads */
1359     if (cmd == RBD_AIO_READ && task.ret < qiov->size) {
1360         qemu_iovec_memset(qiov, task.ret, 0, qiov->size - task.ret);
1361     }
1362 
1363     return 0;
1364 }
1365 
1366 static int
1367 coroutine_fn qemu_rbd_co_preadv(BlockDriverState *bs, int64_t offset,
1368                                 int64_t bytes, QEMUIOVector *qiov,
1369                                 BdrvRequestFlags flags)
1370 {
1371     return qemu_rbd_start_co(bs, offset, bytes, qiov, flags, RBD_AIO_READ);
1372 }
1373 
1374 static int
1375 coroutine_fn qemu_rbd_co_pwritev(BlockDriverState *bs, int64_t offset,
1376                                  int64_t bytes, QEMUIOVector *qiov,
1377                                  BdrvRequestFlags flags)
1378 {
1379     return qemu_rbd_start_co(bs, offset, bytes, qiov, flags, RBD_AIO_WRITE);
1380 }
1381 
1382 static int coroutine_fn qemu_rbd_co_flush(BlockDriverState *bs)
1383 {
1384     return qemu_rbd_start_co(bs, 0, 0, NULL, 0, RBD_AIO_FLUSH);
1385 }
1386 
1387 static int coroutine_fn qemu_rbd_co_pdiscard(BlockDriverState *bs,
1388                                              int64_t offset, int64_t bytes)
1389 {
1390     return qemu_rbd_start_co(bs, offset, bytes, NULL, 0, RBD_AIO_DISCARD);
1391 }
1392 
1393 #ifdef LIBRBD_SUPPORTS_WRITE_ZEROES
1394 static int
1395 coroutine_fn qemu_rbd_co_pwrite_zeroes(BlockDriverState *bs, int64_t offset,
1396                                        int64_t bytes, BdrvRequestFlags flags)
1397 {
1398     return qemu_rbd_start_co(bs, offset, bytes, NULL, flags,
1399                              RBD_AIO_WRITE_ZEROES);
1400 }
1401 #endif
1402 
1403 static int coroutine_fn
1404 qemu_rbd_co_get_info(BlockDriverState *bs, BlockDriverInfo *bdi)
1405 {
1406     BDRVRBDState *s = bs->opaque;
1407     bdi->cluster_size = s->object_size;
1408     return 0;
1409 }
1410 
1411 static ImageInfoSpecific *qemu_rbd_get_specific_info(BlockDriverState *bs,
1412                                                      Error **errp)
1413 {
1414     BDRVRBDState *s = bs->opaque;
1415     ImageInfoSpecific *spec_info;
1416     char buf[RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN] = {0};
1417     int r;
1418 
1419     if (s->image_size >= RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN) {
1420         r = rbd_read(s->image, 0,
1421                      RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN, buf);
1422         if (r < 0) {
1423             error_setg_errno(errp, -r, "cannot read image start for probe");
1424             return NULL;
1425         }
1426     }
1427 
1428     spec_info = g_new(ImageInfoSpecific, 1);
1429     *spec_info = (ImageInfoSpecific){
1430         .type  = IMAGE_INFO_SPECIFIC_KIND_RBD,
1431         .u.rbd.data = g_new0(ImageInfoSpecificRbd, 1),
1432     };
1433 
1434     if (memcmp(buf, rbd_luks_header_verification,
1435                RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN) == 0) {
1436         spec_info->u.rbd.data->encryption_format =
1437                 RBD_IMAGE_ENCRYPTION_FORMAT_LUKS;
1438         spec_info->u.rbd.data->has_encryption_format = true;
1439     } else if (memcmp(buf, rbd_luks2_header_verification,
1440                RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN) == 0) {
1441         spec_info->u.rbd.data->encryption_format =
1442                 RBD_IMAGE_ENCRYPTION_FORMAT_LUKS2;
1443         spec_info->u.rbd.data->has_encryption_format = true;
1444     } else if (memcmp(buf, rbd_layered_luks_header_verification,
1445                RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN) == 0) {
1446         spec_info->u.rbd.data->encryption_format =
1447                 RBD_IMAGE_ENCRYPTION_FORMAT_LUKS;
1448         spec_info->u.rbd.data->has_encryption_format = true;
1449     } else if (memcmp(buf, rbd_layered_luks2_header_verification,
1450                RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN) == 0) {
1451         spec_info->u.rbd.data->encryption_format =
1452                 RBD_IMAGE_ENCRYPTION_FORMAT_LUKS2;
1453         spec_info->u.rbd.data->has_encryption_format = true;
1454     } else {
1455         spec_info->u.rbd.data->has_encryption_format = false;
1456     }
1457 
1458     return spec_info;
1459 }
1460 
1461 /*
1462  * rbd_diff_iterate2 allows to interrupt the exection by returning a negative
1463  * value in the callback routine. Choose a value that does not conflict with
1464  * an existing exitcode and return it if we want to prematurely stop the
1465  * execution because we detected a change in the allocation status.
1466  */
1467 #define QEMU_RBD_EXIT_DIFF_ITERATE2 -9000
1468 
1469 static int qemu_rbd_diff_iterate_cb(uint64_t offs, size_t len,
1470                                     int exists, void *opaque)
1471 {
1472     RBDDiffIterateReq *req = opaque;
1473 
1474     assert(req->offs + req->bytes <= offs);
1475 
1476     /* treat a hole like an unallocated area and bail out */
1477     if (!exists) {
1478         return 0;
1479     }
1480 
1481     if (!req->exists && offs > req->offs) {
1482         /*
1483          * we started in an unallocated area and hit the first allocated
1484          * block. req->bytes must be set to the length of the unallocated area
1485          * before the allocated area. stop further processing.
1486          */
1487         req->bytes = offs - req->offs;
1488         return QEMU_RBD_EXIT_DIFF_ITERATE2;
1489     }
1490 
1491     if (req->exists && offs > req->offs + req->bytes) {
1492         /*
1493          * we started in an allocated area and jumped over an unallocated area,
1494          * req->bytes contains the length of the allocated area before the
1495          * unallocated area. stop further processing.
1496          */
1497         return QEMU_RBD_EXIT_DIFF_ITERATE2;
1498     }
1499 
1500     req->bytes += len;
1501     req->exists = true;
1502 
1503     return 0;
1504 }
1505 
1506 static int coroutine_fn qemu_rbd_co_block_status(BlockDriverState *bs,
1507                                                  bool want_zero, int64_t offset,
1508                                                  int64_t bytes, int64_t *pnum,
1509                                                  int64_t *map,
1510                                                  BlockDriverState **file)
1511 {
1512     BDRVRBDState *s = bs->opaque;
1513     int status, r;
1514     RBDDiffIterateReq req = { .offs = offset };
1515     uint64_t features, flags;
1516     uint64_t head = 0;
1517 
1518     assert(offset + bytes <= s->image_size);
1519 
1520     /* default to all sectors allocated */
1521     status = BDRV_BLOCK_DATA | BDRV_BLOCK_OFFSET_VALID;
1522     *map = offset;
1523     *file = bs;
1524     *pnum = bytes;
1525 
1526     /* check if RBD image supports fast-diff */
1527     r = rbd_get_features(s->image, &features);
1528     if (r < 0) {
1529         return status;
1530     }
1531     if (!(features & RBD_FEATURE_FAST_DIFF)) {
1532         return status;
1533     }
1534 
1535     /* check if RBD fast-diff result is valid */
1536     r = rbd_get_flags(s->image, &flags);
1537     if (r < 0) {
1538         return status;
1539     }
1540     if (flags & RBD_FLAG_FAST_DIFF_INVALID) {
1541         return status;
1542     }
1543 
1544 #if LIBRBD_VERSION_CODE < LIBRBD_VERSION(1, 17, 0)
1545     /*
1546      * librbd had a bug until early 2022 that affected all versions of ceph that
1547      * supported fast-diff. This bug results in reporting of incorrect offsets
1548      * if the offset parameter to rbd_diff_iterate2 is not object aligned.
1549      * Work around this bug by rounding down the offset to object boundaries.
1550      * This is OK because we call rbd_diff_iterate2 with whole_object = true.
1551      * However, this workaround only works for non cloned images with default
1552      * striping.
1553      *
1554      * See: https://tracker.ceph.com/issues/53784
1555      */
1556 
1557     /* check if RBD image has non-default striping enabled */
1558     if (features & RBD_FEATURE_STRIPINGV2) {
1559         return status;
1560     }
1561 
1562 #pragma GCC diagnostic push
1563 #pragma GCC diagnostic ignored "-Wdeprecated-declarations"
1564     /*
1565      * check if RBD image is a clone (= has a parent).
1566      *
1567      * rbd_get_parent_info is deprecated from Nautilus onwards, but the
1568      * replacement rbd_get_parent is not present in Luminous and Mimic.
1569      */
1570     if (rbd_get_parent_info(s->image, NULL, 0, NULL, 0, NULL, 0) != -ENOENT) {
1571         return status;
1572     }
1573 #pragma GCC diagnostic pop
1574 
1575     head = req.offs & (s->object_size - 1);
1576     req.offs -= head;
1577     bytes += head;
1578 #endif
1579 
1580     r = rbd_diff_iterate2(s->image, NULL, req.offs, bytes, true, true,
1581                           qemu_rbd_diff_iterate_cb, &req);
1582     if (r < 0 && r != QEMU_RBD_EXIT_DIFF_ITERATE2) {
1583         return status;
1584     }
1585     assert(req.bytes <= bytes);
1586     if (!req.exists) {
1587         if (r == 0) {
1588             /*
1589              * rbd_diff_iterate2 does not invoke callbacks for unallocated
1590              * areas. This here catches the case where no callback was
1591              * invoked at all (req.bytes == 0).
1592              */
1593             assert(req.bytes == 0);
1594             req.bytes = bytes;
1595         }
1596         status = BDRV_BLOCK_ZERO | BDRV_BLOCK_OFFSET_VALID;
1597     }
1598 
1599     assert(req.bytes > head);
1600     *pnum = req.bytes - head;
1601     return status;
1602 }
1603 
1604 static int64_t coroutine_fn qemu_rbd_co_getlength(BlockDriverState *bs)
1605 {
1606     BDRVRBDState *s = bs->opaque;
1607     int r;
1608 
1609     r = rbd_get_size(s->image, &s->image_size);
1610     if (r < 0) {
1611         return r;
1612     }
1613 
1614     return s->image_size;
1615 }
1616 
1617 static int coroutine_fn qemu_rbd_co_truncate(BlockDriverState *bs,
1618                                              int64_t offset,
1619                                              bool exact,
1620                                              PreallocMode prealloc,
1621                                              BdrvRequestFlags flags,
1622                                              Error **errp)
1623 {
1624     int r;
1625 
1626     if (prealloc != PREALLOC_MODE_OFF) {
1627         error_setg(errp, "Unsupported preallocation mode '%s'",
1628                    PreallocMode_str(prealloc));
1629         return -ENOTSUP;
1630     }
1631 
1632     r = qemu_rbd_resize(bs, offset);
1633     if (r < 0) {
1634         error_setg_errno(errp, -r, "Failed to resize file");
1635         return r;
1636     }
1637 
1638     return 0;
1639 }
1640 
1641 static int qemu_rbd_snap_create(BlockDriverState *bs,
1642                                 QEMUSnapshotInfo *sn_info)
1643 {
1644     BDRVRBDState *s = bs->opaque;
1645     int r;
1646 
1647     if (sn_info->name[0] == '\0') {
1648         return -EINVAL; /* we need a name for rbd snapshots */
1649     }
1650 
1651     /*
1652      * rbd snapshots are using the name as the user controlled unique identifier
1653      * we can't use the rbd snapid for that purpose, as it can't be set
1654      */
1655     if (sn_info->id_str[0] != '\0' &&
1656         strcmp(sn_info->id_str, sn_info->name) != 0) {
1657         return -EINVAL;
1658     }
1659 
1660     if (strlen(sn_info->name) >= sizeof(sn_info->id_str)) {
1661         return -ERANGE;
1662     }
1663 
1664     r = rbd_snap_create(s->image, sn_info->name);
1665     if (r < 0) {
1666         error_report("failed to create snap: %s", strerror(-r));
1667         return r;
1668     }
1669 
1670     return 0;
1671 }
1672 
1673 static int qemu_rbd_snap_remove(BlockDriverState *bs,
1674                                 const char *snapshot_id,
1675                                 const char *snapshot_name,
1676                                 Error **errp)
1677 {
1678     BDRVRBDState *s = bs->opaque;
1679     int r;
1680 
1681     if (!snapshot_name) {
1682         error_setg(errp, "rbd need a valid snapshot name");
1683         return -EINVAL;
1684     }
1685 
1686     /* If snapshot_id is specified, it must be equal to name, see
1687        qemu_rbd_snap_list() */
1688     if (snapshot_id && strcmp(snapshot_id, snapshot_name)) {
1689         error_setg(errp,
1690                    "rbd do not support snapshot id, it should be NULL or "
1691                    "equal to snapshot name");
1692         return -EINVAL;
1693     }
1694 
1695     r = rbd_snap_remove(s->image, snapshot_name);
1696     if (r < 0) {
1697         error_setg_errno(errp, -r, "Failed to remove the snapshot");
1698     }
1699     return r;
1700 }
1701 
1702 static int qemu_rbd_snap_rollback(BlockDriverState *bs,
1703                                   const char *snapshot_name)
1704 {
1705     BDRVRBDState *s = bs->opaque;
1706 
1707     return rbd_snap_rollback(s->image, snapshot_name);
1708 }
1709 
1710 static int qemu_rbd_snap_list(BlockDriverState *bs,
1711                               QEMUSnapshotInfo **psn_tab)
1712 {
1713     BDRVRBDState *s = bs->opaque;
1714     QEMUSnapshotInfo *sn_info, *sn_tab = NULL;
1715     int i, snap_count;
1716     rbd_snap_info_t *snaps;
1717     int max_snaps = RBD_MAX_SNAPS;
1718 
1719     do {
1720         snaps = g_new(rbd_snap_info_t, max_snaps);
1721         snap_count = rbd_snap_list(s->image, snaps, &max_snaps);
1722         if (snap_count <= 0) {
1723             g_free(snaps);
1724         }
1725     } while (snap_count == -ERANGE);
1726 
1727     if (snap_count <= 0) {
1728         goto done;
1729     }
1730 
1731     sn_tab = g_new0(QEMUSnapshotInfo, snap_count);
1732 
1733     for (i = 0; i < snap_count; i++) {
1734         const char *snap_name = snaps[i].name;
1735 
1736         sn_info = sn_tab + i;
1737         pstrcpy(sn_info->id_str, sizeof(sn_info->id_str), snap_name);
1738         pstrcpy(sn_info->name, sizeof(sn_info->name), snap_name);
1739 
1740         sn_info->vm_state_size = snaps[i].size;
1741         sn_info->date_sec = 0;
1742         sn_info->date_nsec = 0;
1743         sn_info->vm_clock_nsec = 0;
1744     }
1745     rbd_snap_list_end(snaps);
1746     g_free(snaps);
1747 
1748  done:
1749     *psn_tab = sn_tab;
1750     return snap_count;
1751 }
1752 
1753 static void coroutine_fn qemu_rbd_co_invalidate_cache(BlockDriverState *bs,
1754                                                       Error **errp)
1755 {
1756     BDRVRBDState *s = bs->opaque;
1757     int r = rbd_invalidate_cache(s->image);
1758     if (r < 0) {
1759         error_setg_errno(errp, -r, "Failed to invalidate the cache");
1760     }
1761 }
1762 
1763 static QemuOptsList qemu_rbd_create_opts = {
1764     .name = "rbd-create-opts",
1765     .head = QTAILQ_HEAD_INITIALIZER(qemu_rbd_create_opts.head),
1766     .desc = {
1767         {
1768             .name = BLOCK_OPT_SIZE,
1769             .type = QEMU_OPT_SIZE,
1770             .help = "Virtual disk size"
1771         },
1772         {
1773             .name = BLOCK_OPT_CLUSTER_SIZE,
1774             .type = QEMU_OPT_SIZE,
1775             .help = "RBD object size"
1776         },
1777         {
1778             .name = "password-secret",
1779             .type = QEMU_OPT_STRING,
1780             .help = "ID of secret providing the password",
1781         },
1782         {
1783             .name = "encrypt.format",
1784             .type = QEMU_OPT_STRING,
1785             .help = "Encrypt the image, format choices: 'luks', 'luks2'",
1786         },
1787         {
1788             .name = "encrypt.cipher-alg",
1789             .type = QEMU_OPT_STRING,
1790             .help = "Name of encryption cipher algorithm"
1791                     " (allowed values: aes-128, aes-256)",
1792         },
1793         {
1794             .name = "encrypt.key-secret",
1795             .type = QEMU_OPT_STRING,
1796             .help = "ID of secret providing LUKS passphrase",
1797         },
1798         { /* end of list */ }
1799     }
1800 };
1801 
1802 static const char *const qemu_rbd_strong_runtime_opts[] = {
1803     "pool",
1804     "namespace",
1805     "image",
1806     "conf",
1807     "snapshot",
1808     "user",
1809     "server.",
1810     "password-secret",
1811 
1812     NULL
1813 };
1814 
1815 static BlockDriver bdrv_rbd = {
1816     .format_name            = "rbd",
1817     .instance_size          = sizeof(BDRVRBDState),
1818 
1819     .bdrv_parse_filename    = qemu_rbd_parse_filename,
1820     .bdrv_open              = qemu_rbd_open,
1821     .bdrv_close             = qemu_rbd_close,
1822     .bdrv_reopen_prepare    = qemu_rbd_reopen_prepare,
1823     .bdrv_co_create         = qemu_rbd_co_create,
1824     .bdrv_co_create_opts    = qemu_rbd_co_create_opts,
1825     .bdrv_has_zero_init     = bdrv_has_zero_init_1,
1826     .bdrv_co_get_info       = qemu_rbd_co_get_info,
1827     .bdrv_get_specific_info = qemu_rbd_get_specific_info,
1828     .create_opts            = &qemu_rbd_create_opts,
1829     .bdrv_co_getlength      = qemu_rbd_co_getlength,
1830     .bdrv_co_truncate       = qemu_rbd_co_truncate,
1831     .protocol_name          = "rbd",
1832 
1833     .bdrv_co_preadv         = qemu_rbd_co_preadv,
1834     .bdrv_co_pwritev        = qemu_rbd_co_pwritev,
1835     .bdrv_co_flush_to_disk  = qemu_rbd_co_flush,
1836     .bdrv_co_pdiscard       = qemu_rbd_co_pdiscard,
1837 #ifdef LIBRBD_SUPPORTS_WRITE_ZEROES
1838     .bdrv_co_pwrite_zeroes  = qemu_rbd_co_pwrite_zeroes,
1839 #endif
1840     .bdrv_co_block_status   = qemu_rbd_co_block_status,
1841 
1842     .bdrv_snapshot_create   = qemu_rbd_snap_create,
1843     .bdrv_snapshot_delete   = qemu_rbd_snap_remove,
1844     .bdrv_snapshot_list     = qemu_rbd_snap_list,
1845     .bdrv_snapshot_goto     = qemu_rbd_snap_rollback,
1846     .bdrv_co_invalidate_cache = qemu_rbd_co_invalidate_cache,
1847 
1848     .strong_runtime_opts    = qemu_rbd_strong_runtime_opts,
1849 };
1850 
1851 static void bdrv_rbd_init(void)
1852 {
1853     bdrv_register(&bdrv_rbd);
1854 }
1855 
1856 block_init(bdrv_rbd_init);
1857