xref: /openbmc/qemu/block/rbd.c (revision e3d0814368d00e7985c31edf5d0cfce45972d4be)
1  /*
2   * QEMU Block driver for RADOS (Ceph)
3   *
4   * Copyright (C) 2010-2011 Christian Brunner <chb@muc.de>,
5   *                         Josh Durgin <josh.durgin@dreamhost.com>
6   *
7   * This work is licensed under the terms of the GNU GPL, version 2.  See
8   * the COPYING file in the top-level directory.
9   *
10   * Contributions after 2012-01-13 are licensed under the terms of the
11   * GNU GPL, version 2 or (at your option) any later version.
12   */
13  
14  #include "qemu/osdep.h"
15  
16  #include <rbd/librbd.h>
17  #include "qapi/error.h"
18  #include "qemu/error-report.h"
19  #include "qemu/module.h"
20  #include "qemu/option.h"
21  #include "block/block-io.h"
22  #include "block/block_int.h"
23  #include "block/qdict.h"
24  #include "crypto/secret.h"
25  #include "qemu/cutils.h"
26  #include "sysemu/replay.h"
27  #include "qapi/qmp/qstring.h"
28  #include "qapi/qmp/qdict.h"
29  #include "qapi/qmp/qjson.h"
30  #include "qapi/qmp/qlist.h"
31  #include "qapi/qobject-input-visitor.h"
32  #include "qapi/qapi-visit-block-core.h"
33  
34  /*
35   * When specifying the image filename use:
36   *
37   * rbd:poolname/devicename[@snapshotname][:option1=value1[:option2=value2...]]
38   *
39   * poolname must be the name of an existing rados pool.
40   *
41   * devicename is the name of the rbd image.
42   *
43   * Each option given is used to configure rados, and may be any valid
44   * Ceph option, "id", or "conf".
45   *
46   * The "id" option indicates what user we should authenticate as to
47   * the Ceph cluster.  If it is excluded we will use the Ceph default
48   * (normally 'admin').
49   *
50   * The "conf" option specifies a Ceph configuration file to read.  If
51   * it is not specified, we will read from the default Ceph locations
52   * (e.g., /etc/ceph/ceph.conf).  To avoid reading _any_ configuration
53   * file, specify conf=/dev/null.
54   *
55   * Configuration values containing :, @, or = can be escaped with a
56   * leading "\".
57   */
58  
59  #define OBJ_MAX_SIZE (1UL << OBJ_DEFAULT_OBJ_ORDER)
60  
61  #define RBD_MAX_SNAPS 100
62  
63  #define RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN 8
64  
65  static const char rbd_luks_header_verification[
66          RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN] = {
67      'L', 'U', 'K', 'S', 0xBA, 0xBE, 0, 1
68  };
69  
70  static const char rbd_luks2_header_verification[
71          RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN] = {
72      'L', 'U', 'K', 'S', 0xBA, 0xBE, 0, 2
73  };
74  
75  static const char rbd_layered_luks_header_verification[
76          RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN] = {
77      'R', 'B', 'D', 'L', 0xBA, 0xBE, 0, 1
78  };
79  
80  static const char rbd_layered_luks2_header_verification[
81          RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN] = {
82      'R', 'B', 'D', 'L', 0xBA, 0xBE, 0, 2
83  };
84  
85  typedef enum {
86      RBD_AIO_READ,
87      RBD_AIO_WRITE,
88      RBD_AIO_DISCARD,
89      RBD_AIO_FLUSH,
90      RBD_AIO_WRITE_ZEROES
91  } RBDAIOCmd;
92  
93  typedef struct BDRVRBDState {
94      rados_t cluster;
95      rados_ioctx_t io_ctx;
96      rbd_image_t image;
97      char *image_name;
98      char *snap;
99      char *namespace;
100      uint64_t image_size;
101      uint64_t object_size;
102  } BDRVRBDState;
103  
104  typedef struct RBDTask {
105      BlockDriverState *bs;
106      Coroutine *co;
107      bool complete;
108      int64_t ret;
109  } RBDTask;
110  
111  typedef struct RBDDiffIterateReq {
112      uint64_t offs;
113      uint64_t bytes;
114      bool exists;
115  } RBDDiffIterateReq;
116  
117  static int qemu_rbd_connect(rados_t *cluster, rados_ioctx_t *io_ctx,
118                              BlockdevOptionsRbd *opts, bool cache,
119                              const char *keypairs, const char *secretid,
120                              Error **errp);
121  
122  static char *qemu_rbd_strchr(char *src, char delim)
123  {
124      char *p;
125  
126      for (p = src; *p; ++p) {
127          if (*p == delim) {
128              return p;
129          }
130          if (*p == '\\' && p[1] != '\0') {
131              ++p;
132          }
133      }
134  
135      return NULL;
136  }
137  
138  
139  static char *qemu_rbd_next_tok(char *src, char delim, char **p)
140  {
141      char *end;
142  
143      *p = NULL;
144  
145      end = qemu_rbd_strchr(src, delim);
146      if (end) {
147          *p = end + 1;
148          *end = '\0';
149      }
150      return src;
151  }
152  
153  static void qemu_rbd_unescape(char *src)
154  {
155      char *p;
156  
157      for (p = src; *src; ++src, ++p) {
158          if (*src == '\\' && src[1] != '\0') {
159              src++;
160          }
161          *p = *src;
162      }
163      *p = '\0';
164  }
165  
166  static void qemu_rbd_parse_filename(const char *filename, QDict *options,
167                                      Error **errp)
168  {
169      const char *start;
170      char *p, *buf;
171      QList *keypairs = NULL;
172      char *found_str, *image_name;
173  
174      if (!strstart(filename, "rbd:", &start)) {
175          error_setg(errp, "File name must start with 'rbd:'");
176          return;
177      }
178  
179      buf = g_strdup(start);
180      p = buf;
181  
182      found_str = qemu_rbd_next_tok(p, '/', &p);
183      if (!p) {
184          error_setg(errp, "Pool name is required");
185          goto done;
186      }
187      qemu_rbd_unescape(found_str);
188      qdict_put_str(options, "pool", found_str);
189  
190      if (qemu_rbd_strchr(p, '@')) {
191          image_name = qemu_rbd_next_tok(p, '@', &p);
192  
193          found_str = qemu_rbd_next_tok(p, ':', &p);
194          qemu_rbd_unescape(found_str);
195          qdict_put_str(options, "snapshot", found_str);
196      } else {
197          image_name = qemu_rbd_next_tok(p, ':', &p);
198      }
199      /* Check for namespace in the image_name */
200      if (qemu_rbd_strchr(image_name, '/')) {
201          found_str = qemu_rbd_next_tok(image_name, '/', &image_name);
202          qemu_rbd_unescape(found_str);
203          qdict_put_str(options, "namespace", found_str);
204      } else {
205          qdict_put_str(options, "namespace", "");
206      }
207      qemu_rbd_unescape(image_name);
208      qdict_put_str(options, "image", image_name);
209      if (!p) {
210          goto done;
211      }
212  
213      /* The following are essentially all key/value pairs, and we treat
214       * 'id' and 'conf' a bit special.  Key/value pairs may be in any order. */
215      while (p) {
216          char *name, *value;
217          name = qemu_rbd_next_tok(p, '=', &p);
218          if (!p) {
219              error_setg(errp, "conf option %s has no value", name);
220              break;
221          }
222  
223          qemu_rbd_unescape(name);
224  
225          value = qemu_rbd_next_tok(p, ':', &p);
226          qemu_rbd_unescape(value);
227  
228          if (!strcmp(name, "conf")) {
229              qdict_put_str(options, "conf", value);
230          } else if (!strcmp(name, "id")) {
231              qdict_put_str(options, "user", value);
232          } else {
233              /*
234               * We pass these internally to qemu_rbd_set_keypairs(), so
235               * we can get away with the simpler list of [ "key1",
236               * "value1", "key2", "value2" ] rather than a raw dict
237               * { "key1": "value1", "key2": "value2" } where we can't
238               * guarantee order, or even a more correct but complex
239               * [ { "key1": "value1" }, { "key2": "value2" } ]
240               */
241              if (!keypairs) {
242                  keypairs = qlist_new();
243              }
244              qlist_append_str(keypairs, name);
245              qlist_append_str(keypairs, value);
246          }
247      }
248  
249      if (keypairs) {
250          qdict_put(options, "=keyvalue-pairs",
251                    qstring_from_gstring(qobject_to_json(QOBJECT(keypairs))));
252      }
253  
254  done:
255      g_free(buf);
256      qobject_unref(keypairs);
257      return;
258  }
259  
260  static int qemu_rbd_set_auth(rados_t cluster, BlockdevOptionsRbd *opts,
261                               Error **errp)
262  {
263      char *key, *acr;
264      int r;
265      GString *accu;
266      RbdAuthModeList *auth;
267  
268      if (opts->key_secret) {
269          key = qcrypto_secret_lookup_as_base64(opts->key_secret, errp);
270          if (!key) {
271              return -EIO;
272          }
273          r = rados_conf_set(cluster, "key", key);
274          g_free(key);
275          if (r < 0) {
276              error_setg_errno(errp, -r, "Could not set 'key'");
277              return r;
278          }
279      }
280  
281      if (opts->has_auth_client_required) {
282          accu = g_string_new("");
283          for (auth = opts->auth_client_required; auth; auth = auth->next) {
284              if (accu->str[0]) {
285                  g_string_append_c(accu, ';');
286              }
287              g_string_append(accu, RbdAuthMode_str(auth->value));
288          }
289          acr = g_string_free(accu, FALSE);
290          r = rados_conf_set(cluster, "auth_client_required", acr);
291          g_free(acr);
292          if (r < 0) {
293              error_setg_errno(errp, -r,
294                               "Could not set 'auth_client_required'");
295              return r;
296          }
297      }
298  
299      return 0;
300  }
301  
302  static int qemu_rbd_set_keypairs(rados_t cluster, const char *keypairs_json,
303                                   Error **errp)
304  {
305      QList *keypairs;
306      QString *name;
307      QString *value;
308      const char *key;
309      size_t remaining;
310      int ret = 0;
311  
312      if (!keypairs_json) {
313          return ret;
314      }
315      keypairs = qobject_to(QList,
316                            qobject_from_json(keypairs_json, &error_abort));
317      remaining = qlist_size(keypairs) / 2;
318      assert(remaining);
319  
320      while (remaining--) {
321          name = qobject_to(QString, qlist_pop(keypairs));
322          value = qobject_to(QString, qlist_pop(keypairs));
323          assert(name && value);
324          key = qstring_get_str(name);
325  
326          ret = rados_conf_set(cluster, key, qstring_get_str(value));
327          qobject_unref(value);
328          if (ret < 0) {
329              error_setg_errno(errp, -ret, "invalid conf option %s", key);
330              qobject_unref(name);
331              ret = -EINVAL;
332              break;
333          }
334          qobject_unref(name);
335      }
336  
337      qobject_unref(keypairs);
338      return ret;
339  }
340  
341  #ifdef LIBRBD_SUPPORTS_ENCRYPTION
342  static int qemu_rbd_convert_luks_options(
343          RbdEncryptionOptionsLUKSBase *luks_opts,
344          char **passphrase,
345          size_t *passphrase_len,
346          Error **errp)
347  {
348      return qcrypto_secret_lookup(luks_opts->key_secret, (uint8_t **)passphrase,
349                                   passphrase_len, errp);
350  }
351  
352  static int qemu_rbd_convert_luks_create_options(
353          RbdEncryptionCreateOptionsLUKSBase *luks_opts,
354          rbd_encryption_algorithm_t *alg,
355          char **passphrase,
356          size_t *passphrase_len,
357          Error **errp)
358  {
359      int r = 0;
360  
361      r = qemu_rbd_convert_luks_options(
362              qapi_RbdEncryptionCreateOptionsLUKSBase_base(luks_opts),
363              passphrase, passphrase_len, errp);
364      if (r < 0) {
365          return r;
366      }
367  
368      if (luks_opts->has_cipher_alg) {
369          switch (luks_opts->cipher_alg) {
370              case QCRYPTO_CIPHER_ALGO_AES_128: {
371                  *alg = RBD_ENCRYPTION_ALGORITHM_AES128;
372                  break;
373              }
374              case QCRYPTO_CIPHER_ALGO_AES_256: {
375                  *alg = RBD_ENCRYPTION_ALGORITHM_AES256;
376                  break;
377              }
378              default: {
379                  r = -ENOTSUP;
380                  error_setg_errno(errp, -r, "unknown encryption algorithm: %u",
381                                   luks_opts->cipher_alg);
382                  return r;
383              }
384          }
385      } else {
386          /* default alg */
387          *alg = RBD_ENCRYPTION_ALGORITHM_AES256;
388      }
389  
390      return 0;
391  }
392  
393  static int qemu_rbd_encryption_format(rbd_image_t image,
394                                        RbdEncryptionCreateOptions *encrypt,
395                                        Error **errp)
396  {
397      int r = 0;
398      g_autofree char *passphrase = NULL;
399      rbd_encryption_format_t format;
400      rbd_encryption_options_t opts;
401      rbd_encryption_luks1_format_options_t luks_opts;
402      rbd_encryption_luks2_format_options_t luks2_opts;
403      size_t opts_size;
404      uint64_t raw_size, effective_size;
405  
406      r = rbd_get_size(image, &raw_size);
407      if (r < 0) {
408          error_setg_errno(errp, -r, "cannot get raw image size");
409          return r;
410      }
411  
412      switch (encrypt->format) {
413          case RBD_IMAGE_ENCRYPTION_FORMAT_LUKS: {
414              memset(&luks_opts, 0, sizeof(luks_opts));
415              format = RBD_ENCRYPTION_FORMAT_LUKS1;
416              opts = &luks_opts;
417              opts_size = sizeof(luks_opts);
418              r = qemu_rbd_convert_luks_create_options(
419                      qapi_RbdEncryptionCreateOptionsLUKS_base(&encrypt->u.luks),
420                      &luks_opts.alg, &passphrase, &luks_opts.passphrase_size,
421                      errp);
422              if (r < 0) {
423                  return r;
424              }
425              luks_opts.passphrase = passphrase;
426              break;
427          }
428          case RBD_IMAGE_ENCRYPTION_FORMAT_LUKS2: {
429              memset(&luks2_opts, 0, sizeof(luks2_opts));
430              format = RBD_ENCRYPTION_FORMAT_LUKS2;
431              opts = &luks2_opts;
432              opts_size = sizeof(luks2_opts);
433              r = qemu_rbd_convert_luks_create_options(
434                      qapi_RbdEncryptionCreateOptionsLUKS2_base(
435                              &encrypt->u.luks2),
436                      &luks2_opts.alg, &passphrase, &luks2_opts.passphrase_size,
437                      errp);
438              if (r < 0) {
439                  return r;
440              }
441              luks2_opts.passphrase = passphrase;
442              break;
443          }
444          default: {
445              r = -ENOTSUP;
446              error_setg_errno(
447                      errp, -r, "unknown image encryption format: %u",
448                      encrypt->format);
449              return r;
450          }
451      }
452  
453      r = rbd_encryption_format(image, format, opts, opts_size);
454      if (r < 0) {
455          error_setg_errno(errp, -r, "encryption format fail");
456          return r;
457      }
458  
459      r = rbd_get_size(image, &effective_size);
460      if (r < 0) {
461          error_setg_errno(errp, -r, "cannot get effective image size");
462          return r;
463      }
464  
465      r = rbd_resize(image, raw_size + (raw_size - effective_size));
466      if (r < 0) {
467          error_setg_errno(errp, -r, "cannot resize image after format");
468          return r;
469      }
470  
471      return 0;
472  }
473  
474  static int qemu_rbd_encryption_load(rbd_image_t image,
475                                      RbdEncryptionOptions *encrypt,
476                                      Error **errp)
477  {
478      int r = 0;
479      g_autofree char *passphrase = NULL;
480      rbd_encryption_luks1_format_options_t luks_opts;
481      rbd_encryption_luks2_format_options_t luks2_opts;
482  #ifdef LIBRBD_SUPPORTS_ENCRYPTION_LOAD2
483      rbd_encryption_luks_format_options_t luks_any_opts;
484  #endif
485      rbd_encryption_format_t format;
486      rbd_encryption_options_t opts;
487      size_t opts_size;
488  
489      switch (encrypt->format) {
490          case RBD_IMAGE_ENCRYPTION_FORMAT_LUKS: {
491              memset(&luks_opts, 0, sizeof(luks_opts));
492              format = RBD_ENCRYPTION_FORMAT_LUKS1;
493              opts = &luks_opts;
494              opts_size = sizeof(luks_opts);
495              r = qemu_rbd_convert_luks_options(
496                      qapi_RbdEncryptionOptionsLUKS_base(&encrypt->u.luks),
497                      &passphrase, &luks_opts.passphrase_size, errp);
498              if (r < 0) {
499                  return r;
500              }
501              luks_opts.passphrase = passphrase;
502              break;
503          }
504          case RBD_IMAGE_ENCRYPTION_FORMAT_LUKS2: {
505              memset(&luks2_opts, 0, sizeof(luks2_opts));
506              format = RBD_ENCRYPTION_FORMAT_LUKS2;
507              opts = &luks2_opts;
508              opts_size = sizeof(luks2_opts);
509              r = qemu_rbd_convert_luks_options(
510                      qapi_RbdEncryptionOptionsLUKS2_base(&encrypt->u.luks2),
511                      &passphrase, &luks2_opts.passphrase_size, errp);
512              if (r < 0) {
513                  return r;
514              }
515              luks2_opts.passphrase = passphrase;
516              break;
517          }
518  #ifdef LIBRBD_SUPPORTS_ENCRYPTION_LOAD2
519          case RBD_IMAGE_ENCRYPTION_FORMAT_LUKS_ANY: {
520              memset(&luks_any_opts, 0, sizeof(luks_any_opts));
521              format = RBD_ENCRYPTION_FORMAT_LUKS;
522              opts = &luks_any_opts;
523              opts_size = sizeof(luks_any_opts);
524              r = qemu_rbd_convert_luks_options(
525                      qapi_RbdEncryptionOptionsLUKSAny_base(&encrypt->u.luks_any),
526                      &passphrase, &luks_any_opts.passphrase_size, errp);
527              if (r < 0) {
528                  return r;
529              }
530              luks_any_opts.passphrase = passphrase;
531              break;
532          }
533  #endif
534          default: {
535              r = -ENOTSUP;
536              error_setg_errno(
537                      errp, -r, "unknown image encryption format: %u",
538                      encrypt->format);
539              return r;
540          }
541      }
542  
543      r = rbd_encryption_load(image, format, opts, opts_size);
544      if (r < 0) {
545          error_setg_errno(errp, -r, "encryption load fail");
546          return r;
547      }
548  
549      return 0;
550  }
551  
552  #ifdef LIBRBD_SUPPORTS_ENCRYPTION_LOAD2
553  static int qemu_rbd_encryption_load2(rbd_image_t image,
554                                       RbdEncryptionOptions *encrypt,
555                                       Error **errp)
556  {
557      int r = 0;
558      int encrypt_count = 1;
559      int i;
560      RbdEncryptionOptions *curr_encrypt;
561      rbd_encryption_spec_t *specs;
562      rbd_encryption_luks1_format_options_t *luks_opts;
563      rbd_encryption_luks2_format_options_t *luks2_opts;
564      rbd_encryption_luks_format_options_t *luks_any_opts;
565  
566      /* count encryption options */
567      for (curr_encrypt = encrypt->parent; curr_encrypt;
568           curr_encrypt = curr_encrypt->parent) {
569          ++encrypt_count;
570      }
571  
572      specs = g_new0(rbd_encryption_spec_t, encrypt_count);
573  
574      curr_encrypt = encrypt;
575      for (i = 0; i < encrypt_count; ++i) {
576          switch (curr_encrypt->format) {
577              case RBD_IMAGE_ENCRYPTION_FORMAT_LUKS: {
578                  specs[i].format = RBD_ENCRYPTION_FORMAT_LUKS1;
579  
580                  luks_opts = g_new0(rbd_encryption_luks1_format_options_t, 1);
581                  specs[i].opts = luks_opts;
582                  specs[i].opts_size = sizeof(*luks_opts);
583  
584                  r = qemu_rbd_convert_luks_options(
585                          qapi_RbdEncryptionOptionsLUKS_base(
586                                  &curr_encrypt->u.luks),
587                          (char **)&luks_opts->passphrase,
588                          &luks_opts->passphrase_size,
589                          errp);
590                  break;
591              }
592              case RBD_IMAGE_ENCRYPTION_FORMAT_LUKS2: {
593                  specs[i].format = RBD_ENCRYPTION_FORMAT_LUKS2;
594  
595                  luks2_opts = g_new0(rbd_encryption_luks2_format_options_t, 1);
596                  specs[i].opts = luks2_opts;
597                  specs[i].opts_size = sizeof(*luks2_opts);
598  
599                  r = qemu_rbd_convert_luks_options(
600                          qapi_RbdEncryptionOptionsLUKS2_base(
601                                  &curr_encrypt->u.luks2),
602                          (char **)&luks2_opts->passphrase,
603                          &luks2_opts->passphrase_size,
604                          errp);
605                  break;
606              }
607              case RBD_IMAGE_ENCRYPTION_FORMAT_LUKS_ANY: {
608                  specs[i].format = RBD_ENCRYPTION_FORMAT_LUKS;
609  
610                  luks_any_opts = g_new0(rbd_encryption_luks_format_options_t, 1);
611                  specs[i].opts = luks_any_opts;
612                  specs[i].opts_size = sizeof(*luks_any_opts);
613  
614                  r = qemu_rbd_convert_luks_options(
615                          qapi_RbdEncryptionOptionsLUKSAny_base(
616                                  &curr_encrypt->u.luks_any),
617                          (char **)&luks_any_opts->passphrase,
618                          &luks_any_opts->passphrase_size,
619                          errp);
620                  break;
621              }
622              default: {
623                  r = -ENOTSUP;
624                  error_setg_errno(
625                          errp, -r, "unknown image encryption format: %u",
626                          curr_encrypt->format);
627              }
628          }
629  
630          if (r < 0) {
631              goto exit;
632          }
633  
634          curr_encrypt = curr_encrypt->parent;
635      }
636  
637      r = rbd_encryption_load2(image, specs, encrypt_count);
638      if (r < 0) {
639          error_setg_errno(errp, -r, "layered encryption load fail");
640          goto exit;
641      }
642  
643  exit:
644      for (i = 0; i < encrypt_count; ++i) {
645          if (!specs[i].opts) {
646              break;
647          }
648  
649          switch (specs[i].format) {
650              case RBD_ENCRYPTION_FORMAT_LUKS1: {
651                  luks_opts = specs[i].opts;
652                  g_free((void *)luks_opts->passphrase);
653                  break;
654              }
655              case RBD_ENCRYPTION_FORMAT_LUKS2: {
656                  luks2_opts = specs[i].opts;
657                  g_free((void *)luks2_opts->passphrase);
658                  break;
659              }
660              case RBD_ENCRYPTION_FORMAT_LUKS: {
661                  luks_any_opts = specs[i].opts;
662                  g_free((void *)luks_any_opts->passphrase);
663                  break;
664              }
665          }
666  
667          g_free(specs[i].opts);
668      }
669      g_free(specs);
670      return r;
671  }
672  #endif
673  #endif
674  
675  /* FIXME Deprecate and remove keypairs or make it available in QMP. */
676  static int qemu_rbd_do_create(BlockdevCreateOptions *options,
677                                const char *keypairs, const char *password_secret,
678                                Error **errp)
679  {
680      BlockdevCreateOptionsRbd *opts = &options->u.rbd;
681      rados_t cluster;
682      rados_ioctx_t io_ctx;
683      int obj_order = 0;
684      int ret;
685  
686      assert(options->driver == BLOCKDEV_DRIVER_RBD);
687      if (opts->location->snapshot) {
688          error_setg(errp, "Can't use snapshot name for image creation");
689          return -EINVAL;
690      }
691  
692  #ifndef LIBRBD_SUPPORTS_ENCRYPTION
693      if (opts->encrypt) {
694          error_setg(errp, "RBD library does not support image encryption");
695          return -ENOTSUP;
696      }
697  #endif
698  
699      if (opts->has_cluster_size) {
700          int64_t objsize = opts->cluster_size;
701          if ((objsize - 1) & objsize) {    /* not a power of 2? */
702              error_setg(errp, "obj size needs to be power of 2");
703              return -EINVAL;
704          }
705          if (objsize < 4096) {
706              error_setg(errp, "obj size too small");
707              return -EINVAL;
708          }
709          obj_order = ctz32(objsize);
710      }
711  
712      ret = qemu_rbd_connect(&cluster, &io_ctx, opts->location, false, keypairs,
713                             password_secret, errp);
714      if (ret < 0) {
715          return ret;
716      }
717  
718      ret = rbd_create(io_ctx, opts->location->image, opts->size, &obj_order);
719      if (ret < 0) {
720          error_setg_errno(errp, -ret, "error rbd create");
721          goto out;
722      }
723  
724  #ifdef LIBRBD_SUPPORTS_ENCRYPTION
725      if (opts->encrypt) {
726          rbd_image_t image;
727  
728          ret = rbd_open(io_ctx, opts->location->image, &image, NULL);
729          if (ret < 0) {
730              error_setg_errno(errp, -ret,
731                               "error opening image '%s' for encryption format",
732                               opts->location->image);
733              goto out;
734          }
735  
736          ret = qemu_rbd_encryption_format(image, opts->encrypt, errp);
737          rbd_close(image);
738          if (ret < 0) {
739              /* encryption format fail, try removing the image */
740              rbd_remove(io_ctx, opts->location->image);
741              goto out;
742          }
743      }
744  #endif
745  
746      ret = 0;
747  out:
748      rados_ioctx_destroy(io_ctx);
749      rados_shutdown(cluster);
750      return ret;
751  }
752  
753  static int qemu_rbd_co_create(BlockdevCreateOptions *options, Error **errp)
754  {
755      return qemu_rbd_do_create(options, NULL, NULL, errp);
756  }
757  
758  static int qemu_rbd_extract_encryption_create_options(
759          QemuOpts *opts,
760          RbdEncryptionCreateOptions **spec,
761          Error **errp)
762  {
763      QDict *opts_qdict;
764      QDict *encrypt_qdict;
765      Visitor *v;
766      int ret = 0;
767  
768      opts_qdict = qemu_opts_to_qdict(opts, NULL);
769      qdict_extract_subqdict(opts_qdict, &encrypt_qdict, "encrypt.");
770      qobject_unref(opts_qdict);
771      if (!qdict_size(encrypt_qdict)) {
772          *spec = NULL;
773          goto exit;
774      }
775  
776      /* Convert options into a QAPI object */
777      v = qobject_input_visitor_new_flat_confused(encrypt_qdict, errp);
778      if (!v) {
779          ret = -EINVAL;
780          goto exit;
781      }
782  
783      visit_type_RbdEncryptionCreateOptions(v, NULL, spec, errp);
784      visit_free(v);
785      if (!*spec) {
786          ret = -EINVAL;
787          goto exit;
788      }
789  
790  exit:
791      qobject_unref(encrypt_qdict);
792      return ret;
793  }
794  
795  static int coroutine_fn qemu_rbd_co_create_opts(BlockDriver *drv,
796                                                  const char *filename,
797                                                  QemuOpts *opts,
798                                                  Error **errp)
799  {
800      BlockdevCreateOptions *create_options;
801      BlockdevCreateOptionsRbd *rbd_opts;
802      BlockdevOptionsRbd *loc;
803      RbdEncryptionCreateOptions *encrypt = NULL;
804      Error *local_err = NULL;
805      const char *keypairs, *password_secret;
806      QDict *options = NULL;
807      int ret = 0;
808  
809      create_options = g_new0(BlockdevCreateOptions, 1);
810      create_options->driver = BLOCKDEV_DRIVER_RBD;
811      rbd_opts = &create_options->u.rbd;
812  
813      rbd_opts->location = g_new0(BlockdevOptionsRbd, 1);
814  
815      password_secret = qemu_opt_get(opts, "password-secret");
816  
817      /* Read out options */
818      rbd_opts->size = ROUND_UP(qemu_opt_get_size_del(opts, BLOCK_OPT_SIZE, 0),
819                                BDRV_SECTOR_SIZE);
820      rbd_opts->cluster_size = qemu_opt_get_size_del(opts,
821                                                     BLOCK_OPT_CLUSTER_SIZE, 0);
822      rbd_opts->has_cluster_size = (rbd_opts->cluster_size != 0);
823  
824      options = qdict_new();
825      qemu_rbd_parse_filename(filename, options, &local_err);
826      if (local_err) {
827          ret = -EINVAL;
828          error_propagate(errp, local_err);
829          goto exit;
830      }
831  
832      ret = qemu_rbd_extract_encryption_create_options(opts, &encrypt, errp);
833      if (ret < 0) {
834          goto exit;
835      }
836      rbd_opts->encrypt     = encrypt;
837  
838      /*
839       * Caution: while qdict_get_try_str() is fine, getting non-string
840       * types would require more care.  When @options come from -blockdev
841       * or blockdev_add, its members are typed according to the QAPI
842       * schema, but when they come from -drive, they're all QString.
843       */
844      loc = rbd_opts->location;
845      loc->pool        = g_strdup(qdict_get_try_str(options, "pool"));
846      loc->conf        = g_strdup(qdict_get_try_str(options, "conf"));
847      loc->user        = g_strdup(qdict_get_try_str(options, "user"));
848      loc->q_namespace = g_strdup(qdict_get_try_str(options, "namespace"));
849      loc->image       = g_strdup(qdict_get_try_str(options, "image"));
850      keypairs         = qdict_get_try_str(options, "=keyvalue-pairs");
851  
852      ret = qemu_rbd_do_create(create_options, keypairs, password_secret, errp);
853      if (ret < 0) {
854          goto exit;
855      }
856  
857  exit:
858      qobject_unref(options);
859      qapi_free_BlockdevCreateOptions(create_options);
860      return ret;
861  }
862  
863  static char *qemu_rbd_mon_host(BlockdevOptionsRbd *opts, Error **errp)
864  {
865      const char **vals;
866      const char *host, *port;
867      char *rados_str;
868      InetSocketAddressBaseList *p;
869      int i, cnt;
870  
871      if (!opts->has_server) {
872          return NULL;
873      }
874  
875      for (cnt = 0, p = opts->server; p; p = p->next) {
876          cnt++;
877      }
878  
879      vals = g_new(const char *, cnt + 1);
880  
881      for (i = 0, p = opts->server; p; p = p->next, i++) {
882          host = p->value->host;
883          port = p->value->port;
884  
885          if (strchr(host, ':')) {
886              vals[i] = g_strdup_printf("[%s]:%s", host, port);
887          } else {
888              vals[i] = g_strdup_printf("%s:%s", host, port);
889          }
890      }
891      vals[i] = NULL;
892  
893      rados_str = i ? g_strjoinv(";", (char **)vals) : NULL;
894      g_strfreev((char **)vals);
895      return rados_str;
896  }
897  
898  static int qemu_rbd_connect(rados_t *cluster, rados_ioctx_t *io_ctx,
899                              BlockdevOptionsRbd *opts, bool cache,
900                              const char *keypairs, const char *secretid,
901                              Error **errp)
902  {
903      char *mon_host = NULL;
904      Error *local_err = NULL;
905      int r;
906  
907      if (secretid) {
908          if (opts->key_secret) {
909              error_setg(errp,
910                         "Legacy 'password-secret' clashes with 'key-secret'");
911              return -EINVAL;
912          }
913          opts->key_secret = g_strdup(secretid);
914      }
915  
916      mon_host = qemu_rbd_mon_host(opts, &local_err);
917      if (local_err) {
918          error_propagate(errp, local_err);
919          r = -EINVAL;
920          goto out;
921      }
922  
923      r = rados_create(cluster, opts->user);
924      if (r < 0) {
925          error_setg_errno(errp, -r, "error initializing");
926          goto out;
927      }
928  
929      /* try default location when conf=NULL, but ignore failure */
930      r = rados_conf_read_file(*cluster, opts->conf);
931      if (opts->conf && r < 0) {
932          error_setg_errno(errp, -r, "error reading conf file %s", opts->conf);
933          goto failed_shutdown;
934      }
935  
936      r = qemu_rbd_set_keypairs(*cluster, keypairs, errp);
937      if (r < 0) {
938          goto failed_shutdown;
939      }
940  
941      if (mon_host) {
942          r = rados_conf_set(*cluster, "mon_host", mon_host);
943          if (r < 0) {
944              goto failed_shutdown;
945          }
946      }
947  
948      r = qemu_rbd_set_auth(*cluster, opts, errp);
949      if (r < 0) {
950          goto failed_shutdown;
951      }
952  
953      /*
954       * Fallback to more conservative semantics if setting cache
955       * options fails. Ignore errors from setting rbd_cache because the
956       * only possible error is that the option does not exist, and
957       * librbd defaults to no caching. If write through caching cannot
958       * be set up, fall back to no caching.
959       */
960      if (cache) {
961          rados_conf_set(*cluster, "rbd_cache", "true");
962      } else {
963          rados_conf_set(*cluster, "rbd_cache", "false");
964      }
965  
966      r = rados_connect(*cluster);
967      if (r < 0) {
968          error_setg_errno(errp, -r, "error connecting");
969          goto failed_shutdown;
970      }
971  
972      r = rados_ioctx_create(*cluster, opts->pool, io_ctx);
973      if (r < 0) {
974          error_setg_errno(errp, -r, "error opening pool %s", opts->pool);
975          goto failed_shutdown;
976      }
977  
978  #ifdef HAVE_RBD_NAMESPACE_EXISTS
979      if (opts->q_namespace && strlen(opts->q_namespace) > 0) {
980          bool exists;
981  
982          r = rbd_namespace_exists(*io_ctx, opts->q_namespace, &exists);
983          if (r < 0) {
984              error_setg_errno(errp, -r, "error checking namespace");
985              goto failed_ioctx_destroy;
986          }
987  
988          if (!exists) {
989              error_setg(errp, "namespace '%s' does not exist",
990                         opts->q_namespace);
991              r = -ENOENT;
992              goto failed_ioctx_destroy;
993          }
994      }
995  #endif
996  
997      /*
998       * Set the namespace after opening the io context on the pool,
999       * if nspace == NULL or if nspace == "", it is just as we did nothing
1000       */
1001      rados_ioctx_set_namespace(*io_ctx, opts->q_namespace);
1002  
1003      r = 0;
1004      goto out;
1005  
1006  #ifdef HAVE_RBD_NAMESPACE_EXISTS
1007  failed_ioctx_destroy:
1008      rados_ioctx_destroy(*io_ctx);
1009  #endif
1010  failed_shutdown:
1011      rados_shutdown(*cluster);
1012  out:
1013      g_free(mon_host);
1014      return r;
1015  }
1016  
1017  static int qemu_rbd_convert_options(QDict *options, BlockdevOptionsRbd **opts,
1018                                      Error **errp)
1019  {
1020      Visitor *v;
1021  
1022      /* Convert the remaining options into a QAPI object */
1023      v = qobject_input_visitor_new_flat_confused(options, errp);
1024      if (!v) {
1025          return -EINVAL;
1026      }
1027  
1028      visit_type_BlockdevOptionsRbd(v, NULL, opts, errp);
1029      visit_free(v);
1030      if (!opts) {
1031          return -EINVAL;
1032      }
1033  
1034      return 0;
1035  }
1036  
1037  static int qemu_rbd_attempt_legacy_options(QDict *options,
1038                                             BlockdevOptionsRbd **opts,
1039                                             char **keypairs)
1040  {
1041      char *filename;
1042      int r;
1043  
1044      filename = g_strdup(qdict_get_try_str(options, "filename"));
1045      if (!filename) {
1046          return -EINVAL;
1047      }
1048      qdict_del(options, "filename");
1049  
1050      qemu_rbd_parse_filename(filename, options, NULL);
1051  
1052      /* keypairs freed by caller */
1053      *keypairs = g_strdup(qdict_get_try_str(options, "=keyvalue-pairs"));
1054      if (*keypairs) {
1055          qdict_del(options, "=keyvalue-pairs");
1056      }
1057  
1058      r = qemu_rbd_convert_options(options, opts, NULL);
1059  
1060      g_free(filename);
1061      return r;
1062  }
1063  
1064  static int qemu_rbd_open(BlockDriverState *bs, QDict *options, int flags,
1065                           Error **errp)
1066  {
1067      BDRVRBDState *s = bs->opaque;
1068      BlockdevOptionsRbd *opts = NULL;
1069      const QDictEntry *e;
1070      Error *local_err = NULL;
1071      char *keypairs, *secretid;
1072      rbd_image_info_t info;
1073      int r;
1074  
1075      keypairs = g_strdup(qdict_get_try_str(options, "=keyvalue-pairs"));
1076      if (keypairs) {
1077          qdict_del(options, "=keyvalue-pairs");
1078      }
1079  
1080      secretid = g_strdup(qdict_get_try_str(options, "password-secret"));
1081      if (secretid) {
1082          qdict_del(options, "password-secret");
1083      }
1084  
1085      r = qemu_rbd_convert_options(options, &opts, &local_err);
1086      if (local_err) {
1087          /* If keypairs are present, that means some options are present in
1088           * the modern option format.  Don't attempt to parse legacy option
1089           * formats, as we won't support mixed usage. */
1090          if (keypairs) {
1091              error_propagate(errp, local_err);
1092              goto out;
1093          }
1094  
1095          /* If the initial attempt to convert and process the options failed,
1096           * we may be attempting to open an image file that has the rbd options
1097           * specified in the older format consisting of all key/value pairs
1098           * encoded in the filename.  Go ahead and attempt to parse the
1099           * filename, and see if we can pull out the required options. */
1100          r = qemu_rbd_attempt_legacy_options(options, &opts, &keypairs);
1101          if (r < 0) {
1102              /* Propagate the original error, not the legacy parsing fallback
1103               * error, as the latter was just a best-effort attempt. */
1104              error_propagate(errp, local_err);
1105              goto out;
1106          }
1107          /* Take care whenever deciding to actually deprecate; once this ability
1108           * is removed, we will not be able to open any images with legacy-styled
1109           * backing image strings. */
1110          warn_report("RBD options encoded in the filename as keyvalue pairs "
1111                      "is deprecated");
1112      }
1113  
1114      /* Remove the processed options from the QDict (the visitor processes
1115       * _all_ options in the QDict) */
1116      while ((e = qdict_first(options))) {
1117          qdict_del(options, e->key);
1118      }
1119  
1120      r = qemu_rbd_connect(&s->cluster, &s->io_ctx, opts,
1121                           !(flags & BDRV_O_NOCACHE), keypairs, secretid, errp);
1122      if (r < 0) {
1123          goto out;
1124      }
1125  
1126      s->snap = g_strdup(opts->snapshot);
1127      s->image_name = g_strdup(opts->image);
1128  
1129      /* rbd_open is always r/w */
1130      r = rbd_open(s->io_ctx, s->image_name, &s->image, s->snap);
1131      if (r < 0) {
1132          error_setg_errno(errp, -r, "error reading header from %s",
1133                           s->image_name);
1134          goto failed_open;
1135      }
1136  
1137      if (opts->encrypt) {
1138  #ifdef LIBRBD_SUPPORTS_ENCRYPTION
1139          if (opts->encrypt->parent) {
1140  #ifdef LIBRBD_SUPPORTS_ENCRYPTION_LOAD2
1141              r = qemu_rbd_encryption_load2(s->image, opts->encrypt, errp);
1142  #else
1143              r = -ENOTSUP;
1144              error_setg(errp, "RBD library does not support layered encryption");
1145  #endif
1146          } else {
1147              r = qemu_rbd_encryption_load(s->image, opts->encrypt, errp);
1148          }
1149          if (r < 0) {
1150              goto failed_post_open;
1151          }
1152  #else
1153          r = -ENOTSUP;
1154          error_setg(errp, "RBD library does not support image encryption");
1155          goto failed_post_open;
1156  #endif
1157      }
1158  
1159      r = rbd_stat(s->image, &info, sizeof(info));
1160      if (r < 0) {
1161          error_setg_errno(errp, -r, "error getting image info from %s",
1162                           s->image_name);
1163          goto failed_post_open;
1164      }
1165      s->image_size = info.size;
1166      s->object_size = info.obj_size;
1167  
1168      /* If we are using an rbd snapshot, we must be r/o, otherwise
1169       * leave as-is */
1170      if (s->snap != NULL) {
1171          bdrv_graph_rdlock_main_loop();
1172          r = bdrv_apply_auto_read_only(bs, "rbd snapshots are read-only", errp);
1173          bdrv_graph_rdunlock_main_loop();
1174          if (r < 0) {
1175              goto failed_post_open;
1176          }
1177      }
1178  
1179  #ifdef LIBRBD_SUPPORTS_WRITE_ZEROES
1180      bs->supported_zero_flags = BDRV_REQ_MAY_UNMAP | BDRV_REQ_NO_FALLBACK;
1181  #endif
1182  
1183      /* When extending regular files, we get zeros from the OS */
1184      bs->supported_truncate_flags = BDRV_REQ_ZERO_WRITE;
1185  
1186      r = 0;
1187      goto out;
1188  
1189  failed_post_open:
1190      rbd_close(s->image);
1191  failed_open:
1192      rados_ioctx_destroy(s->io_ctx);
1193      g_free(s->snap);
1194      g_free(s->image_name);
1195      rados_shutdown(s->cluster);
1196  out:
1197      qapi_free_BlockdevOptionsRbd(opts);
1198      g_free(keypairs);
1199      g_free(secretid);
1200      return r;
1201  }
1202  
1203  
1204  /* Since RBD is currently always opened R/W via the API,
1205   * we just need to check if we are using a snapshot or not, in
1206   * order to determine if we will allow it to be R/W */
1207  static int qemu_rbd_reopen_prepare(BDRVReopenState *state,
1208                                     BlockReopenQueue *queue, Error **errp)
1209  {
1210      BDRVRBDState *s = state->bs->opaque;
1211      int ret = 0;
1212  
1213      GRAPH_RDLOCK_GUARD_MAINLOOP();
1214  
1215      if (s->snap && state->flags & BDRV_O_RDWR) {
1216          error_setg(errp,
1217                     "Cannot change node '%s' to r/w when using RBD snapshot",
1218                     bdrv_get_device_or_node_name(state->bs));
1219          ret = -EINVAL;
1220      }
1221  
1222      return ret;
1223  }
1224  
1225  static void qemu_rbd_close(BlockDriverState *bs)
1226  {
1227      BDRVRBDState *s = bs->opaque;
1228  
1229      rbd_close(s->image);
1230      rados_ioctx_destroy(s->io_ctx);
1231      g_free(s->snap);
1232      g_free(s->image_name);
1233      rados_shutdown(s->cluster);
1234  }
1235  
1236  /* Resize the RBD image and update the 'image_size' with the current size */
1237  static int qemu_rbd_resize(BlockDriverState *bs, uint64_t size)
1238  {
1239      BDRVRBDState *s = bs->opaque;
1240      int r;
1241  
1242      r = rbd_resize(s->image, size);
1243      if (r < 0) {
1244          return r;
1245      }
1246  
1247      s->image_size = size;
1248  
1249      return 0;
1250  }
1251  
1252  static void qemu_rbd_finish_bh(void *opaque)
1253  {
1254      RBDTask *task = opaque;
1255      task->complete = true;
1256      aio_co_wake(task->co);
1257  }
1258  
1259  /*
1260   * This is the completion callback function for all rbd aio calls
1261   * started from qemu_rbd_start_co().
1262   *
1263   * Note: this function is being called from a non qemu thread so
1264   * we need to be careful about what we do here. Generally we only
1265   * schedule a BH, and do the rest of the io completion handling
1266   * from qemu_rbd_finish_bh() which runs in a qemu context.
1267   */
1268  static void qemu_rbd_completion_cb(rbd_completion_t c, RBDTask *task)
1269  {
1270      task->ret = rbd_aio_get_return_value(c);
1271      rbd_aio_release(c);
1272      aio_bh_schedule_oneshot(bdrv_get_aio_context(task->bs),
1273                              qemu_rbd_finish_bh, task);
1274  }
1275  
1276  static int coroutine_fn qemu_rbd_start_co(BlockDriverState *bs,
1277                                            uint64_t offset,
1278                                            uint64_t bytes,
1279                                            QEMUIOVector *qiov,
1280                                            int flags,
1281                                            RBDAIOCmd cmd)
1282  {
1283      BDRVRBDState *s = bs->opaque;
1284      RBDTask task = { .bs = bs, .co = qemu_coroutine_self() };
1285      rbd_completion_t c;
1286      int r;
1287  
1288      assert(!qiov || qiov->size == bytes);
1289  
1290      if (cmd == RBD_AIO_WRITE || cmd == RBD_AIO_WRITE_ZEROES) {
1291          /*
1292           * RBD APIs don't allow us to write more than actual size, so in order
1293           * to support growing images, we resize the image before write
1294           * operations that exceed the current size.
1295           */
1296          if (offset + bytes > s->image_size) {
1297              r = qemu_rbd_resize(bs, offset + bytes);
1298              if (r < 0) {
1299                  return r;
1300              }
1301          }
1302      }
1303  
1304      r = rbd_aio_create_completion(&task,
1305                                    (rbd_callback_t) qemu_rbd_completion_cb, &c);
1306      if (r < 0) {
1307          return r;
1308      }
1309  
1310      switch (cmd) {
1311      case RBD_AIO_READ:
1312          r = rbd_aio_readv(s->image, qiov->iov, qiov->niov, offset, c);
1313          break;
1314      case RBD_AIO_WRITE:
1315          r = rbd_aio_writev(s->image, qiov->iov, qiov->niov, offset, c);
1316          break;
1317      case RBD_AIO_DISCARD:
1318          r = rbd_aio_discard(s->image, offset, bytes, c);
1319          break;
1320      case RBD_AIO_FLUSH:
1321          r = rbd_aio_flush(s->image, c);
1322          break;
1323  #ifdef LIBRBD_SUPPORTS_WRITE_ZEROES
1324      case RBD_AIO_WRITE_ZEROES: {
1325          int zero_flags = 0;
1326  #ifdef RBD_WRITE_ZEROES_FLAG_THICK_PROVISION
1327          if (!(flags & BDRV_REQ_MAY_UNMAP)) {
1328              zero_flags = RBD_WRITE_ZEROES_FLAG_THICK_PROVISION;
1329          }
1330  #endif
1331          r = rbd_aio_write_zeroes(s->image, offset, bytes, c, zero_flags, 0);
1332          break;
1333      }
1334  #endif
1335      default:
1336          r = -EINVAL;
1337      }
1338  
1339      if (r < 0) {
1340          error_report("rbd request failed early: cmd %d offset %" PRIu64
1341                       " bytes %" PRIu64 " flags %d r %d (%s)", cmd, offset,
1342                       bytes, flags, r, strerror(-r));
1343          rbd_aio_release(c);
1344          return r;
1345      }
1346  
1347      while (!task.complete) {
1348          qemu_coroutine_yield();
1349      }
1350  
1351      if (task.ret < 0) {
1352          error_report("rbd request failed: cmd %d offset %" PRIu64 " bytes %"
1353                       PRIu64 " flags %d task.ret %" PRIi64 " (%s)", cmd, offset,
1354                       bytes, flags, task.ret, strerror(-task.ret));
1355          return task.ret;
1356      }
1357  
1358      /* zero pad short reads */
1359      if (cmd == RBD_AIO_READ && task.ret < qiov->size) {
1360          qemu_iovec_memset(qiov, task.ret, 0, qiov->size - task.ret);
1361      }
1362  
1363      return 0;
1364  }
1365  
1366  static int
1367  coroutine_fn qemu_rbd_co_preadv(BlockDriverState *bs, int64_t offset,
1368                                  int64_t bytes, QEMUIOVector *qiov,
1369                                  BdrvRequestFlags flags)
1370  {
1371      return qemu_rbd_start_co(bs, offset, bytes, qiov, flags, RBD_AIO_READ);
1372  }
1373  
1374  static int
1375  coroutine_fn qemu_rbd_co_pwritev(BlockDriverState *bs, int64_t offset,
1376                                   int64_t bytes, QEMUIOVector *qiov,
1377                                   BdrvRequestFlags flags)
1378  {
1379      return qemu_rbd_start_co(bs, offset, bytes, qiov, flags, RBD_AIO_WRITE);
1380  }
1381  
1382  static int coroutine_fn qemu_rbd_co_flush(BlockDriverState *bs)
1383  {
1384      return qemu_rbd_start_co(bs, 0, 0, NULL, 0, RBD_AIO_FLUSH);
1385  }
1386  
1387  static int coroutine_fn qemu_rbd_co_pdiscard(BlockDriverState *bs,
1388                                               int64_t offset, int64_t bytes)
1389  {
1390      return qemu_rbd_start_co(bs, offset, bytes, NULL, 0, RBD_AIO_DISCARD);
1391  }
1392  
1393  #ifdef LIBRBD_SUPPORTS_WRITE_ZEROES
1394  static int
1395  coroutine_fn qemu_rbd_co_pwrite_zeroes(BlockDriverState *bs, int64_t offset,
1396                                         int64_t bytes, BdrvRequestFlags flags)
1397  {
1398      return qemu_rbd_start_co(bs, offset, bytes, NULL, flags,
1399                               RBD_AIO_WRITE_ZEROES);
1400  }
1401  #endif
1402  
1403  static int coroutine_fn
1404  qemu_rbd_co_get_info(BlockDriverState *bs, BlockDriverInfo *bdi)
1405  {
1406      BDRVRBDState *s = bs->opaque;
1407      bdi->cluster_size = s->object_size;
1408      return 0;
1409  }
1410  
1411  static ImageInfoSpecific *qemu_rbd_get_specific_info(BlockDriverState *bs,
1412                                                       Error **errp)
1413  {
1414      BDRVRBDState *s = bs->opaque;
1415      ImageInfoSpecific *spec_info;
1416      char buf[RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN] = {0};
1417      int r;
1418  
1419      if (s->image_size >= RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN) {
1420          r = rbd_read(s->image, 0,
1421                       RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN, buf);
1422          if (r < 0) {
1423              error_setg_errno(errp, -r, "cannot read image start for probe");
1424              return NULL;
1425          }
1426      }
1427  
1428      spec_info = g_new(ImageInfoSpecific, 1);
1429      *spec_info = (ImageInfoSpecific){
1430          .type  = IMAGE_INFO_SPECIFIC_KIND_RBD,
1431          .u.rbd.data = g_new0(ImageInfoSpecificRbd, 1),
1432      };
1433  
1434      if (memcmp(buf, rbd_luks_header_verification,
1435                 RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN) == 0) {
1436          spec_info->u.rbd.data->encryption_format =
1437                  RBD_IMAGE_ENCRYPTION_FORMAT_LUKS;
1438          spec_info->u.rbd.data->has_encryption_format = true;
1439      } else if (memcmp(buf, rbd_luks2_header_verification,
1440                 RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN) == 0) {
1441          spec_info->u.rbd.data->encryption_format =
1442                  RBD_IMAGE_ENCRYPTION_FORMAT_LUKS2;
1443          spec_info->u.rbd.data->has_encryption_format = true;
1444      } else if (memcmp(buf, rbd_layered_luks_header_verification,
1445                 RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN) == 0) {
1446          spec_info->u.rbd.data->encryption_format =
1447                  RBD_IMAGE_ENCRYPTION_FORMAT_LUKS;
1448          spec_info->u.rbd.data->has_encryption_format = true;
1449      } else if (memcmp(buf, rbd_layered_luks2_header_verification,
1450                 RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN) == 0) {
1451          spec_info->u.rbd.data->encryption_format =
1452                  RBD_IMAGE_ENCRYPTION_FORMAT_LUKS2;
1453          spec_info->u.rbd.data->has_encryption_format = true;
1454      } else {
1455          spec_info->u.rbd.data->has_encryption_format = false;
1456      }
1457  
1458      return spec_info;
1459  }
1460  
1461  /*
1462   * rbd_diff_iterate2 allows to interrupt the exection by returning a negative
1463   * value in the callback routine. Choose a value that does not conflict with
1464   * an existing exitcode and return it if we want to prematurely stop the
1465   * execution because we detected a change in the allocation status.
1466   */
1467  #define QEMU_RBD_EXIT_DIFF_ITERATE2 -9000
1468  
1469  static int qemu_rbd_diff_iterate_cb(uint64_t offs, size_t len,
1470                                      int exists, void *opaque)
1471  {
1472      RBDDiffIterateReq *req = opaque;
1473  
1474      assert(req->offs + req->bytes <= offs);
1475  
1476      /* treat a hole like an unallocated area and bail out */
1477      if (!exists) {
1478          return 0;
1479      }
1480  
1481      if (!req->exists && offs > req->offs) {
1482          /*
1483           * we started in an unallocated area and hit the first allocated
1484           * block. req->bytes must be set to the length of the unallocated area
1485           * before the allocated area. stop further processing.
1486           */
1487          req->bytes = offs - req->offs;
1488          return QEMU_RBD_EXIT_DIFF_ITERATE2;
1489      }
1490  
1491      if (req->exists && offs > req->offs + req->bytes) {
1492          /*
1493           * we started in an allocated area and jumped over an unallocated area,
1494           * req->bytes contains the length of the allocated area before the
1495           * unallocated area. stop further processing.
1496           */
1497          return QEMU_RBD_EXIT_DIFF_ITERATE2;
1498      }
1499  
1500      req->bytes += len;
1501      req->exists = true;
1502  
1503      return 0;
1504  }
1505  
1506  static int coroutine_fn qemu_rbd_co_block_status(BlockDriverState *bs,
1507                                                   bool want_zero, int64_t offset,
1508                                                   int64_t bytes, int64_t *pnum,
1509                                                   int64_t *map,
1510                                                   BlockDriverState **file)
1511  {
1512      BDRVRBDState *s = bs->opaque;
1513      int status, r;
1514      RBDDiffIterateReq req = { .offs = offset };
1515      uint64_t features, flags;
1516      uint64_t head = 0;
1517  
1518      assert(offset + bytes <= s->image_size);
1519  
1520      /* default to all sectors allocated */
1521      status = BDRV_BLOCK_DATA | BDRV_BLOCK_OFFSET_VALID;
1522      *map = offset;
1523      *file = bs;
1524      *pnum = bytes;
1525  
1526      /* check if RBD image supports fast-diff */
1527      r = rbd_get_features(s->image, &features);
1528      if (r < 0) {
1529          return status;
1530      }
1531      if (!(features & RBD_FEATURE_FAST_DIFF)) {
1532          return status;
1533      }
1534  
1535      /* check if RBD fast-diff result is valid */
1536      r = rbd_get_flags(s->image, &flags);
1537      if (r < 0) {
1538          return status;
1539      }
1540      if (flags & RBD_FLAG_FAST_DIFF_INVALID) {
1541          return status;
1542      }
1543  
1544  #if LIBRBD_VERSION_CODE < LIBRBD_VERSION(1, 17, 0)
1545      /*
1546       * librbd had a bug until early 2022 that affected all versions of ceph that
1547       * supported fast-diff. This bug results in reporting of incorrect offsets
1548       * if the offset parameter to rbd_diff_iterate2 is not object aligned.
1549       * Work around this bug by rounding down the offset to object boundaries.
1550       * This is OK because we call rbd_diff_iterate2 with whole_object = true.
1551       * However, this workaround only works for non cloned images with default
1552       * striping.
1553       *
1554       * See: https://tracker.ceph.com/issues/53784
1555       */
1556  
1557      /* check if RBD image has non-default striping enabled */
1558      if (features & RBD_FEATURE_STRIPINGV2) {
1559          return status;
1560      }
1561  
1562  #pragma GCC diagnostic push
1563  #pragma GCC diagnostic ignored "-Wdeprecated-declarations"
1564      /*
1565       * check if RBD image is a clone (= has a parent).
1566       *
1567       * rbd_get_parent_info is deprecated from Nautilus onwards, but the
1568       * replacement rbd_get_parent is not present in Luminous and Mimic.
1569       */
1570      if (rbd_get_parent_info(s->image, NULL, 0, NULL, 0, NULL, 0) != -ENOENT) {
1571          return status;
1572      }
1573  #pragma GCC diagnostic pop
1574  
1575      head = req.offs & (s->object_size - 1);
1576      req.offs -= head;
1577      bytes += head;
1578  #endif
1579  
1580      r = rbd_diff_iterate2(s->image, NULL, req.offs, bytes, true, true,
1581                            qemu_rbd_diff_iterate_cb, &req);
1582      if (r < 0 && r != QEMU_RBD_EXIT_DIFF_ITERATE2) {
1583          return status;
1584      }
1585      assert(req.bytes <= bytes);
1586      if (!req.exists) {
1587          if (r == 0) {
1588              /*
1589               * rbd_diff_iterate2 does not invoke callbacks for unallocated
1590               * areas. This here catches the case where no callback was
1591               * invoked at all (req.bytes == 0).
1592               */
1593              assert(req.bytes == 0);
1594              req.bytes = bytes;
1595          }
1596          status = BDRV_BLOCK_ZERO | BDRV_BLOCK_OFFSET_VALID;
1597      }
1598  
1599      assert(req.bytes > head);
1600      *pnum = req.bytes - head;
1601      return status;
1602  }
1603  
1604  static int64_t coroutine_fn qemu_rbd_co_getlength(BlockDriverState *bs)
1605  {
1606      BDRVRBDState *s = bs->opaque;
1607      int r;
1608  
1609      r = rbd_get_size(s->image, &s->image_size);
1610      if (r < 0) {
1611          return r;
1612      }
1613  
1614      return s->image_size;
1615  }
1616  
1617  static int coroutine_fn qemu_rbd_co_truncate(BlockDriverState *bs,
1618                                               int64_t offset,
1619                                               bool exact,
1620                                               PreallocMode prealloc,
1621                                               BdrvRequestFlags flags,
1622                                               Error **errp)
1623  {
1624      int r;
1625  
1626      if (prealloc != PREALLOC_MODE_OFF) {
1627          error_setg(errp, "Unsupported preallocation mode '%s'",
1628                     PreallocMode_str(prealloc));
1629          return -ENOTSUP;
1630      }
1631  
1632      r = qemu_rbd_resize(bs, offset);
1633      if (r < 0) {
1634          error_setg_errno(errp, -r, "Failed to resize file");
1635          return r;
1636      }
1637  
1638      return 0;
1639  }
1640  
1641  static int qemu_rbd_snap_create(BlockDriverState *bs,
1642                                  QEMUSnapshotInfo *sn_info)
1643  {
1644      BDRVRBDState *s = bs->opaque;
1645      int r;
1646  
1647      if (sn_info->name[0] == '\0') {
1648          return -EINVAL; /* we need a name for rbd snapshots */
1649      }
1650  
1651      /*
1652       * rbd snapshots are using the name as the user controlled unique identifier
1653       * we can't use the rbd snapid for that purpose, as it can't be set
1654       */
1655      if (sn_info->id_str[0] != '\0' &&
1656          strcmp(sn_info->id_str, sn_info->name) != 0) {
1657          return -EINVAL;
1658      }
1659  
1660      if (strlen(sn_info->name) >= sizeof(sn_info->id_str)) {
1661          return -ERANGE;
1662      }
1663  
1664      r = rbd_snap_create(s->image, sn_info->name);
1665      if (r < 0) {
1666          error_report("failed to create snap: %s", strerror(-r));
1667          return r;
1668      }
1669  
1670      return 0;
1671  }
1672  
1673  static int qemu_rbd_snap_remove(BlockDriverState *bs,
1674                                  const char *snapshot_id,
1675                                  const char *snapshot_name,
1676                                  Error **errp)
1677  {
1678      BDRVRBDState *s = bs->opaque;
1679      int r;
1680  
1681      if (!snapshot_name) {
1682          error_setg(errp, "rbd need a valid snapshot name");
1683          return -EINVAL;
1684      }
1685  
1686      /* If snapshot_id is specified, it must be equal to name, see
1687         qemu_rbd_snap_list() */
1688      if (snapshot_id && strcmp(snapshot_id, snapshot_name)) {
1689          error_setg(errp,
1690                     "rbd do not support snapshot id, it should be NULL or "
1691                     "equal to snapshot name");
1692          return -EINVAL;
1693      }
1694  
1695      r = rbd_snap_remove(s->image, snapshot_name);
1696      if (r < 0) {
1697          error_setg_errno(errp, -r, "Failed to remove the snapshot");
1698      }
1699      return r;
1700  }
1701  
1702  static int qemu_rbd_snap_rollback(BlockDriverState *bs,
1703                                    const char *snapshot_name)
1704  {
1705      BDRVRBDState *s = bs->opaque;
1706  
1707      return rbd_snap_rollback(s->image, snapshot_name);
1708  }
1709  
1710  static int qemu_rbd_snap_list(BlockDriverState *bs,
1711                                QEMUSnapshotInfo **psn_tab)
1712  {
1713      BDRVRBDState *s = bs->opaque;
1714      QEMUSnapshotInfo *sn_info, *sn_tab = NULL;
1715      int i, snap_count;
1716      rbd_snap_info_t *snaps;
1717      int max_snaps = RBD_MAX_SNAPS;
1718  
1719      do {
1720          snaps = g_new(rbd_snap_info_t, max_snaps);
1721          snap_count = rbd_snap_list(s->image, snaps, &max_snaps);
1722          if (snap_count <= 0) {
1723              g_free(snaps);
1724          }
1725      } while (snap_count == -ERANGE);
1726  
1727      if (snap_count <= 0) {
1728          goto done;
1729      }
1730  
1731      sn_tab = g_new0(QEMUSnapshotInfo, snap_count);
1732  
1733      for (i = 0; i < snap_count; i++) {
1734          const char *snap_name = snaps[i].name;
1735  
1736          sn_info = sn_tab + i;
1737          pstrcpy(sn_info->id_str, sizeof(sn_info->id_str), snap_name);
1738          pstrcpy(sn_info->name, sizeof(sn_info->name), snap_name);
1739  
1740          sn_info->vm_state_size = snaps[i].size;
1741          sn_info->date_sec = 0;
1742          sn_info->date_nsec = 0;
1743          sn_info->vm_clock_nsec = 0;
1744      }
1745      rbd_snap_list_end(snaps);
1746      g_free(snaps);
1747  
1748   done:
1749      *psn_tab = sn_tab;
1750      return snap_count;
1751  }
1752  
1753  static void coroutine_fn qemu_rbd_co_invalidate_cache(BlockDriverState *bs,
1754                                                        Error **errp)
1755  {
1756      BDRVRBDState *s = bs->opaque;
1757      int r = rbd_invalidate_cache(s->image);
1758      if (r < 0) {
1759          error_setg_errno(errp, -r, "Failed to invalidate the cache");
1760      }
1761  }
1762  
1763  static QemuOptsList qemu_rbd_create_opts = {
1764      .name = "rbd-create-opts",
1765      .head = QTAILQ_HEAD_INITIALIZER(qemu_rbd_create_opts.head),
1766      .desc = {
1767          {
1768              .name = BLOCK_OPT_SIZE,
1769              .type = QEMU_OPT_SIZE,
1770              .help = "Virtual disk size"
1771          },
1772          {
1773              .name = BLOCK_OPT_CLUSTER_SIZE,
1774              .type = QEMU_OPT_SIZE,
1775              .help = "RBD object size"
1776          },
1777          {
1778              .name = "password-secret",
1779              .type = QEMU_OPT_STRING,
1780              .help = "ID of secret providing the password",
1781          },
1782          {
1783              .name = "encrypt.format",
1784              .type = QEMU_OPT_STRING,
1785              .help = "Encrypt the image, format choices: 'luks', 'luks2'",
1786          },
1787          {
1788              .name = "encrypt.cipher-alg",
1789              .type = QEMU_OPT_STRING,
1790              .help = "Name of encryption cipher algorithm"
1791                      " (allowed values: aes-128, aes-256)",
1792          },
1793          {
1794              .name = "encrypt.key-secret",
1795              .type = QEMU_OPT_STRING,
1796              .help = "ID of secret providing LUKS passphrase",
1797          },
1798          { /* end of list */ }
1799      }
1800  };
1801  
1802  static const char *const qemu_rbd_strong_runtime_opts[] = {
1803      "pool",
1804      "namespace",
1805      "image",
1806      "conf",
1807      "snapshot",
1808      "user",
1809      "server.",
1810      "password-secret",
1811  
1812      NULL
1813  };
1814  
1815  static BlockDriver bdrv_rbd = {
1816      .format_name            = "rbd",
1817      .instance_size          = sizeof(BDRVRBDState),
1818  
1819      .bdrv_parse_filename    = qemu_rbd_parse_filename,
1820      .bdrv_open              = qemu_rbd_open,
1821      .bdrv_close             = qemu_rbd_close,
1822      .bdrv_reopen_prepare    = qemu_rbd_reopen_prepare,
1823      .bdrv_co_create         = qemu_rbd_co_create,
1824      .bdrv_co_create_opts    = qemu_rbd_co_create_opts,
1825      .bdrv_has_zero_init     = bdrv_has_zero_init_1,
1826      .bdrv_co_get_info       = qemu_rbd_co_get_info,
1827      .bdrv_get_specific_info = qemu_rbd_get_specific_info,
1828      .create_opts            = &qemu_rbd_create_opts,
1829      .bdrv_co_getlength      = qemu_rbd_co_getlength,
1830      .bdrv_co_truncate       = qemu_rbd_co_truncate,
1831      .protocol_name          = "rbd",
1832  
1833      .bdrv_co_preadv         = qemu_rbd_co_preadv,
1834      .bdrv_co_pwritev        = qemu_rbd_co_pwritev,
1835      .bdrv_co_flush_to_disk  = qemu_rbd_co_flush,
1836      .bdrv_co_pdiscard       = qemu_rbd_co_pdiscard,
1837  #ifdef LIBRBD_SUPPORTS_WRITE_ZEROES
1838      .bdrv_co_pwrite_zeroes  = qemu_rbd_co_pwrite_zeroes,
1839  #endif
1840      .bdrv_co_block_status   = qemu_rbd_co_block_status,
1841  
1842      .bdrv_snapshot_create   = qemu_rbd_snap_create,
1843      .bdrv_snapshot_delete   = qemu_rbd_snap_remove,
1844      .bdrv_snapshot_list     = qemu_rbd_snap_list,
1845      .bdrv_snapshot_goto     = qemu_rbd_snap_rollback,
1846      .bdrv_co_invalidate_cache = qemu_rbd_co_invalidate_cache,
1847  
1848      .strong_runtime_opts    = qemu_rbd_strong_runtime_opts,
1849  };
1850  
1851  static void bdrv_rbd_init(void)
1852  {
1853      bdrv_register(&bdrv_rbd);
1854  }
1855  
1856  block_init(bdrv_rbd_init);
1857