xref: /openbmc/qemu/block/rbd.c (revision 2cfb3b6c)
1 /*
2  * QEMU Block driver for RADOS (Ceph)
3  *
4  * Copyright (C) 2010-2011 Christian Brunner <chb@muc.de>,
5  *                         Josh Durgin <josh.durgin@dreamhost.com>
6  *
7  * This work is licensed under the terms of the GNU GPL, version 2.  See
8  * the COPYING file in the top-level directory.
9  *
10  * Contributions after 2012-01-13 are licensed under the terms of the
11  * GNU GPL, version 2 or (at your option) any later version.
12  */
13 
14 #include "qemu/osdep.h"
15 
16 #include <rbd/librbd.h>
17 #include "qapi/error.h"
18 #include "qemu/error-report.h"
19 #include "qemu/module.h"
20 #include "qemu/option.h"
21 #include "block/block-io.h"
22 #include "block/block_int.h"
23 #include "block/qdict.h"
24 #include "crypto/secret.h"
25 #include "qemu/cutils.h"
26 #include "sysemu/replay.h"
27 #include "qapi/qmp/qstring.h"
28 #include "qapi/qmp/qdict.h"
29 #include "qapi/qmp/qjson.h"
30 #include "qapi/qmp/qlist.h"
31 #include "qapi/qobject-input-visitor.h"
32 #include "qapi/qapi-visit-block-core.h"
33 
34 /*
35  * When specifying the image filename use:
36  *
37  * rbd:poolname/devicename[@snapshotname][:option1=value1[:option2=value2...]]
38  *
39  * poolname must be the name of an existing rados pool.
40  *
41  * devicename is the name of the rbd image.
42  *
43  * Each option given is used to configure rados, and may be any valid
44  * Ceph option, "id", or "conf".
45  *
46  * The "id" option indicates what user we should authenticate as to
47  * the Ceph cluster.  If it is excluded we will use the Ceph default
48  * (normally 'admin').
49  *
50  * The "conf" option specifies a Ceph configuration file to read.  If
51  * it is not specified, we will read from the default Ceph locations
52  * (e.g., /etc/ceph/ceph.conf).  To avoid reading _any_ configuration
53  * file, specify conf=/dev/null.
54  *
55  * Configuration values containing :, @, or = can be escaped with a
56  * leading "\".
57  */
58 
59 #define OBJ_MAX_SIZE (1UL << OBJ_DEFAULT_OBJ_ORDER)
60 
61 #define RBD_MAX_SNAPS 100
62 
63 #define RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN 8
64 
65 static const char rbd_luks_header_verification[
66         RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN] = {
67     'L', 'U', 'K', 'S', 0xBA, 0xBE, 0, 1
68 };
69 
70 static const char rbd_luks2_header_verification[
71         RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN] = {
72     'L', 'U', 'K', 'S', 0xBA, 0xBE, 0, 2
73 };
74 
75 typedef enum {
76     RBD_AIO_READ,
77     RBD_AIO_WRITE,
78     RBD_AIO_DISCARD,
79     RBD_AIO_FLUSH,
80     RBD_AIO_WRITE_ZEROES
81 } RBDAIOCmd;
82 
83 typedef struct BDRVRBDState {
84     rados_t cluster;
85     rados_ioctx_t io_ctx;
86     rbd_image_t image;
87     char *image_name;
88     char *snap;
89     char *namespace;
90     uint64_t image_size;
91     uint64_t object_size;
92 } BDRVRBDState;
93 
94 typedef struct RBDTask {
95     BlockDriverState *bs;
96     Coroutine *co;
97     bool complete;
98     int64_t ret;
99 } RBDTask;
100 
101 typedef struct RBDDiffIterateReq {
102     uint64_t offs;
103     uint64_t bytes;
104     bool exists;
105 } RBDDiffIterateReq;
106 
107 static int qemu_rbd_connect(rados_t *cluster, rados_ioctx_t *io_ctx,
108                             BlockdevOptionsRbd *opts, bool cache,
109                             const char *keypairs, const char *secretid,
110                             Error **errp);
111 
112 static char *qemu_rbd_strchr(char *src, char delim)
113 {
114     char *p;
115 
116     for (p = src; *p; ++p) {
117         if (*p == delim) {
118             return p;
119         }
120         if (*p == '\\' && p[1] != '\0') {
121             ++p;
122         }
123     }
124 
125     return NULL;
126 }
127 
128 
129 static char *qemu_rbd_next_tok(char *src, char delim, char **p)
130 {
131     char *end;
132 
133     *p = NULL;
134 
135     end = qemu_rbd_strchr(src, delim);
136     if (end) {
137         *p = end + 1;
138         *end = '\0';
139     }
140     return src;
141 }
142 
143 static void qemu_rbd_unescape(char *src)
144 {
145     char *p;
146 
147     for (p = src; *src; ++src, ++p) {
148         if (*src == '\\' && src[1] != '\0') {
149             src++;
150         }
151         *p = *src;
152     }
153     *p = '\0';
154 }
155 
156 static void qemu_rbd_parse_filename(const char *filename, QDict *options,
157                                     Error **errp)
158 {
159     const char *start;
160     char *p, *buf;
161     QList *keypairs = NULL;
162     char *found_str, *image_name;
163 
164     if (!strstart(filename, "rbd:", &start)) {
165         error_setg(errp, "File name must start with 'rbd:'");
166         return;
167     }
168 
169     buf = g_strdup(start);
170     p = buf;
171 
172     found_str = qemu_rbd_next_tok(p, '/', &p);
173     if (!p) {
174         error_setg(errp, "Pool name is required");
175         goto done;
176     }
177     qemu_rbd_unescape(found_str);
178     qdict_put_str(options, "pool", found_str);
179 
180     if (qemu_rbd_strchr(p, '@')) {
181         image_name = qemu_rbd_next_tok(p, '@', &p);
182 
183         found_str = qemu_rbd_next_tok(p, ':', &p);
184         qemu_rbd_unescape(found_str);
185         qdict_put_str(options, "snapshot", found_str);
186     } else {
187         image_name = qemu_rbd_next_tok(p, ':', &p);
188     }
189     /* Check for namespace in the image_name */
190     if (qemu_rbd_strchr(image_name, '/')) {
191         found_str = qemu_rbd_next_tok(image_name, '/', &image_name);
192         qemu_rbd_unescape(found_str);
193         qdict_put_str(options, "namespace", found_str);
194     } else {
195         qdict_put_str(options, "namespace", "");
196     }
197     qemu_rbd_unescape(image_name);
198     qdict_put_str(options, "image", image_name);
199     if (!p) {
200         goto done;
201     }
202 
203     /* The following are essentially all key/value pairs, and we treat
204      * 'id' and 'conf' a bit special.  Key/value pairs may be in any order. */
205     while (p) {
206         char *name, *value;
207         name = qemu_rbd_next_tok(p, '=', &p);
208         if (!p) {
209             error_setg(errp, "conf option %s has no value", name);
210             break;
211         }
212 
213         qemu_rbd_unescape(name);
214 
215         value = qemu_rbd_next_tok(p, ':', &p);
216         qemu_rbd_unescape(value);
217 
218         if (!strcmp(name, "conf")) {
219             qdict_put_str(options, "conf", value);
220         } else if (!strcmp(name, "id")) {
221             qdict_put_str(options, "user", value);
222         } else {
223             /*
224              * We pass these internally to qemu_rbd_set_keypairs(), so
225              * we can get away with the simpler list of [ "key1",
226              * "value1", "key2", "value2" ] rather than a raw dict
227              * { "key1": "value1", "key2": "value2" } where we can't
228              * guarantee order, or even a more correct but complex
229              * [ { "key1": "value1" }, { "key2": "value2" } ]
230              */
231             if (!keypairs) {
232                 keypairs = qlist_new();
233             }
234             qlist_append_str(keypairs, name);
235             qlist_append_str(keypairs, value);
236         }
237     }
238 
239     if (keypairs) {
240         qdict_put(options, "=keyvalue-pairs",
241                   qstring_from_gstring(qobject_to_json(QOBJECT(keypairs))));
242     }
243 
244 done:
245     g_free(buf);
246     qobject_unref(keypairs);
247     return;
248 }
249 
250 static int qemu_rbd_set_auth(rados_t cluster, BlockdevOptionsRbd *opts,
251                              Error **errp)
252 {
253     char *key, *acr;
254     int r;
255     GString *accu;
256     RbdAuthModeList *auth;
257 
258     if (opts->key_secret) {
259         key = qcrypto_secret_lookup_as_base64(opts->key_secret, errp);
260         if (!key) {
261             return -EIO;
262         }
263         r = rados_conf_set(cluster, "key", key);
264         g_free(key);
265         if (r < 0) {
266             error_setg_errno(errp, -r, "Could not set 'key'");
267             return r;
268         }
269     }
270 
271     if (opts->has_auth_client_required) {
272         accu = g_string_new("");
273         for (auth = opts->auth_client_required; auth; auth = auth->next) {
274             if (accu->str[0]) {
275                 g_string_append_c(accu, ';');
276             }
277             g_string_append(accu, RbdAuthMode_str(auth->value));
278         }
279         acr = g_string_free(accu, FALSE);
280         r = rados_conf_set(cluster, "auth_client_required", acr);
281         g_free(acr);
282         if (r < 0) {
283             error_setg_errno(errp, -r,
284                              "Could not set 'auth_client_required'");
285             return r;
286         }
287     }
288 
289     return 0;
290 }
291 
292 static int qemu_rbd_set_keypairs(rados_t cluster, const char *keypairs_json,
293                                  Error **errp)
294 {
295     QList *keypairs;
296     QString *name;
297     QString *value;
298     const char *key;
299     size_t remaining;
300     int ret = 0;
301 
302     if (!keypairs_json) {
303         return ret;
304     }
305     keypairs = qobject_to(QList,
306                           qobject_from_json(keypairs_json, &error_abort));
307     remaining = qlist_size(keypairs) / 2;
308     assert(remaining);
309 
310     while (remaining--) {
311         name = qobject_to(QString, qlist_pop(keypairs));
312         value = qobject_to(QString, qlist_pop(keypairs));
313         assert(name && value);
314         key = qstring_get_str(name);
315 
316         ret = rados_conf_set(cluster, key, qstring_get_str(value));
317         qobject_unref(value);
318         if (ret < 0) {
319             error_setg_errno(errp, -ret, "invalid conf option %s", key);
320             qobject_unref(name);
321             ret = -EINVAL;
322             break;
323         }
324         qobject_unref(name);
325     }
326 
327     qobject_unref(keypairs);
328     return ret;
329 }
330 
331 #ifdef LIBRBD_SUPPORTS_ENCRYPTION
332 static int qemu_rbd_convert_luks_options(
333         RbdEncryptionOptionsLUKSBase *luks_opts,
334         char **passphrase,
335         size_t *passphrase_len,
336         Error **errp)
337 {
338     return qcrypto_secret_lookup(luks_opts->key_secret, (uint8_t **)passphrase,
339                                  passphrase_len, errp);
340 }
341 
342 static int qemu_rbd_convert_luks_create_options(
343         RbdEncryptionCreateOptionsLUKSBase *luks_opts,
344         rbd_encryption_algorithm_t *alg,
345         char **passphrase,
346         size_t *passphrase_len,
347         Error **errp)
348 {
349     int r = 0;
350 
351     r = qemu_rbd_convert_luks_options(
352             qapi_RbdEncryptionCreateOptionsLUKSBase_base(luks_opts),
353             passphrase, passphrase_len, errp);
354     if (r < 0) {
355         return r;
356     }
357 
358     if (luks_opts->has_cipher_alg) {
359         switch (luks_opts->cipher_alg) {
360             case QCRYPTO_CIPHER_ALG_AES_128: {
361                 *alg = RBD_ENCRYPTION_ALGORITHM_AES128;
362                 break;
363             }
364             case QCRYPTO_CIPHER_ALG_AES_256: {
365                 *alg = RBD_ENCRYPTION_ALGORITHM_AES256;
366                 break;
367             }
368             default: {
369                 r = -ENOTSUP;
370                 error_setg_errno(errp, -r, "unknown encryption algorithm: %u",
371                                  luks_opts->cipher_alg);
372                 return r;
373             }
374         }
375     } else {
376         /* default alg */
377         *alg = RBD_ENCRYPTION_ALGORITHM_AES256;
378     }
379 
380     return 0;
381 }
382 
383 static int qemu_rbd_encryption_format(rbd_image_t image,
384                                       RbdEncryptionCreateOptions *encrypt,
385                                       Error **errp)
386 {
387     int r = 0;
388     g_autofree char *passphrase = NULL;
389     size_t passphrase_len;
390     rbd_encryption_format_t format;
391     rbd_encryption_options_t opts;
392     rbd_encryption_luks1_format_options_t luks_opts;
393     rbd_encryption_luks2_format_options_t luks2_opts;
394     size_t opts_size;
395     uint64_t raw_size, effective_size;
396 
397     r = rbd_get_size(image, &raw_size);
398     if (r < 0) {
399         error_setg_errno(errp, -r, "cannot get raw image size");
400         return r;
401     }
402 
403     switch (encrypt->format) {
404         case RBD_IMAGE_ENCRYPTION_FORMAT_LUKS: {
405             memset(&luks_opts, 0, sizeof(luks_opts));
406             format = RBD_ENCRYPTION_FORMAT_LUKS1;
407             opts = &luks_opts;
408             opts_size = sizeof(luks_opts);
409             r = qemu_rbd_convert_luks_create_options(
410                     qapi_RbdEncryptionCreateOptionsLUKS_base(&encrypt->u.luks),
411                     &luks_opts.alg, &passphrase, &passphrase_len, errp);
412             if (r < 0) {
413                 return r;
414             }
415             luks_opts.passphrase = passphrase;
416             luks_opts.passphrase_size = passphrase_len;
417             break;
418         }
419         case RBD_IMAGE_ENCRYPTION_FORMAT_LUKS2: {
420             memset(&luks2_opts, 0, sizeof(luks2_opts));
421             format = RBD_ENCRYPTION_FORMAT_LUKS2;
422             opts = &luks2_opts;
423             opts_size = sizeof(luks2_opts);
424             r = qemu_rbd_convert_luks_create_options(
425                     qapi_RbdEncryptionCreateOptionsLUKS2_base(
426                             &encrypt->u.luks2),
427                     &luks2_opts.alg, &passphrase, &passphrase_len, errp);
428             if (r < 0) {
429                 return r;
430             }
431             luks2_opts.passphrase = passphrase;
432             luks2_opts.passphrase_size = passphrase_len;
433             break;
434         }
435         default: {
436             r = -ENOTSUP;
437             error_setg_errno(
438                     errp, -r, "unknown image encryption format: %u",
439                     encrypt->format);
440             return r;
441         }
442     }
443 
444     r = rbd_encryption_format(image, format, opts, opts_size);
445     if (r < 0) {
446         error_setg_errno(errp, -r, "encryption format fail");
447         return r;
448     }
449 
450     r = rbd_get_size(image, &effective_size);
451     if (r < 0) {
452         error_setg_errno(errp, -r, "cannot get effective image size");
453         return r;
454     }
455 
456     r = rbd_resize(image, raw_size + (raw_size - effective_size));
457     if (r < 0) {
458         error_setg_errno(errp, -r, "cannot resize image after format");
459         return r;
460     }
461 
462     return 0;
463 }
464 
465 static int qemu_rbd_encryption_load(rbd_image_t image,
466                                     RbdEncryptionOptions *encrypt,
467                                     Error **errp)
468 {
469     int r = 0;
470     g_autofree char *passphrase = NULL;
471     size_t passphrase_len;
472     rbd_encryption_luks1_format_options_t luks_opts;
473     rbd_encryption_luks2_format_options_t luks2_opts;
474     rbd_encryption_format_t format;
475     rbd_encryption_options_t opts;
476     size_t opts_size;
477 
478     switch (encrypt->format) {
479         case RBD_IMAGE_ENCRYPTION_FORMAT_LUKS: {
480             memset(&luks_opts, 0, sizeof(luks_opts));
481             format = RBD_ENCRYPTION_FORMAT_LUKS1;
482             opts = &luks_opts;
483             opts_size = sizeof(luks_opts);
484             r = qemu_rbd_convert_luks_options(
485                     qapi_RbdEncryptionOptionsLUKS_base(&encrypt->u.luks),
486                     &passphrase, &passphrase_len, errp);
487             if (r < 0) {
488                 return r;
489             }
490             luks_opts.passphrase = passphrase;
491             luks_opts.passphrase_size = passphrase_len;
492             break;
493         }
494         case RBD_IMAGE_ENCRYPTION_FORMAT_LUKS2: {
495             memset(&luks2_opts, 0, sizeof(luks2_opts));
496             format = RBD_ENCRYPTION_FORMAT_LUKS2;
497             opts = &luks2_opts;
498             opts_size = sizeof(luks2_opts);
499             r = qemu_rbd_convert_luks_options(
500                     qapi_RbdEncryptionOptionsLUKS2_base(&encrypt->u.luks2),
501                     &passphrase, &passphrase_len, errp);
502             if (r < 0) {
503                 return r;
504             }
505             luks2_opts.passphrase = passphrase;
506             luks2_opts.passphrase_size = passphrase_len;
507             break;
508         }
509         default: {
510             r = -ENOTSUP;
511             error_setg_errno(
512                     errp, -r, "unknown image encryption format: %u",
513                     encrypt->format);
514             return r;
515         }
516     }
517 
518     r = rbd_encryption_load(image, format, opts, opts_size);
519     if (r < 0) {
520         error_setg_errno(errp, -r, "encryption load fail");
521         return r;
522     }
523 
524     return 0;
525 }
526 #endif
527 
528 /* FIXME Deprecate and remove keypairs or make it available in QMP. */
529 static int qemu_rbd_do_create(BlockdevCreateOptions *options,
530                               const char *keypairs, const char *password_secret,
531                               Error **errp)
532 {
533     BlockdevCreateOptionsRbd *opts = &options->u.rbd;
534     rados_t cluster;
535     rados_ioctx_t io_ctx;
536     int obj_order = 0;
537     int ret;
538 
539     assert(options->driver == BLOCKDEV_DRIVER_RBD);
540     if (opts->location->snapshot) {
541         error_setg(errp, "Can't use snapshot name for image creation");
542         return -EINVAL;
543     }
544 
545 #ifndef LIBRBD_SUPPORTS_ENCRYPTION
546     if (opts->encrypt) {
547         error_setg(errp, "RBD library does not support image encryption");
548         return -ENOTSUP;
549     }
550 #endif
551 
552     if (opts->has_cluster_size) {
553         int64_t objsize = opts->cluster_size;
554         if ((objsize - 1) & objsize) {    /* not a power of 2? */
555             error_setg(errp, "obj size needs to be power of 2");
556             return -EINVAL;
557         }
558         if (objsize < 4096) {
559             error_setg(errp, "obj size too small");
560             return -EINVAL;
561         }
562         obj_order = ctz32(objsize);
563     }
564 
565     ret = qemu_rbd_connect(&cluster, &io_ctx, opts->location, false, keypairs,
566                            password_secret, errp);
567     if (ret < 0) {
568         return ret;
569     }
570 
571     ret = rbd_create(io_ctx, opts->location->image, opts->size, &obj_order);
572     if (ret < 0) {
573         error_setg_errno(errp, -ret, "error rbd create");
574         goto out;
575     }
576 
577 #ifdef LIBRBD_SUPPORTS_ENCRYPTION
578     if (opts->encrypt) {
579         rbd_image_t image;
580 
581         ret = rbd_open(io_ctx, opts->location->image, &image, NULL);
582         if (ret < 0) {
583             error_setg_errno(errp, -ret,
584                              "error opening image '%s' for encryption format",
585                              opts->location->image);
586             goto out;
587         }
588 
589         ret = qemu_rbd_encryption_format(image, opts->encrypt, errp);
590         rbd_close(image);
591         if (ret < 0) {
592             /* encryption format fail, try removing the image */
593             rbd_remove(io_ctx, opts->location->image);
594             goto out;
595         }
596     }
597 #endif
598 
599     ret = 0;
600 out:
601     rados_ioctx_destroy(io_ctx);
602     rados_shutdown(cluster);
603     return ret;
604 }
605 
606 static int qemu_rbd_co_create(BlockdevCreateOptions *options, Error **errp)
607 {
608     return qemu_rbd_do_create(options, NULL, NULL, errp);
609 }
610 
611 static int qemu_rbd_extract_encryption_create_options(
612         QemuOpts *opts,
613         RbdEncryptionCreateOptions **spec,
614         Error **errp)
615 {
616     QDict *opts_qdict;
617     QDict *encrypt_qdict;
618     Visitor *v;
619     int ret = 0;
620 
621     opts_qdict = qemu_opts_to_qdict(opts, NULL);
622     qdict_extract_subqdict(opts_qdict, &encrypt_qdict, "encrypt.");
623     qobject_unref(opts_qdict);
624     if (!qdict_size(encrypt_qdict)) {
625         *spec = NULL;
626         goto exit;
627     }
628 
629     /* Convert options into a QAPI object */
630     v = qobject_input_visitor_new_flat_confused(encrypt_qdict, errp);
631     if (!v) {
632         ret = -EINVAL;
633         goto exit;
634     }
635 
636     visit_type_RbdEncryptionCreateOptions(v, NULL, spec, errp);
637     visit_free(v);
638     if (!*spec) {
639         ret = -EINVAL;
640         goto exit;
641     }
642 
643 exit:
644     qobject_unref(encrypt_qdict);
645     return ret;
646 }
647 
648 static int coroutine_fn qemu_rbd_co_create_opts(BlockDriver *drv,
649                                                 const char *filename,
650                                                 QemuOpts *opts,
651                                                 Error **errp)
652 {
653     BlockdevCreateOptions *create_options;
654     BlockdevCreateOptionsRbd *rbd_opts;
655     BlockdevOptionsRbd *loc;
656     RbdEncryptionCreateOptions *encrypt = NULL;
657     Error *local_err = NULL;
658     const char *keypairs, *password_secret;
659     QDict *options = NULL;
660     int ret = 0;
661 
662     create_options = g_new0(BlockdevCreateOptions, 1);
663     create_options->driver = BLOCKDEV_DRIVER_RBD;
664     rbd_opts = &create_options->u.rbd;
665 
666     rbd_opts->location = g_new0(BlockdevOptionsRbd, 1);
667 
668     password_secret = qemu_opt_get(opts, "password-secret");
669 
670     /* Read out options */
671     rbd_opts->size = ROUND_UP(qemu_opt_get_size_del(opts, BLOCK_OPT_SIZE, 0),
672                               BDRV_SECTOR_SIZE);
673     rbd_opts->cluster_size = qemu_opt_get_size_del(opts,
674                                                    BLOCK_OPT_CLUSTER_SIZE, 0);
675     rbd_opts->has_cluster_size = (rbd_opts->cluster_size != 0);
676 
677     options = qdict_new();
678     qemu_rbd_parse_filename(filename, options, &local_err);
679     if (local_err) {
680         ret = -EINVAL;
681         error_propagate(errp, local_err);
682         goto exit;
683     }
684 
685     ret = qemu_rbd_extract_encryption_create_options(opts, &encrypt, errp);
686     if (ret < 0) {
687         goto exit;
688     }
689     rbd_opts->encrypt     = encrypt;
690 
691     /*
692      * Caution: while qdict_get_try_str() is fine, getting non-string
693      * types would require more care.  When @options come from -blockdev
694      * or blockdev_add, its members are typed according to the QAPI
695      * schema, but when they come from -drive, they're all QString.
696      */
697     loc = rbd_opts->location;
698     loc->pool        = g_strdup(qdict_get_try_str(options, "pool"));
699     loc->conf        = g_strdup(qdict_get_try_str(options, "conf"));
700     loc->user        = g_strdup(qdict_get_try_str(options, "user"));
701     loc->q_namespace = g_strdup(qdict_get_try_str(options, "namespace"));
702     loc->image       = g_strdup(qdict_get_try_str(options, "image"));
703     keypairs         = qdict_get_try_str(options, "=keyvalue-pairs");
704 
705     ret = qemu_rbd_do_create(create_options, keypairs, password_secret, errp);
706     if (ret < 0) {
707         goto exit;
708     }
709 
710 exit:
711     qobject_unref(options);
712     qapi_free_BlockdevCreateOptions(create_options);
713     return ret;
714 }
715 
716 static char *qemu_rbd_mon_host(BlockdevOptionsRbd *opts, Error **errp)
717 {
718     const char **vals;
719     const char *host, *port;
720     char *rados_str;
721     InetSocketAddressBaseList *p;
722     int i, cnt;
723 
724     if (!opts->has_server) {
725         return NULL;
726     }
727 
728     for (cnt = 0, p = opts->server; p; p = p->next) {
729         cnt++;
730     }
731 
732     vals = g_new(const char *, cnt + 1);
733 
734     for (i = 0, p = opts->server; p; p = p->next, i++) {
735         host = p->value->host;
736         port = p->value->port;
737 
738         if (strchr(host, ':')) {
739             vals[i] = g_strdup_printf("[%s]:%s", host, port);
740         } else {
741             vals[i] = g_strdup_printf("%s:%s", host, port);
742         }
743     }
744     vals[i] = NULL;
745 
746     rados_str = i ? g_strjoinv(";", (char **)vals) : NULL;
747     g_strfreev((char **)vals);
748     return rados_str;
749 }
750 
751 static int qemu_rbd_connect(rados_t *cluster, rados_ioctx_t *io_ctx,
752                             BlockdevOptionsRbd *opts, bool cache,
753                             const char *keypairs, const char *secretid,
754                             Error **errp)
755 {
756     char *mon_host = NULL;
757     Error *local_err = NULL;
758     int r;
759 
760     if (secretid) {
761         if (opts->key_secret) {
762             error_setg(errp,
763                        "Legacy 'password-secret' clashes with 'key-secret'");
764             return -EINVAL;
765         }
766         opts->key_secret = g_strdup(secretid);
767     }
768 
769     mon_host = qemu_rbd_mon_host(opts, &local_err);
770     if (local_err) {
771         error_propagate(errp, local_err);
772         r = -EINVAL;
773         goto out;
774     }
775 
776     r = rados_create(cluster, opts->user);
777     if (r < 0) {
778         error_setg_errno(errp, -r, "error initializing");
779         goto out;
780     }
781 
782     /* try default location when conf=NULL, but ignore failure */
783     r = rados_conf_read_file(*cluster, opts->conf);
784     if (opts->conf && r < 0) {
785         error_setg_errno(errp, -r, "error reading conf file %s", opts->conf);
786         goto failed_shutdown;
787     }
788 
789     r = qemu_rbd_set_keypairs(*cluster, keypairs, errp);
790     if (r < 0) {
791         goto failed_shutdown;
792     }
793 
794     if (mon_host) {
795         r = rados_conf_set(*cluster, "mon_host", mon_host);
796         if (r < 0) {
797             goto failed_shutdown;
798         }
799     }
800 
801     r = qemu_rbd_set_auth(*cluster, opts, errp);
802     if (r < 0) {
803         goto failed_shutdown;
804     }
805 
806     /*
807      * Fallback to more conservative semantics if setting cache
808      * options fails. Ignore errors from setting rbd_cache because the
809      * only possible error is that the option does not exist, and
810      * librbd defaults to no caching. If write through caching cannot
811      * be set up, fall back to no caching.
812      */
813     if (cache) {
814         rados_conf_set(*cluster, "rbd_cache", "true");
815     } else {
816         rados_conf_set(*cluster, "rbd_cache", "false");
817     }
818 
819     r = rados_connect(*cluster);
820     if (r < 0) {
821         error_setg_errno(errp, -r, "error connecting");
822         goto failed_shutdown;
823     }
824 
825     r = rados_ioctx_create(*cluster, opts->pool, io_ctx);
826     if (r < 0) {
827         error_setg_errno(errp, -r, "error opening pool %s", opts->pool);
828         goto failed_shutdown;
829     }
830 
831 #ifdef HAVE_RBD_NAMESPACE_EXISTS
832     if (opts->q_namespace && strlen(opts->q_namespace) > 0) {
833         bool exists;
834 
835         r = rbd_namespace_exists(*io_ctx, opts->q_namespace, &exists);
836         if (r < 0) {
837             error_setg_errno(errp, -r, "error checking namespace");
838             goto failed_ioctx_destroy;
839         }
840 
841         if (!exists) {
842             error_setg(errp, "namespace '%s' does not exist",
843                        opts->q_namespace);
844             r = -ENOENT;
845             goto failed_ioctx_destroy;
846         }
847     }
848 #endif
849 
850     /*
851      * Set the namespace after opening the io context on the pool,
852      * if nspace == NULL or if nspace == "", it is just as we did nothing
853      */
854     rados_ioctx_set_namespace(*io_ctx, opts->q_namespace);
855 
856     r = 0;
857     goto out;
858 
859 #ifdef HAVE_RBD_NAMESPACE_EXISTS
860 failed_ioctx_destroy:
861     rados_ioctx_destroy(*io_ctx);
862 #endif
863 failed_shutdown:
864     rados_shutdown(*cluster);
865 out:
866     g_free(mon_host);
867     return r;
868 }
869 
870 static int qemu_rbd_convert_options(QDict *options, BlockdevOptionsRbd **opts,
871                                     Error **errp)
872 {
873     Visitor *v;
874 
875     /* Convert the remaining options into a QAPI object */
876     v = qobject_input_visitor_new_flat_confused(options, errp);
877     if (!v) {
878         return -EINVAL;
879     }
880 
881     visit_type_BlockdevOptionsRbd(v, NULL, opts, errp);
882     visit_free(v);
883     if (!opts) {
884         return -EINVAL;
885     }
886 
887     return 0;
888 }
889 
890 static int qemu_rbd_attempt_legacy_options(QDict *options,
891                                            BlockdevOptionsRbd **opts,
892                                            char **keypairs)
893 {
894     char *filename;
895     int r;
896 
897     filename = g_strdup(qdict_get_try_str(options, "filename"));
898     if (!filename) {
899         return -EINVAL;
900     }
901     qdict_del(options, "filename");
902 
903     qemu_rbd_parse_filename(filename, options, NULL);
904 
905     /* keypairs freed by caller */
906     *keypairs = g_strdup(qdict_get_try_str(options, "=keyvalue-pairs"));
907     if (*keypairs) {
908         qdict_del(options, "=keyvalue-pairs");
909     }
910 
911     r = qemu_rbd_convert_options(options, opts, NULL);
912 
913     g_free(filename);
914     return r;
915 }
916 
917 static int qemu_rbd_open(BlockDriverState *bs, QDict *options, int flags,
918                          Error **errp)
919 {
920     BDRVRBDState *s = bs->opaque;
921     BlockdevOptionsRbd *opts = NULL;
922     const QDictEntry *e;
923     Error *local_err = NULL;
924     char *keypairs, *secretid;
925     rbd_image_info_t info;
926     int r;
927 
928     keypairs = g_strdup(qdict_get_try_str(options, "=keyvalue-pairs"));
929     if (keypairs) {
930         qdict_del(options, "=keyvalue-pairs");
931     }
932 
933     secretid = g_strdup(qdict_get_try_str(options, "password-secret"));
934     if (secretid) {
935         qdict_del(options, "password-secret");
936     }
937 
938     r = qemu_rbd_convert_options(options, &opts, &local_err);
939     if (local_err) {
940         /* If keypairs are present, that means some options are present in
941          * the modern option format.  Don't attempt to parse legacy option
942          * formats, as we won't support mixed usage. */
943         if (keypairs) {
944             error_propagate(errp, local_err);
945             goto out;
946         }
947 
948         /* If the initial attempt to convert and process the options failed,
949          * we may be attempting to open an image file that has the rbd options
950          * specified in the older format consisting of all key/value pairs
951          * encoded in the filename.  Go ahead and attempt to parse the
952          * filename, and see if we can pull out the required options. */
953         r = qemu_rbd_attempt_legacy_options(options, &opts, &keypairs);
954         if (r < 0) {
955             /* Propagate the original error, not the legacy parsing fallback
956              * error, as the latter was just a best-effort attempt. */
957             error_propagate(errp, local_err);
958             goto out;
959         }
960         /* Take care whenever deciding to actually deprecate; once this ability
961          * is removed, we will not be able to open any images with legacy-styled
962          * backing image strings. */
963         warn_report("RBD options encoded in the filename as keyvalue pairs "
964                     "is deprecated");
965     }
966 
967     /* Remove the processed options from the QDict (the visitor processes
968      * _all_ options in the QDict) */
969     while ((e = qdict_first(options))) {
970         qdict_del(options, e->key);
971     }
972 
973     r = qemu_rbd_connect(&s->cluster, &s->io_ctx, opts,
974                          !(flags & BDRV_O_NOCACHE), keypairs, secretid, errp);
975     if (r < 0) {
976         goto out;
977     }
978 
979     s->snap = g_strdup(opts->snapshot);
980     s->image_name = g_strdup(opts->image);
981 
982     /* rbd_open is always r/w */
983     r = rbd_open(s->io_ctx, s->image_name, &s->image, s->snap);
984     if (r < 0) {
985         error_setg_errno(errp, -r, "error reading header from %s",
986                          s->image_name);
987         goto failed_open;
988     }
989 
990     if (opts->encrypt) {
991 #ifdef LIBRBD_SUPPORTS_ENCRYPTION
992         r = qemu_rbd_encryption_load(s->image, opts->encrypt, errp);
993         if (r < 0) {
994             goto failed_post_open;
995         }
996 #else
997         r = -ENOTSUP;
998         error_setg(errp, "RBD library does not support image encryption");
999         goto failed_post_open;
1000 #endif
1001     }
1002 
1003     r = rbd_stat(s->image, &info, sizeof(info));
1004     if (r < 0) {
1005         error_setg_errno(errp, -r, "error getting image info from %s",
1006                          s->image_name);
1007         goto failed_post_open;
1008     }
1009     s->image_size = info.size;
1010     s->object_size = info.obj_size;
1011 
1012     /* If we are using an rbd snapshot, we must be r/o, otherwise
1013      * leave as-is */
1014     if (s->snap != NULL) {
1015         r = bdrv_apply_auto_read_only(bs, "rbd snapshots are read-only", errp);
1016         if (r < 0) {
1017             goto failed_post_open;
1018         }
1019     }
1020 
1021 #ifdef LIBRBD_SUPPORTS_WRITE_ZEROES
1022     bs->supported_zero_flags = BDRV_REQ_MAY_UNMAP | BDRV_REQ_NO_FALLBACK;
1023 #endif
1024 
1025     /* When extending regular files, we get zeros from the OS */
1026     bs->supported_truncate_flags = BDRV_REQ_ZERO_WRITE;
1027 
1028     r = 0;
1029     goto out;
1030 
1031 failed_post_open:
1032     rbd_close(s->image);
1033 failed_open:
1034     rados_ioctx_destroy(s->io_ctx);
1035     g_free(s->snap);
1036     g_free(s->image_name);
1037     rados_shutdown(s->cluster);
1038 out:
1039     qapi_free_BlockdevOptionsRbd(opts);
1040     g_free(keypairs);
1041     g_free(secretid);
1042     return r;
1043 }
1044 
1045 
1046 /* Since RBD is currently always opened R/W via the API,
1047  * we just need to check if we are using a snapshot or not, in
1048  * order to determine if we will allow it to be R/W */
1049 static int qemu_rbd_reopen_prepare(BDRVReopenState *state,
1050                                    BlockReopenQueue *queue, Error **errp)
1051 {
1052     BDRVRBDState *s = state->bs->opaque;
1053     int ret = 0;
1054 
1055     if (s->snap && state->flags & BDRV_O_RDWR) {
1056         error_setg(errp,
1057                    "Cannot change node '%s' to r/w when using RBD snapshot",
1058                    bdrv_get_device_or_node_name(state->bs));
1059         ret = -EINVAL;
1060     }
1061 
1062     return ret;
1063 }
1064 
1065 static void qemu_rbd_close(BlockDriverState *bs)
1066 {
1067     BDRVRBDState *s = bs->opaque;
1068 
1069     rbd_close(s->image);
1070     rados_ioctx_destroy(s->io_ctx);
1071     g_free(s->snap);
1072     g_free(s->image_name);
1073     rados_shutdown(s->cluster);
1074 }
1075 
1076 /* Resize the RBD image and update the 'image_size' with the current size */
1077 static int qemu_rbd_resize(BlockDriverState *bs, uint64_t size)
1078 {
1079     BDRVRBDState *s = bs->opaque;
1080     int r;
1081 
1082     r = rbd_resize(s->image, size);
1083     if (r < 0) {
1084         return r;
1085     }
1086 
1087     s->image_size = size;
1088 
1089     return 0;
1090 }
1091 
1092 static void qemu_rbd_finish_bh(void *opaque)
1093 {
1094     RBDTask *task = opaque;
1095     task->complete = true;
1096     aio_co_wake(task->co);
1097 }
1098 
1099 /*
1100  * This is the completion callback function for all rbd aio calls
1101  * started from qemu_rbd_start_co().
1102  *
1103  * Note: this function is being called from a non qemu thread so
1104  * we need to be careful about what we do here. Generally we only
1105  * schedule a BH, and do the rest of the io completion handling
1106  * from qemu_rbd_finish_bh() which runs in a qemu context.
1107  */
1108 static void qemu_rbd_completion_cb(rbd_completion_t c, RBDTask *task)
1109 {
1110     task->ret = rbd_aio_get_return_value(c);
1111     rbd_aio_release(c);
1112     aio_bh_schedule_oneshot(bdrv_get_aio_context(task->bs),
1113                             qemu_rbd_finish_bh, task);
1114 }
1115 
1116 static int coroutine_fn qemu_rbd_start_co(BlockDriverState *bs,
1117                                           uint64_t offset,
1118                                           uint64_t bytes,
1119                                           QEMUIOVector *qiov,
1120                                           int flags,
1121                                           RBDAIOCmd cmd)
1122 {
1123     BDRVRBDState *s = bs->opaque;
1124     RBDTask task = { .bs = bs, .co = qemu_coroutine_self() };
1125     rbd_completion_t c;
1126     int r;
1127 
1128     assert(!qiov || qiov->size == bytes);
1129 
1130     if (cmd == RBD_AIO_WRITE || cmd == RBD_AIO_WRITE_ZEROES) {
1131         /*
1132          * RBD APIs don't allow us to write more than actual size, so in order
1133          * to support growing images, we resize the image before write
1134          * operations that exceed the current size.
1135          */
1136         if (offset + bytes > s->image_size) {
1137             int r = qemu_rbd_resize(bs, offset + bytes);
1138             if (r < 0) {
1139                 return r;
1140             }
1141         }
1142     }
1143 
1144     r = rbd_aio_create_completion(&task,
1145                                   (rbd_callback_t) qemu_rbd_completion_cb, &c);
1146     if (r < 0) {
1147         return r;
1148     }
1149 
1150     switch (cmd) {
1151     case RBD_AIO_READ:
1152         r = rbd_aio_readv(s->image, qiov->iov, qiov->niov, offset, c);
1153         break;
1154     case RBD_AIO_WRITE:
1155         r = rbd_aio_writev(s->image, qiov->iov, qiov->niov, offset, c);
1156         break;
1157     case RBD_AIO_DISCARD:
1158         r = rbd_aio_discard(s->image, offset, bytes, c);
1159         break;
1160     case RBD_AIO_FLUSH:
1161         r = rbd_aio_flush(s->image, c);
1162         break;
1163 #ifdef LIBRBD_SUPPORTS_WRITE_ZEROES
1164     case RBD_AIO_WRITE_ZEROES: {
1165         int zero_flags = 0;
1166 #ifdef RBD_WRITE_ZEROES_FLAG_THICK_PROVISION
1167         if (!(flags & BDRV_REQ_MAY_UNMAP)) {
1168             zero_flags = RBD_WRITE_ZEROES_FLAG_THICK_PROVISION;
1169         }
1170 #endif
1171         r = rbd_aio_write_zeroes(s->image, offset, bytes, c, zero_flags, 0);
1172         break;
1173     }
1174 #endif
1175     default:
1176         r = -EINVAL;
1177     }
1178 
1179     if (r < 0) {
1180         error_report("rbd request failed early: cmd %d offset %" PRIu64
1181                      " bytes %" PRIu64 " flags %d r %d (%s)", cmd, offset,
1182                      bytes, flags, r, strerror(-r));
1183         rbd_aio_release(c);
1184         return r;
1185     }
1186 
1187     while (!task.complete) {
1188         qemu_coroutine_yield();
1189     }
1190 
1191     if (task.ret < 0) {
1192         error_report("rbd request failed: cmd %d offset %" PRIu64 " bytes %"
1193                      PRIu64 " flags %d task.ret %" PRIi64 " (%s)", cmd, offset,
1194                      bytes, flags, task.ret, strerror(-task.ret));
1195         return task.ret;
1196     }
1197 
1198     /* zero pad short reads */
1199     if (cmd == RBD_AIO_READ && task.ret < qiov->size) {
1200         qemu_iovec_memset(qiov, task.ret, 0, qiov->size - task.ret);
1201     }
1202 
1203     return 0;
1204 }
1205 
1206 static int
1207 coroutine_fn qemu_rbd_co_preadv(BlockDriverState *bs, int64_t offset,
1208                                 int64_t bytes, QEMUIOVector *qiov,
1209                                 BdrvRequestFlags flags)
1210 {
1211     return qemu_rbd_start_co(bs, offset, bytes, qiov, flags, RBD_AIO_READ);
1212 }
1213 
1214 static int
1215 coroutine_fn qemu_rbd_co_pwritev(BlockDriverState *bs, int64_t offset,
1216                                  int64_t bytes, QEMUIOVector *qiov,
1217                                  BdrvRequestFlags flags)
1218 {
1219     return qemu_rbd_start_co(bs, offset, bytes, qiov, flags, RBD_AIO_WRITE);
1220 }
1221 
1222 static int coroutine_fn qemu_rbd_co_flush(BlockDriverState *bs)
1223 {
1224     return qemu_rbd_start_co(bs, 0, 0, NULL, 0, RBD_AIO_FLUSH);
1225 }
1226 
1227 static int coroutine_fn qemu_rbd_co_pdiscard(BlockDriverState *bs,
1228                                              int64_t offset, int64_t bytes)
1229 {
1230     return qemu_rbd_start_co(bs, offset, bytes, NULL, 0, RBD_AIO_DISCARD);
1231 }
1232 
1233 #ifdef LIBRBD_SUPPORTS_WRITE_ZEROES
1234 static int
1235 coroutine_fn qemu_rbd_co_pwrite_zeroes(BlockDriverState *bs, int64_t offset,
1236                                        int64_t bytes, BdrvRequestFlags flags)
1237 {
1238     return qemu_rbd_start_co(bs, offset, bytes, NULL, flags,
1239                              RBD_AIO_WRITE_ZEROES);
1240 }
1241 #endif
1242 
1243 static int coroutine_fn
1244 qemu_rbd_co_get_info(BlockDriverState *bs, BlockDriverInfo *bdi)
1245 {
1246     BDRVRBDState *s = bs->opaque;
1247     bdi->cluster_size = s->object_size;
1248     return 0;
1249 }
1250 
1251 static ImageInfoSpecific *qemu_rbd_get_specific_info(BlockDriverState *bs,
1252                                                      Error **errp)
1253 {
1254     BDRVRBDState *s = bs->opaque;
1255     ImageInfoSpecific *spec_info;
1256     char buf[RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN] = {0};
1257     int r;
1258 
1259     if (s->image_size >= RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN) {
1260         r = rbd_read(s->image, 0,
1261                      RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN, buf);
1262         if (r < 0) {
1263             error_setg_errno(errp, -r, "cannot read image start for probe");
1264             return NULL;
1265         }
1266     }
1267 
1268     spec_info = g_new(ImageInfoSpecific, 1);
1269     *spec_info = (ImageInfoSpecific){
1270         .type  = IMAGE_INFO_SPECIFIC_KIND_RBD,
1271         .u.rbd.data = g_new0(ImageInfoSpecificRbd, 1),
1272     };
1273 
1274     if (memcmp(buf, rbd_luks_header_verification,
1275                RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN) == 0) {
1276         spec_info->u.rbd.data->encryption_format =
1277                 RBD_IMAGE_ENCRYPTION_FORMAT_LUKS;
1278         spec_info->u.rbd.data->has_encryption_format = true;
1279     } else if (memcmp(buf, rbd_luks2_header_verification,
1280                RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN) == 0) {
1281         spec_info->u.rbd.data->encryption_format =
1282                 RBD_IMAGE_ENCRYPTION_FORMAT_LUKS2;
1283         spec_info->u.rbd.data->has_encryption_format = true;
1284     } else {
1285         spec_info->u.rbd.data->has_encryption_format = false;
1286     }
1287 
1288     return spec_info;
1289 }
1290 
1291 /*
1292  * rbd_diff_iterate2 allows to interrupt the exection by returning a negative
1293  * value in the callback routine. Choose a value that does not conflict with
1294  * an existing exitcode and return it if we want to prematurely stop the
1295  * execution because we detected a change in the allocation status.
1296  */
1297 #define QEMU_RBD_EXIT_DIFF_ITERATE2 -9000
1298 
1299 static int qemu_rbd_diff_iterate_cb(uint64_t offs, size_t len,
1300                                     int exists, void *opaque)
1301 {
1302     RBDDiffIterateReq *req = opaque;
1303 
1304     assert(req->offs + req->bytes <= offs);
1305 
1306     /* treat a hole like an unallocated area and bail out */
1307     if (!exists) {
1308         return 0;
1309     }
1310 
1311     if (!req->exists && offs > req->offs) {
1312         /*
1313          * we started in an unallocated area and hit the first allocated
1314          * block. req->bytes must be set to the length of the unallocated area
1315          * before the allocated area. stop further processing.
1316          */
1317         req->bytes = offs - req->offs;
1318         return QEMU_RBD_EXIT_DIFF_ITERATE2;
1319     }
1320 
1321     if (req->exists && offs > req->offs + req->bytes) {
1322         /*
1323          * we started in an allocated area and jumped over an unallocated area,
1324          * req->bytes contains the length of the allocated area before the
1325          * unallocated area. stop further processing.
1326          */
1327         return QEMU_RBD_EXIT_DIFF_ITERATE2;
1328     }
1329 
1330     req->bytes += len;
1331     req->exists = true;
1332 
1333     return 0;
1334 }
1335 
1336 static int coroutine_fn qemu_rbd_co_block_status(BlockDriverState *bs,
1337                                                  bool want_zero, int64_t offset,
1338                                                  int64_t bytes, int64_t *pnum,
1339                                                  int64_t *map,
1340                                                  BlockDriverState **file)
1341 {
1342     BDRVRBDState *s = bs->opaque;
1343     int status, r;
1344     RBDDiffIterateReq req = { .offs = offset };
1345     uint64_t features, flags;
1346     uint64_t head = 0;
1347 
1348     assert(offset + bytes <= s->image_size);
1349 
1350     /* default to all sectors allocated */
1351     status = BDRV_BLOCK_DATA | BDRV_BLOCK_OFFSET_VALID;
1352     *map = offset;
1353     *file = bs;
1354     *pnum = bytes;
1355 
1356     /* check if RBD image supports fast-diff */
1357     r = rbd_get_features(s->image, &features);
1358     if (r < 0) {
1359         return status;
1360     }
1361     if (!(features & RBD_FEATURE_FAST_DIFF)) {
1362         return status;
1363     }
1364 
1365     /* check if RBD fast-diff result is valid */
1366     r = rbd_get_flags(s->image, &flags);
1367     if (r < 0) {
1368         return status;
1369     }
1370     if (flags & RBD_FLAG_FAST_DIFF_INVALID) {
1371         return status;
1372     }
1373 
1374 #if LIBRBD_VERSION_CODE < LIBRBD_VERSION(1, 17, 0)
1375     /*
1376      * librbd had a bug until early 2022 that affected all versions of ceph that
1377      * supported fast-diff. This bug results in reporting of incorrect offsets
1378      * if the offset parameter to rbd_diff_iterate2 is not object aligned.
1379      * Work around this bug by rounding down the offset to object boundaries.
1380      * This is OK because we call rbd_diff_iterate2 with whole_object = true.
1381      * However, this workaround only works for non cloned images with default
1382      * striping.
1383      *
1384      * See: https://tracker.ceph.com/issues/53784
1385      */
1386 
1387     /* check if RBD image has non-default striping enabled */
1388     if (features & RBD_FEATURE_STRIPINGV2) {
1389         return status;
1390     }
1391 
1392 #pragma GCC diagnostic push
1393 #pragma GCC diagnostic ignored "-Wdeprecated-declarations"
1394     /*
1395      * check if RBD image is a clone (= has a parent).
1396      *
1397      * rbd_get_parent_info is deprecated from Nautilus onwards, but the
1398      * replacement rbd_get_parent is not present in Luminous and Mimic.
1399      */
1400     if (rbd_get_parent_info(s->image, NULL, 0, NULL, 0, NULL, 0) != -ENOENT) {
1401         return status;
1402     }
1403 #pragma GCC diagnostic pop
1404 
1405     head = req.offs & (s->object_size - 1);
1406     req.offs -= head;
1407     bytes += head;
1408 #endif
1409 
1410     r = rbd_diff_iterate2(s->image, NULL, req.offs, bytes, true, true,
1411                           qemu_rbd_diff_iterate_cb, &req);
1412     if (r < 0 && r != QEMU_RBD_EXIT_DIFF_ITERATE2) {
1413         return status;
1414     }
1415     assert(req.bytes <= bytes);
1416     if (!req.exists) {
1417         if (r == 0) {
1418             /*
1419              * rbd_diff_iterate2 does not invoke callbacks for unallocated
1420              * areas. This here catches the case where no callback was
1421              * invoked at all (req.bytes == 0).
1422              */
1423             assert(req.bytes == 0);
1424             req.bytes = bytes;
1425         }
1426         status = BDRV_BLOCK_ZERO | BDRV_BLOCK_OFFSET_VALID;
1427     }
1428 
1429     assert(req.bytes > head);
1430     *pnum = req.bytes - head;
1431     return status;
1432 }
1433 
1434 static int64_t coroutine_fn qemu_rbd_co_getlength(BlockDriverState *bs)
1435 {
1436     BDRVRBDState *s = bs->opaque;
1437     int r;
1438 
1439     r = rbd_get_size(s->image, &s->image_size);
1440     if (r < 0) {
1441         return r;
1442     }
1443 
1444     return s->image_size;
1445 }
1446 
1447 static int coroutine_fn qemu_rbd_co_truncate(BlockDriverState *bs,
1448                                              int64_t offset,
1449                                              bool exact,
1450                                              PreallocMode prealloc,
1451                                              BdrvRequestFlags flags,
1452                                              Error **errp)
1453 {
1454     int r;
1455 
1456     if (prealloc != PREALLOC_MODE_OFF) {
1457         error_setg(errp, "Unsupported preallocation mode '%s'",
1458                    PreallocMode_str(prealloc));
1459         return -ENOTSUP;
1460     }
1461 
1462     r = qemu_rbd_resize(bs, offset);
1463     if (r < 0) {
1464         error_setg_errno(errp, -r, "Failed to resize file");
1465         return r;
1466     }
1467 
1468     return 0;
1469 }
1470 
1471 static int qemu_rbd_snap_create(BlockDriverState *bs,
1472                                 QEMUSnapshotInfo *sn_info)
1473 {
1474     BDRVRBDState *s = bs->opaque;
1475     int r;
1476 
1477     if (sn_info->name[0] == '\0') {
1478         return -EINVAL; /* we need a name for rbd snapshots */
1479     }
1480 
1481     /*
1482      * rbd snapshots are using the name as the user controlled unique identifier
1483      * we can't use the rbd snapid for that purpose, as it can't be set
1484      */
1485     if (sn_info->id_str[0] != '\0' &&
1486         strcmp(sn_info->id_str, sn_info->name) != 0) {
1487         return -EINVAL;
1488     }
1489 
1490     if (strlen(sn_info->name) >= sizeof(sn_info->id_str)) {
1491         return -ERANGE;
1492     }
1493 
1494     r = rbd_snap_create(s->image, sn_info->name);
1495     if (r < 0) {
1496         error_report("failed to create snap: %s", strerror(-r));
1497         return r;
1498     }
1499 
1500     return 0;
1501 }
1502 
1503 static int qemu_rbd_snap_remove(BlockDriverState *bs,
1504                                 const char *snapshot_id,
1505                                 const char *snapshot_name,
1506                                 Error **errp)
1507 {
1508     BDRVRBDState *s = bs->opaque;
1509     int r;
1510 
1511     if (!snapshot_name) {
1512         error_setg(errp, "rbd need a valid snapshot name");
1513         return -EINVAL;
1514     }
1515 
1516     /* If snapshot_id is specified, it must be equal to name, see
1517        qemu_rbd_snap_list() */
1518     if (snapshot_id && strcmp(snapshot_id, snapshot_name)) {
1519         error_setg(errp,
1520                    "rbd do not support snapshot id, it should be NULL or "
1521                    "equal to snapshot name");
1522         return -EINVAL;
1523     }
1524 
1525     r = rbd_snap_remove(s->image, snapshot_name);
1526     if (r < 0) {
1527         error_setg_errno(errp, -r, "Failed to remove the snapshot");
1528     }
1529     return r;
1530 }
1531 
1532 static int qemu_rbd_snap_rollback(BlockDriverState *bs,
1533                                   const char *snapshot_name)
1534 {
1535     BDRVRBDState *s = bs->opaque;
1536 
1537     return rbd_snap_rollback(s->image, snapshot_name);
1538 }
1539 
1540 static int qemu_rbd_snap_list(BlockDriverState *bs,
1541                               QEMUSnapshotInfo **psn_tab)
1542 {
1543     BDRVRBDState *s = bs->opaque;
1544     QEMUSnapshotInfo *sn_info, *sn_tab = NULL;
1545     int i, snap_count;
1546     rbd_snap_info_t *snaps;
1547     int max_snaps = RBD_MAX_SNAPS;
1548 
1549     do {
1550         snaps = g_new(rbd_snap_info_t, max_snaps);
1551         snap_count = rbd_snap_list(s->image, snaps, &max_snaps);
1552         if (snap_count <= 0) {
1553             g_free(snaps);
1554         }
1555     } while (snap_count == -ERANGE);
1556 
1557     if (snap_count <= 0) {
1558         goto done;
1559     }
1560 
1561     sn_tab = g_new0(QEMUSnapshotInfo, snap_count);
1562 
1563     for (i = 0; i < snap_count; i++) {
1564         const char *snap_name = snaps[i].name;
1565 
1566         sn_info = sn_tab + i;
1567         pstrcpy(sn_info->id_str, sizeof(sn_info->id_str), snap_name);
1568         pstrcpy(sn_info->name, sizeof(sn_info->name), snap_name);
1569 
1570         sn_info->vm_state_size = snaps[i].size;
1571         sn_info->date_sec = 0;
1572         sn_info->date_nsec = 0;
1573         sn_info->vm_clock_nsec = 0;
1574     }
1575     rbd_snap_list_end(snaps);
1576     g_free(snaps);
1577 
1578  done:
1579     *psn_tab = sn_tab;
1580     return snap_count;
1581 }
1582 
1583 static void coroutine_fn qemu_rbd_co_invalidate_cache(BlockDriverState *bs,
1584                                                       Error **errp)
1585 {
1586     BDRVRBDState *s = bs->opaque;
1587     int r = rbd_invalidate_cache(s->image);
1588     if (r < 0) {
1589         error_setg_errno(errp, -r, "Failed to invalidate the cache");
1590     }
1591 }
1592 
1593 static QemuOptsList qemu_rbd_create_opts = {
1594     .name = "rbd-create-opts",
1595     .head = QTAILQ_HEAD_INITIALIZER(qemu_rbd_create_opts.head),
1596     .desc = {
1597         {
1598             .name = BLOCK_OPT_SIZE,
1599             .type = QEMU_OPT_SIZE,
1600             .help = "Virtual disk size"
1601         },
1602         {
1603             .name = BLOCK_OPT_CLUSTER_SIZE,
1604             .type = QEMU_OPT_SIZE,
1605             .help = "RBD object size"
1606         },
1607         {
1608             .name = "password-secret",
1609             .type = QEMU_OPT_STRING,
1610             .help = "ID of secret providing the password",
1611         },
1612         {
1613             .name = "encrypt.format",
1614             .type = QEMU_OPT_STRING,
1615             .help = "Encrypt the image, format choices: 'luks', 'luks2'",
1616         },
1617         {
1618             .name = "encrypt.cipher-alg",
1619             .type = QEMU_OPT_STRING,
1620             .help = "Name of encryption cipher algorithm"
1621                     " (allowed values: aes-128, aes-256)",
1622         },
1623         {
1624             .name = "encrypt.key-secret",
1625             .type = QEMU_OPT_STRING,
1626             .help = "ID of secret providing LUKS passphrase",
1627         },
1628         { /* end of list */ }
1629     }
1630 };
1631 
1632 static const char *const qemu_rbd_strong_runtime_opts[] = {
1633     "pool",
1634     "namespace",
1635     "image",
1636     "conf",
1637     "snapshot",
1638     "user",
1639     "server.",
1640     "password-secret",
1641 
1642     NULL
1643 };
1644 
1645 static BlockDriver bdrv_rbd = {
1646     .format_name            = "rbd",
1647     .instance_size          = sizeof(BDRVRBDState),
1648     .bdrv_parse_filename    = qemu_rbd_parse_filename,
1649     .bdrv_file_open         = qemu_rbd_open,
1650     .bdrv_close             = qemu_rbd_close,
1651     .bdrv_reopen_prepare    = qemu_rbd_reopen_prepare,
1652     .bdrv_co_create         = qemu_rbd_co_create,
1653     .bdrv_co_create_opts    = qemu_rbd_co_create_opts,
1654     .bdrv_has_zero_init     = bdrv_has_zero_init_1,
1655     .bdrv_co_get_info       = qemu_rbd_co_get_info,
1656     .bdrv_get_specific_info = qemu_rbd_get_specific_info,
1657     .create_opts            = &qemu_rbd_create_opts,
1658     .bdrv_co_getlength      = qemu_rbd_co_getlength,
1659     .bdrv_co_truncate       = qemu_rbd_co_truncate,
1660     .protocol_name          = "rbd",
1661 
1662     .bdrv_co_preadv         = qemu_rbd_co_preadv,
1663     .bdrv_co_pwritev        = qemu_rbd_co_pwritev,
1664     .bdrv_co_flush_to_disk  = qemu_rbd_co_flush,
1665     .bdrv_co_pdiscard       = qemu_rbd_co_pdiscard,
1666 #ifdef LIBRBD_SUPPORTS_WRITE_ZEROES
1667     .bdrv_co_pwrite_zeroes  = qemu_rbd_co_pwrite_zeroes,
1668 #endif
1669     .bdrv_co_block_status   = qemu_rbd_co_block_status,
1670 
1671     .bdrv_snapshot_create   = qemu_rbd_snap_create,
1672     .bdrv_snapshot_delete   = qemu_rbd_snap_remove,
1673     .bdrv_snapshot_list     = qemu_rbd_snap_list,
1674     .bdrv_snapshot_goto     = qemu_rbd_snap_rollback,
1675     .bdrv_co_invalidate_cache = qemu_rbd_co_invalidate_cache,
1676 
1677     .strong_runtime_opts    = qemu_rbd_strong_runtime_opts,
1678 };
1679 
1680 static void bdrv_rbd_init(void)
1681 {
1682     bdrv_register(&bdrv_rbd);
1683 }
1684 
1685 block_init(bdrv_rbd_init);
1686