xref: /openbmc/qemu/block/rbd.c (revision 9992f57978b8badb9d95f0eb601c7379f786f76c)
1  /*
2   * QEMU Block driver for RADOS (Ceph)
3   *
4   * Copyright (C) 2010-2011 Christian Brunner <chb@muc.de>,
5   *                         Josh Durgin <josh.durgin@dreamhost.com>
6   *
7   * This work is licensed under the terms of the GNU GPL, version 2.  See
8   * the COPYING file in the top-level directory.
9   *
10   * Contributions after 2012-01-13 are licensed under the terms of the
11   * GNU GPL, version 2 or (at your option) any later version.
12   */
13  
14  #include "qemu/osdep.h"
15  
16  #include <rbd/librbd.h>
17  #include "qapi/error.h"
18  #include "qemu/error-report.h"
19  #include "qemu/module.h"
20  #include "qemu/option.h"
21  #include "block/block_int.h"
22  #include "block/qdict.h"
23  #include "crypto/secret.h"
24  #include "qemu/cutils.h"
25  #include "sysemu/replay.h"
26  #include "qapi/qmp/qstring.h"
27  #include "qapi/qmp/qdict.h"
28  #include "qapi/qmp/qjson.h"
29  #include "qapi/qmp/qlist.h"
30  #include "qapi/qobject-input-visitor.h"
31  #include "qapi/qapi-visit-block-core.h"
32  
33  /*
34   * When specifying the image filename use:
35   *
36   * rbd:poolname/devicename[@snapshotname][:option1=value1[:option2=value2...]]
37   *
38   * poolname must be the name of an existing rados pool.
39   *
40   * devicename is the name of the rbd image.
41   *
42   * Each option given is used to configure rados, and may be any valid
43   * Ceph option, "id", or "conf".
44   *
45   * The "id" option indicates what user we should authenticate as to
46   * the Ceph cluster.  If it is excluded we will use the Ceph default
47   * (normally 'admin').
48   *
49   * The "conf" option specifies a Ceph configuration file to read.  If
50   * it is not specified, we will read from the default Ceph locations
51   * (e.g., /etc/ceph/ceph.conf).  To avoid reading _any_ configuration
52   * file, specify conf=/dev/null.
53   *
54   * Configuration values containing :, @, or = can be escaped with a
55   * leading "\".
56   */
57  
58  #define OBJ_MAX_SIZE (1UL << OBJ_DEFAULT_OBJ_ORDER)
59  
60  #define RBD_MAX_SNAPS 100
61  
62  #define RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN 8
63  
64  static const char rbd_luks_header_verification[
65          RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN] = {
66      'L', 'U', 'K', 'S', 0xBA, 0xBE, 0, 1
67  };
68  
69  static const char rbd_luks2_header_verification[
70          RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN] = {
71      'L', 'U', 'K', 'S', 0xBA, 0xBE, 0, 2
72  };
73  
74  typedef enum {
75      RBD_AIO_READ,
76      RBD_AIO_WRITE,
77      RBD_AIO_DISCARD,
78      RBD_AIO_FLUSH,
79      RBD_AIO_WRITE_ZEROES
80  } RBDAIOCmd;
81  
82  typedef struct BDRVRBDState {
83      rados_t cluster;
84      rados_ioctx_t io_ctx;
85      rbd_image_t image;
86      char *image_name;
87      char *snap;
88      char *namespace;
89      uint64_t image_size;
90      uint64_t object_size;
91  } BDRVRBDState;
92  
93  typedef struct RBDTask {
94      BlockDriverState *bs;
95      Coroutine *co;
96      bool complete;
97      int64_t ret;
98  } RBDTask;
99  
100  typedef struct RBDDiffIterateReq {
101      uint64_t offs;
102      uint64_t bytes;
103      bool exists;
104  } RBDDiffIterateReq;
105  
106  static int qemu_rbd_connect(rados_t *cluster, rados_ioctx_t *io_ctx,
107                              BlockdevOptionsRbd *opts, bool cache,
108                              const char *keypairs, const char *secretid,
109                              Error **errp);
110  
111  static char *qemu_rbd_strchr(char *src, char delim)
112  {
113      char *p;
114  
115      for (p = src; *p; ++p) {
116          if (*p == delim) {
117              return p;
118          }
119          if (*p == '\\' && p[1] != '\0') {
120              ++p;
121          }
122      }
123  
124      return NULL;
125  }
126  
127  
128  static char *qemu_rbd_next_tok(char *src, char delim, char **p)
129  {
130      char *end;
131  
132      *p = NULL;
133  
134      end = qemu_rbd_strchr(src, delim);
135      if (end) {
136          *p = end + 1;
137          *end = '\0';
138      }
139      return src;
140  }
141  
142  static void qemu_rbd_unescape(char *src)
143  {
144      char *p;
145  
146      for (p = src; *src; ++src, ++p) {
147          if (*src == '\\' && src[1] != '\0') {
148              src++;
149          }
150          *p = *src;
151      }
152      *p = '\0';
153  }
154  
155  static void qemu_rbd_parse_filename(const char *filename, QDict *options,
156                                      Error **errp)
157  {
158      const char *start;
159      char *p, *buf;
160      QList *keypairs = NULL;
161      char *found_str, *image_name;
162  
163      if (!strstart(filename, "rbd:", &start)) {
164          error_setg(errp, "File name must start with 'rbd:'");
165          return;
166      }
167  
168      buf = g_strdup(start);
169      p = buf;
170  
171      found_str = qemu_rbd_next_tok(p, '/', &p);
172      if (!p) {
173          error_setg(errp, "Pool name is required");
174          goto done;
175      }
176      qemu_rbd_unescape(found_str);
177      qdict_put_str(options, "pool", found_str);
178  
179      if (qemu_rbd_strchr(p, '@')) {
180          image_name = qemu_rbd_next_tok(p, '@', &p);
181  
182          found_str = qemu_rbd_next_tok(p, ':', &p);
183          qemu_rbd_unescape(found_str);
184          qdict_put_str(options, "snapshot", found_str);
185      } else {
186          image_name = qemu_rbd_next_tok(p, ':', &p);
187      }
188      /* Check for namespace in the image_name */
189      if (qemu_rbd_strchr(image_name, '/')) {
190          found_str = qemu_rbd_next_tok(image_name, '/', &image_name);
191          qemu_rbd_unescape(found_str);
192          qdict_put_str(options, "namespace", found_str);
193      } else {
194          qdict_put_str(options, "namespace", "");
195      }
196      qemu_rbd_unescape(image_name);
197      qdict_put_str(options, "image", image_name);
198      if (!p) {
199          goto done;
200      }
201  
202      /* The following are essentially all key/value pairs, and we treat
203       * 'id' and 'conf' a bit special.  Key/value pairs may be in any order. */
204      while (p) {
205          char *name, *value;
206          name = qemu_rbd_next_tok(p, '=', &p);
207          if (!p) {
208              error_setg(errp, "conf option %s has no value", name);
209              break;
210          }
211  
212          qemu_rbd_unescape(name);
213  
214          value = qemu_rbd_next_tok(p, ':', &p);
215          qemu_rbd_unescape(value);
216  
217          if (!strcmp(name, "conf")) {
218              qdict_put_str(options, "conf", value);
219          } else if (!strcmp(name, "id")) {
220              qdict_put_str(options, "user", value);
221          } else {
222              /*
223               * We pass these internally to qemu_rbd_set_keypairs(), so
224               * we can get away with the simpler list of [ "key1",
225               * "value1", "key2", "value2" ] rather than a raw dict
226               * { "key1": "value1", "key2": "value2" } where we can't
227               * guarantee order, or even a more correct but complex
228               * [ { "key1": "value1" }, { "key2": "value2" } ]
229               */
230              if (!keypairs) {
231                  keypairs = qlist_new();
232              }
233              qlist_append_str(keypairs, name);
234              qlist_append_str(keypairs, value);
235          }
236      }
237  
238      if (keypairs) {
239          qdict_put(options, "=keyvalue-pairs",
240                    qstring_from_gstring(qobject_to_json(QOBJECT(keypairs))));
241      }
242  
243  done:
244      g_free(buf);
245      qobject_unref(keypairs);
246      return;
247  }
248  
249  static int qemu_rbd_set_auth(rados_t cluster, BlockdevOptionsRbd *opts,
250                               Error **errp)
251  {
252      char *key, *acr;
253      int r;
254      GString *accu;
255      RbdAuthModeList *auth;
256  
257      if (opts->key_secret) {
258          key = qcrypto_secret_lookup_as_base64(opts->key_secret, errp);
259          if (!key) {
260              return -EIO;
261          }
262          r = rados_conf_set(cluster, "key", key);
263          g_free(key);
264          if (r < 0) {
265              error_setg_errno(errp, -r, "Could not set 'key'");
266              return r;
267          }
268      }
269  
270      if (opts->has_auth_client_required) {
271          accu = g_string_new("");
272          for (auth = opts->auth_client_required; auth; auth = auth->next) {
273              if (accu->str[0]) {
274                  g_string_append_c(accu, ';');
275              }
276              g_string_append(accu, RbdAuthMode_str(auth->value));
277          }
278          acr = g_string_free(accu, FALSE);
279          r = rados_conf_set(cluster, "auth_client_required", acr);
280          g_free(acr);
281          if (r < 0) {
282              error_setg_errno(errp, -r,
283                               "Could not set 'auth_client_required'");
284              return r;
285          }
286      }
287  
288      return 0;
289  }
290  
291  static int qemu_rbd_set_keypairs(rados_t cluster, const char *keypairs_json,
292                                   Error **errp)
293  {
294      QList *keypairs;
295      QString *name;
296      QString *value;
297      const char *key;
298      size_t remaining;
299      int ret = 0;
300  
301      if (!keypairs_json) {
302          return ret;
303      }
304      keypairs = qobject_to(QList,
305                            qobject_from_json(keypairs_json, &error_abort));
306      remaining = qlist_size(keypairs) / 2;
307      assert(remaining);
308  
309      while (remaining--) {
310          name = qobject_to(QString, qlist_pop(keypairs));
311          value = qobject_to(QString, qlist_pop(keypairs));
312          assert(name && value);
313          key = qstring_get_str(name);
314  
315          ret = rados_conf_set(cluster, key, qstring_get_str(value));
316          qobject_unref(value);
317          if (ret < 0) {
318              error_setg_errno(errp, -ret, "invalid conf option %s", key);
319              qobject_unref(name);
320              ret = -EINVAL;
321              break;
322          }
323          qobject_unref(name);
324      }
325  
326      qobject_unref(keypairs);
327      return ret;
328  }
329  
330  #ifdef LIBRBD_SUPPORTS_ENCRYPTION
331  static int qemu_rbd_convert_luks_options(
332          RbdEncryptionOptionsLUKSBase *luks_opts,
333          char **passphrase,
334          size_t *passphrase_len,
335          Error **errp)
336  {
337      return qcrypto_secret_lookup(luks_opts->key_secret, (uint8_t **)passphrase,
338                                   passphrase_len, errp);
339  }
340  
341  static int qemu_rbd_convert_luks_create_options(
342          RbdEncryptionCreateOptionsLUKSBase *luks_opts,
343          rbd_encryption_algorithm_t *alg,
344          char **passphrase,
345          size_t *passphrase_len,
346          Error **errp)
347  {
348      int r = 0;
349  
350      r = qemu_rbd_convert_luks_options(
351              qapi_RbdEncryptionCreateOptionsLUKSBase_base(luks_opts),
352              passphrase, passphrase_len, errp);
353      if (r < 0) {
354          return r;
355      }
356  
357      if (luks_opts->has_cipher_alg) {
358          switch (luks_opts->cipher_alg) {
359              case QCRYPTO_CIPHER_ALG_AES_128: {
360                  *alg = RBD_ENCRYPTION_ALGORITHM_AES128;
361                  break;
362              }
363              case QCRYPTO_CIPHER_ALG_AES_256: {
364                  *alg = RBD_ENCRYPTION_ALGORITHM_AES256;
365                  break;
366              }
367              default: {
368                  r = -ENOTSUP;
369                  error_setg_errno(errp, -r, "unknown encryption algorithm: %u",
370                                   luks_opts->cipher_alg);
371                  return r;
372              }
373          }
374      } else {
375          /* default alg */
376          *alg = RBD_ENCRYPTION_ALGORITHM_AES256;
377      }
378  
379      return 0;
380  }
381  
382  static int qemu_rbd_encryption_format(rbd_image_t image,
383                                        RbdEncryptionCreateOptions *encrypt,
384                                        Error **errp)
385  {
386      int r = 0;
387      g_autofree char *passphrase = NULL;
388      size_t passphrase_len;
389      rbd_encryption_format_t format;
390      rbd_encryption_options_t opts;
391      rbd_encryption_luks1_format_options_t luks_opts;
392      rbd_encryption_luks2_format_options_t luks2_opts;
393      size_t opts_size;
394      uint64_t raw_size, effective_size;
395  
396      r = rbd_get_size(image, &raw_size);
397      if (r < 0) {
398          error_setg_errno(errp, -r, "cannot get raw image size");
399          return r;
400      }
401  
402      switch (encrypt->format) {
403          case RBD_IMAGE_ENCRYPTION_FORMAT_LUKS: {
404              memset(&luks_opts, 0, sizeof(luks_opts));
405              format = RBD_ENCRYPTION_FORMAT_LUKS1;
406              opts = &luks_opts;
407              opts_size = sizeof(luks_opts);
408              r = qemu_rbd_convert_luks_create_options(
409                      qapi_RbdEncryptionCreateOptionsLUKS_base(&encrypt->u.luks),
410                      &luks_opts.alg, &passphrase, &passphrase_len, errp);
411              if (r < 0) {
412                  return r;
413              }
414              luks_opts.passphrase = passphrase;
415              luks_opts.passphrase_size = passphrase_len;
416              break;
417          }
418          case RBD_IMAGE_ENCRYPTION_FORMAT_LUKS2: {
419              memset(&luks2_opts, 0, sizeof(luks2_opts));
420              format = RBD_ENCRYPTION_FORMAT_LUKS2;
421              opts = &luks2_opts;
422              opts_size = sizeof(luks2_opts);
423              r = qemu_rbd_convert_luks_create_options(
424                      qapi_RbdEncryptionCreateOptionsLUKS2_base(
425                              &encrypt->u.luks2),
426                      &luks2_opts.alg, &passphrase, &passphrase_len, errp);
427              if (r < 0) {
428                  return r;
429              }
430              luks2_opts.passphrase = passphrase;
431              luks2_opts.passphrase_size = passphrase_len;
432              break;
433          }
434          default: {
435              r = -ENOTSUP;
436              error_setg_errno(
437                      errp, -r, "unknown image encryption format: %u",
438                      encrypt->format);
439              return r;
440          }
441      }
442  
443      r = rbd_encryption_format(image, format, opts, opts_size);
444      if (r < 0) {
445          error_setg_errno(errp, -r, "encryption format fail");
446          return r;
447      }
448  
449      r = rbd_get_size(image, &effective_size);
450      if (r < 0) {
451          error_setg_errno(errp, -r, "cannot get effective image size");
452          return r;
453      }
454  
455      r = rbd_resize(image, raw_size + (raw_size - effective_size));
456      if (r < 0) {
457          error_setg_errno(errp, -r, "cannot resize image after format");
458          return r;
459      }
460  
461      return 0;
462  }
463  
464  static int qemu_rbd_encryption_load(rbd_image_t image,
465                                      RbdEncryptionOptions *encrypt,
466                                      Error **errp)
467  {
468      int r = 0;
469      g_autofree char *passphrase = NULL;
470      size_t passphrase_len;
471      rbd_encryption_luks1_format_options_t luks_opts;
472      rbd_encryption_luks2_format_options_t luks2_opts;
473      rbd_encryption_format_t format;
474      rbd_encryption_options_t opts;
475      size_t opts_size;
476  
477      switch (encrypt->format) {
478          case RBD_IMAGE_ENCRYPTION_FORMAT_LUKS: {
479              memset(&luks_opts, 0, sizeof(luks_opts));
480              format = RBD_ENCRYPTION_FORMAT_LUKS1;
481              opts = &luks_opts;
482              opts_size = sizeof(luks_opts);
483              r = qemu_rbd_convert_luks_options(
484                      qapi_RbdEncryptionOptionsLUKS_base(&encrypt->u.luks),
485                      &passphrase, &passphrase_len, errp);
486              if (r < 0) {
487                  return r;
488              }
489              luks_opts.passphrase = passphrase;
490              luks_opts.passphrase_size = passphrase_len;
491              break;
492          }
493          case RBD_IMAGE_ENCRYPTION_FORMAT_LUKS2: {
494              memset(&luks2_opts, 0, sizeof(luks2_opts));
495              format = RBD_ENCRYPTION_FORMAT_LUKS2;
496              opts = &luks2_opts;
497              opts_size = sizeof(luks2_opts);
498              r = qemu_rbd_convert_luks_options(
499                      qapi_RbdEncryptionOptionsLUKS2_base(&encrypt->u.luks2),
500                      &passphrase, &passphrase_len, errp);
501              if (r < 0) {
502                  return r;
503              }
504              luks2_opts.passphrase = passphrase;
505              luks2_opts.passphrase_size = passphrase_len;
506              break;
507          }
508          default: {
509              r = -ENOTSUP;
510              error_setg_errno(
511                      errp, -r, "unknown image encryption format: %u",
512                      encrypt->format);
513              return r;
514          }
515      }
516  
517      r = rbd_encryption_load(image, format, opts, opts_size);
518      if (r < 0) {
519          error_setg_errno(errp, -r, "encryption load fail");
520          return r;
521      }
522  
523      return 0;
524  }
525  #endif
526  
527  /* FIXME Deprecate and remove keypairs or make it available in QMP. */
528  static int qemu_rbd_do_create(BlockdevCreateOptions *options,
529                                const char *keypairs, const char *password_secret,
530                                Error **errp)
531  {
532      BlockdevCreateOptionsRbd *opts = &options->u.rbd;
533      rados_t cluster;
534      rados_ioctx_t io_ctx;
535      int obj_order = 0;
536      int ret;
537  
538      assert(options->driver == BLOCKDEV_DRIVER_RBD);
539      if (opts->location->has_snapshot) {
540          error_setg(errp, "Can't use snapshot name for image creation");
541          return -EINVAL;
542      }
543  
544  #ifndef LIBRBD_SUPPORTS_ENCRYPTION
545      if (opts->has_encrypt) {
546          error_setg(errp, "RBD library does not support image encryption");
547          return -ENOTSUP;
548      }
549  #endif
550  
551      if (opts->has_cluster_size) {
552          int64_t objsize = opts->cluster_size;
553          if ((objsize - 1) & objsize) {    /* not a power of 2? */
554              error_setg(errp, "obj size needs to be power of 2");
555              return -EINVAL;
556          }
557          if (objsize < 4096) {
558              error_setg(errp, "obj size too small");
559              return -EINVAL;
560          }
561          obj_order = ctz32(objsize);
562      }
563  
564      ret = qemu_rbd_connect(&cluster, &io_ctx, opts->location, false, keypairs,
565                             password_secret, errp);
566      if (ret < 0) {
567          return ret;
568      }
569  
570      ret = rbd_create(io_ctx, opts->location->image, opts->size, &obj_order);
571      if (ret < 0) {
572          error_setg_errno(errp, -ret, "error rbd create");
573          goto out;
574      }
575  
576  #ifdef LIBRBD_SUPPORTS_ENCRYPTION
577      if (opts->has_encrypt) {
578          rbd_image_t image;
579  
580          ret = rbd_open(io_ctx, opts->location->image, &image, NULL);
581          if (ret < 0) {
582              error_setg_errno(errp, -ret,
583                               "error opening image '%s' for encryption format",
584                               opts->location->image);
585              goto out;
586          }
587  
588          ret = qemu_rbd_encryption_format(image, opts->encrypt, errp);
589          rbd_close(image);
590          if (ret < 0) {
591              /* encryption format fail, try removing the image */
592              rbd_remove(io_ctx, opts->location->image);
593              goto out;
594          }
595      }
596  #endif
597  
598      ret = 0;
599  out:
600      rados_ioctx_destroy(io_ctx);
601      rados_shutdown(cluster);
602      return ret;
603  }
604  
605  static int qemu_rbd_co_create(BlockdevCreateOptions *options, Error **errp)
606  {
607      return qemu_rbd_do_create(options, NULL, NULL, errp);
608  }
609  
610  static int qemu_rbd_extract_encryption_create_options(
611          QemuOpts *opts,
612          RbdEncryptionCreateOptions **spec,
613          Error **errp)
614  {
615      QDict *opts_qdict;
616      QDict *encrypt_qdict;
617      Visitor *v;
618      int ret = 0;
619  
620      opts_qdict = qemu_opts_to_qdict(opts, NULL);
621      qdict_extract_subqdict(opts_qdict, &encrypt_qdict, "encrypt.");
622      qobject_unref(opts_qdict);
623      if (!qdict_size(encrypt_qdict)) {
624          *spec = NULL;
625          goto exit;
626      }
627  
628      /* Convert options into a QAPI object */
629      v = qobject_input_visitor_new_flat_confused(encrypt_qdict, errp);
630      if (!v) {
631          ret = -EINVAL;
632          goto exit;
633      }
634  
635      visit_type_RbdEncryptionCreateOptions(v, NULL, spec, errp);
636      visit_free(v);
637      if (!*spec) {
638          ret = -EINVAL;
639          goto exit;
640      }
641  
642  exit:
643      qobject_unref(encrypt_qdict);
644      return ret;
645  }
646  
647  static int coroutine_fn qemu_rbd_co_create_opts(BlockDriver *drv,
648                                                  const char *filename,
649                                                  QemuOpts *opts,
650                                                  Error **errp)
651  {
652      BlockdevCreateOptions *create_options;
653      BlockdevCreateOptionsRbd *rbd_opts;
654      BlockdevOptionsRbd *loc;
655      RbdEncryptionCreateOptions *encrypt = NULL;
656      Error *local_err = NULL;
657      const char *keypairs, *password_secret;
658      QDict *options = NULL;
659      int ret = 0;
660  
661      create_options = g_new0(BlockdevCreateOptions, 1);
662      create_options->driver = BLOCKDEV_DRIVER_RBD;
663      rbd_opts = &create_options->u.rbd;
664  
665      rbd_opts->location = g_new0(BlockdevOptionsRbd, 1);
666  
667      password_secret = qemu_opt_get(opts, "password-secret");
668  
669      /* Read out options */
670      rbd_opts->size = ROUND_UP(qemu_opt_get_size_del(opts, BLOCK_OPT_SIZE, 0),
671                                BDRV_SECTOR_SIZE);
672      rbd_opts->cluster_size = qemu_opt_get_size_del(opts,
673                                                     BLOCK_OPT_CLUSTER_SIZE, 0);
674      rbd_opts->has_cluster_size = (rbd_opts->cluster_size != 0);
675  
676      options = qdict_new();
677      qemu_rbd_parse_filename(filename, options, &local_err);
678      if (local_err) {
679          ret = -EINVAL;
680          error_propagate(errp, local_err);
681          goto exit;
682      }
683  
684      ret = qemu_rbd_extract_encryption_create_options(opts, &encrypt, errp);
685      if (ret < 0) {
686          goto exit;
687      }
688      rbd_opts->encrypt     = encrypt;
689      rbd_opts->has_encrypt = !!encrypt;
690  
691      /*
692       * Caution: while qdict_get_try_str() is fine, getting non-string
693       * types would require more care.  When @options come from -blockdev
694       * or blockdev_add, its members are typed according to the QAPI
695       * schema, but when they come from -drive, they're all QString.
696       */
697      loc = rbd_opts->location;
698      loc->pool        = g_strdup(qdict_get_try_str(options, "pool"));
699      loc->conf        = g_strdup(qdict_get_try_str(options, "conf"));
700      loc->has_conf    = !!loc->conf;
701      loc->user        = g_strdup(qdict_get_try_str(options, "user"));
702      loc->has_user    = !!loc->user;
703      loc->q_namespace = g_strdup(qdict_get_try_str(options, "namespace"));
704      loc->has_q_namespace = !!loc->q_namespace;
705      loc->image       = g_strdup(qdict_get_try_str(options, "image"));
706      keypairs         = qdict_get_try_str(options, "=keyvalue-pairs");
707  
708      ret = qemu_rbd_do_create(create_options, keypairs, password_secret, errp);
709      if (ret < 0) {
710          goto exit;
711      }
712  
713  exit:
714      qobject_unref(options);
715      qapi_free_BlockdevCreateOptions(create_options);
716      return ret;
717  }
718  
719  static char *qemu_rbd_mon_host(BlockdevOptionsRbd *opts, Error **errp)
720  {
721      const char **vals;
722      const char *host, *port;
723      char *rados_str;
724      InetSocketAddressBaseList *p;
725      int i, cnt;
726  
727      if (!opts->has_server) {
728          return NULL;
729      }
730  
731      for (cnt = 0, p = opts->server; p; p = p->next) {
732          cnt++;
733      }
734  
735      vals = g_new(const char *, cnt + 1);
736  
737      for (i = 0, p = opts->server; p; p = p->next, i++) {
738          host = p->value->host;
739          port = p->value->port;
740  
741          if (strchr(host, ':')) {
742              vals[i] = g_strdup_printf("[%s]:%s", host, port);
743          } else {
744              vals[i] = g_strdup_printf("%s:%s", host, port);
745          }
746      }
747      vals[i] = NULL;
748  
749      rados_str = i ? g_strjoinv(";", (char **)vals) : NULL;
750      g_strfreev((char **)vals);
751      return rados_str;
752  }
753  
754  static int qemu_rbd_connect(rados_t *cluster, rados_ioctx_t *io_ctx,
755                              BlockdevOptionsRbd *opts, bool cache,
756                              const char *keypairs, const char *secretid,
757                              Error **errp)
758  {
759      char *mon_host = NULL;
760      Error *local_err = NULL;
761      int r;
762  
763      if (secretid) {
764          if (opts->key_secret) {
765              error_setg(errp,
766                         "Legacy 'password-secret' clashes with 'key-secret'");
767              return -EINVAL;
768          }
769          opts->key_secret = g_strdup(secretid);
770          opts->has_key_secret = true;
771      }
772  
773      mon_host = qemu_rbd_mon_host(opts, &local_err);
774      if (local_err) {
775          error_propagate(errp, local_err);
776          r = -EINVAL;
777          goto out;
778      }
779  
780      r = rados_create(cluster, opts->user);
781      if (r < 0) {
782          error_setg_errno(errp, -r, "error initializing");
783          goto out;
784      }
785  
786      /* try default location when conf=NULL, but ignore failure */
787      r = rados_conf_read_file(*cluster, opts->conf);
788      if (opts->has_conf && r < 0) {
789          error_setg_errno(errp, -r, "error reading conf file %s", opts->conf);
790          goto failed_shutdown;
791      }
792  
793      r = qemu_rbd_set_keypairs(*cluster, keypairs, errp);
794      if (r < 0) {
795          goto failed_shutdown;
796      }
797  
798      if (mon_host) {
799          r = rados_conf_set(*cluster, "mon_host", mon_host);
800          if (r < 0) {
801              goto failed_shutdown;
802          }
803      }
804  
805      r = qemu_rbd_set_auth(*cluster, opts, errp);
806      if (r < 0) {
807          goto failed_shutdown;
808      }
809  
810      /*
811       * Fallback to more conservative semantics if setting cache
812       * options fails. Ignore errors from setting rbd_cache because the
813       * only possible error is that the option does not exist, and
814       * librbd defaults to no caching. If write through caching cannot
815       * be set up, fall back to no caching.
816       */
817      if (cache) {
818          rados_conf_set(*cluster, "rbd_cache", "true");
819      } else {
820          rados_conf_set(*cluster, "rbd_cache", "false");
821      }
822  
823      r = rados_connect(*cluster);
824      if (r < 0) {
825          error_setg_errno(errp, -r, "error connecting");
826          goto failed_shutdown;
827      }
828  
829      r = rados_ioctx_create(*cluster, opts->pool, io_ctx);
830      if (r < 0) {
831          error_setg_errno(errp, -r, "error opening pool %s", opts->pool);
832          goto failed_shutdown;
833      }
834      /*
835       * Set the namespace after opening the io context on the pool,
836       * if nspace == NULL or if nspace == "", it is just as we did nothing
837       */
838      rados_ioctx_set_namespace(*io_ctx, opts->q_namespace);
839  
840      r = 0;
841      goto out;
842  
843  failed_shutdown:
844      rados_shutdown(*cluster);
845  out:
846      g_free(mon_host);
847      return r;
848  }
849  
850  static int qemu_rbd_convert_options(QDict *options, BlockdevOptionsRbd **opts,
851                                      Error **errp)
852  {
853      Visitor *v;
854  
855      /* Convert the remaining options into a QAPI object */
856      v = qobject_input_visitor_new_flat_confused(options, errp);
857      if (!v) {
858          return -EINVAL;
859      }
860  
861      visit_type_BlockdevOptionsRbd(v, NULL, opts, errp);
862      visit_free(v);
863      if (!opts) {
864          return -EINVAL;
865      }
866  
867      return 0;
868  }
869  
870  static int qemu_rbd_attempt_legacy_options(QDict *options,
871                                             BlockdevOptionsRbd **opts,
872                                             char **keypairs)
873  {
874      char *filename;
875      int r;
876  
877      filename = g_strdup(qdict_get_try_str(options, "filename"));
878      if (!filename) {
879          return -EINVAL;
880      }
881      qdict_del(options, "filename");
882  
883      qemu_rbd_parse_filename(filename, options, NULL);
884  
885      /* keypairs freed by caller */
886      *keypairs = g_strdup(qdict_get_try_str(options, "=keyvalue-pairs"));
887      if (*keypairs) {
888          qdict_del(options, "=keyvalue-pairs");
889      }
890  
891      r = qemu_rbd_convert_options(options, opts, NULL);
892  
893      g_free(filename);
894      return r;
895  }
896  
897  static int qemu_rbd_open(BlockDriverState *bs, QDict *options, int flags,
898                           Error **errp)
899  {
900      BDRVRBDState *s = bs->opaque;
901      BlockdevOptionsRbd *opts = NULL;
902      const QDictEntry *e;
903      Error *local_err = NULL;
904      char *keypairs, *secretid;
905      rbd_image_info_t info;
906      int r;
907  
908      keypairs = g_strdup(qdict_get_try_str(options, "=keyvalue-pairs"));
909      if (keypairs) {
910          qdict_del(options, "=keyvalue-pairs");
911      }
912  
913      secretid = g_strdup(qdict_get_try_str(options, "password-secret"));
914      if (secretid) {
915          qdict_del(options, "password-secret");
916      }
917  
918      r = qemu_rbd_convert_options(options, &opts, &local_err);
919      if (local_err) {
920          /* If keypairs are present, that means some options are present in
921           * the modern option format.  Don't attempt to parse legacy option
922           * formats, as we won't support mixed usage. */
923          if (keypairs) {
924              error_propagate(errp, local_err);
925              goto out;
926          }
927  
928          /* If the initial attempt to convert and process the options failed,
929           * we may be attempting to open an image file that has the rbd options
930           * specified in the older format consisting of all key/value pairs
931           * encoded in the filename.  Go ahead and attempt to parse the
932           * filename, and see if we can pull out the required options. */
933          r = qemu_rbd_attempt_legacy_options(options, &opts, &keypairs);
934          if (r < 0) {
935              /* Propagate the original error, not the legacy parsing fallback
936               * error, as the latter was just a best-effort attempt. */
937              error_propagate(errp, local_err);
938              goto out;
939          }
940          /* Take care whenever deciding to actually deprecate; once this ability
941           * is removed, we will not be able to open any images with legacy-styled
942           * backing image strings. */
943          warn_report("RBD options encoded in the filename as keyvalue pairs "
944                      "is deprecated");
945      }
946  
947      /* Remove the processed options from the QDict (the visitor processes
948       * _all_ options in the QDict) */
949      while ((e = qdict_first(options))) {
950          qdict_del(options, e->key);
951      }
952  
953      r = qemu_rbd_connect(&s->cluster, &s->io_ctx, opts,
954                           !(flags & BDRV_O_NOCACHE), keypairs, secretid, errp);
955      if (r < 0) {
956          goto out;
957      }
958  
959      s->snap = g_strdup(opts->snapshot);
960      s->image_name = g_strdup(opts->image);
961  
962      /* rbd_open is always r/w */
963      r = rbd_open(s->io_ctx, s->image_name, &s->image, s->snap);
964      if (r < 0) {
965          error_setg_errno(errp, -r, "error reading header from %s",
966                           s->image_name);
967          goto failed_open;
968      }
969  
970      if (opts->has_encrypt) {
971  #ifdef LIBRBD_SUPPORTS_ENCRYPTION
972          r = qemu_rbd_encryption_load(s->image, opts->encrypt, errp);
973          if (r < 0) {
974              goto failed_post_open;
975          }
976  #else
977          r = -ENOTSUP;
978          error_setg(errp, "RBD library does not support image encryption");
979          goto failed_post_open;
980  #endif
981      }
982  
983      r = rbd_stat(s->image, &info, sizeof(info));
984      if (r < 0) {
985          error_setg_errno(errp, -r, "error getting image info from %s",
986                           s->image_name);
987          goto failed_post_open;
988      }
989      s->image_size = info.size;
990      s->object_size = info.obj_size;
991  
992      /* If we are using an rbd snapshot, we must be r/o, otherwise
993       * leave as-is */
994      if (s->snap != NULL) {
995          r = bdrv_apply_auto_read_only(bs, "rbd snapshots are read-only", errp);
996          if (r < 0) {
997              goto failed_post_open;
998          }
999      }
1000  
1001  #ifdef LIBRBD_SUPPORTS_WRITE_ZEROES
1002      bs->supported_zero_flags = BDRV_REQ_MAY_UNMAP | BDRV_REQ_NO_FALLBACK;
1003  #endif
1004  
1005      /* When extending regular files, we get zeros from the OS */
1006      bs->supported_truncate_flags = BDRV_REQ_ZERO_WRITE;
1007  
1008      r = 0;
1009      goto out;
1010  
1011  failed_post_open:
1012      rbd_close(s->image);
1013  failed_open:
1014      rados_ioctx_destroy(s->io_ctx);
1015      g_free(s->snap);
1016      g_free(s->image_name);
1017      rados_shutdown(s->cluster);
1018  out:
1019      qapi_free_BlockdevOptionsRbd(opts);
1020      g_free(keypairs);
1021      g_free(secretid);
1022      return r;
1023  }
1024  
1025  
1026  /* Since RBD is currently always opened R/W via the API,
1027   * we just need to check if we are using a snapshot or not, in
1028   * order to determine if we will allow it to be R/W */
1029  static int qemu_rbd_reopen_prepare(BDRVReopenState *state,
1030                                     BlockReopenQueue *queue, Error **errp)
1031  {
1032      BDRVRBDState *s = state->bs->opaque;
1033      int ret = 0;
1034  
1035      if (s->snap && state->flags & BDRV_O_RDWR) {
1036          error_setg(errp,
1037                     "Cannot change node '%s' to r/w when using RBD snapshot",
1038                     bdrv_get_device_or_node_name(state->bs));
1039          ret = -EINVAL;
1040      }
1041  
1042      return ret;
1043  }
1044  
1045  static void qemu_rbd_close(BlockDriverState *bs)
1046  {
1047      BDRVRBDState *s = bs->opaque;
1048  
1049      rbd_close(s->image);
1050      rados_ioctx_destroy(s->io_ctx);
1051      g_free(s->snap);
1052      g_free(s->image_name);
1053      rados_shutdown(s->cluster);
1054  }
1055  
1056  /* Resize the RBD image and update the 'image_size' with the current size */
1057  static int qemu_rbd_resize(BlockDriverState *bs, uint64_t size)
1058  {
1059      BDRVRBDState *s = bs->opaque;
1060      int r;
1061  
1062      r = rbd_resize(s->image, size);
1063      if (r < 0) {
1064          return r;
1065      }
1066  
1067      s->image_size = size;
1068  
1069      return 0;
1070  }
1071  
1072  static void qemu_rbd_finish_bh(void *opaque)
1073  {
1074      RBDTask *task = opaque;
1075      task->complete = true;
1076      aio_co_wake(task->co);
1077  }
1078  
1079  /*
1080   * This is the completion callback function for all rbd aio calls
1081   * started from qemu_rbd_start_co().
1082   *
1083   * Note: this function is being called from a non qemu thread so
1084   * we need to be careful about what we do here. Generally we only
1085   * schedule a BH, and do the rest of the io completion handling
1086   * from qemu_rbd_finish_bh() which runs in a qemu context.
1087   */
1088  static void qemu_rbd_completion_cb(rbd_completion_t c, RBDTask *task)
1089  {
1090      task->ret = rbd_aio_get_return_value(c);
1091      rbd_aio_release(c);
1092      aio_bh_schedule_oneshot(bdrv_get_aio_context(task->bs),
1093                              qemu_rbd_finish_bh, task);
1094  }
1095  
1096  static int coroutine_fn qemu_rbd_start_co(BlockDriverState *bs,
1097                                            uint64_t offset,
1098                                            uint64_t bytes,
1099                                            QEMUIOVector *qiov,
1100                                            int flags,
1101                                            RBDAIOCmd cmd)
1102  {
1103      BDRVRBDState *s = bs->opaque;
1104      RBDTask task = { .bs = bs, .co = qemu_coroutine_self() };
1105      rbd_completion_t c;
1106      int r;
1107  
1108      assert(!qiov || qiov->size == bytes);
1109  
1110      if (cmd == RBD_AIO_WRITE || cmd == RBD_AIO_WRITE_ZEROES) {
1111          /*
1112           * RBD APIs don't allow us to write more than actual size, so in order
1113           * to support growing images, we resize the image before write
1114           * operations that exceed the current size.
1115           */
1116          if (offset + bytes > s->image_size) {
1117              int r = qemu_rbd_resize(bs, offset + bytes);
1118              if (r < 0) {
1119                  return r;
1120              }
1121          }
1122      }
1123  
1124      r = rbd_aio_create_completion(&task,
1125                                    (rbd_callback_t) qemu_rbd_completion_cb, &c);
1126      if (r < 0) {
1127          return r;
1128      }
1129  
1130      switch (cmd) {
1131      case RBD_AIO_READ:
1132          r = rbd_aio_readv(s->image, qiov->iov, qiov->niov, offset, c);
1133          break;
1134      case RBD_AIO_WRITE:
1135          r = rbd_aio_writev(s->image, qiov->iov, qiov->niov, offset, c);
1136          break;
1137      case RBD_AIO_DISCARD:
1138          r = rbd_aio_discard(s->image, offset, bytes, c);
1139          break;
1140      case RBD_AIO_FLUSH:
1141          r = rbd_aio_flush(s->image, c);
1142          break;
1143  #ifdef LIBRBD_SUPPORTS_WRITE_ZEROES
1144      case RBD_AIO_WRITE_ZEROES: {
1145          int zero_flags = 0;
1146  #ifdef RBD_WRITE_ZEROES_FLAG_THICK_PROVISION
1147          if (!(flags & BDRV_REQ_MAY_UNMAP)) {
1148              zero_flags = RBD_WRITE_ZEROES_FLAG_THICK_PROVISION;
1149          }
1150  #endif
1151          r = rbd_aio_write_zeroes(s->image, offset, bytes, c, zero_flags, 0);
1152          break;
1153      }
1154  #endif
1155      default:
1156          r = -EINVAL;
1157      }
1158  
1159      if (r < 0) {
1160          error_report("rbd request failed early: cmd %d offset %" PRIu64
1161                       " bytes %" PRIu64 " flags %d r %d (%s)", cmd, offset,
1162                       bytes, flags, r, strerror(-r));
1163          rbd_aio_release(c);
1164          return r;
1165      }
1166  
1167      while (!task.complete) {
1168          qemu_coroutine_yield();
1169      }
1170  
1171      if (task.ret < 0) {
1172          error_report("rbd request failed: cmd %d offset %" PRIu64 " bytes %"
1173                       PRIu64 " flags %d task.ret %" PRIi64 " (%s)", cmd, offset,
1174                       bytes, flags, task.ret, strerror(-task.ret));
1175          return task.ret;
1176      }
1177  
1178      /* zero pad short reads */
1179      if (cmd == RBD_AIO_READ && task.ret < qiov->size) {
1180          qemu_iovec_memset(qiov, task.ret, 0, qiov->size - task.ret);
1181      }
1182  
1183      return 0;
1184  }
1185  
1186  static int
1187  coroutine_fn qemu_rbd_co_preadv(BlockDriverState *bs, int64_t offset,
1188                                  int64_t bytes, QEMUIOVector *qiov,
1189                                  BdrvRequestFlags flags)
1190  {
1191      return qemu_rbd_start_co(bs, offset, bytes, qiov, flags, RBD_AIO_READ);
1192  }
1193  
1194  static int
1195  coroutine_fn qemu_rbd_co_pwritev(BlockDriverState *bs, int64_t offset,
1196                                   int64_t bytes, QEMUIOVector *qiov,
1197                                   BdrvRequestFlags flags)
1198  {
1199      return qemu_rbd_start_co(bs, offset, bytes, qiov, flags, RBD_AIO_WRITE);
1200  }
1201  
1202  static int coroutine_fn qemu_rbd_co_flush(BlockDriverState *bs)
1203  {
1204      return qemu_rbd_start_co(bs, 0, 0, NULL, 0, RBD_AIO_FLUSH);
1205  }
1206  
1207  static int coroutine_fn qemu_rbd_co_pdiscard(BlockDriverState *bs,
1208                                               int64_t offset, int64_t bytes)
1209  {
1210      return qemu_rbd_start_co(bs, offset, bytes, NULL, 0, RBD_AIO_DISCARD);
1211  }
1212  
1213  #ifdef LIBRBD_SUPPORTS_WRITE_ZEROES
1214  static int
1215  coroutine_fn qemu_rbd_co_pwrite_zeroes(BlockDriverState *bs, int64_t offset,
1216                                         int64_t bytes, BdrvRequestFlags flags)
1217  {
1218      return qemu_rbd_start_co(bs, offset, bytes, NULL, flags,
1219                               RBD_AIO_WRITE_ZEROES);
1220  }
1221  #endif
1222  
1223  static int qemu_rbd_getinfo(BlockDriverState *bs, BlockDriverInfo *bdi)
1224  {
1225      BDRVRBDState *s = bs->opaque;
1226      bdi->cluster_size = s->object_size;
1227      return 0;
1228  }
1229  
1230  static ImageInfoSpecific *qemu_rbd_get_specific_info(BlockDriverState *bs,
1231                                                       Error **errp)
1232  {
1233      BDRVRBDState *s = bs->opaque;
1234      ImageInfoSpecific *spec_info;
1235      char buf[RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN] = {0};
1236      int r;
1237  
1238      if (s->image_size >= RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN) {
1239          r = rbd_read(s->image, 0,
1240                       RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN, buf);
1241          if (r < 0) {
1242              error_setg_errno(errp, -r, "cannot read image start for probe");
1243              return NULL;
1244          }
1245      }
1246  
1247      spec_info = g_new(ImageInfoSpecific, 1);
1248      *spec_info = (ImageInfoSpecific){
1249          .type  = IMAGE_INFO_SPECIFIC_KIND_RBD,
1250          .u.rbd.data = g_new0(ImageInfoSpecificRbd, 1),
1251      };
1252  
1253      if (memcmp(buf, rbd_luks_header_verification,
1254                 RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN) == 0) {
1255          spec_info->u.rbd.data->encryption_format =
1256                  RBD_IMAGE_ENCRYPTION_FORMAT_LUKS;
1257          spec_info->u.rbd.data->has_encryption_format = true;
1258      } else if (memcmp(buf, rbd_luks2_header_verification,
1259                 RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN) == 0) {
1260          spec_info->u.rbd.data->encryption_format =
1261                  RBD_IMAGE_ENCRYPTION_FORMAT_LUKS2;
1262          spec_info->u.rbd.data->has_encryption_format = true;
1263      } else {
1264          spec_info->u.rbd.data->has_encryption_format = false;
1265      }
1266  
1267      return spec_info;
1268  }
1269  
1270  /*
1271   * rbd_diff_iterate2 allows to interrupt the exection by returning a negative
1272   * value in the callback routine. Choose a value that does not conflict with
1273   * an existing exitcode and return it if we want to prematurely stop the
1274   * execution because we detected a change in the allocation status.
1275   */
1276  #define QEMU_RBD_EXIT_DIFF_ITERATE2 -9000
1277  
1278  static int qemu_rbd_diff_iterate_cb(uint64_t offs, size_t len,
1279                                      int exists, void *opaque)
1280  {
1281      RBDDiffIterateReq *req = opaque;
1282  
1283      assert(req->offs + req->bytes <= offs);
1284  
1285      /* treat a hole like an unallocated area and bail out */
1286      if (!exists) {
1287          return 0;
1288      }
1289  
1290      if (!req->exists && offs > req->offs) {
1291          /*
1292           * we started in an unallocated area and hit the first allocated
1293           * block. req->bytes must be set to the length of the unallocated area
1294           * before the allocated area. stop further processing.
1295           */
1296          req->bytes = offs - req->offs;
1297          return QEMU_RBD_EXIT_DIFF_ITERATE2;
1298      }
1299  
1300      if (req->exists && offs > req->offs + req->bytes) {
1301          /*
1302           * we started in an allocated area and jumped over an unallocated area,
1303           * req->bytes contains the length of the allocated area before the
1304           * unallocated area. stop further processing.
1305           */
1306          return QEMU_RBD_EXIT_DIFF_ITERATE2;
1307      }
1308  
1309      req->bytes += len;
1310      req->exists = true;
1311  
1312      return 0;
1313  }
1314  
1315  static int coroutine_fn qemu_rbd_co_block_status(BlockDriverState *bs,
1316                                                   bool want_zero, int64_t offset,
1317                                                   int64_t bytes, int64_t *pnum,
1318                                                   int64_t *map,
1319                                                   BlockDriverState **file)
1320  {
1321      BDRVRBDState *s = bs->opaque;
1322      int status, r;
1323      RBDDiffIterateReq req = { .offs = offset };
1324      uint64_t features, flags;
1325      uint64_t head = 0;
1326  
1327      assert(offset + bytes <= s->image_size);
1328  
1329      /* default to all sectors allocated */
1330      status = BDRV_BLOCK_DATA | BDRV_BLOCK_OFFSET_VALID;
1331      *map = offset;
1332      *file = bs;
1333      *pnum = bytes;
1334  
1335      /* check if RBD image supports fast-diff */
1336      r = rbd_get_features(s->image, &features);
1337      if (r < 0) {
1338          return status;
1339      }
1340      if (!(features & RBD_FEATURE_FAST_DIFF)) {
1341          return status;
1342      }
1343  
1344      /* check if RBD fast-diff result is valid */
1345      r = rbd_get_flags(s->image, &flags);
1346      if (r < 0) {
1347          return status;
1348      }
1349      if (flags & RBD_FLAG_FAST_DIFF_INVALID) {
1350          return status;
1351      }
1352  
1353  #if LIBRBD_VERSION_CODE < LIBRBD_VERSION(1, 17, 0)
1354      /*
1355       * librbd had a bug until early 2022 that affected all versions of ceph that
1356       * supported fast-diff. This bug results in reporting of incorrect offsets
1357       * if the offset parameter to rbd_diff_iterate2 is not object aligned.
1358       * Work around this bug by rounding down the offset to object boundaries.
1359       * This is OK because we call rbd_diff_iterate2 with whole_object = true.
1360       * However, this workaround only works for non cloned images with default
1361       * striping.
1362       *
1363       * See: https://tracker.ceph.com/issues/53784
1364       */
1365  
1366      /* check if RBD image has non-default striping enabled */
1367      if (features & RBD_FEATURE_STRIPINGV2) {
1368          return status;
1369      }
1370  
1371  #pragma GCC diagnostic push
1372  #pragma GCC diagnostic ignored "-Wdeprecated-declarations"
1373      /*
1374       * check if RBD image is a clone (= has a parent).
1375       *
1376       * rbd_get_parent_info is deprecated from Nautilus onwards, but the
1377       * replacement rbd_get_parent is not present in Luminous and Mimic.
1378       */
1379      if (rbd_get_parent_info(s->image, NULL, 0, NULL, 0, NULL, 0) != -ENOENT) {
1380          return status;
1381      }
1382  #pragma GCC diagnostic pop
1383  
1384      head = req.offs & (s->object_size - 1);
1385      req.offs -= head;
1386      bytes += head;
1387  #endif
1388  
1389      r = rbd_diff_iterate2(s->image, NULL, req.offs, bytes, true, true,
1390                            qemu_rbd_diff_iterate_cb, &req);
1391      if (r < 0 && r != QEMU_RBD_EXIT_DIFF_ITERATE2) {
1392          return status;
1393      }
1394      assert(req.bytes <= bytes);
1395      if (!req.exists) {
1396          if (r == 0) {
1397              /*
1398               * rbd_diff_iterate2 does not invoke callbacks for unallocated
1399               * areas. This here catches the case where no callback was
1400               * invoked at all (req.bytes == 0).
1401               */
1402              assert(req.bytes == 0);
1403              req.bytes = bytes;
1404          }
1405          status = BDRV_BLOCK_ZERO | BDRV_BLOCK_OFFSET_VALID;
1406      }
1407  
1408      assert(req.bytes > head);
1409      *pnum = req.bytes - head;
1410      return status;
1411  }
1412  
1413  static int64_t qemu_rbd_getlength(BlockDriverState *bs)
1414  {
1415      BDRVRBDState *s = bs->opaque;
1416      int r;
1417  
1418      r = rbd_get_size(s->image, &s->image_size);
1419      if (r < 0) {
1420          return r;
1421      }
1422  
1423      return s->image_size;
1424  }
1425  
1426  static int coroutine_fn qemu_rbd_co_truncate(BlockDriverState *bs,
1427                                               int64_t offset,
1428                                               bool exact,
1429                                               PreallocMode prealloc,
1430                                               BdrvRequestFlags flags,
1431                                               Error **errp)
1432  {
1433      int r;
1434  
1435      if (prealloc != PREALLOC_MODE_OFF) {
1436          error_setg(errp, "Unsupported preallocation mode '%s'",
1437                     PreallocMode_str(prealloc));
1438          return -ENOTSUP;
1439      }
1440  
1441      r = qemu_rbd_resize(bs, offset);
1442      if (r < 0) {
1443          error_setg_errno(errp, -r, "Failed to resize file");
1444          return r;
1445      }
1446  
1447      return 0;
1448  }
1449  
1450  static int qemu_rbd_snap_create(BlockDriverState *bs,
1451                                  QEMUSnapshotInfo *sn_info)
1452  {
1453      BDRVRBDState *s = bs->opaque;
1454      int r;
1455  
1456      if (sn_info->name[0] == '\0') {
1457          return -EINVAL; /* we need a name for rbd snapshots */
1458      }
1459  
1460      /*
1461       * rbd snapshots are using the name as the user controlled unique identifier
1462       * we can't use the rbd snapid for that purpose, as it can't be set
1463       */
1464      if (sn_info->id_str[0] != '\0' &&
1465          strcmp(sn_info->id_str, sn_info->name) != 0) {
1466          return -EINVAL;
1467      }
1468  
1469      if (strlen(sn_info->name) >= sizeof(sn_info->id_str)) {
1470          return -ERANGE;
1471      }
1472  
1473      r = rbd_snap_create(s->image, sn_info->name);
1474      if (r < 0) {
1475          error_report("failed to create snap: %s", strerror(-r));
1476          return r;
1477      }
1478  
1479      return 0;
1480  }
1481  
1482  static int qemu_rbd_snap_remove(BlockDriverState *bs,
1483                                  const char *snapshot_id,
1484                                  const char *snapshot_name,
1485                                  Error **errp)
1486  {
1487      BDRVRBDState *s = bs->opaque;
1488      int r;
1489  
1490      if (!snapshot_name) {
1491          error_setg(errp, "rbd need a valid snapshot name");
1492          return -EINVAL;
1493      }
1494  
1495      /* If snapshot_id is specified, it must be equal to name, see
1496         qemu_rbd_snap_list() */
1497      if (snapshot_id && strcmp(snapshot_id, snapshot_name)) {
1498          error_setg(errp,
1499                     "rbd do not support snapshot id, it should be NULL or "
1500                     "equal to snapshot name");
1501          return -EINVAL;
1502      }
1503  
1504      r = rbd_snap_remove(s->image, snapshot_name);
1505      if (r < 0) {
1506          error_setg_errno(errp, -r, "Failed to remove the snapshot");
1507      }
1508      return r;
1509  }
1510  
1511  static int qemu_rbd_snap_rollback(BlockDriverState *bs,
1512                                    const char *snapshot_name)
1513  {
1514      BDRVRBDState *s = bs->opaque;
1515  
1516      return rbd_snap_rollback(s->image, snapshot_name);
1517  }
1518  
1519  static int qemu_rbd_snap_list(BlockDriverState *bs,
1520                                QEMUSnapshotInfo **psn_tab)
1521  {
1522      BDRVRBDState *s = bs->opaque;
1523      QEMUSnapshotInfo *sn_info, *sn_tab = NULL;
1524      int i, snap_count;
1525      rbd_snap_info_t *snaps;
1526      int max_snaps = RBD_MAX_SNAPS;
1527  
1528      do {
1529          snaps = g_new(rbd_snap_info_t, max_snaps);
1530          snap_count = rbd_snap_list(s->image, snaps, &max_snaps);
1531          if (snap_count <= 0) {
1532              g_free(snaps);
1533          }
1534      } while (snap_count == -ERANGE);
1535  
1536      if (snap_count <= 0) {
1537          goto done;
1538      }
1539  
1540      sn_tab = g_new0(QEMUSnapshotInfo, snap_count);
1541  
1542      for (i = 0; i < snap_count; i++) {
1543          const char *snap_name = snaps[i].name;
1544  
1545          sn_info = sn_tab + i;
1546          pstrcpy(sn_info->id_str, sizeof(sn_info->id_str), snap_name);
1547          pstrcpy(sn_info->name, sizeof(sn_info->name), snap_name);
1548  
1549          sn_info->vm_state_size = snaps[i].size;
1550          sn_info->date_sec = 0;
1551          sn_info->date_nsec = 0;
1552          sn_info->vm_clock_nsec = 0;
1553      }
1554      rbd_snap_list_end(snaps);
1555      g_free(snaps);
1556  
1557   done:
1558      *psn_tab = sn_tab;
1559      return snap_count;
1560  }
1561  
1562  static void coroutine_fn qemu_rbd_co_invalidate_cache(BlockDriverState *bs,
1563                                                        Error **errp)
1564  {
1565      BDRVRBDState *s = bs->opaque;
1566      int r = rbd_invalidate_cache(s->image);
1567      if (r < 0) {
1568          error_setg_errno(errp, -r, "Failed to invalidate the cache");
1569      }
1570  }
1571  
1572  static QemuOptsList qemu_rbd_create_opts = {
1573      .name = "rbd-create-opts",
1574      .head = QTAILQ_HEAD_INITIALIZER(qemu_rbd_create_opts.head),
1575      .desc = {
1576          {
1577              .name = BLOCK_OPT_SIZE,
1578              .type = QEMU_OPT_SIZE,
1579              .help = "Virtual disk size"
1580          },
1581          {
1582              .name = BLOCK_OPT_CLUSTER_SIZE,
1583              .type = QEMU_OPT_SIZE,
1584              .help = "RBD object size"
1585          },
1586          {
1587              .name = "password-secret",
1588              .type = QEMU_OPT_STRING,
1589              .help = "ID of secret providing the password",
1590          },
1591          {
1592              .name = "encrypt.format",
1593              .type = QEMU_OPT_STRING,
1594              .help = "Encrypt the image, format choices: 'luks', 'luks2'",
1595          },
1596          {
1597              .name = "encrypt.cipher-alg",
1598              .type = QEMU_OPT_STRING,
1599              .help = "Name of encryption cipher algorithm"
1600                      " (allowed values: aes-128, aes-256)",
1601          },
1602          {
1603              .name = "encrypt.key-secret",
1604              .type = QEMU_OPT_STRING,
1605              .help = "ID of secret providing LUKS passphrase",
1606          },
1607          { /* end of list */ }
1608      }
1609  };
1610  
1611  static const char *const qemu_rbd_strong_runtime_opts[] = {
1612      "pool",
1613      "namespace",
1614      "image",
1615      "conf",
1616      "snapshot",
1617      "user",
1618      "server.",
1619      "password-secret",
1620  
1621      NULL
1622  };
1623  
1624  static BlockDriver bdrv_rbd = {
1625      .format_name            = "rbd",
1626      .instance_size          = sizeof(BDRVRBDState),
1627      .bdrv_parse_filename    = qemu_rbd_parse_filename,
1628      .bdrv_file_open         = qemu_rbd_open,
1629      .bdrv_close             = qemu_rbd_close,
1630      .bdrv_reopen_prepare    = qemu_rbd_reopen_prepare,
1631      .bdrv_co_create         = qemu_rbd_co_create,
1632      .bdrv_co_create_opts    = qemu_rbd_co_create_opts,
1633      .bdrv_has_zero_init     = bdrv_has_zero_init_1,
1634      .bdrv_get_info          = qemu_rbd_getinfo,
1635      .bdrv_get_specific_info = qemu_rbd_get_specific_info,
1636      .create_opts            = &qemu_rbd_create_opts,
1637      .bdrv_getlength         = qemu_rbd_getlength,
1638      .bdrv_co_truncate       = qemu_rbd_co_truncate,
1639      .protocol_name          = "rbd",
1640  
1641      .bdrv_co_preadv         = qemu_rbd_co_preadv,
1642      .bdrv_co_pwritev        = qemu_rbd_co_pwritev,
1643      .bdrv_co_flush_to_disk  = qemu_rbd_co_flush,
1644      .bdrv_co_pdiscard       = qemu_rbd_co_pdiscard,
1645  #ifdef LIBRBD_SUPPORTS_WRITE_ZEROES
1646      .bdrv_co_pwrite_zeroes  = qemu_rbd_co_pwrite_zeroes,
1647  #endif
1648      .bdrv_co_block_status   = qemu_rbd_co_block_status,
1649  
1650      .bdrv_snapshot_create   = qemu_rbd_snap_create,
1651      .bdrv_snapshot_delete   = qemu_rbd_snap_remove,
1652      .bdrv_snapshot_list     = qemu_rbd_snap_list,
1653      .bdrv_snapshot_goto     = qemu_rbd_snap_rollback,
1654      .bdrv_co_invalidate_cache = qemu_rbd_co_invalidate_cache,
1655  
1656      .strong_runtime_opts    = qemu_rbd_strong_runtime_opts,
1657  };
1658  
1659  static void bdrv_rbd_init(void)
1660  {
1661      bdrv_register(&bdrv_rbd);
1662  }
1663  
1664  block_init(bdrv_rbd_init);
1665