xref: /openbmc/qemu/block/rbd.c (revision a0c2e80a)
1 /*
2  * QEMU Block driver for RADOS (Ceph)
3  *
4  * Copyright (C) 2010-2011 Christian Brunner <chb@muc.de>,
5  *                         Josh Durgin <josh.durgin@dreamhost.com>
6  *
7  * This work is licensed under the terms of the GNU GPL, version 2.  See
8  * the COPYING file in the top-level directory.
9  *
10  * Contributions after 2012-01-13 are licensed under the terms of the
11  * GNU GPL, version 2 or (at your option) any later version.
12  */
13 
14 #include "qemu/osdep.h"
15 
16 #include <rbd/librbd.h>
17 #include "qapi/error.h"
18 #include "qemu/error-report.h"
19 #include "qemu/module.h"
20 #include "qemu/option.h"
21 #include "block/block_int.h"
22 #include "block/qdict.h"
23 #include "crypto/secret.h"
24 #include "qemu/cutils.h"
25 #include "sysemu/replay.h"
26 #include "qapi/qmp/qstring.h"
27 #include "qapi/qmp/qdict.h"
28 #include "qapi/qmp/qjson.h"
29 #include "qapi/qmp/qlist.h"
30 #include "qapi/qobject-input-visitor.h"
31 #include "qapi/qapi-visit-block-core.h"
32 
33 /*
34  * When specifying the image filename use:
35  *
36  * rbd:poolname/devicename[@snapshotname][:option1=value1[:option2=value2...]]
37  *
38  * poolname must be the name of an existing rados pool.
39  *
40  * devicename is the name of the rbd image.
41  *
42  * Each option given is used to configure rados, and may be any valid
43  * Ceph option, "id", or "conf".
44  *
45  * The "id" option indicates what user we should authenticate as to
46  * the Ceph cluster.  If it is excluded we will use the Ceph default
47  * (normally 'admin').
48  *
49  * The "conf" option specifies a Ceph configuration file to read.  If
50  * it is not specified, we will read from the default Ceph locations
51  * (e.g., /etc/ceph/ceph.conf).  To avoid reading _any_ configuration
52  * file, specify conf=/dev/null.
53  *
54  * Configuration values containing :, @, or = can be escaped with a
55  * leading "\".
56  */
57 
58 #define OBJ_MAX_SIZE (1UL << OBJ_DEFAULT_OBJ_ORDER)
59 
60 #define RBD_MAX_SNAPS 100
61 
62 #define RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN 8
63 
64 static const char rbd_luks_header_verification[
65         RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN] = {
66     'L', 'U', 'K', 'S', 0xBA, 0xBE, 0, 1
67 };
68 
69 static const char rbd_luks2_header_verification[
70         RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN] = {
71     'L', 'U', 'K', 'S', 0xBA, 0xBE, 0, 2
72 };
73 
74 typedef enum {
75     RBD_AIO_READ,
76     RBD_AIO_WRITE,
77     RBD_AIO_DISCARD,
78     RBD_AIO_FLUSH,
79     RBD_AIO_WRITE_ZEROES
80 } RBDAIOCmd;
81 
82 typedef struct BDRVRBDState {
83     rados_t cluster;
84     rados_ioctx_t io_ctx;
85     rbd_image_t image;
86     char *image_name;
87     char *snap;
88     char *namespace;
89     uint64_t image_size;
90     uint64_t object_size;
91 } BDRVRBDState;
92 
93 typedef struct RBDTask {
94     BlockDriverState *bs;
95     Coroutine *co;
96     bool complete;
97     int64_t ret;
98 } RBDTask;
99 
100 typedef struct RBDDiffIterateReq {
101     uint64_t offs;
102     uint64_t bytes;
103     bool exists;
104 } RBDDiffIterateReq;
105 
106 static int qemu_rbd_connect(rados_t *cluster, rados_ioctx_t *io_ctx,
107                             BlockdevOptionsRbd *opts, bool cache,
108                             const char *keypairs, const char *secretid,
109                             Error **errp);
110 
111 static char *qemu_rbd_strchr(char *src, char delim)
112 {
113     char *p;
114 
115     for (p = src; *p; ++p) {
116         if (*p == delim) {
117             return p;
118         }
119         if (*p == '\\' && p[1] != '\0') {
120             ++p;
121         }
122     }
123 
124     return NULL;
125 }
126 
127 
128 static char *qemu_rbd_next_tok(char *src, char delim, char **p)
129 {
130     char *end;
131 
132     *p = NULL;
133 
134     end = qemu_rbd_strchr(src, delim);
135     if (end) {
136         *p = end + 1;
137         *end = '\0';
138     }
139     return src;
140 }
141 
142 static void qemu_rbd_unescape(char *src)
143 {
144     char *p;
145 
146     for (p = src; *src; ++src, ++p) {
147         if (*src == '\\' && src[1] != '\0') {
148             src++;
149         }
150         *p = *src;
151     }
152     *p = '\0';
153 }
154 
155 static void qemu_rbd_parse_filename(const char *filename, QDict *options,
156                                     Error **errp)
157 {
158     const char *start;
159     char *p, *buf;
160     QList *keypairs = NULL;
161     char *found_str, *image_name;
162 
163     if (!strstart(filename, "rbd:", &start)) {
164         error_setg(errp, "File name must start with 'rbd:'");
165         return;
166     }
167 
168     buf = g_strdup(start);
169     p = buf;
170 
171     found_str = qemu_rbd_next_tok(p, '/', &p);
172     if (!p) {
173         error_setg(errp, "Pool name is required");
174         goto done;
175     }
176     qemu_rbd_unescape(found_str);
177     qdict_put_str(options, "pool", found_str);
178 
179     if (qemu_rbd_strchr(p, '@')) {
180         image_name = qemu_rbd_next_tok(p, '@', &p);
181 
182         found_str = qemu_rbd_next_tok(p, ':', &p);
183         qemu_rbd_unescape(found_str);
184         qdict_put_str(options, "snapshot", found_str);
185     } else {
186         image_name = qemu_rbd_next_tok(p, ':', &p);
187     }
188     /* Check for namespace in the image_name */
189     if (qemu_rbd_strchr(image_name, '/')) {
190         found_str = qemu_rbd_next_tok(image_name, '/', &image_name);
191         qemu_rbd_unescape(found_str);
192         qdict_put_str(options, "namespace", found_str);
193     } else {
194         qdict_put_str(options, "namespace", "");
195     }
196     qemu_rbd_unescape(image_name);
197     qdict_put_str(options, "image", image_name);
198     if (!p) {
199         goto done;
200     }
201 
202     /* The following are essentially all key/value pairs, and we treat
203      * 'id' and 'conf' a bit special.  Key/value pairs may be in any order. */
204     while (p) {
205         char *name, *value;
206         name = qemu_rbd_next_tok(p, '=', &p);
207         if (!p) {
208             error_setg(errp, "conf option %s has no value", name);
209             break;
210         }
211 
212         qemu_rbd_unescape(name);
213 
214         value = qemu_rbd_next_tok(p, ':', &p);
215         qemu_rbd_unescape(value);
216 
217         if (!strcmp(name, "conf")) {
218             qdict_put_str(options, "conf", value);
219         } else if (!strcmp(name, "id")) {
220             qdict_put_str(options, "user", value);
221         } else {
222             /*
223              * We pass these internally to qemu_rbd_set_keypairs(), so
224              * we can get away with the simpler list of [ "key1",
225              * "value1", "key2", "value2" ] rather than a raw dict
226              * { "key1": "value1", "key2": "value2" } where we can't
227              * guarantee order, or even a more correct but complex
228              * [ { "key1": "value1" }, { "key2": "value2" } ]
229              */
230             if (!keypairs) {
231                 keypairs = qlist_new();
232             }
233             qlist_append_str(keypairs, name);
234             qlist_append_str(keypairs, value);
235         }
236     }
237 
238     if (keypairs) {
239         qdict_put(options, "=keyvalue-pairs",
240                   qstring_from_gstring(qobject_to_json(QOBJECT(keypairs))));
241     }
242 
243 done:
244     g_free(buf);
245     qobject_unref(keypairs);
246     return;
247 }
248 
249 static int qemu_rbd_set_auth(rados_t cluster, BlockdevOptionsRbd *opts,
250                              Error **errp)
251 {
252     char *key, *acr;
253     int r;
254     GString *accu;
255     RbdAuthModeList *auth;
256 
257     if (opts->key_secret) {
258         key = qcrypto_secret_lookup_as_base64(opts->key_secret, errp);
259         if (!key) {
260             return -EIO;
261         }
262         r = rados_conf_set(cluster, "key", key);
263         g_free(key);
264         if (r < 0) {
265             error_setg_errno(errp, -r, "Could not set 'key'");
266             return r;
267         }
268     }
269 
270     if (opts->has_auth_client_required) {
271         accu = g_string_new("");
272         for (auth = opts->auth_client_required; auth; auth = auth->next) {
273             if (accu->str[0]) {
274                 g_string_append_c(accu, ';');
275             }
276             g_string_append(accu, RbdAuthMode_str(auth->value));
277         }
278         acr = g_string_free(accu, FALSE);
279         r = rados_conf_set(cluster, "auth_client_required", acr);
280         g_free(acr);
281         if (r < 0) {
282             error_setg_errno(errp, -r,
283                              "Could not set 'auth_client_required'");
284             return r;
285         }
286     }
287 
288     return 0;
289 }
290 
291 static int qemu_rbd_set_keypairs(rados_t cluster, const char *keypairs_json,
292                                  Error **errp)
293 {
294     QList *keypairs;
295     QString *name;
296     QString *value;
297     const char *key;
298     size_t remaining;
299     int ret = 0;
300 
301     if (!keypairs_json) {
302         return ret;
303     }
304     keypairs = qobject_to(QList,
305                           qobject_from_json(keypairs_json, &error_abort));
306     remaining = qlist_size(keypairs) / 2;
307     assert(remaining);
308 
309     while (remaining--) {
310         name = qobject_to(QString, qlist_pop(keypairs));
311         value = qobject_to(QString, qlist_pop(keypairs));
312         assert(name && value);
313         key = qstring_get_str(name);
314 
315         ret = rados_conf_set(cluster, key, qstring_get_str(value));
316         qobject_unref(value);
317         if (ret < 0) {
318             error_setg_errno(errp, -ret, "invalid conf option %s", key);
319             qobject_unref(name);
320             ret = -EINVAL;
321             break;
322         }
323         qobject_unref(name);
324     }
325 
326     qobject_unref(keypairs);
327     return ret;
328 }
329 
330 #ifdef LIBRBD_SUPPORTS_ENCRYPTION
331 static int qemu_rbd_convert_luks_options(
332         RbdEncryptionOptionsLUKSBase *luks_opts,
333         char **passphrase,
334         size_t *passphrase_len,
335         Error **errp)
336 {
337     return qcrypto_secret_lookup(luks_opts->key_secret, (uint8_t **)passphrase,
338                                  passphrase_len, errp);
339 }
340 
341 static int qemu_rbd_convert_luks_create_options(
342         RbdEncryptionCreateOptionsLUKSBase *luks_opts,
343         rbd_encryption_algorithm_t *alg,
344         char **passphrase,
345         size_t *passphrase_len,
346         Error **errp)
347 {
348     int r = 0;
349 
350     r = qemu_rbd_convert_luks_options(
351             qapi_RbdEncryptionCreateOptionsLUKSBase_base(luks_opts),
352             passphrase, passphrase_len, errp);
353     if (r < 0) {
354         return r;
355     }
356 
357     if (luks_opts->has_cipher_alg) {
358         switch (luks_opts->cipher_alg) {
359             case QCRYPTO_CIPHER_ALG_AES_128: {
360                 *alg = RBD_ENCRYPTION_ALGORITHM_AES128;
361                 break;
362             }
363             case QCRYPTO_CIPHER_ALG_AES_256: {
364                 *alg = RBD_ENCRYPTION_ALGORITHM_AES256;
365                 break;
366             }
367             default: {
368                 r = -ENOTSUP;
369                 error_setg_errno(errp, -r, "unknown encryption algorithm: %u",
370                                  luks_opts->cipher_alg);
371                 return r;
372             }
373         }
374     } else {
375         /* default alg */
376         *alg = RBD_ENCRYPTION_ALGORITHM_AES256;
377     }
378 
379     return 0;
380 }
381 
382 static int qemu_rbd_encryption_format(rbd_image_t image,
383                                       RbdEncryptionCreateOptions *encrypt,
384                                       Error **errp)
385 {
386     int r = 0;
387     g_autofree char *passphrase = NULL;
388     size_t passphrase_len;
389     rbd_encryption_format_t format;
390     rbd_encryption_options_t opts;
391     rbd_encryption_luks1_format_options_t luks_opts;
392     rbd_encryption_luks2_format_options_t luks2_opts;
393     size_t opts_size;
394     uint64_t raw_size, effective_size;
395 
396     r = rbd_get_size(image, &raw_size);
397     if (r < 0) {
398         error_setg_errno(errp, -r, "cannot get raw image size");
399         return r;
400     }
401 
402     switch (encrypt->format) {
403         case RBD_IMAGE_ENCRYPTION_FORMAT_LUKS: {
404             memset(&luks_opts, 0, sizeof(luks_opts));
405             format = RBD_ENCRYPTION_FORMAT_LUKS1;
406             opts = &luks_opts;
407             opts_size = sizeof(luks_opts);
408             r = qemu_rbd_convert_luks_create_options(
409                     qapi_RbdEncryptionCreateOptionsLUKS_base(&encrypt->u.luks),
410                     &luks_opts.alg, &passphrase, &passphrase_len, errp);
411             if (r < 0) {
412                 return r;
413             }
414             luks_opts.passphrase = passphrase;
415             luks_opts.passphrase_size = passphrase_len;
416             break;
417         }
418         case RBD_IMAGE_ENCRYPTION_FORMAT_LUKS2: {
419             memset(&luks2_opts, 0, sizeof(luks2_opts));
420             format = RBD_ENCRYPTION_FORMAT_LUKS2;
421             opts = &luks2_opts;
422             opts_size = sizeof(luks2_opts);
423             r = qemu_rbd_convert_luks_create_options(
424                     qapi_RbdEncryptionCreateOptionsLUKS2_base(
425                             &encrypt->u.luks2),
426                     &luks2_opts.alg, &passphrase, &passphrase_len, errp);
427             if (r < 0) {
428                 return r;
429             }
430             luks2_opts.passphrase = passphrase;
431             luks2_opts.passphrase_size = passphrase_len;
432             break;
433         }
434         default: {
435             r = -ENOTSUP;
436             error_setg_errno(
437                     errp, -r, "unknown image encryption format: %u",
438                     encrypt->format);
439             return r;
440         }
441     }
442 
443     r = rbd_encryption_format(image, format, opts, opts_size);
444     if (r < 0) {
445         error_setg_errno(errp, -r, "encryption format fail");
446         return r;
447     }
448 
449     r = rbd_get_size(image, &effective_size);
450     if (r < 0) {
451         error_setg_errno(errp, -r, "cannot get effective image size");
452         return r;
453     }
454 
455     r = rbd_resize(image, raw_size + (raw_size - effective_size));
456     if (r < 0) {
457         error_setg_errno(errp, -r, "cannot resize image after format");
458         return r;
459     }
460 
461     return 0;
462 }
463 
464 static int qemu_rbd_encryption_load(rbd_image_t image,
465                                     RbdEncryptionOptions *encrypt,
466                                     Error **errp)
467 {
468     int r = 0;
469     g_autofree char *passphrase = NULL;
470     size_t passphrase_len;
471     rbd_encryption_luks1_format_options_t luks_opts;
472     rbd_encryption_luks2_format_options_t luks2_opts;
473     rbd_encryption_format_t format;
474     rbd_encryption_options_t opts;
475     size_t opts_size;
476 
477     switch (encrypt->format) {
478         case RBD_IMAGE_ENCRYPTION_FORMAT_LUKS: {
479             memset(&luks_opts, 0, sizeof(luks_opts));
480             format = RBD_ENCRYPTION_FORMAT_LUKS1;
481             opts = &luks_opts;
482             opts_size = sizeof(luks_opts);
483             r = qemu_rbd_convert_luks_options(
484                     qapi_RbdEncryptionOptionsLUKS_base(&encrypt->u.luks),
485                     &passphrase, &passphrase_len, errp);
486             if (r < 0) {
487                 return r;
488             }
489             luks_opts.passphrase = passphrase;
490             luks_opts.passphrase_size = passphrase_len;
491             break;
492         }
493         case RBD_IMAGE_ENCRYPTION_FORMAT_LUKS2: {
494             memset(&luks2_opts, 0, sizeof(luks2_opts));
495             format = RBD_ENCRYPTION_FORMAT_LUKS2;
496             opts = &luks2_opts;
497             opts_size = sizeof(luks2_opts);
498             r = qemu_rbd_convert_luks_options(
499                     qapi_RbdEncryptionOptionsLUKS2_base(&encrypt->u.luks2),
500                     &passphrase, &passphrase_len, errp);
501             if (r < 0) {
502                 return r;
503             }
504             luks2_opts.passphrase = passphrase;
505             luks2_opts.passphrase_size = passphrase_len;
506             break;
507         }
508         default: {
509             r = -ENOTSUP;
510             error_setg_errno(
511                     errp, -r, "unknown image encryption format: %u",
512                     encrypt->format);
513             return r;
514         }
515     }
516 
517     r = rbd_encryption_load(image, format, opts, opts_size);
518     if (r < 0) {
519         error_setg_errno(errp, -r, "encryption load fail");
520         return r;
521     }
522 
523     return 0;
524 }
525 #endif
526 
527 /* FIXME Deprecate and remove keypairs or make it available in QMP. */
528 static int qemu_rbd_do_create(BlockdevCreateOptions *options,
529                               const char *keypairs, const char *password_secret,
530                               Error **errp)
531 {
532     BlockdevCreateOptionsRbd *opts = &options->u.rbd;
533     rados_t cluster;
534     rados_ioctx_t io_ctx;
535     int obj_order = 0;
536     int ret;
537 
538     assert(options->driver == BLOCKDEV_DRIVER_RBD);
539     if (opts->location->snapshot) {
540         error_setg(errp, "Can't use snapshot name for image creation");
541         return -EINVAL;
542     }
543 
544 #ifndef LIBRBD_SUPPORTS_ENCRYPTION
545     if (opts->encrypt) {
546         error_setg(errp, "RBD library does not support image encryption");
547         return -ENOTSUP;
548     }
549 #endif
550 
551     if (opts->has_cluster_size) {
552         int64_t objsize = opts->cluster_size;
553         if ((objsize - 1) & objsize) {    /* not a power of 2? */
554             error_setg(errp, "obj size needs to be power of 2");
555             return -EINVAL;
556         }
557         if (objsize < 4096) {
558             error_setg(errp, "obj size too small");
559             return -EINVAL;
560         }
561         obj_order = ctz32(objsize);
562     }
563 
564     ret = qemu_rbd_connect(&cluster, &io_ctx, opts->location, false, keypairs,
565                            password_secret, errp);
566     if (ret < 0) {
567         return ret;
568     }
569 
570     ret = rbd_create(io_ctx, opts->location->image, opts->size, &obj_order);
571     if (ret < 0) {
572         error_setg_errno(errp, -ret, "error rbd create");
573         goto out;
574     }
575 
576 #ifdef LIBRBD_SUPPORTS_ENCRYPTION
577     if (opts->encrypt) {
578         rbd_image_t image;
579 
580         ret = rbd_open(io_ctx, opts->location->image, &image, NULL);
581         if (ret < 0) {
582             error_setg_errno(errp, -ret,
583                              "error opening image '%s' for encryption format",
584                              opts->location->image);
585             goto out;
586         }
587 
588         ret = qemu_rbd_encryption_format(image, opts->encrypt, errp);
589         rbd_close(image);
590         if (ret < 0) {
591             /* encryption format fail, try removing the image */
592             rbd_remove(io_ctx, opts->location->image);
593             goto out;
594         }
595     }
596 #endif
597 
598     ret = 0;
599 out:
600     rados_ioctx_destroy(io_ctx);
601     rados_shutdown(cluster);
602     return ret;
603 }
604 
605 static int qemu_rbd_co_create(BlockdevCreateOptions *options, Error **errp)
606 {
607     return qemu_rbd_do_create(options, NULL, NULL, errp);
608 }
609 
610 static int qemu_rbd_extract_encryption_create_options(
611         QemuOpts *opts,
612         RbdEncryptionCreateOptions **spec,
613         Error **errp)
614 {
615     QDict *opts_qdict;
616     QDict *encrypt_qdict;
617     Visitor *v;
618     int ret = 0;
619 
620     opts_qdict = qemu_opts_to_qdict(opts, NULL);
621     qdict_extract_subqdict(opts_qdict, &encrypt_qdict, "encrypt.");
622     qobject_unref(opts_qdict);
623     if (!qdict_size(encrypt_qdict)) {
624         *spec = NULL;
625         goto exit;
626     }
627 
628     /* Convert options into a QAPI object */
629     v = qobject_input_visitor_new_flat_confused(encrypt_qdict, errp);
630     if (!v) {
631         ret = -EINVAL;
632         goto exit;
633     }
634 
635     visit_type_RbdEncryptionCreateOptions(v, NULL, spec, errp);
636     visit_free(v);
637     if (!*spec) {
638         ret = -EINVAL;
639         goto exit;
640     }
641 
642 exit:
643     qobject_unref(encrypt_qdict);
644     return ret;
645 }
646 
647 static int coroutine_fn qemu_rbd_co_create_opts(BlockDriver *drv,
648                                                 const char *filename,
649                                                 QemuOpts *opts,
650                                                 Error **errp)
651 {
652     BlockdevCreateOptions *create_options;
653     BlockdevCreateOptionsRbd *rbd_opts;
654     BlockdevOptionsRbd *loc;
655     RbdEncryptionCreateOptions *encrypt = NULL;
656     Error *local_err = NULL;
657     const char *keypairs, *password_secret;
658     QDict *options = NULL;
659     int ret = 0;
660 
661     create_options = g_new0(BlockdevCreateOptions, 1);
662     create_options->driver = BLOCKDEV_DRIVER_RBD;
663     rbd_opts = &create_options->u.rbd;
664 
665     rbd_opts->location = g_new0(BlockdevOptionsRbd, 1);
666 
667     password_secret = qemu_opt_get(opts, "password-secret");
668 
669     /* Read out options */
670     rbd_opts->size = ROUND_UP(qemu_opt_get_size_del(opts, BLOCK_OPT_SIZE, 0),
671                               BDRV_SECTOR_SIZE);
672     rbd_opts->cluster_size = qemu_opt_get_size_del(opts,
673                                                    BLOCK_OPT_CLUSTER_SIZE, 0);
674     rbd_opts->has_cluster_size = (rbd_opts->cluster_size != 0);
675 
676     options = qdict_new();
677     qemu_rbd_parse_filename(filename, options, &local_err);
678     if (local_err) {
679         ret = -EINVAL;
680         error_propagate(errp, local_err);
681         goto exit;
682     }
683 
684     ret = qemu_rbd_extract_encryption_create_options(opts, &encrypt, errp);
685     if (ret < 0) {
686         goto exit;
687     }
688     rbd_opts->encrypt     = encrypt;
689 
690     /*
691      * Caution: while qdict_get_try_str() is fine, getting non-string
692      * types would require more care.  When @options come from -blockdev
693      * or blockdev_add, its members are typed according to the QAPI
694      * schema, but when they come from -drive, they're all QString.
695      */
696     loc = rbd_opts->location;
697     loc->pool        = g_strdup(qdict_get_try_str(options, "pool"));
698     loc->conf        = g_strdup(qdict_get_try_str(options, "conf"));
699     loc->user        = g_strdup(qdict_get_try_str(options, "user"));
700     loc->q_namespace = g_strdup(qdict_get_try_str(options, "namespace"));
701     loc->image       = g_strdup(qdict_get_try_str(options, "image"));
702     keypairs         = qdict_get_try_str(options, "=keyvalue-pairs");
703 
704     ret = qemu_rbd_do_create(create_options, keypairs, password_secret, errp);
705     if (ret < 0) {
706         goto exit;
707     }
708 
709 exit:
710     qobject_unref(options);
711     qapi_free_BlockdevCreateOptions(create_options);
712     return ret;
713 }
714 
715 static char *qemu_rbd_mon_host(BlockdevOptionsRbd *opts, Error **errp)
716 {
717     const char **vals;
718     const char *host, *port;
719     char *rados_str;
720     InetSocketAddressBaseList *p;
721     int i, cnt;
722 
723     if (!opts->has_server) {
724         return NULL;
725     }
726 
727     for (cnt = 0, p = opts->server; p; p = p->next) {
728         cnt++;
729     }
730 
731     vals = g_new(const char *, cnt + 1);
732 
733     for (i = 0, p = opts->server; p; p = p->next, i++) {
734         host = p->value->host;
735         port = p->value->port;
736 
737         if (strchr(host, ':')) {
738             vals[i] = g_strdup_printf("[%s]:%s", host, port);
739         } else {
740             vals[i] = g_strdup_printf("%s:%s", host, port);
741         }
742     }
743     vals[i] = NULL;
744 
745     rados_str = i ? g_strjoinv(";", (char **)vals) : NULL;
746     g_strfreev((char **)vals);
747     return rados_str;
748 }
749 
750 static int qemu_rbd_connect(rados_t *cluster, rados_ioctx_t *io_ctx,
751                             BlockdevOptionsRbd *opts, bool cache,
752                             const char *keypairs, const char *secretid,
753                             Error **errp)
754 {
755     char *mon_host = NULL;
756     Error *local_err = NULL;
757     int r;
758 
759     if (secretid) {
760         if (opts->key_secret) {
761             error_setg(errp,
762                        "Legacy 'password-secret' clashes with 'key-secret'");
763             return -EINVAL;
764         }
765         opts->key_secret = g_strdup(secretid);
766     }
767 
768     mon_host = qemu_rbd_mon_host(opts, &local_err);
769     if (local_err) {
770         error_propagate(errp, local_err);
771         r = -EINVAL;
772         goto out;
773     }
774 
775     r = rados_create(cluster, opts->user);
776     if (r < 0) {
777         error_setg_errno(errp, -r, "error initializing");
778         goto out;
779     }
780 
781     /* try default location when conf=NULL, but ignore failure */
782     r = rados_conf_read_file(*cluster, opts->conf);
783     if (opts->conf && r < 0) {
784         error_setg_errno(errp, -r, "error reading conf file %s", opts->conf);
785         goto failed_shutdown;
786     }
787 
788     r = qemu_rbd_set_keypairs(*cluster, keypairs, errp);
789     if (r < 0) {
790         goto failed_shutdown;
791     }
792 
793     if (mon_host) {
794         r = rados_conf_set(*cluster, "mon_host", mon_host);
795         if (r < 0) {
796             goto failed_shutdown;
797         }
798     }
799 
800     r = qemu_rbd_set_auth(*cluster, opts, errp);
801     if (r < 0) {
802         goto failed_shutdown;
803     }
804 
805     /*
806      * Fallback to more conservative semantics if setting cache
807      * options fails. Ignore errors from setting rbd_cache because the
808      * only possible error is that the option does not exist, and
809      * librbd defaults to no caching. If write through caching cannot
810      * be set up, fall back to no caching.
811      */
812     if (cache) {
813         rados_conf_set(*cluster, "rbd_cache", "true");
814     } else {
815         rados_conf_set(*cluster, "rbd_cache", "false");
816     }
817 
818     r = rados_connect(*cluster);
819     if (r < 0) {
820         error_setg_errno(errp, -r, "error connecting");
821         goto failed_shutdown;
822     }
823 
824     r = rados_ioctx_create(*cluster, opts->pool, io_ctx);
825     if (r < 0) {
826         error_setg_errno(errp, -r, "error opening pool %s", opts->pool);
827         goto failed_shutdown;
828     }
829 
830 #ifdef HAVE_RBD_NAMESPACE_EXISTS
831     if (opts->q_namespace && strlen(opts->q_namespace) > 0) {
832         bool exists;
833 
834         r = rbd_namespace_exists(*io_ctx, opts->q_namespace, &exists);
835         if (r < 0) {
836             error_setg_errno(errp, -r, "error checking namespace");
837             goto failed_ioctx_destroy;
838         }
839 
840         if (!exists) {
841             error_setg(errp, "namespace '%s' does not exist",
842                        opts->q_namespace);
843             r = -ENOENT;
844             goto failed_ioctx_destroy;
845         }
846     }
847 #endif
848 
849     /*
850      * Set the namespace after opening the io context on the pool,
851      * if nspace == NULL or if nspace == "", it is just as we did nothing
852      */
853     rados_ioctx_set_namespace(*io_ctx, opts->q_namespace);
854 
855     r = 0;
856     goto out;
857 
858 #ifdef HAVE_RBD_NAMESPACE_EXISTS
859 failed_ioctx_destroy:
860     rados_ioctx_destroy(*io_ctx);
861 #endif
862 failed_shutdown:
863     rados_shutdown(*cluster);
864 out:
865     g_free(mon_host);
866     return r;
867 }
868 
869 static int qemu_rbd_convert_options(QDict *options, BlockdevOptionsRbd **opts,
870                                     Error **errp)
871 {
872     Visitor *v;
873 
874     /* Convert the remaining options into a QAPI object */
875     v = qobject_input_visitor_new_flat_confused(options, errp);
876     if (!v) {
877         return -EINVAL;
878     }
879 
880     visit_type_BlockdevOptionsRbd(v, NULL, opts, errp);
881     visit_free(v);
882     if (!opts) {
883         return -EINVAL;
884     }
885 
886     return 0;
887 }
888 
889 static int qemu_rbd_attempt_legacy_options(QDict *options,
890                                            BlockdevOptionsRbd **opts,
891                                            char **keypairs)
892 {
893     char *filename;
894     int r;
895 
896     filename = g_strdup(qdict_get_try_str(options, "filename"));
897     if (!filename) {
898         return -EINVAL;
899     }
900     qdict_del(options, "filename");
901 
902     qemu_rbd_parse_filename(filename, options, NULL);
903 
904     /* keypairs freed by caller */
905     *keypairs = g_strdup(qdict_get_try_str(options, "=keyvalue-pairs"));
906     if (*keypairs) {
907         qdict_del(options, "=keyvalue-pairs");
908     }
909 
910     r = qemu_rbd_convert_options(options, opts, NULL);
911 
912     g_free(filename);
913     return r;
914 }
915 
916 static int qemu_rbd_open(BlockDriverState *bs, QDict *options, int flags,
917                          Error **errp)
918 {
919     BDRVRBDState *s = bs->opaque;
920     BlockdevOptionsRbd *opts = NULL;
921     const QDictEntry *e;
922     Error *local_err = NULL;
923     char *keypairs, *secretid;
924     rbd_image_info_t info;
925     int r;
926 
927     keypairs = g_strdup(qdict_get_try_str(options, "=keyvalue-pairs"));
928     if (keypairs) {
929         qdict_del(options, "=keyvalue-pairs");
930     }
931 
932     secretid = g_strdup(qdict_get_try_str(options, "password-secret"));
933     if (secretid) {
934         qdict_del(options, "password-secret");
935     }
936 
937     r = qemu_rbd_convert_options(options, &opts, &local_err);
938     if (local_err) {
939         /* If keypairs are present, that means some options are present in
940          * the modern option format.  Don't attempt to parse legacy option
941          * formats, as we won't support mixed usage. */
942         if (keypairs) {
943             error_propagate(errp, local_err);
944             goto out;
945         }
946 
947         /* If the initial attempt to convert and process the options failed,
948          * we may be attempting to open an image file that has the rbd options
949          * specified in the older format consisting of all key/value pairs
950          * encoded in the filename.  Go ahead and attempt to parse the
951          * filename, and see if we can pull out the required options. */
952         r = qemu_rbd_attempt_legacy_options(options, &opts, &keypairs);
953         if (r < 0) {
954             /* Propagate the original error, not the legacy parsing fallback
955              * error, as the latter was just a best-effort attempt. */
956             error_propagate(errp, local_err);
957             goto out;
958         }
959         /* Take care whenever deciding to actually deprecate; once this ability
960          * is removed, we will not be able to open any images with legacy-styled
961          * backing image strings. */
962         warn_report("RBD options encoded in the filename as keyvalue pairs "
963                     "is deprecated");
964     }
965 
966     /* Remove the processed options from the QDict (the visitor processes
967      * _all_ options in the QDict) */
968     while ((e = qdict_first(options))) {
969         qdict_del(options, e->key);
970     }
971 
972     r = qemu_rbd_connect(&s->cluster, &s->io_ctx, opts,
973                          !(flags & BDRV_O_NOCACHE), keypairs, secretid, errp);
974     if (r < 0) {
975         goto out;
976     }
977 
978     s->snap = g_strdup(opts->snapshot);
979     s->image_name = g_strdup(opts->image);
980 
981     /* rbd_open is always r/w */
982     r = rbd_open(s->io_ctx, s->image_name, &s->image, s->snap);
983     if (r < 0) {
984         error_setg_errno(errp, -r, "error reading header from %s",
985                          s->image_name);
986         goto failed_open;
987     }
988 
989     if (opts->encrypt) {
990 #ifdef LIBRBD_SUPPORTS_ENCRYPTION
991         r = qemu_rbd_encryption_load(s->image, opts->encrypt, errp);
992         if (r < 0) {
993             goto failed_post_open;
994         }
995 #else
996         r = -ENOTSUP;
997         error_setg(errp, "RBD library does not support image encryption");
998         goto failed_post_open;
999 #endif
1000     }
1001 
1002     r = rbd_stat(s->image, &info, sizeof(info));
1003     if (r < 0) {
1004         error_setg_errno(errp, -r, "error getting image info from %s",
1005                          s->image_name);
1006         goto failed_post_open;
1007     }
1008     s->image_size = info.size;
1009     s->object_size = info.obj_size;
1010 
1011     /* If we are using an rbd snapshot, we must be r/o, otherwise
1012      * leave as-is */
1013     if (s->snap != NULL) {
1014         r = bdrv_apply_auto_read_only(bs, "rbd snapshots are read-only", errp);
1015         if (r < 0) {
1016             goto failed_post_open;
1017         }
1018     }
1019 
1020 #ifdef LIBRBD_SUPPORTS_WRITE_ZEROES
1021     bs->supported_zero_flags = BDRV_REQ_MAY_UNMAP | BDRV_REQ_NO_FALLBACK;
1022 #endif
1023 
1024     /* When extending regular files, we get zeros from the OS */
1025     bs->supported_truncate_flags = BDRV_REQ_ZERO_WRITE;
1026 
1027     r = 0;
1028     goto out;
1029 
1030 failed_post_open:
1031     rbd_close(s->image);
1032 failed_open:
1033     rados_ioctx_destroy(s->io_ctx);
1034     g_free(s->snap);
1035     g_free(s->image_name);
1036     rados_shutdown(s->cluster);
1037 out:
1038     qapi_free_BlockdevOptionsRbd(opts);
1039     g_free(keypairs);
1040     g_free(secretid);
1041     return r;
1042 }
1043 
1044 
1045 /* Since RBD is currently always opened R/W via the API,
1046  * we just need to check if we are using a snapshot or not, in
1047  * order to determine if we will allow it to be R/W */
1048 static int qemu_rbd_reopen_prepare(BDRVReopenState *state,
1049                                    BlockReopenQueue *queue, Error **errp)
1050 {
1051     BDRVRBDState *s = state->bs->opaque;
1052     int ret = 0;
1053 
1054     if (s->snap && state->flags & BDRV_O_RDWR) {
1055         error_setg(errp,
1056                    "Cannot change node '%s' to r/w when using RBD snapshot",
1057                    bdrv_get_device_or_node_name(state->bs));
1058         ret = -EINVAL;
1059     }
1060 
1061     return ret;
1062 }
1063 
1064 static void qemu_rbd_close(BlockDriverState *bs)
1065 {
1066     BDRVRBDState *s = bs->opaque;
1067 
1068     rbd_close(s->image);
1069     rados_ioctx_destroy(s->io_ctx);
1070     g_free(s->snap);
1071     g_free(s->image_name);
1072     rados_shutdown(s->cluster);
1073 }
1074 
1075 /* Resize the RBD image and update the 'image_size' with the current size */
1076 static int qemu_rbd_resize(BlockDriverState *bs, uint64_t size)
1077 {
1078     BDRVRBDState *s = bs->opaque;
1079     int r;
1080 
1081     r = rbd_resize(s->image, size);
1082     if (r < 0) {
1083         return r;
1084     }
1085 
1086     s->image_size = size;
1087 
1088     return 0;
1089 }
1090 
1091 static void qemu_rbd_finish_bh(void *opaque)
1092 {
1093     RBDTask *task = opaque;
1094     task->complete = true;
1095     aio_co_wake(task->co);
1096 }
1097 
1098 /*
1099  * This is the completion callback function for all rbd aio calls
1100  * started from qemu_rbd_start_co().
1101  *
1102  * Note: this function is being called from a non qemu thread so
1103  * we need to be careful about what we do here. Generally we only
1104  * schedule a BH, and do the rest of the io completion handling
1105  * from qemu_rbd_finish_bh() which runs in a qemu context.
1106  */
1107 static void qemu_rbd_completion_cb(rbd_completion_t c, RBDTask *task)
1108 {
1109     task->ret = rbd_aio_get_return_value(c);
1110     rbd_aio_release(c);
1111     aio_bh_schedule_oneshot(bdrv_get_aio_context(task->bs),
1112                             qemu_rbd_finish_bh, task);
1113 }
1114 
1115 static int coroutine_fn qemu_rbd_start_co(BlockDriverState *bs,
1116                                           uint64_t offset,
1117                                           uint64_t bytes,
1118                                           QEMUIOVector *qiov,
1119                                           int flags,
1120                                           RBDAIOCmd cmd)
1121 {
1122     BDRVRBDState *s = bs->opaque;
1123     RBDTask task = { .bs = bs, .co = qemu_coroutine_self() };
1124     rbd_completion_t c;
1125     int r;
1126 
1127     assert(!qiov || qiov->size == bytes);
1128 
1129     if (cmd == RBD_AIO_WRITE || cmd == RBD_AIO_WRITE_ZEROES) {
1130         /*
1131          * RBD APIs don't allow us to write more than actual size, so in order
1132          * to support growing images, we resize the image before write
1133          * operations that exceed the current size.
1134          */
1135         if (offset + bytes > s->image_size) {
1136             int r = qemu_rbd_resize(bs, offset + bytes);
1137             if (r < 0) {
1138                 return r;
1139             }
1140         }
1141     }
1142 
1143     r = rbd_aio_create_completion(&task,
1144                                   (rbd_callback_t) qemu_rbd_completion_cb, &c);
1145     if (r < 0) {
1146         return r;
1147     }
1148 
1149     switch (cmd) {
1150     case RBD_AIO_READ:
1151         r = rbd_aio_readv(s->image, qiov->iov, qiov->niov, offset, c);
1152         break;
1153     case RBD_AIO_WRITE:
1154         r = rbd_aio_writev(s->image, qiov->iov, qiov->niov, offset, c);
1155         break;
1156     case RBD_AIO_DISCARD:
1157         r = rbd_aio_discard(s->image, offset, bytes, c);
1158         break;
1159     case RBD_AIO_FLUSH:
1160         r = rbd_aio_flush(s->image, c);
1161         break;
1162 #ifdef LIBRBD_SUPPORTS_WRITE_ZEROES
1163     case RBD_AIO_WRITE_ZEROES: {
1164         int zero_flags = 0;
1165 #ifdef RBD_WRITE_ZEROES_FLAG_THICK_PROVISION
1166         if (!(flags & BDRV_REQ_MAY_UNMAP)) {
1167             zero_flags = RBD_WRITE_ZEROES_FLAG_THICK_PROVISION;
1168         }
1169 #endif
1170         r = rbd_aio_write_zeroes(s->image, offset, bytes, c, zero_flags, 0);
1171         break;
1172     }
1173 #endif
1174     default:
1175         r = -EINVAL;
1176     }
1177 
1178     if (r < 0) {
1179         error_report("rbd request failed early: cmd %d offset %" PRIu64
1180                      " bytes %" PRIu64 " flags %d r %d (%s)", cmd, offset,
1181                      bytes, flags, r, strerror(-r));
1182         rbd_aio_release(c);
1183         return r;
1184     }
1185 
1186     while (!task.complete) {
1187         qemu_coroutine_yield();
1188     }
1189 
1190     if (task.ret < 0) {
1191         error_report("rbd request failed: cmd %d offset %" PRIu64 " bytes %"
1192                      PRIu64 " flags %d task.ret %" PRIi64 " (%s)", cmd, offset,
1193                      bytes, flags, task.ret, strerror(-task.ret));
1194         return task.ret;
1195     }
1196 
1197     /* zero pad short reads */
1198     if (cmd == RBD_AIO_READ && task.ret < qiov->size) {
1199         qemu_iovec_memset(qiov, task.ret, 0, qiov->size - task.ret);
1200     }
1201 
1202     return 0;
1203 }
1204 
1205 static int
1206 coroutine_fn qemu_rbd_co_preadv(BlockDriverState *bs, int64_t offset,
1207                                 int64_t bytes, QEMUIOVector *qiov,
1208                                 BdrvRequestFlags flags)
1209 {
1210     return qemu_rbd_start_co(bs, offset, bytes, qiov, flags, RBD_AIO_READ);
1211 }
1212 
1213 static int
1214 coroutine_fn qemu_rbd_co_pwritev(BlockDriverState *bs, int64_t offset,
1215                                  int64_t bytes, QEMUIOVector *qiov,
1216                                  BdrvRequestFlags flags)
1217 {
1218     return qemu_rbd_start_co(bs, offset, bytes, qiov, flags, RBD_AIO_WRITE);
1219 }
1220 
1221 static int coroutine_fn qemu_rbd_co_flush(BlockDriverState *bs)
1222 {
1223     return qemu_rbd_start_co(bs, 0, 0, NULL, 0, RBD_AIO_FLUSH);
1224 }
1225 
1226 static int coroutine_fn qemu_rbd_co_pdiscard(BlockDriverState *bs,
1227                                              int64_t offset, int64_t bytes)
1228 {
1229     return qemu_rbd_start_co(bs, offset, bytes, NULL, 0, RBD_AIO_DISCARD);
1230 }
1231 
1232 #ifdef LIBRBD_SUPPORTS_WRITE_ZEROES
1233 static int
1234 coroutine_fn qemu_rbd_co_pwrite_zeroes(BlockDriverState *bs, int64_t offset,
1235                                        int64_t bytes, BdrvRequestFlags flags)
1236 {
1237     return qemu_rbd_start_co(bs, offset, bytes, NULL, flags,
1238                              RBD_AIO_WRITE_ZEROES);
1239 }
1240 #endif
1241 
1242 static int qemu_rbd_getinfo(BlockDriverState *bs, BlockDriverInfo *bdi)
1243 {
1244     BDRVRBDState *s = bs->opaque;
1245     bdi->cluster_size = s->object_size;
1246     return 0;
1247 }
1248 
1249 static ImageInfoSpecific *qemu_rbd_get_specific_info(BlockDriverState *bs,
1250                                                      Error **errp)
1251 {
1252     BDRVRBDState *s = bs->opaque;
1253     ImageInfoSpecific *spec_info;
1254     char buf[RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN] = {0};
1255     int r;
1256 
1257     if (s->image_size >= RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN) {
1258         r = rbd_read(s->image, 0,
1259                      RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN, buf);
1260         if (r < 0) {
1261             error_setg_errno(errp, -r, "cannot read image start for probe");
1262             return NULL;
1263         }
1264     }
1265 
1266     spec_info = g_new(ImageInfoSpecific, 1);
1267     *spec_info = (ImageInfoSpecific){
1268         .type  = IMAGE_INFO_SPECIFIC_KIND_RBD,
1269         .u.rbd.data = g_new0(ImageInfoSpecificRbd, 1),
1270     };
1271 
1272     if (memcmp(buf, rbd_luks_header_verification,
1273                RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN) == 0) {
1274         spec_info->u.rbd.data->encryption_format =
1275                 RBD_IMAGE_ENCRYPTION_FORMAT_LUKS;
1276         spec_info->u.rbd.data->has_encryption_format = true;
1277     } else if (memcmp(buf, rbd_luks2_header_verification,
1278                RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN) == 0) {
1279         spec_info->u.rbd.data->encryption_format =
1280                 RBD_IMAGE_ENCRYPTION_FORMAT_LUKS2;
1281         spec_info->u.rbd.data->has_encryption_format = true;
1282     } else {
1283         spec_info->u.rbd.data->has_encryption_format = false;
1284     }
1285 
1286     return spec_info;
1287 }
1288 
1289 /*
1290  * rbd_diff_iterate2 allows to interrupt the exection by returning a negative
1291  * value in the callback routine. Choose a value that does not conflict with
1292  * an existing exitcode and return it if we want to prematurely stop the
1293  * execution because we detected a change in the allocation status.
1294  */
1295 #define QEMU_RBD_EXIT_DIFF_ITERATE2 -9000
1296 
1297 static int qemu_rbd_diff_iterate_cb(uint64_t offs, size_t len,
1298                                     int exists, void *opaque)
1299 {
1300     RBDDiffIterateReq *req = opaque;
1301 
1302     assert(req->offs + req->bytes <= offs);
1303 
1304     /* treat a hole like an unallocated area and bail out */
1305     if (!exists) {
1306         return 0;
1307     }
1308 
1309     if (!req->exists && offs > req->offs) {
1310         /*
1311          * we started in an unallocated area and hit the first allocated
1312          * block. req->bytes must be set to the length of the unallocated area
1313          * before the allocated area. stop further processing.
1314          */
1315         req->bytes = offs - req->offs;
1316         return QEMU_RBD_EXIT_DIFF_ITERATE2;
1317     }
1318 
1319     if (req->exists && offs > req->offs + req->bytes) {
1320         /*
1321          * we started in an allocated area and jumped over an unallocated area,
1322          * req->bytes contains the length of the allocated area before the
1323          * unallocated area. stop further processing.
1324          */
1325         return QEMU_RBD_EXIT_DIFF_ITERATE2;
1326     }
1327 
1328     req->bytes += len;
1329     req->exists = true;
1330 
1331     return 0;
1332 }
1333 
1334 static int coroutine_fn qemu_rbd_co_block_status(BlockDriverState *bs,
1335                                                  bool want_zero, int64_t offset,
1336                                                  int64_t bytes, int64_t *pnum,
1337                                                  int64_t *map,
1338                                                  BlockDriverState **file)
1339 {
1340     BDRVRBDState *s = bs->opaque;
1341     int status, r;
1342     RBDDiffIterateReq req = { .offs = offset };
1343     uint64_t features, flags;
1344     uint64_t head = 0;
1345 
1346     assert(offset + bytes <= s->image_size);
1347 
1348     /* default to all sectors allocated */
1349     status = BDRV_BLOCK_DATA | BDRV_BLOCK_OFFSET_VALID;
1350     *map = offset;
1351     *file = bs;
1352     *pnum = bytes;
1353 
1354     /* check if RBD image supports fast-diff */
1355     r = rbd_get_features(s->image, &features);
1356     if (r < 0) {
1357         return status;
1358     }
1359     if (!(features & RBD_FEATURE_FAST_DIFF)) {
1360         return status;
1361     }
1362 
1363     /* check if RBD fast-diff result is valid */
1364     r = rbd_get_flags(s->image, &flags);
1365     if (r < 0) {
1366         return status;
1367     }
1368     if (flags & RBD_FLAG_FAST_DIFF_INVALID) {
1369         return status;
1370     }
1371 
1372 #if LIBRBD_VERSION_CODE < LIBRBD_VERSION(1, 17, 0)
1373     /*
1374      * librbd had a bug until early 2022 that affected all versions of ceph that
1375      * supported fast-diff. This bug results in reporting of incorrect offsets
1376      * if the offset parameter to rbd_diff_iterate2 is not object aligned.
1377      * Work around this bug by rounding down the offset to object boundaries.
1378      * This is OK because we call rbd_diff_iterate2 with whole_object = true.
1379      * However, this workaround only works for non cloned images with default
1380      * striping.
1381      *
1382      * See: https://tracker.ceph.com/issues/53784
1383      */
1384 
1385     /* check if RBD image has non-default striping enabled */
1386     if (features & RBD_FEATURE_STRIPINGV2) {
1387         return status;
1388     }
1389 
1390 #pragma GCC diagnostic push
1391 #pragma GCC diagnostic ignored "-Wdeprecated-declarations"
1392     /*
1393      * check if RBD image is a clone (= has a parent).
1394      *
1395      * rbd_get_parent_info is deprecated from Nautilus onwards, but the
1396      * replacement rbd_get_parent is not present in Luminous and Mimic.
1397      */
1398     if (rbd_get_parent_info(s->image, NULL, 0, NULL, 0, NULL, 0) != -ENOENT) {
1399         return status;
1400     }
1401 #pragma GCC diagnostic pop
1402 
1403     head = req.offs & (s->object_size - 1);
1404     req.offs -= head;
1405     bytes += head;
1406 #endif
1407 
1408     r = rbd_diff_iterate2(s->image, NULL, req.offs, bytes, true, true,
1409                           qemu_rbd_diff_iterate_cb, &req);
1410     if (r < 0 && r != QEMU_RBD_EXIT_DIFF_ITERATE2) {
1411         return status;
1412     }
1413     assert(req.bytes <= bytes);
1414     if (!req.exists) {
1415         if (r == 0) {
1416             /*
1417              * rbd_diff_iterate2 does not invoke callbacks for unallocated
1418              * areas. This here catches the case where no callback was
1419              * invoked at all (req.bytes == 0).
1420              */
1421             assert(req.bytes == 0);
1422             req.bytes = bytes;
1423         }
1424         status = BDRV_BLOCK_ZERO | BDRV_BLOCK_OFFSET_VALID;
1425     }
1426 
1427     assert(req.bytes > head);
1428     *pnum = req.bytes - head;
1429     return status;
1430 }
1431 
1432 static int64_t qemu_rbd_getlength(BlockDriverState *bs)
1433 {
1434     BDRVRBDState *s = bs->opaque;
1435     int r;
1436 
1437     r = rbd_get_size(s->image, &s->image_size);
1438     if (r < 0) {
1439         return r;
1440     }
1441 
1442     return s->image_size;
1443 }
1444 
1445 static int coroutine_fn qemu_rbd_co_truncate(BlockDriverState *bs,
1446                                              int64_t offset,
1447                                              bool exact,
1448                                              PreallocMode prealloc,
1449                                              BdrvRequestFlags flags,
1450                                              Error **errp)
1451 {
1452     int r;
1453 
1454     if (prealloc != PREALLOC_MODE_OFF) {
1455         error_setg(errp, "Unsupported preallocation mode '%s'",
1456                    PreallocMode_str(prealloc));
1457         return -ENOTSUP;
1458     }
1459 
1460     r = qemu_rbd_resize(bs, offset);
1461     if (r < 0) {
1462         error_setg_errno(errp, -r, "Failed to resize file");
1463         return r;
1464     }
1465 
1466     return 0;
1467 }
1468 
1469 static int qemu_rbd_snap_create(BlockDriverState *bs,
1470                                 QEMUSnapshotInfo *sn_info)
1471 {
1472     BDRVRBDState *s = bs->opaque;
1473     int r;
1474 
1475     if (sn_info->name[0] == '\0') {
1476         return -EINVAL; /* we need a name for rbd snapshots */
1477     }
1478 
1479     /*
1480      * rbd snapshots are using the name as the user controlled unique identifier
1481      * we can't use the rbd snapid for that purpose, as it can't be set
1482      */
1483     if (sn_info->id_str[0] != '\0' &&
1484         strcmp(sn_info->id_str, sn_info->name) != 0) {
1485         return -EINVAL;
1486     }
1487 
1488     if (strlen(sn_info->name) >= sizeof(sn_info->id_str)) {
1489         return -ERANGE;
1490     }
1491 
1492     r = rbd_snap_create(s->image, sn_info->name);
1493     if (r < 0) {
1494         error_report("failed to create snap: %s", strerror(-r));
1495         return r;
1496     }
1497 
1498     return 0;
1499 }
1500 
1501 static int qemu_rbd_snap_remove(BlockDriverState *bs,
1502                                 const char *snapshot_id,
1503                                 const char *snapshot_name,
1504                                 Error **errp)
1505 {
1506     BDRVRBDState *s = bs->opaque;
1507     int r;
1508 
1509     if (!snapshot_name) {
1510         error_setg(errp, "rbd need a valid snapshot name");
1511         return -EINVAL;
1512     }
1513 
1514     /* If snapshot_id is specified, it must be equal to name, see
1515        qemu_rbd_snap_list() */
1516     if (snapshot_id && strcmp(snapshot_id, snapshot_name)) {
1517         error_setg(errp,
1518                    "rbd do not support snapshot id, it should be NULL or "
1519                    "equal to snapshot name");
1520         return -EINVAL;
1521     }
1522 
1523     r = rbd_snap_remove(s->image, snapshot_name);
1524     if (r < 0) {
1525         error_setg_errno(errp, -r, "Failed to remove the snapshot");
1526     }
1527     return r;
1528 }
1529 
1530 static int qemu_rbd_snap_rollback(BlockDriverState *bs,
1531                                   const char *snapshot_name)
1532 {
1533     BDRVRBDState *s = bs->opaque;
1534 
1535     return rbd_snap_rollback(s->image, snapshot_name);
1536 }
1537 
1538 static int qemu_rbd_snap_list(BlockDriverState *bs,
1539                               QEMUSnapshotInfo **psn_tab)
1540 {
1541     BDRVRBDState *s = bs->opaque;
1542     QEMUSnapshotInfo *sn_info, *sn_tab = NULL;
1543     int i, snap_count;
1544     rbd_snap_info_t *snaps;
1545     int max_snaps = RBD_MAX_SNAPS;
1546 
1547     do {
1548         snaps = g_new(rbd_snap_info_t, max_snaps);
1549         snap_count = rbd_snap_list(s->image, snaps, &max_snaps);
1550         if (snap_count <= 0) {
1551             g_free(snaps);
1552         }
1553     } while (snap_count == -ERANGE);
1554 
1555     if (snap_count <= 0) {
1556         goto done;
1557     }
1558 
1559     sn_tab = g_new0(QEMUSnapshotInfo, snap_count);
1560 
1561     for (i = 0; i < snap_count; i++) {
1562         const char *snap_name = snaps[i].name;
1563 
1564         sn_info = sn_tab + i;
1565         pstrcpy(sn_info->id_str, sizeof(sn_info->id_str), snap_name);
1566         pstrcpy(sn_info->name, sizeof(sn_info->name), snap_name);
1567 
1568         sn_info->vm_state_size = snaps[i].size;
1569         sn_info->date_sec = 0;
1570         sn_info->date_nsec = 0;
1571         sn_info->vm_clock_nsec = 0;
1572     }
1573     rbd_snap_list_end(snaps);
1574     g_free(snaps);
1575 
1576  done:
1577     *psn_tab = sn_tab;
1578     return snap_count;
1579 }
1580 
1581 static void coroutine_fn qemu_rbd_co_invalidate_cache(BlockDriverState *bs,
1582                                                       Error **errp)
1583 {
1584     BDRVRBDState *s = bs->opaque;
1585     int r = rbd_invalidate_cache(s->image);
1586     if (r < 0) {
1587         error_setg_errno(errp, -r, "Failed to invalidate the cache");
1588     }
1589 }
1590 
1591 static QemuOptsList qemu_rbd_create_opts = {
1592     .name = "rbd-create-opts",
1593     .head = QTAILQ_HEAD_INITIALIZER(qemu_rbd_create_opts.head),
1594     .desc = {
1595         {
1596             .name = BLOCK_OPT_SIZE,
1597             .type = QEMU_OPT_SIZE,
1598             .help = "Virtual disk size"
1599         },
1600         {
1601             .name = BLOCK_OPT_CLUSTER_SIZE,
1602             .type = QEMU_OPT_SIZE,
1603             .help = "RBD object size"
1604         },
1605         {
1606             .name = "password-secret",
1607             .type = QEMU_OPT_STRING,
1608             .help = "ID of secret providing the password",
1609         },
1610         {
1611             .name = "encrypt.format",
1612             .type = QEMU_OPT_STRING,
1613             .help = "Encrypt the image, format choices: 'luks', 'luks2'",
1614         },
1615         {
1616             .name = "encrypt.cipher-alg",
1617             .type = QEMU_OPT_STRING,
1618             .help = "Name of encryption cipher algorithm"
1619                     " (allowed values: aes-128, aes-256)",
1620         },
1621         {
1622             .name = "encrypt.key-secret",
1623             .type = QEMU_OPT_STRING,
1624             .help = "ID of secret providing LUKS passphrase",
1625         },
1626         { /* end of list */ }
1627     }
1628 };
1629 
1630 static const char *const qemu_rbd_strong_runtime_opts[] = {
1631     "pool",
1632     "namespace",
1633     "image",
1634     "conf",
1635     "snapshot",
1636     "user",
1637     "server.",
1638     "password-secret",
1639 
1640     NULL
1641 };
1642 
1643 static BlockDriver bdrv_rbd = {
1644     .format_name            = "rbd",
1645     .instance_size          = sizeof(BDRVRBDState),
1646     .bdrv_parse_filename    = qemu_rbd_parse_filename,
1647     .bdrv_file_open         = qemu_rbd_open,
1648     .bdrv_close             = qemu_rbd_close,
1649     .bdrv_reopen_prepare    = qemu_rbd_reopen_prepare,
1650     .bdrv_co_create         = qemu_rbd_co_create,
1651     .bdrv_co_create_opts    = qemu_rbd_co_create_opts,
1652     .bdrv_has_zero_init     = bdrv_has_zero_init_1,
1653     .bdrv_get_info          = qemu_rbd_getinfo,
1654     .bdrv_get_specific_info = qemu_rbd_get_specific_info,
1655     .create_opts            = &qemu_rbd_create_opts,
1656     .bdrv_getlength         = qemu_rbd_getlength,
1657     .bdrv_co_truncate       = qemu_rbd_co_truncate,
1658     .protocol_name          = "rbd",
1659 
1660     .bdrv_co_preadv         = qemu_rbd_co_preadv,
1661     .bdrv_co_pwritev        = qemu_rbd_co_pwritev,
1662     .bdrv_co_flush_to_disk  = qemu_rbd_co_flush,
1663     .bdrv_co_pdiscard       = qemu_rbd_co_pdiscard,
1664 #ifdef LIBRBD_SUPPORTS_WRITE_ZEROES
1665     .bdrv_co_pwrite_zeroes  = qemu_rbd_co_pwrite_zeroes,
1666 #endif
1667     .bdrv_co_block_status   = qemu_rbd_co_block_status,
1668 
1669     .bdrv_snapshot_create   = qemu_rbd_snap_create,
1670     .bdrv_snapshot_delete   = qemu_rbd_snap_remove,
1671     .bdrv_snapshot_list     = qemu_rbd_snap_list,
1672     .bdrv_snapshot_goto     = qemu_rbd_snap_rollback,
1673     .bdrv_co_invalidate_cache = qemu_rbd_co_invalidate_cache,
1674 
1675     .strong_runtime_opts    = qemu_rbd_strong_runtime_opts,
1676 };
1677 
1678 static void bdrv_rbd_init(void)
1679 {
1680     bdrv_register(&bdrv_rbd);
1681 }
1682 
1683 block_init(bdrv_rbd_init);
1684