xref: /openbmc/qemu/block/rbd.c (revision a75ed3c43064528f3409f0be286b62b9c3a47218)
1  /*
2   * QEMU Block driver for RADOS (Ceph)
3   *
4   * Copyright (C) 2010-2011 Christian Brunner <chb@muc.de>,
5   *                         Josh Durgin <josh.durgin@dreamhost.com>
6   *
7   * This work is licensed under the terms of the GNU GPL, version 2.  See
8   * the COPYING file in the top-level directory.
9   *
10   * Contributions after 2012-01-13 are licensed under the terms of the
11   * GNU GPL, version 2 or (at your option) any later version.
12   */
13  
14  #include "qemu/osdep.h"
15  
16  #include <rbd/librbd.h>
17  #include "qapi/error.h"
18  #include "qemu/error-report.h"
19  #include "qemu/module.h"
20  #include "qemu/option.h"
21  #include "block/block_int.h"
22  #include "block/qdict.h"
23  #include "crypto/secret.h"
24  #include "qemu/cutils.h"
25  #include "sysemu/replay.h"
26  #include "qapi/qmp/qstring.h"
27  #include "qapi/qmp/qdict.h"
28  #include "qapi/qmp/qjson.h"
29  #include "qapi/qmp/qlist.h"
30  #include "qapi/qobject-input-visitor.h"
31  #include "qapi/qapi-visit-block-core.h"
32  
33  /*
34   * When specifying the image filename use:
35   *
36   * rbd:poolname/devicename[@snapshotname][:option1=value1[:option2=value2...]]
37   *
38   * poolname must be the name of an existing rados pool.
39   *
40   * devicename is the name of the rbd image.
41   *
42   * Each option given is used to configure rados, and may be any valid
43   * Ceph option, "id", or "conf".
44   *
45   * The "id" option indicates what user we should authenticate as to
46   * the Ceph cluster.  If it is excluded we will use the Ceph default
47   * (normally 'admin').
48   *
49   * The "conf" option specifies a Ceph configuration file to read.  If
50   * it is not specified, we will read from the default Ceph locations
51   * (e.g., /etc/ceph/ceph.conf).  To avoid reading _any_ configuration
52   * file, specify conf=/dev/null.
53   *
54   * Configuration values containing :, @, or = can be escaped with a
55   * leading "\".
56   */
57  
58  #define OBJ_MAX_SIZE (1UL << OBJ_DEFAULT_OBJ_ORDER)
59  
60  #define RBD_MAX_SNAPS 100
61  
62  #define RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN 8
63  
64  static const char rbd_luks_header_verification[
65          RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN] = {
66      'L', 'U', 'K', 'S', 0xBA, 0xBE, 0, 1
67  };
68  
69  static const char rbd_luks2_header_verification[
70          RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN] = {
71      'L', 'U', 'K', 'S', 0xBA, 0xBE, 0, 2
72  };
73  
74  typedef enum {
75      RBD_AIO_READ,
76      RBD_AIO_WRITE,
77      RBD_AIO_DISCARD,
78      RBD_AIO_FLUSH,
79      RBD_AIO_WRITE_ZEROES
80  } RBDAIOCmd;
81  
82  typedef struct BDRVRBDState {
83      rados_t cluster;
84      rados_ioctx_t io_ctx;
85      rbd_image_t image;
86      char *image_name;
87      char *snap;
88      char *namespace;
89      uint64_t image_size;
90      uint64_t object_size;
91  } BDRVRBDState;
92  
93  typedef struct RBDTask {
94      BlockDriverState *bs;
95      Coroutine *co;
96      bool complete;
97      int64_t ret;
98  } RBDTask;
99  
100  typedef struct RBDDiffIterateReq {
101      uint64_t offs;
102      uint64_t bytes;
103      bool exists;
104  } RBDDiffIterateReq;
105  
106  static int qemu_rbd_connect(rados_t *cluster, rados_ioctx_t *io_ctx,
107                              BlockdevOptionsRbd *opts, bool cache,
108                              const char *keypairs, const char *secretid,
109                              Error **errp);
110  
111  static char *qemu_rbd_strchr(char *src, char delim)
112  {
113      char *p;
114  
115      for (p = src; *p; ++p) {
116          if (*p == delim) {
117              return p;
118          }
119          if (*p == '\\' && p[1] != '\0') {
120              ++p;
121          }
122      }
123  
124      return NULL;
125  }
126  
127  
128  static char *qemu_rbd_next_tok(char *src, char delim, char **p)
129  {
130      char *end;
131  
132      *p = NULL;
133  
134      end = qemu_rbd_strchr(src, delim);
135      if (end) {
136          *p = end + 1;
137          *end = '\0';
138      }
139      return src;
140  }
141  
142  static void qemu_rbd_unescape(char *src)
143  {
144      char *p;
145  
146      for (p = src; *src; ++src, ++p) {
147          if (*src == '\\' && src[1] != '\0') {
148              src++;
149          }
150          *p = *src;
151      }
152      *p = '\0';
153  }
154  
155  static void qemu_rbd_parse_filename(const char *filename, QDict *options,
156                                      Error **errp)
157  {
158      const char *start;
159      char *p, *buf;
160      QList *keypairs = NULL;
161      char *found_str, *image_name;
162  
163      if (!strstart(filename, "rbd:", &start)) {
164          error_setg(errp, "File name must start with 'rbd:'");
165          return;
166      }
167  
168      buf = g_strdup(start);
169      p = buf;
170  
171      found_str = qemu_rbd_next_tok(p, '/', &p);
172      if (!p) {
173          error_setg(errp, "Pool name is required");
174          goto done;
175      }
176      qemu_rbd_unescape(found_str);
177      qdict_put_str(options, "pool", found_str);
178  
179      if (qemu_rbd_strchr(p, '@')) {
180          image_name = qemu_rbd_next_tok(p, '@', &p);
181  
182          found_str = qemu_rbd_next_tok(p, ':', &p);
183          qemu_rbd_unescape(found_str);
184          qdict_put_str(options, "snapshot", found_str);
185      } else {
186          image_name = qemu_rbd_next_tok(p, ':', &p);
187      }
188      /* Check for namespace in the image_name */
189      if (qemu_rbd_strchr(image_name, '/')) {
190          found_str = qemu_rbd_next_tok(image_name, '/', &image_name);
191          qemu_rbd_unescape(found_str);
192          qdict_put_str(options, "namespace", found_str);
193      } else {
194          qdict_put_str(options, "namespace", "");
195      }
196      qemu_rbd_unescape(image_name);
197      qdict_put_str(options, "image", image_name);
198      if (!p) {
199          goto done;
200      }
201  
202      /* The following are essentially all key/value pairs, and we treat
203       * 'id' and 'conf' a bit special.  Key/value pairs may be in any order. */
204      while (p) {
205          char *name, *value;
206          name = qemu_rbd_next_tok(p, '=', &p);
207          if (!p) {
208              error_setg(errp, "conf option %s has no value", name);
209              break;
210          }
211  
212          qemu_rbd_unescape(name);
213  
214          value = qemu_rbd_next_tok(p, ':', &p);
215          qemu_rbd_unescape(value);
216  
217          if (!strcmp(name, "conf")) {
218              qdict_put_str(options, "conf", value);
219          } else if (!strcmp(name, "id")) {
220              qdict_put_str(options, "user", value);
221          } else {
222              /*
223               * We pass these internally to qemu_rbd_set_keypairs(), so
224               * we can get away with the simpler list of [ "key1",
225               * "value1", "key2", "value2" ] rather than a raw dict
226               * { "key1": "value1", "key2": "value2" } where we can't
227               * guarantee order, or even a more correct but complex
228               * [ { "key1": "value1" }, { "key2": "value2" } ]
229               */
230              if (!keypairs) {
231                  keypairs = qlist_new();
232              }
233              qlist_append_str(keypairs, name);
234              qlist_append_str(keypairs, value);
235          }
236      }
237  
238      if (keypairs) {
239          qdict_put(options, "=keyvalue-pairs",
240                    qstring_from_gstring(qobject_to_json(QOBJECT(keypairs))));
241      }
242  
243  done:
244      g_free(buf);
245      qobject_unref(keypairs);
246      return;
247  }
248  
249  static int qemu_rbd_set_auth(rados_t cluster, BlockdevOptionsRbd *opts,
250                               Error **errp)
251  {
252      char *key, *acr;
253      int r;
254      GString *accu;
255      RbdAuthModeList *auth;
256  
257      if (opts->key_secret) {
258          key = qcrypto_secret_lookup_as_base64(opts->key_secret, errp);
259          if (!key) {
260              return -EIO;
261          }
262          r = rados_conf_set(cluster, "key", key);
263          g_free(key);
264          if (r < 0) {
265              error_setg_errno(errp, -r, "Could not set 'key'");
266              return r;
267          }
268      }
269  
270      if (opts->has_auth_client_required) {
271          accu = g_string_new("");
272          for (auth = opts->auth_client_required; auth; auth = auth->next) {
273              if (accu->str[0]) {
274                  g_string_append_c(accu, ';');
275              }
276              g_string_append(accu, RbdAuthMode_str(auth->value));
277          }
278          acr = g_string_free(accu, FALSE);
279          r = rados_conf_set(cluster, "auth_client_required", acr);
280          g_free(acr);
281          if (r < 0) {
282              error_setg_errno(errp, -r,
283                               "Could not set 'auth_client_required'");
284              return r;
285          }
286      }
287  
288      return 0;
289  }
290  
291  static int qemu_rbd_set_keypairs(rados_t cluster, const char *keypairs_json,
292                                   Error **errp)
293  {
294      QList *keypairs;
295      QString *name;
296      QString *value;
297      const char *key;
298      size_t remaining;
299      int ret = 0;
300  
301      if (!keypairs_json) {
302          return ret;
303      }
304      keypairs = qobject_to(QList,
305                            qobject_from_json(keypairs_json, &error_abort));
306      remaining = qlist_size(keypairs) / 2;
307      assert(remaining);
308  
309      while (remaining--) {
310          name = qobject_to(QString, qlist_pop(keypairs));
311          value = qobject_to(QString, qlist_pop(keypairs));
312          assert(name && value);
313          key = qstring_get_str(name);
314  
315          ret = rados_conf_set(cluster, key, qstring_get_str(value));
316          qobject_unref(value);
317          if (ret < 0) {
318              error_setg_errno(errp, -ret, "invalid conf option %s", key);
319              qobject_unref(name);
320              ret = -EINVAL;
321              break;
322          }
323          qobject_unref(name);
324      }
325  
326      qobject_unref(keypairs);
327      return ret;
328  }
329  
330  #ifdef LIBRBD_SUPPORTS_ENCRYPTION
331  static int qemu_rbd_convert_luks_options(
332          RbdEncryptionOptionsLUKSBase *luks_opts,
333          char **passphrase,
334          size_t *passphrase_len,
335          Error **errp)
336  {
337      return qcrypto_secret_lookup(luks_opts->key_secret, (uint8_t **)passphrase,
338                                   passphrase_len, errp);
339  }
340  
341  static int qemu_rbd_convert_luks_create_options(
342          RbdEncryptionCreateOptionsLUKSBase *luks_opts,
343          rbd_encryption_algorithm_t *alg,
344          char **passphrase,
345          size_t *passphrase_len,
346          Error **errp)
347  {
348      int r = 0;
349  
350      r = qemu_rbd_convert_luks_options(
351              qapi_RbdEncryptionCreateOptionsLUKSBase_base(luks_opts),
352              passphrase, passphrase_len, errp);
353      if (r < 0) {
354          return r;
355      }
356  
357      if (luks_opts->has_cipher_alg) {
358          switch (luks_opts->cipher_alg) {
359              case QCRYPTO_CIPHER_ALG_AES_128: {
360                  *alg = RBD_ENCRYPTION_ALGORITHM_AES128;
361                  break;
362              }
363              case QCRYPTO_CIPHER_ALG_AES_256: {
364                  *alg = RBD_ENCRYPTION_ALGORITHM_AES256;
365                  break;
366              }
367              default: {
368                  r = -ENOTSUP;
369                  error_setg_errno(errp, -r, "unknown encryption algorithm: %u",
370                                   luks_opts->cipher_alg);
371                  return r;
372              }
373          }
374      } else {
375          /* default alg */
376          *alg = RBD_ENCRYPTION_ALGORITHM_AES256;
377      }
378  
379      return 0;
380  }
381  
382  static int qemu_rbd_encryption_format(rbd_image_t image,
383                                        RbdEncryptionCreateOptions *encrypt,
384                                        Error **errp)
385  {
386      int r = 0;
387      g_autofree char *passphrase = NULL;
388      size_t passphrase_len;
389      rbd_encryption_format_t format;
390      rbd_encryption_options_t opts;
391      rbd_encryption_luks1_format_options_t luks_opts;
392      rbd_encryption_luks2_format_options_t luks2_opts;
393      size_t opts_size;
394      uint64_t raw_size, effective_size;
395  
396      r = rbd_get_size(image, &raw_size);
397      if (r < 0) {
398          error_setg_errno(errp, -r, "cannot get raw image size");
399          return r;
400      }
401  
402      switch (encrypt->format) {
403          case RBD_IMAGE_ENCRYPTION_FORMAT_LUKS: {
404              memset(&luks_opts, 0, sizeof(luks_opts));
405              format = RBD_ENCRYPTION_FORMAT_LUKS1;
406              opts = &luks_opts;
407              opts_size = sizeof(luks_opts);
408              r = qemu_rbd_convert_luks_create_options(
409                      qapi_RbdEncryptionCreateOptionsLUKS_base(&encrypt->u.luks),
410                      &luks_opts.alg, &passphrase, &passphrase_len, errp);
411              if (r < 0) {
412                  return r;
413              }
414              luks_opts.passphrase = passphrase;
415              luks_opts.passphrase_size = passphrase_len;
416              break;
417          }
418          case RBD_IMAGE_ENCRYPTION_FORMAT_LUKS2: {
419              memset(&luks2_opts, 0, sizeof(luks2_opts));
420              format = RBD_ENCRYPTION_FORMAT_LUKS2;
421              opts = &luks2_opts;
422              opts_size = sizeof(luks2_opts);
423              r = qemu_rbd_convert_luks_create_options(
424                      qapi_RbdEncryptionCreateOptionsLUKS2_base(
425                              &encrypt->u.luks2),
426                      &luks2_opts.alg, &passphrase, &passphrase_len, errp);
427              if (r < 0) {
428                  return r;
429              }
430              luks2_opts.passphrase = passphrase;
431              luks2_opts.passphrase_size = passphrase_len;
432              break;
433          }
434          default: {
435              r = -ENOTSUP;
436              error_setg_errno(
437                      errp, -r, "unknown image encryption format: %u",
438                      encrypt->format);
439              return r;
440          }
441      }
442  
443      r = rbd_encryption_format(image, format, opts, opts_size);
444      if (r < 0) {
445          error_setg_errno(errp, -r, "encryption format fail");
446          return r;
447      }
448  
449      r = rbd_get_size(image, &effective_size);
450      if (r < 0) {
451          error_setg_errno(errp, -r, "cannot get effective image size");
452          return r;
453      }
454  
455      r = rbd_resize(image, raw_size + (raw_size - effective_size));
456      if (r < 0) {
457          error_setg_errno(errp, -r, "cannot resize image after format");
458          return r;
459      }
460  
461      return 0;
462  }
463  
464  static int qemu_rbd_encryption_load(rbd_image_t image,
465                                      RbdEncryptionOptions *encrypt,
466                                      Error **errp)
467  {
468      int r = 0;
469      g_autofree char *passphrase = NULL;
470      size_t passphrase_len;
471      rbd_encryption_luks1_format_options_t luks_opts;
472      rbd_encryption_luks2_format_options_t luks2_opts;
473      rbd_encryption_format_t format;
474      rbd_encryption_options_t opts;
475      size_t opts_size;
476  
477      switch (encrypt->format) {
478          case RBD_IMAGE_ENCRYPTION_FORMAT_LUKS: {
479              memset(&luks_opts, 0, sizeof(luks_opts));
480              format = RBD_ENCRYPTION_FORMAT_LUKS1;
481              opts = &luks_opts;
482              opts_size = sizeof(luks_opts);
483              r = qemu_rbd_convert_luks_options(
484                      qapi_RbdEncryptionOptionsLUKS_base(&encrypt->u.luks),
485                      &passphrase, &passphrase_len, errp);
486              if (r < 0) {
487                  return r;
488              }
489              luks_opts.passphrase = passphrase;
490              luks_opts.passphrase_size = passphrase_len;
491              break;
492          }
493          case RBD_IMAGE_ENCRYPTION_FORMAT_LUKS2: {
494              memset(&luks2_opts, 0, sizeof(luks2_opts));
495              format = RBD_ENCRYPTION_FORMAT_LUKS2;
496              opts = &luks2_opts;
497              opts_size = sizeof(luks2_opts);
498              r = qemu_rbd_convert_luks_options(
499                      qapi_RbdEncryptionOptionsLUKS2_base(&encrypt->u.luks2),
500                      &passphrase, &passphrase_len, errp);
501              if (r < 0) {
502                  return r;
503              }
504              luks2_opts.passphrase = passphrase;
505              luks2_opts.passphrase_size = passphrase_len;
506              break;
507          }
508          default: {
509              r = -ENOTSUP;
510              error_setg_errno(
511                      errp, -r, "unknown image encryption format: %u",
512                      encrypt->format);
513              return r;
514          }
515      }
516  
517      r = rbd_encryption_load(image, format, opts, opts_size);
518      if (r < 0) {
519          error_setg_errno(errp, -r, "encryption load fail");
520          return r;
521      }
522  
523      return 0;
524  }
525  #endif
526  
527  /* FIXME Deprecate and remove keypairs or make it available in QMP. */
528  static int qemu_rbd_do_create(BlockdevCreateOptions *options,
529                                const char *keypairs, const char *password_secret,
530                                Error **errp)
531  {
532      BlockdevCreateOptionsRbd *opts = &options->u.rbd;
533      rados_t cluster;
534      rados_ioctx_t io_ctx;
535      int obj_order = 0;
536      int ret;
537  
538      assert(options->driver == BLOCKDEV_DRIVER_RBD);
539      if (opts->location->snapshot) {
540          error_setg(errp, "Can't use snapshot name for image creation");
541          return -EINVAL;
542      }
543  
544  #ifndef LIBRBD_SUPPORTS_ENCRYPTION
545      if (opts->encrypt) {
546          error_setg(errp, "RBD library does not support image encryption");
547          return -ENOTSUP;
548      }
549  #endif
550  
551      if (opts->has_cluster_size) {
552          int64_t objsize = opts->cluster_size;
553          if ((objsize - 1) & objsize) {    /* not a power of 2? */
554              error_setg(errp, "obj size needs to be power of 2");
555              return -EINVAL;
556          }
557          if (objsize < 4096) {
558              error_setg(errp, "obj size too small");
559              return -EINVAL;
560          }
561          obj_order = ctz32(objsize);
562      }
563  
564      ret = qemu_rbd_connect(&cluster, &io_ctx, opts->location, false, keypairs,
565                             password_secret, errp);
566      if (ret < 0) {
567          return ret;
568      }
569  
570      ret = rbd_create(io_ctx, opts->location->image, opts->size, &obj_order);
571      if (ret < 0) {
572          error_setg_errno(errp, -ret, "error rbd create");
573          goto out;
574      }
575  
576  #ifdef LIBRBD_SUPPORTS_ENCRYPTION
577      if (opts->encrypt) {
578          rbd_image_t image;
579  
580          ret = rbd_open(io_ctx, opts->location->image, &image, NULL);
581          if (ret < 0) {
582              error_setg_errno(errp, -ret,
583                               "error opening image '%s' for encryption format",
584                               opts->location->image);
585              goto out;
586          }
587  
588          ret = qemu_rbd_encryption_format(image, opts->encrypt, errp);
589          rbd_close(image);
590          if (ret < 0) {
591              /* encryption format fail, try removing the image */
592              rbd_remove(io_ctx, opts->location->image);
593              goto out;
594          }
595      }
596  #endif
597  
598      ret = 0;
599  out:
600      rados_ioctx_destroy(io_ctx);
601      rados_shutdown(cluster);
602      return ret;
603  }
604  
605  static int qemu_rbd_co_create(BlockdevCreateOptions *options, Error **errp)
606  {
607      return qemu_rbd_do_create(options, NULL, NULL, errp);
608  }
609  
610  static int qemu_rbd_extract_encryption_create_options(
611          QemuOpts *opts,
612          RbdEncryptionCreateOptions **spec,
613          Error **errp)
614  {
615      QDict *opts_qdict;
616      QDict *encrypt_qdict;
617      Visitor *v;
618      int ret = 0;
619  
620      opts_qdict = qemu_opts_to_qdict(opts, NULL);
621      qdict_extract_subqdict(opts_qdict, &encrypt_qdict, "encrypt.");
622      qobject_unref(opts_qdict);
623      if (!qdict_size(encrypt_qdict)) {
624          *spec = NULL;
625          goto exit;
626      }
627  
628      /* Convert options into a QAPI object */
629      v = qobject_input_visitor_new_flat_confused(encrypt_qdict, errp);
630      if (!v) {
631          ret = -EINVAL;
632          goto exit;
633      }
634  
635      visit_type_RbdEncryptionCreateOptions(v, NULL, spec, errp);
636      visit_free(v);
637      if (!*spec) {
638          ret = -EINVAL;
639          goto exit;
640      }
641  
642  exit:
643      qobject_unref(encrypt_qdict);
644      return ret;
645  }
646  
647  static int coroutine_fn qemu_rbd_co_create_opts(BlockDriver *drv,
648                                                  const char *filename,
649                                                  QemuOpts *opts,
650                                                  Error **errp)
651  {
652      BlockdevCreateOptions *create_options;
653      BlockdevCreateOptionsRbd *rbd_opts;
654      BlockdevOptionsRbd *loc;
655      RbdEncryptionCreateOptions *encrypt = NULL;
656      Error *local_err = NULL;
657      const char *keypairs, *password_secret;
658      QDict *options = NULL;
659      int ret = 0;
660  
661      create_options = g_new0(BlockdevCreateOptions, 1);
662      create_options->driver = BLOCKDEV_DRIVER_RBD;
663      rbd_opts = &create_options->u.rbd;
664  
665      rbd_opts->location = g_new0(BlockdevOptionsRbd, 1);
666  
667      password_secret = qemu_opt_get(opts, "password-secret");
668  
669      /* Read out options */
670      rbd_opts->size = ROUND_UP(qemu_opt_get_size_del(opts, BLOCK_OPT_SIZE, 0),
671                                BDRV_SECTOR_SIZE);
672      rbd_opts->cluster_size = qemu_opt_get_size_del(opts,
673                                                     BLOCK_OPT_CLUSTER_SIZE, 0);
674      rbd_opts->has_cluster_size = (rbd_opts->cluster_size != 0);
675  
676      options = qdict_new();
677      qemu_rbd_parse_filename(filename, options, &local_err);
678      if (local_err) {
679          ret = -EINVAL;
680          error_propagate(errp, local_err);
681          goto exit;
682      }
683  
684      ret = qemu_rbd_extract_encryption_create_options(opts, &encrypt, errp);
685      if (ret < 0) {
686          goto exit;
687      }
688      rbd_opts->encrypt     = encrypt;
689  
690      /*
691       * Caution: while qdict_get_try_str() is fine, getting non-string
692       * types would require more care.  When @options come from -blockdev
693       * or blockdev_add, its members are typed according to the QAPI
694       * schema, but when they come from -drive, they're all QString.
695       */
696      loc = rbd_opts->location;
697      loc->pool        = g_strdup(qdict_get_try_str(options, "pool"));
698      loc->conf        = g_strdup(qdict_get_try_str(options, "conf"));
699      loc->user        = g_strdup(qdict_get_try_str(options, "user"));
700      loc->q_namespace = g_strdup(qdict_get_try_str(options, "namespace"));
701      loc->image       = g_strdup(qdict_get_try_str(options, "image"));
702      keypairs         = qdict_get_try_str(options, "=keyvalue-pairs");
703  
704      ret = qemu_rbd_do_create(create_options, keypairs, password_secret, errp);
705      if (ret < 0) {
706          goto exit;
707      }
708  
709  exit:
710      qobject_unref(options);
711      qapi_free_BlockdevCreateOptions(create_options);
712      return ret;
713  }
714  
715  static char *qemu_rbd_mon_host(BlockdevOptionsRbd *opts, Error **errp)
716  {
717      const char **vals;
718      const char *host, *port;
719      char *rados_str;
720      InetSocketAddressBaseList *p;
721      int i, cnt;
722  
723      if (!opts->has_server) {
724          return NULL;
725      }
726  
727      for (cnt = 0, p = opts->server; p; p = p->next) {
728          cnt++;
729      }
730  
731      vals = g_new(const char *, cnt + 1);
732  
733      for (i = 0, p = opts->server; p; p = p->next, i++) {
734          host = p->value->host;
735          port = p->value->port;
736  
737          if (strchr(host, ':')) {
738              vals[i] = g_strdup_printf("[%s]:%s", host, port);
739          } else {
740              vals[i] = g_strdup_printf("%s:%s", host, port);
741          }
742      }
743      vals[i] = NULL;
744  
745      rados_str = i ? g_strjoinv(";", (char **)vals) : NULL;
746      g_strfreev((char **)vals);
747      return rados_str;
748  }
749  
750  static int qemu_rbd_connect(rados_t *cluster, rados_ioctx_t *io_ctx,
751                              BlockdevOptionsRbd *opts, bool cache,
752                              const char *keypairs, const char *secretid,
753                              Error **errp)
754  {
755      char *mon_host = NULL;
756      Error *local_err = NULL;
757      int r;
758  
759      if (secretid) {
760          if (opts->key_secret) {
761              error_setg(errp,
762                         "Legacy 'password-secret' clashes with 'key-secret'");
763              return -EINVAL;
764          }
765          opts->key_secret = g_strdup(secretid);
766      }
767  
768      mon_host = qemu_rbd_mon_host(opts, &local_err);
769      if (local_err) {
770          error_propagate(errp, local_err);
771          r = -EINVAL;
772          goto out;
773      }
774  
775      r = rados_create(cluster, opts->user);
776      if (r < 0) {
777          error_setg_errno(errp, -r, "error initializing");
778          goto out;
779      }
780  
781      /* try default location when conf=NULL, but ignore failure */
782      r = rados_conf_read_file(*cluster, opts->conf);
783      if (opts->conf && r < 0) {
784          error_setg_errno(errp, -r, "error reading conf file %s", opts->conf);
785          goto failed_shutdown;
786      }
787  
788      r = qemu_rbd_set_keypairs(*cluster, keypairs, errp);
789      if (r < 0) {
790          goto failed_shutdown;
791      }
792  
793      if (mon_host) {
794          r = rados_conf_set(*cluster, "mon_host", mon_host);
795          if (r < 0) {
796              goto failed_shutdown;
797          }
798      }
799  
800      r = qemu_rbd_set_auth(*cluster, opts, errp);
801      if (r < 0) {
802          goto failed_shutdown;
803      }
804  
805      /*
806       * Fallback to more conservative semantics if setting cache
807       * options fails. Ignore errors from setting rbd_cache because the
808       * only possible error is that the option does not exist, and
809       * librbd defaults to no caching. If write through caching cannot
810       * be set up, fall back to no caching.
811       */
812      if (cache) {
813          rados_conf_set(*cluster, "rbd_cache", "true");
814      } else {
815          rados_conf_set(*cluster, "rbd_cache", "false");
816      }
817  
818      r = rados_connect(*cluster);
819      if (r < 0) {
820          error_setg_errno(errp, -r, "error connecting");
821          goto failed_shutdown;
822      }
823  
824      r = rados_ioctx_create(*cluster, opts->pool, io_ctx);
825      if (r < 0) {
826          error_setg_errno(errp, -r, "error opening pool %s", opts->pool);
827          goto failed_shutdown;
828      }
829  
830  #ifdef HAVE_RBD_NAMESPACE_EXISTS
831      if (opts->q_namespace && strlen(opts->q_namespace) > 0) {
832          bool exists;
833  
834          r = rbd_namespace_exists(*io_ctx, opts->q_namespace, &exists);
835          if (r < 0) {
836              error_setg_errno(errp, -r, "error checking namespace");
837              goto failed_ioctx_destroy;
838          }
839  
840          if (!exists) {
841              error_setg(errp, "namespace '%s' does not exist",
842                         opts->q_namespace);
843              r = -ENOENT;
844              goto failed_ioctx_destroy;
845          }
846      }
847  #endif
848  
849      /*
850       * Set the namespace after opening the io context on the pool,
851       * if nspace == NULL or if nspace == "", it is just as we did nothing
852       */
853      rados_ioctx_set_namespace(*io_ctx, opts->q_namespace);
854  
855      r = 0;
856      goto out;
857  
858  #ifdef HAVE_RBD_NAMESPACE_EXISTS
859  failed_ioctx_destroy:
860      rados_ioctx_destroy(*io_ctx);
861  #endif
862  failed_shutdown:
863      rados_shutdown(*cluster);
864  out:
865      g_free(mon_host);
866      return r;
867  }
868  
869  static int qemu_rbd_convert_options(QDict *options, BlockdevOptionsRbd **opts,
870                                      Error **errp)
871  {
872      Visitor *v;
873  
874      /* Convert the remaining options into a QAPI object */
875      v = qobject_input_visitor_new_flat_confused(options, errp);
876      if (!v) {
877          return -EINVAL;
878      }
879  
880      visit_type_BlockdevOptionsRbd(v, NULL, opts, errp);
881      visit_free(v);
882      if (!opts) {
883          return -EINVAL;
884      }
885  
886      return 0;
887  }
888  
889  static int qemu_rbd_attempt_legacy_options(QDict *options,
890                                             BlockdevOptionsRbd **opts,
891                                             char **keypairs)
892  {
893      char *filename;
894      int r;
895  
896      filename = g_strdup(qdict_get_try_str(options, "filename"));
897      if (!filename) {
898          return -EINVAL;
899      }
900      qdict_del(options, "filename");
901  
902      qemu_rbd_parse_filename(filename, options, NULL);
903  
904      /* keypairs freed by caller */
905      *keypairs = g_strdup(qdict_get_try_str(options, "=keyvalue-pairs"));
906      if (*keypairs) {
907          qdict_del(options, "=keyvalue-pairs");
908      }
909  
910      r = qemu_rbd_convert_options(options, opts, NULL);
911  
912      g_free(filename);
913      return r;
914  }
915  
916  static int qemu_rbd_open(BlockDriverState *bs, QDict *options, int flags,
917                           Error **errp)
918  {
919      BDRVRBDState *s = bs->opaque;
920      BlockdevOptionsRbd *opts = NULL;
921      const QDictEntry *e;
922      Error *local_err = NULL;
923      char *keypairs, *secretid;
924      rbd_image_info_t info;
925      int r;
926  
927      keypairs = g_strdup(qdict_get_try_str(options, "=keyvalue-pairs"));
928      if (keypairs) {
929          qdict_del(options, "=keyvalue-pairs");
930      }
931  
932      secretid = g_strdup(qdict_get_try_str(options, "password-secret"));
933      if (secretid) {
934          qdict_del(options, "password-secret");
935      }
936  
937      r = qemu_rbd_convert_options(options, &opts, &local_err);
938      if (local_err) {
939          /* If keypairs are present, that means some options are present in
940           * the modern option format.  Don't attempt to parse legacy option
941           * formats, as we won't support mixed usage. */
942          if (keypairs) {
943              error_propagate(errp, local_err);
944              goto out;
945          }
946  
947          /* If the initial attempt to convert and process the options failed,
948           * we may be attempting to open an image file that has the rbd options
949           * specified in the older format consisting of all key/value pairs
950           * encoded in the filename.  Go ahead and attempt to parse the
951           * filename, and see if we can pull out the required options. */
952          r = qemu_rbd_attempt_legacy_options(options, &opts, &keypairs);
953          if (r < 0) {
954              /* Propagate the original error, not the legacy parsing fallback
955               * error, as the latter was just a best-effort attempt. */
956              error_propagate(errp, local_err);
957              goto out;
958          }
959          /* Take care whenever deciding to actually deprecate; once this ability
960           * is removed, we will not be able to open any images with legacy-styled
961           * backing image strings. */
962          warn_report("RBD options encoded in the filename as keyvalue pairs "
963                      "is deprecated");
964      }
965  
966      /* Remove the processed options from the QDict (the visitor processes
967       * _all_ options in the QDict) */
968      while ((e = qdict_first(options))) {
969          qdict_del(options, e->key);
970      }
971  
972      r = qemu_rbd_connect(&s->cluster, &s->io_ctx, opts,
973                           !(flags & BDRV_O_NOCACHE), keypairs, secretid, errp);
974      if (r < 0) {
975          goto out;
976      }
977  
978      s->snap = g_strdup(opts->snapshot);
979      s->image_name = g_strdup(opts->image);
980  
981      /* rbd_open is always r/w */
982      r = rbd_open(s->io_ctx, s->image_name, &s->image, s->snap);
983      if (r < 0) {
984          error_setg_errno(errp, -r, "error reading header from %s",
985                           s->image_name);
986          goto failed_open;
987      }
988  
989      if (opts->encrypt) {
990  #ifdef LIBRBD_SUPPORTS_ENCRYPTION
991          r = qemu_rbd_encryption_load(s->image, opts->encrypt, errp);
992          if (r < 0) {
993              goto failed_post_open;
994          }
995  #else
996          r = -ENOTSUP;
997          error_setg(errp, "RBD library does not support image encryption");
998          goto failed_post_open;
999  #endif
1000      }
1001  
1002      r = rbd_stat(s->image, &info, sizeof(info));
1003      if (r < 0) {
1004          error_setg_errno(errp, -r, "error getting image info from %s",
1005                           s->image_name);
1006          goto failed_post_open;
1007      }
1008      s->image_size = info.size;
1009      s->object_size = info.obj_size;
1010  
1011      /* If we are using an rbd snapshot, we must be r/o, otherwise
1012       * leave as-is */
1013      if (s->snap != NULL) {
1014          r = bdrv_apply_auto_read_only(bs, "rbd snapshots are read-only", errp);
1015          if (r < 0) {
1016              goto failed_post_open;
1017          }
1018      }
1019  
1020  #ifdef LIBRBD_SUPPORTS_WRITE_ZEROES
1021      bs->supported_zero_flags = BDRV_REQ_MAY_UNMAP | BDRV_REQ_NO_FALLBACK;
1022  #endif
1023  
1024      /* When extending regular files, we get zeros from the OS */
1025      bs->supported_truncate_flags = BDRV_REQ_ZERO_WRITE;
1026  
1027      r = 0;
1028      goto out;
1029  
1030  failed_post_open:
1031      rbd_close(s->image);
1032  failed_open:
1033      rados_ioctx_destroy(s->io_ctx);
1034      g_free(s->snap);
1035      g_free(s->image_name);
1036      rados_shutdown(s->cluster);
1037  out:
1038      qapi_free_BlockdevOptionsRbd(opts);
1039      g_free(keypairs);
1040      g_free(secretid);
1041      return r;
1042  }
1043  
1044  
1045  /* Since RBD is currently always opened R/W via the API,
1046   * we just need to check if we are using a snapshot or not, in
1047   * order to determine if we will allow it to be R/W */
1048  static int qemu_rbd_reopen_prepare(BDRVReopenState *state,
1049                                     BlockReopenQueue *queue, Error **errp)
1050  {
1051      BDRVRBDState *s = state->bs->opaque;
1052      int ret = 0;
1053  
1054      if (s->snap && state->flags & BDRV_O_RDWR) {
1055          error_setg(errp,
1056                     "Cannot change node '%s' to r/w when using RBD snapshot",
1057                     bdrv_get_device_or_node_name(state->bs));
1058          ret = -EINVAL;
1059      }
1060  
1061      return ret;
1062  }
1063  
1064  static void qemu_rbd_close(BlockDriverState *bs)
1065  {
1066      BDRVRBDState *s = bs->opaque;
1067  
1068      rbd_close(s->image);
1069      rados_ioctx_destroy(s->io_ctx);
1070      g_free(s->snap);
1071      g_free(s->image_name);
1072      rados_shutdown(s->cluster);
1073  }
1074  
1075  /* Resize the RBD image and update the 'image_size' with the current size */
1076  static int qemu_rbd_resize(BlockDriverState *bs, uint64_t size)
1077  {
1078      BDRVRBDState *s = bs->opaque;
1079      int r;
1080  
1081      r = rbd_resize(s->image, size);
1082      if (r < 0) {
1083          return r;
1084      }
1085  
1086      s->image_size = size;
1087  
1088      return 0;
1089  }
1090  
1091  static void qemu_rbd_finish_bh(void *opaque)
1092  {
1093      RBDTask *task = opaque;
1094      task->complete = true;
1095      aio_co_wake(task->co);
1096  }
1097  
1098  /*
1099   * This is the completion callback function for all rbd aio calls
1100   * started from qemu_rbd_start_co().
1101   *
1102   * Note: this function is being called from a non qemu thread so
1103   * we need to be careful about what we do here. Generally we only
1104   * schedule a BH, and do the rest of the io completion handling
1105   * from qemu_rbd_finish_bh() which runs in a qemu context.
1106   */
1107  static void qemu_rbd_completion_cb(rbd_completion_t c, RBDTask *task)
1108  {
1109      task->ret = rbd_aio_get_return_value(c);
1110      rbd_aio_release(c);
1111      aio_bh_schedule_oneshot(bdrv_get_aio_context(task->bs),
1112                              qemu_rbd_finish_bh, task);
1113  }
1114  
1115  static int coroutine_fn qemu_rbd_start_co(BlockDriverState *bs,
1116                                            uint64_t offset,
1117                                            uint64_t bytes,
1118                                            QEMUIOVector *qiov,
1119                                            int flags,
1120                                            RBDAIOCmd cmd)
1121  {
1122      BDRVRBDState *s = bs->opaque;
1123      RBDTask task = { .bs = bs, .co = qemu_coroutine_self() };
1124      rbd_completion_t c;
1125      int r;
1126  
1127      assert(!qiov || qiov->size == bytes);
1128  
1129      if (cmd == RBD_AIO_WRITE || cmd == RBD_AIO_WRITE_ZEROES) {
1130          /*
1131           * RBD APIs don't allow us to write more than actual size, so in order
1132           * to support growing images, we resize the image before write
1133           * operations that exceed the current size.
1134           */
1135          if (offset + bytes > s->image_size) {
1136              int r = qemu_rbd_resize(bs, offset + bytes);
1137              if (r < 0) {
1138                  return r;
1139              }
1140          }
1141      }
1142  
1143      r = rbd_aio_create_completion(&task,
1144                                    (rbd_callback_t) qemu_rbd_completion_cb, &c);
1145      if (r < 0) {
1146          return r;
1147      }
1148  
1149      switch (cmd) {
1150      case RBD_AIO_READ:
1151          r = rbd_aio_readv(s->image, qiov->iov, qiov->niov, offset, c);
1152          break;
1153      case RBD_AIO_WRITE:
1154          r = rbd_aio_writev(s->image, qiov->iov, qiov->niov, offset, c);
1155          break;
1156      case RBD_AIO_DISCARD:
1157          r = rbd_aio_discard(s->image, offset, bytes, c);
1158          break;
1159      case RBD_AIO_FLUSH:
1160          r = rbd_aio_flush(s->image, c);
1161          break;
1162  #ifdef LIBRBD_SUPPORTS_WRITE_ZEROES
1163      case RBD_AIO_WRITE_ZEROES: {
1164          int zero_flags = 0;
1165  #ifdef RBD_WRITE_ZEROES_FLAG_THICK_PROVISION
1166          if (!(flags & BDRV_REQ_MAY_UNMAP)) {
1167              zero_flags = RBD_WRITE_ZEROES_FLAG_THICK_PROVISION;
1168          }
1169  #endif
1170          r = rbd_aio_write_zeroes(s->image, offset, bytes, c, zero_flags, 0);
1171          break;
1172      }
1173  #endif
1174      default:
1175          r = -EINVAL;
1176      }
1177  
1178      if (r < 0) {
1179          error_report("rbd request failed early: cmd %d offset %" PRIu64
1180                       " bytes %" PRIu64 " flags %d r %d (%s)", cmd, offset,
1181                       bytes, flags, r, strerror(-r));
1182          rbd_aio_release(c);
1183          return r;
1184      }
1185  
1186      while (!task.complete) {
1187          qemu_coroutine_yield();
1188      }
1189  
1190      if (task.ret < 0) {
1191          error_report("rbd request failed: cmd %d offset %" PRIu64 " bytes %"
1192                       PRIu64 " flags %d task.ret %" PRIi64 " (%s)", cmd, offset,
1193                       bytes, flags, task.ret, strerror(-task.ret));
1194          return task.ret;
1195      }
1196  
1197      /* zero pad short reads */
1198      if (cmd == RBD_AIO_READ && task.ret < qiov->size) {
1199          qemu_iovec_memset(qiov, task.ret, 0, qiov->size - task.ret);
1200      }
1201  
1202      return 0;
1203  }
1204  
1205  static int
1206  coroutine_fn qemu_rbd_co_preadv(BlockDriverState *bs, int64_t offset,
1207                                  int64_t bytes, QEMUIOVector *qiov,
1208                                  BdrvRequestFlags flags)
1209  {
1210      return qemu_rbd_start_co(bs, offset, bytes, qiov, flags, RBD_AIO_READ);
1211  }
1212  
1213  static int
1214  coroutine_fn qemu_rbd_co_pwritev(BlockDriverState *bs, int64_t offset,
1215                                   int64_t bytes, QEMUIOVector *qiov,
1216                                   BdrvRequestFlags flags)
1217  {
1218      return qemu_rbd_start_co(bs, offset, bytes, qiov, flags, RBD_AIO_WRITE);
1219  }
1220  
1221  static int coroutine_fn qemu_rbd_co_flush(BlockDriverState *bs)
1222  {
1223      return qemu_rbd_start_co(bs, 0, 0, NULL, 0, RBD_AIO_FLUSH);
1224  }
1225  
1226  static int coroutine_fn qemu_rbd_co_pdiscard(BlockDriverState *bs,
1227                                               int64_t offset, int64_t bytes)
1228  {
1229      return qemu_rbd_start_co(bs, offset, bytes, NULL, 0, RBD_AIO_DISCARD);
1230  }
1231  
1232  #ifdef LIBRBD_SUPPORTS_WRITE_ZEROES
1233  static int
1234  coroutine_fn qemu_rbd_co_pwrite_zeroes(BlockDriverState *bs, int64_t offset,
1235                                         int64_t bytes, BdrvRequestFlags flags)
1236  {
1237      return qemu_rbd_start_co(bs, offset, bytes, NULL, flags,
1238                               RBD_AIO_WRITE_ZEROES);
1239  }
1240  #endif
1241  
1242  static int qemu_rbd_getinfo(BlockDriverState *bs, BlockDriverInfo *bdi)
1243  {
1244      BDRVRBDState *s = bs->opaque;
1245      bdi->cluster_size = s->object_size;
1246      return 0;
1247  }
1248  
1249  static ImageInfoSpecific *qemu_rbd_get_specific_info(BlockDriverState *bs,
1250                                                       Error **errp)
1251  {
1252      BDRVRBDState *s = bs->opaque;
1253      ImageInfoSpecific *spec_info;
1254      char buf[RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN] = {0};
1255      int r;
1256  
1257      if (s->image_size >= RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN) {
1258          r = rbd_read(s->image, 0,
1259                       RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN, buf);
1260          if (r < 0) {
1261              error_setg_errno(errp, -r, "cannot read image start for probe");
1262              return NULL;
1263          }
1264      }
1265  
1266      spec_info = g_new(ImageInfoSpecific, 1);
1267      *spec_info = (ImageInfoSpecific){
1268          .type  = IMAGE_INFO_SPECIFIC_KIND_RBD,
1269          .u.rbd.data = g_new0(ImageInfoSpecificRbd, 1),
1270      };
1271  
1272      if (memcmp(buf, rbd_luks_header_verification,
1273                 RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN) == 0) {
1274          spec_info->u.rbd.data->encryption_format =
1275                  RBD_IMAGE_ENCRYPTION_FORMAT_LUKS;
1276          spec_info->u.rbd.data->has_encryption_format = true;
1277      } else if (memcmp(buf, rbd_luks2_header_verification,
1278                 RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN) == 0) {
1279          spec_info->u.rbd.data->encryption_format =
1280                  RBD_IMAGE_ENCRYPTION_FORMAT_LUKS2;
1281          spec_info->u.rbd.data->has_encryption_format = true;
1282      } else {
1283          spec_info->u.rbd.data->has_encryption_format = false;
1284      }
1285  
1286      return spec_info;
1287  }
1288  
1289  /*
1290   * rbd_diff_iterate2 allows to interrupt the exection by returning a negative
1291   * value in the callback routine. Choose a value that does not conflict with
1292   * an existing exitcode and return it if we want to prematurely stop the
1293   * execution because we detected a change in the allocation status.
1294   */
1295  #define QEMU_RBD_EXIT_DIFF_ITERATE2 -9000
1296  
1297  static int qemu_rbd_diff_iterate_cb(uint64_t offs, size_t len,
1298                                      int exists, void *opaque)
1299  {
1300      RBDDiffIterateReq *req = opaque;
1301  
1302      assert(req->offs + req->bytes <= offs);
1303  
1304      /* treat a hole like an unallocated area and bail out */
1305      if (!exists) {
1306          return 0;
1307      }
1308  
1309      if (!req->exists && offs > req->offs) {
1310          /*
1311           * we started in an unallocated area and hit the first allocated
1312           * block. req->bytes must be set to the length of the unallocated area
1313           * before the allocated area. stop further processing.
1314           */
1315          req->bytes = offs - req->offs;
1316          return QEMU_RBD_EXIT_DIFF_ITERATE2;
1317      }
1318  
1319      if (req->exists && offs > req->offs + req->bytes) {
1320          /*
1321           * we started in an allocated area and jumped over an unallocated area,
1322           * req->bytes contains the length of the allocated area before the
1323           * unallocated area. stop further processing.
1324           */
1325          return QEMU_RBD_EXIT_DIFF_ITERATE2;
1326      }
1327  
1328      req->bytes += len;
1329      req->exists = true;
1330  
1331      return 0;
1332  }
1333  
1334  static int coroutine_fn qemu_rbd_co_block_status(BlockDriverState *bs,
1335                                                   bool want_zero, int64_t offset,
1336                                                   int64_t bytes, int64_t *pnum,
1337                                                   int64_t *map,
1338                                                   BlockDriverState **file)
1339  {
1340      BDRVRBDState *s = bs->opaque;
1341      int status, r;
1342      RBDDiffIterateReq req = { .offs = offset };
1343      uint64_t features, flags;
1344      uint64_t head = 0;
1345  
1346      assert(offset + bytes <= s->image_size);
1347  
1348      /* default to all sectors allocated */
1349      status = BDRV_BLOCK_DATA | BDRV_BLOCK_OFFSET_VALID;
1350      *map = offset;
1351      *file = bs;
1352      *pnum = bytes;
1353  
1354      /* check if RBD image supports fast-diff */
1355      r = rbd_get_features(s->image, &features);
1356      if (r < 0) {
1357          return status;
1358      }
1359      if (!(features & RBD_FEATURE_FAST_DIFF)) {
1360          return status;
1361      }
1362  
1363      /* check if RBD fast-diff result is valid */
1364      r = rbd_get_flags(s->image, &flags);
1365      if (r < 0) {
1366          return status;
1367      }
1368      if (flags & RBD_FLAG_FAST_DIFF_INVALID) {
1369          return status;
1370      }
1371  
1372  #if LIBRBD_VERSION_CODE < LIBRBD_VERSION(1, 17, 0)
1373      /*
1374       * librbd had a bug until early 2022 that affected all versions of ceph that
1375       * supported fast-diff. This bug results in reporting of incorrect offsets
1376       * if the offset parameter to rbd_diff_iterate2 is not object aligned.
1377       * Work around this bug by rounding down the offset to object boundaries.
1378       * This is OK because we call rbd_diff_iterate2 with whole_object = true.
1379       * However, this workaround only works for non cloned images with default
1380       * striping.
1381       *
1382       * See: https://tracker.ceph.com/issues/53784
1383       */
1384  
1385      /* check if RBD image has non-default striping enabled */
1386      if (features & RBD_FEATURE_STRIPINGV2) {
1387          return status;
1388      }
1389  
1390  #pragma GCC diagnostic push
1391  #pragma GCC diagnostic ignored "-Wdeprecated-declarations"
1392      /*
1393       * check if RBD image is a clone (= has a parent).
1394       *
1395       * rbd_get_parent_info is deprecated from Nautilus onwards, but the
1396       * replacement rbd_get_parent is not present in Luminous and Mimic.
1397       */
1398      if (rbd_get_parent_info(s->image, NULL, 0, NULL, 0, NULL, 0) != -ENOENT) {
1399          return status;
1400      }
1401  #pragma GCC diagnostic pop
1402  
1403      head = req.offs & (s->object_size - 1);
1404      req.offs -= head;
1405      bytes += head;
1406  #endif
1407  
1408      r = rbd_diff_iterate2(s->image, NULL, req.offs, bytes, true, true,
1409                            qemu_rbd_diff_iterate_cb, &req);
1410      if (r < 0 && r != QEMU_RBD_EXIT_DIFF_ITERATE2) {
1411          return status;
1412      }
1413      assert(req.bytes <= bytes);
1414      if (!req.exists) {
1415          if (r == 0) {
1416              /*
1417               * rbd_diff_iterate2 does not invoke callbacks for unallocated
1418               * areas. This here catches the case where no callback was
1419               * invoked at all (req.bytes == 0).
1420               */
1421              assert(req.bytes == 0);
1422              req.bytes = bytes;
1423          }
1424          status = BDRV_BLOCK_ZERO | BDRV_BLOCK_OFFSET_VALID;
1425      }
1426  
1427      assert(req.bytes > head);
1428      *pnum = req.bytes - head;
1429      return status;
1430  }
1431  
1432  static int64_t qemu_rbd_getlength(BlockDriverState *bs)
1433  {
1434      BDRVRBDState *s = bs->opaque;
1435      int r;
1436  
1437      r = rbd_get_size(s->image, &s->image_size);
1438      if (r < 0) {
1439          return r;
1440      }
1441  
1442      return s->image_size;
1443  }
1444  
1445  static int coroutine_fn qemu_rbd_co_truncate(BlockDriverState *bs,
1446                                               int64_t offset,
1447                                               bool exact,
1448                                               PreallocMode prealloc,
1449                                               BdrvRequestFlags flags,
1450                                               Error **errp)
1451  {
1452      int r;
1453  
1454      if (prealloc != PREALLOC_MODE_OFF) {
1455          error_setg(errp, "Unsupported preallocation mode '%s'",
1456                     PreallocMode_str(prealloc));
1457          return -ENOTSUP;
1458      }
1459  
1460      r = qemu_rbd_resize(bs, offset);
1461      if (r < 0) {
1462          error_setg_errno(errp, -r, "Failed to resize file");
1463          return r;
1464      }
1465  
1466      return 0;
1467  }
1468  
1469  static int qemu_rbd_snap_create(BlockDriverState *bs,
1470                                  QEMUSnapshotInfo *sn_info)
1471  {
1472      BDRVRBDState *s = bs->opaque;
1473      int r;
1474  
1475      if (sn_info->name[0] == '\0') {
1476          return -EINVAL; /* we need a name for rbd snapshots */
1477      }
1478  
1479      /*
1480       * rbd snapshots are using the name as the user controlled unique identifier
1481       * we can't use the rbd snapid for that purpose, as it can't be set
1482       */
1483      if (sn_info->id_str[0] != '\0' &&
1484          strcmp(sn_info->id_str, sn_info->name) != 0) {
1485          return -EINVAL;
1486      }
1487  
1488      if (strlen(sn_info->name) >= sizeof(sn_info->id_str)) {
1489          return -ERANGE;
1490      }
1491  
1492      r = rbd_snap_create(s->image, sn_info->name);
1493      if (r < 0) {
1494          error_report("failed to create snap: %s", strerror(-r));
1495          return r;
1496      }
1497  
1498      return 0;
1499  }
1500  
1501  static int qemu_rbd_snap_remove(BlockDriverState *bs,
1502                                  const char *snapshot_id,
1503                                  const char *snapshot_name,
1504                                  Error **errp)
1505  {
1506      BDRVRBDState *s = bs->opaque;
1507      int r;
1508  
1509      if (!snapshot_name) {
1510          error_setg(errp, "rbd need a valid snapshot name");
1511          return -EINVAL;
1512      }
1513  
1514      /* If snapshot_id is specified, it must be equal to name, see
1515         qemu_rbd_snap_list() */
1516      if (snapshot_id && strcmp(snapshot_id, snapshot_name)) {
1517          error_setg(errp,
1518                     "rbd do not support snapshot id, it should be NULL or "
1519                     "equal to snapshot name");
1520          return -EINVAL;
1521      }
1522  
1523      r = rbd_snap_remove(s->image, snapshot_name);
1524      if (r < 0) {
1525          error_setg_errno(errp, -r, "Failed to remove the snapshot");
1526      }
1527      return r;
1528  }
1529  
1530  static int qemu_rbd_snap_rollback(BlockDriverState *bs,
1531                                    const char *snapshot_name)
1532  {
1533      BDRVRBDState *s = bs->opaque;
1534  
1535      return rbd_snap_rollback(s->image, snapshot_name);
1536  }
1537  
1538  static int qemu_rbd_snap_list(BlockDriverState *bs,
1539                                QEMUSnapshotInfo **psn_tab)
1540  {
1541      BDRVRBDState *s = bs->opaque;
1542      QEMUSnapshotInfo *sn_info, *sn_tab = NULL;
1543      int i, snap_count;
1544      rbd_snap_info_t *snaps;
1545      int max_snaps = RBD_MAX_SNAPS;
1546  
1547      do {
1548          snaps = g_new(rbd_snap_info_t, max_snaps);
1549          snap_count = rbd_snap_list(s->image, snaps, &max_snaps);
1550          if (snap_count <= 0) {
1551              g_free(snaps);
1552          }
1553      } while (snap_count == -ERANGE);
1554  
1555      if (snap_count <= 0) {
1556          goto done;
1557      }
1558  
1559      sn_tab = g_new0(QEMUSnapshotInfo, snap_count);
1560  
1561      for (i = 0; i < snap_count; i++) {
1562          const char *snap_name = snaps[i].name;
1563  
1564          sn_info = sn_tab + i;
1565          pstrcpy(sn_info->id_str, sizeof(sn_info->id_str), snap_name);
1566          pstrcpy(sn_info->name, sizeof(sn_info->name), snap_name);
1567  
1568          sn_info->vm_state_size = snaps[i].size;
1569          sn_info->date_sec = 0;
1570          sn_info->date_nsec = 0;
1571          sn_info->vm_clock_nsec = 0;
1572      }
1573      rbd_snap_list_end(snaps);
1574      g_free(snaps);
1575  
1576   done:
1577      *psn_tab = sn_tab;
1578      return snap_count;
1579  }
1580  
1581  static void coroutine_fn qemu_rbd_co_invalidate_cache(BlockDriverState *bs,
1582                                                        Error **errp)
1583  {
1584      BDRVRBDState *s = bs->opaque;
1585      int r = rbd_invalidate_cache(s->image);
1586      if (r < 0) {
1587          error_setg_errno(errp, -r, "Failed to invalidate the cache");
1588      }
1589  }
1590  
1591  static QemuOptsList qemu_rbd_create_opts = {
1592      .name = "rbd-create-opts",
1593      .head = QTAILQ_HEAD_INITIALIZER(qemu_rbd_create_opts.head),
1594      .desc = {
1595          {
1596              .name = BLOCK_OPT_SIZE,
1597              .type = QEMU_OPT_SIZE,
1598              .help = "Virtual disk size"
1599          },
1600          {
1601              .name = BLOCK_OPT_CLUSTER_SIZE,
1602              .type = QEMU_OPT_SIZE,
1603              .help = "RBD object size"
1604          },
1605          {
1606              .name = "password-secret",
1607              .type = QEMU_OPT_STRING,
1608              .help = "ID of secret providing the password",
1609          },
1610          {
1611              .name = "encrypt.format",
1612              .type = QEMU_OPT_STRING,
1613              .help = "Encrypt the image, format choices: 'luks', 'luks2'",
1614          },
1615          {
1616              .name = "encrypt.cipher-alg",
1617              .type = QEMU_OPT_STRING,
1618              .help = "Name of encryption cipher algorithm"
1619                      " (allowed values: aes-128, aes-256)",
1620          },
1621          {
1622              .name = "encrypt.key-secret",
1623              .type = QEMU_OPT_STRING,
1624              .help = "ID of secret providing LUKS passphrase",
1625          },
1626          { /* end of list */ }
1627      }
1628  };
1629  
1630  static const char *const qemu_rbd_strong_runtime_opts[] = {
1631      "pool",
1632      "namespace",
1633      "image",
1634      "conf",
1635      "snapshot",
1636      "user",
1637      "server.",
1638      "password-secret",
1639  
1640      NULL
1641  };
1642  
1643  static BlockDriver bdrv_rbd = {
1644      .format_name            = "rbd",
1645      .instance_size          = sizeof(BDRVRBDState),
1646      .bdrv_parse_filename    = qemu_rbd_parse_filename,
1647      .bdrv_file_open         = qemu_rbd_open,
1648      .bdrv_close             = qemu_rbd_close,
1649      .bdrv_reopen_prepare    = qemu_rbd_reopen_prepare,
1650      .bdrv_co_create         = qemu_rbd_co_create,
1651      .bdrv_co_create_opts    = qemu_rbd_co_create_opts,
1652      .bdrv_has_zero_init     = bdrv_has_zero_init_1,
1653      .bdrv_get_info          = qemu_rbd_getinfo,
1654      .bdrv_get_specific_info = qemu_rbd_get_specific_info,
1655      .create_opts            = &qemu_rbd_create_opts,
1656      .bdrv_getlength         = qemu_rbd_getlength,
1657      .bdrv_co_truncate       = qemu_rbd_co_truncate,
1658      .protocol_name          = "rbd",
1659  
1660      .bdrv_co_preadv         = qemu_rbd_co_preadv,
1661      .bdrv_co_pwritev        = qemu_rbd_co_pwritev,
1662      .bdrv_co_flush_to_disk  = qemu_rbd_co_flush,
1663      .bdrv_co_pdiscard       = qemu_rbd_co_pdiscard,
1664  #ifdef LIBRBD_SUPPORTS_WRITE_ZEROES
1665      .bdrv_co_pwrite_zeroes  = qemu_rbd_co_pwrite_zeroes,
1666  #endif
1667      .bdrv_co_block_status   = qemu_rbd_co_block_status,
1668  
1669      .bdrv_snapshot_create   = qemu_rbd_snap_create,
1670      .bdrv_snapshot_delete   = qemu_rbd_snap_remove,
1671      .bdrv_snapshot_list     = qemu_rbd_snap_list,
1672      .bdrv_snapshot_goto     = qemu_rbd_snap_rollback,
1673      .bdrv_co_invalidate_cache = qemu_rbd_co_invalidate_cache,
1674  
1675      .strong_runtime_opts    = qemu_rbd_strong_runtime_opts,
1676  };
1677  
1678  static void bdrv_rbd_init(void)
1679  {
1680      bdrv_register(&bdrv_rbd);
1681  }
1682  
1683  block_init(bdrv_rbd_init);
1684