xref: /openbmc/qemu/block/rbd.c (revision 607132e6b6c2fef07f9dd025a7178b7fca9a3f9d)
1 /*
2  * QEMU Block driver for RADOS (Ceph)
3  *
4  * Copyright (C) 2010-2011 Christian Brunner <chb@muc.de>,
5  *                         Josh Durgin <josh.durgin@dreamhost.com>
6  *
7  * This work is licensed under the terms of the GNU GPL, version 2.  See
8  * the COPYING file in the top-level directory.
9  *
10  * Contributions after 2012-01-13 are licensed under the terms of the
11  * GNU GPL, version 2 or (at your option) any later version.
12  */
13 
14 #include "qemu/osdep.h"
15 
16 #include <rbd/librbd.h>
17 #include "qapi/error.h"
18 #include "qemu/error-report.h"
19 #include "qemu/module.h"
20 #include "qemu/option.h"
21 #include "block/block-io.h"
22 #include "block/block_int.h"
23 #include "block/qdict.h"
24 #include "crypto/secret.h"
25 #include "qemu/cutils.h"
26 #include "system/replay.h"
27 #include "qobject/qstring.h"
28 #include "qobject/qdict.h"
29 #include "qobject/qjson.h"
30 #include "qobject/qlist.h"
31 #include "qapi/qobject-input-visitor.h"
32 #include "qapi/qapi-visit-block-core.h"
33 
34 /*
35  * When specifying the image filename use:
36  *
37  * rbd:poolname/devicename[@snapshotname][:option1=value1[:option2=value2...]]
38  *
39  * poolname must be the name of an existing rados pool.
40  *
41  * devicename is the name of the rbd image.
42  *
43  * Each option given is used to configure rados, and may be any valid
44  * Ceph option, "id", or "conf".
45  *
46  * The "id" option indicates what user we should authenticate as to
47  * the Ceph cluster.  If it is excluded we will use the Ceph default
48  * (normally 'admin').
49  *
50  * The "conf" option specifies a Ceph configuration file to read.  If
51  * it is not specified, we will read from the default Ceph locations
52  * (e.g., /etc/ceph/ceph.conf).  To avoid reading _any_ configuration
53  * file, specify conf=/dev/null.
54  *
55  * Configuration values containing :, @, or = can be escaped with a
56  * leading "\".
57  */
58 
59 #define OBJ_MAX_SIZE (1UL << OBJ_DEFAULT_OBJ_ORDER)
60 
61 #define RBD_MAX_SNAPS 100
62 
63 #define RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN 8
64 
65 static const char rbd_luks_header_verification[
66         RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN] = {
67     'L', 'U', 'K', 'S', 0xBA, 0xBE, 0, 1
68 };
69 
70 static const char rbd_luks2_header_verification[
71         RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN] = {
72     'L', 'U', 'K', 'S', 0xBA, 0xBE, 0, 2
73 };
74 
75 static const char rbd_layered_luks_header_verification[
76         RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN] = {
77     'R', 'B', 'D', 'L', 0xBA, 0xBE, 0, 1
78 };
79 
80 static const char rbd_layered_luks2_header_verification[
81         RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN] = {
82     'R', 'B', 'D', 'L', 0xBA, 0xBE, 0, 2
83 };
84 
85 typedef enum {
86     RBD_AIO_READ,
87     RBD_AIO_WRITE,
88     RBD_AIO_DISCARD,
89     RBD_AIO_FLUSH,
90     RBD_AIO_WRITE_ZEROES
91 } RBDAIOCmd;
92 
93 typedef struct BDRVRBDState {
94     rados_t cluster;
95     rados_ioctx_t io_ctx;
96     rbd_image_t image;
97     char *image_name;
98     char *snap;
99     char *namespace;
100     uint64_t image_size;
101     uint64_t object_size;
102 
103     /*
104      * If @bs->encrypted is true, this is the encryption format actually loaded
105      * at the librbd level. If it is false, it is the result of probing.
106      * RBD_IMAGE_ENCRYPTION_FORMAT__MAX means that encryption is not enabled and
107      * probing didn't find any known encryption header either.
108      */
109     RbdImageEncryptionFormat encryption_format;
110 } BDRVRBDState;
111 
112 typedef struct RBDTask {
113     BlockDriverState *bs;
114     Coroutine *co;
115     bool complete;
116     int64_t ret;
117 } RBDTask;
118 
119 typedef struct RBDDiffIterateReq {
120     uint64_t offs;
121     uint64_t bytes;
122     bool exists;
123 } RBDDiffIterateReq;
124 
125 static int qemu_rbd_connect(rados_t *cluster, rados_ioctx_t *io_ctx,
126                             BlockdevOptionsRbd *opts, bool cache,
127                             const char *keypairs, const char *secretid,
128                             Error **errp);
129 
qemu_rbd_strchr(char * src,char delim)130 static char *qemu_rbd_strchr(char *src, char delim)
131 {
132     char *p;
133 
134     for (p = src; *p; ++p) {
135         if (*p == delim) {
136             return p;
137         }
138         if (*p == '\\' && p[1] != '\0') {
139             ++p;
140         }
141     }
142 
143     return NULL;
144 }
145 
146 
qemu_rbd_next_tok(char * src,char delim,char ** p)147 static char *qemu_rbd_next_tok(char *src, char delim, char **p)
148 {
149     char *end;
150 
151     *p = NULL;
152 
153     end = qemu_rbd_strchr(src, delim);
154     if (end) {
155         *p = end + 1;
156         *end = '\0';
157     }
158     return src;
159 }
160 
qemu_rbd_unescape(char * src)161 static void qemu_rbd_unescape(char *src)
162 {
163     char *p;
164 
165     for (p = src; *src; ++src, ++p) {
166         if (*src == '\\' && src[1] != '\0') {
167             src++;
168         }
169         *p = *src;
170     }
171     *p = '\0';
172 }
173 
qemu_rbd_parse_filename(const char * filename,QDict * options,Error ** errp)174 static void qemu_rbd_parse_filename(const char *filename, QDict *options,
175                                     Error **errp)
176 {
177     const char *start;
178     char *p, *buf;
179     QList *keypairs = NULL;
180     char *found_str, *image_name;
181 
182     if (!strstart(filename, "rbd:", &start)) {
183         error_setg(errp, "File name must start with 'rbd:'");
184         return;
185     }
186 
187     buf = g_strdup(start);
188     p = buf;
189 
190     found_str = qemu_rbd_next_tok(p, '/', &p);
191     if (!p) {
192         error_setg(errp, "Pool name is required");
193         goto done;
194     }
195     qemu_rbd_unescape(found_str);
196     qdict_put_str(options, "pool", found_str);
197 
198     if (qemu_rbd_strchr(p, '@')) {
199         image_name = qemu_rbd_next_tok(p, '@', &p);
200 
201         found_str = qemu_rbd_next_tok(p, ':', &p);
202         qemu_rbd_unescape(found_str);
203         qdict_put_str(options, "snapshot", found_str);
204     } else {
205         image_name = qemu_rbd_next_tok(p, ':', &p);
206     }
207     /* Check for namespace in the image_name */
208     if (qemu_rbd_strchr(image_name, '/')) {
209         found_str = qemu_rbd_next_tok(image_name, '/', &image_name);
210         qemu_rbd_unescape(found_str);
211         qdict_put_str(options, "namespace", found_str);
212     } else {
213         qdict_put_str(options, "namespace", "");
214     }
215     qemu_rbd_unescape(image_name);
216     qdict_put_str(options, "image", image_name);
217     if (!p) {
218         goto done;
219     }
220 
221     /* The following are essentially all key/value pairs, and we treat
222      * 'id' and 'conf' a bit special.  Key/value pairs may be in any order. */
223     while (p) {
224         char *name, *value;
225         name = qemu_rbd_next_tok(p, '=', &p);
226         if (!p) {
227             error_setg(errp, "conf option %s has no value", name);
228             break;
229         }
230 
231         qemu_rbd_unescape(name);
232 
233         value = qemu_rbd_next_tok(p, ':', &p);
234         qemu_rbd_unescape(value);
235 
236         if (!strcmp(name, "conf")) {
237             qdict_put_str(options, "conf", value);
238         } else if (!strcmp(name, "id")) {
239             qdict_put_str(options, "user", value);
240         } else {
241             /*
242              * We pass these internally to qemu_rbd_set_keypairs(), so
243              * we can get away with the simpler list of [ "key1",
244              * "value1", "key2", "value2" ] rather than a raw dict
245              * { "key1": "value1", "key2": "value2" } where we can't
246              * guarantee order, or even a more correct but complex
247              * [ { "key1": "value1" }, { "key2": "value2" } ]
248              */
249             if (!keypairs) {
250                 keypairs = qlist_new();
251             }
252             qlist_append_str(keypairs, name);
253             qlist_append_str(keypairs, value);
254         }
255     }
256 
257     if (keypairs) {
258         qdict_put(options, "=keyvalue-pairs",
259                   qstring_from_gstring(qobject_to_json(QOBJECT(keypairs))));
260     }
261 
262 done:
263     g_free(buf);
264     qobject_unref(keypairs);
265 }
266 
qemu_rbd_set_auth(rados_t cluster,BlockdevOptionsRbd * opts,Error ** errp)267 static int qemu_rbd_set_auth(rados_t cluster, BlockdevOptionsRbd *opts,
268                              Error **errp)
269 {
270     char *key, *acr;
271     int r;
272     GString *accu;
273     RbdAuthModeList *auth;
274 
275     if (opts->key_secret) {
276         key = qcrypto_secret_lookup_as_base64(opts->key_secret, errp);
277         if (!key) {
278             return -EIO;
279         }
280         r = rados_conf_set(cluster, "key", key);
281         g_free(key);
282         if (r < 0) {
283             error_setg_errno(errp, -r, "Could not set 'key'");
284             return r;
285         }
286     }
287 
288     if (opts->has_auth_client_required) {
289         accu = g_string_new("");
290         for (auth = opts->auth_client_required; auth; auth = auth->next) {
291             if (accu->str[0]) {
292                 g_string_append_c(accu, ';');
293             }
294             g_string_append(accu, RbdAuthMode_str(auth->value));
295         }
296         acr = g_string_free(accu, FALSE);
297         r = rados_conf_set(cluster, "auth_client_required", acr);
298         g_free(acr);
299         if (r < 0) {
300             error_setg_errno(errp, -r,
301                              "Could not set 'auth_client_required'");
302             return r;
303         }
304     }
305 
306     return 0;
307 }
308 
qemu_rbd_set_keypairs(rados_t cluster,const char * keypairs_json,Error ** errp)309 static int qemu_rbd_set_keypairs(rados_t cluster, const char *keypairs_json,
310                                  Error **errp)
311 {
312     QList *keypairs;
313     QString *name;
314     QString *value;
315     const char *key;
316     size_t remaining;
317     int ret = 0;
318 
319     if (!keypairs_json) {
320         return ret;
321     }
322     keypairs = qobject_to(QList,
323                           qobject_from_json(keypairs_json, &error_abort));
324     remaining = qlist_size(keypairs) / 2;
325     assert(remaining);
326 
327     while (remaining--) {
328         name = qobject_to(QString, qlist_pop(keypairs));
329         value = qobject_to(QString, qlist_pop(keypairs));
330         assert(name && value);
331         key = qstring_get_str(name);
332 
333         ret = rados_conf_set(cluster, key, qstring_get_str(value));
334         qobject_unref(value);
335         if (ret < 0) {
336             error_setg_errno(errp, -ret, "invalid conf option %s", key);
337             qobject_unref(name);
338             ret = -EINVAL;
339             break;
340         }
341         qobject_unref(name);
342     }
343 
344     qobject_unref(keypairs);
345     return ret;
346 }
347 
348 #ifdef LIBRBD_SUPPORTS_ENCRYPTION
qemu_rbd_convert_luks_options(RbdEncryptionOptionsLUKSBase * luks_opts,char ** passphrase,size_t * passphrase_len,Error ** errp)349 static int qemu_rbd_convert_luks_options(
350         RbdEncryptionOptionsLUKSBase *luks_opts,
351         char **passphrase,
352         size_t *passphrase_len,
353         Error **errp)
354 {
355     return qcrypto_secret_lookup(luks_opts->key_secret, (uint8_t **)passphrase,
356                                  passphrase_len, errp);
357 }
358 
qemu_rbd_convert_luks_create_options(RbdEncryptionCreateOptionsLUKSBase * luks_opts,rbd_encryption_algorithm_t * alg,char ** passphrase,size_t * passphrase_len,Error ** errp)359 static int qemu_rbd_convert_luks_create_options(
360         RbdEncryptionCreateOptionsLUKSBase *luks_opts,
361         rbd_encryption_algorithm_t *alg,
362         char **passphrase,
363         size_t *passphrase_len,
364         Error **errp)
365 {
366     int r = 0;
367 
368     r = qemu_rbd_convert_luks_options(
369             qapi_RbdEncryptionCreateOptionsLUKSBase_base(luks_opts),
370             passphrase, passphrase_len, errp);
371     if (r < 0) {
372         return r;
373     }
374 
375     if (luks_opts->has_cipher_alg) {
376         switch (luks_opts->cipher_alg) {
377             case QCRYPTO_CIPHER_ALGO_AES_128: {
378                 *alg = RBD_ENCRYPTION_ALGORITHM_AES128;
379                 break;
380             }
381             case QCRYPTO_CIPHER_ALGO_AES_256: {
382                 *alg = RBD_ENCRYPTION_ALGORITHM_AES256;
383                 break;
384             }
385             default: {
386                 r = -ENOTSUP;
387                 error_setg_errno(errp, -r, "unknown encryption algorithm: %u",
388                                  luks_opts->cipher_alg);
389                 return r;
390             }
391         }
392     } else {
393         /* default alg */
394         *alg = RBD_ENCRYPTION_ALGORITHM_AES256;
395     }
396 
397     return 0;
398 }
399 
qemu_rbd_encryption_format(rbd_image_t image,RbdEncryptionCreateOptions * encrypt,Error ** errp)400 static int qemu_rbd_encryption_format(rbd_image_t image,
401                                       RbdEncryptionCreateOptions *encrypt,
402                                       Error **errp)
403 {
404     int r = 0;
405     g_autofree char *passphrase = NULL;
406     rbd_encryption_format_t format;
407     rbd_encryption_options_t opts;
408     rbd_encryption_luks1_format_options_t luks_opts;
409     rbd_encryption_luks2_format_options_t luks2_opts;
410     size_t opts_size;
411     uint64_t raw_size, effective_size;
412 
413     r = rbd_get_size(image, &raw_size);
414     if (r < 0) {
415         error_setg_errno(errp, -r, "cannot get raw image size");
416         return r;
417     }
418 
419     switch (encrypt->format) {
420         case RBD_IMAGE_ENCRYPTION_FORMAT_LUKS: {
421             memset(&luks_opts, 0, sizeof(luks_opts));
422             format = RBD_ENCRYPTION_FORMAT_LUKS1;
423             opts = &luks_opts;
424             opts_size = sizeof(luks_opts);
425             r = qemu_rbd_convert_luks_create_options(
426                     qapi_RbdEncryptionCreateOptionsLUKS_base(&encrypt->u.luks),
427                     &luks_opts.alg, &passphrase, &luks_opts.passphrase_size,
428                     errp);
429             if (r < 0) {
430                 return r;
431             }
432             luks_opts.passphrase = passphrase;
433             break;
434         }
435         case RBD_IMAGE_ENCRYPTION_FORMAT_LUKS2: {
436             memset(&luks2_opts, 0, sizeof(luks2_opts));
437             format = RBD_ENCRYPTION_FORMAT_LUKS2;
438             opts = &luks2_opts;
439             opts_size = sizeof(luks2_opts);
440             r = qemu_rbd_convert_luks_create_options(
441                     qapi_RbdEncryptionCreateOptionsLUKS2_base(
442                             &encrypt->u.luks2),
443                     &luks2_opts.alg, &passphrase, &luks2_opts.passphrase_size,
444                     errp);
445             if (r < 0) {
446                 return r;
447             }
448             luks2_opts.passphrase = passphrase;
449             break;
450         }
451         default: {
452             r = -ENOTSUP;
453             error_setg_errno(
454                     errp, -r, "unknown image encryption format: %u",
455                     encrypt->format);
456             return r;
457         }
458     }
459 
460     r = rbd_encryption_format(image, format, opts, opts_size);
461     if (r < 0) {
462         error_setg_errno(errp, -r, "encryption format fail");
463         return r;
464     }
465 
466     r = rbd_get_size(image, &effective_size);
467     if (r < 0) {
468         error_setg_errno(errp, -r, "cannot get effective image size");
469         return r;
470     }
471 
472     r = rbd_resize(image, raw_size + (raw_size - effective_size));
473     if (r < 0) {
474         error_setg_errno(errp, -r, "cannot resize image after format");
475         return r;
476     }
477 
478     return 0;
479 }
480 
qemu_rbd_encryption_load(BlockDriverState * bs,rbd_image_t image,RbdEncryptionOptions * encrypt,Error ** errp)481 static int qemu_rbd_encryption_load(BlockDriverState *bs,
482                                     rbd_image_t image,
483                                     RbdEncryptionOptions *encrypt,
484                                     Error **errp)
485 {
486     BDRVRBDState *s = bs->opaque;
487     int r = 0;
488     g_autofree char *passphrase = NULL;
489     rbd_encryption_luks1_format_options_t luks_opts;
490     rbd_encryption_luks2_format_options_t luks2_opts;
491 #ifdef LIBRBD_SUPPORTS_ENCRYPTION_LOAD2
492     rbd_encryption_luks_format_options_t luks_any_opts;
493 #endif
494     rbd_encryption_format_t format;
495     rbd_encryption_options_t opts;
496     size_t opts_size;
497 
498     switch (encrypt->format) {
499         case RBD_IMAGE_ENCRYPTION_FORMAT_LUKS: {
500             memset(&luks_opts, 0, sizeof(luks_opts));
501             format = RBD_ENCRYPTION_FORMAT_LUKS1;
502             opts = &luks_opts;
503             opts_size = sizeof(luks_opts);
504             r = qemu_rbd_convert_luks_options(
505                     qapi_RbdEncryptionOptionsLUKS_base(&encrypt->u.luks),
506                     &passphrase, &luks_opts.passphrase_size, errp);
507             if (r < 0) {
508                 return r;
509             }
510             luks_opts.passphrase = passphrase;
511             break;
512         }
513         case RBD_IMAGE_ENCRYPTION_FORMAT_LUKS2: {
514             memset(&luks2_opts, 0, sizeof(luks2_opts));
515             format = RBD_ENCRYPTION_FORMAT_LUKS2;
516             opts = &luks2_opts;
517             opts_size = sizeof(luks2_opts);
518             r = qemu_rbd_convert_luks_options(
519                     qapi_RbdEncryptionOptionsLUKS2_base(&encrypt->u.luks2),
520                     &passphrase, &luks2_opts.passphrase_size, errp);
521             if (r < 0) {
522                 return r;
523             }
524             luks2_opts.passphrase = passphrase;
525             break;
526         }
527 #ifdef LIBRBD_SUPPORTS_ENCRYPTION_LOAD2
528         case RBD_IMAGE_ENCRYPTION_FORMAT_LUKS_ANY: {
529             memset(&luks_any_opts, 0, sizeof(luks_any_opts));
530             format = RBD_ENCRYPTION_FORMAT_LUKS;
531             opts = &luks_any_opts;
532             opts_size = sizeof(luks_any_opts);
533             r = qemu_rbd_convert_luks_options(
534                     qapi_RbdEncryptionOptionsLUKSAny_base(&encrypt->u.luks_any),
535                     &passphrase, &luks_any_opts.passphrase_size, errp);
536             if (r < 0) {
537                 return r;
538             }
539             luks_any_opts.passphrase = passphrase;
540             break;
541         }
542 #endif
543         default: {
544             r = -ENOTSUP;
545             error_setg_errno(
546                     errp, -r, "unknown image encryption format: %u",
547                     encrypt->format);
548             return r;
549         }
550     }
551 
552     r = rbd_encryption_load(image, format, opts, opts_size);
553     if (r < 0) {
554         error_setg_errno(errp, -r, "encryption load fail");
555         return r;
556     }
557     bs->encrypted = true;
558     s->encryption_format = encrypt->format;
559 
560     return 0;
561 }
562 
563 #ifdef LIBRBD_SUPPORTS_ENCRYPTION_LOAD2
qemu_rbd_encryption_load2(BlockDriverState * bs,rbd_image_t image,RbdEncryptionOptions * encrypt,Error ** errp)564 static int qemu_rbd_encryption_load2(BlockDriverState *bs,
565                                      rbd_image_t image,
566                                      RbdEncryptionOptions *encrypt,
567                                      Error **errp)
568 {
569     BDRVRBDState *s = bs->opaque;
570     int r = 0;
571     int encrypt_count = 1;
572     int i;
573     RbdEncryptionOptions *curr_encrypt;
574     rbd_encryption_spec_t *specs;
575     rbd_encryption_luks1_format_options_t *luks_opts;
576     rbd_encryption_luks2_format_options_t *luks2_opts;
577     rbd_encryption_luks_format_options_t *luks_any_opts;
578 
579     /* count encryption options */
580     for (curr_encrypt = encrypt->parent; curr_encrypt;
581          curr_encrypt = curr_encrypt->parent) {
582         ++encrypt_count;
583     }
584 
585     specs = g_new0(rbd_encryption_spec_t, encrypt_count);
586 
587     curr_encrypt = encrypt;
588     for (i = 0; i < encrypt_count; ++i) {
589         switch (curr_encrypt->format) {
590             case RBD_IMAGE_ENCRYPTION_FORMAT_LUKS: {
591                 specs[i].format = RBD_ENCRYPTION_FORMAT_LUKS1;
592 
593                 luks_opts = g_new0(rbd_encryption_luks1_format_options_t, 1);
594                 specs[i].opts = luks_opts;
595                 specs[i].opts_size = sizeof(*luks_opts);
596 
597                 r = qemu_rbd_convert_luks_options(
598                         qapi_RbdEncryptionOptionsLUKS_base(
599                                 &curr_encrypt->u.luks),
600                         (char **)&luks_opts->passphrase,
601                         &luks_opts->passphrase_size,
602                         errp);
603                 break;
604             }
605             case RBD_IMAGE_ENCRYPTION_FORMAT_LUKS2: {
606                 specs[i].format = RBD_ENCRYPTION_FORMAT_LUKS2;
607 
608                 luks2_opts = g_new0(rbd_encryption_luks2_format_options_t, 1);
609                 specs[i].opts = luks2_opts;
610                 specs[i].opts_size = sizeof(*luks2_opts);
611 
612                 r = qemu_rbd_convert_luks_options(
613                         qapi_RbdEncryptionOptionsLUKS2_base(
614                                 &curr_encrypt->u.luks2),
615                         (char **)&luks2_opts->passphrase,
616                         &luks2_opts->passphrase_size,
617                         errp);
618                 break;
619             }
620             case RBD_IMAGE_ENCRYPTION_FORMAT_LUKS_ANY: {
621                 specs[i].format = RBD_ENCRYPTION_FORMAT_LUKS;
622 
623                 luks_any_opts = g_new0(rbd_encryption_luks_format_options_t, 1);
624                 specs[i].opts = luks_any_opts;
625                 specs[i].opts_size = sizeof(*luks_any_opts);
626 
627                 r = qemu_rbd_convert_luks_options(
628                         qapi_RbdEncryptionOptionsLUKSAny_base(
629                                 &curr_encrypt->u.luks_any),
630                         (char **)&luks_any_opts->passphrase,
631                         &luks_any_opts->passphrase_size,
632                         errp);
633                 break;
634             }
635             default: {
636                 r = -ENOTSUP;
637                 error_setg_errno(
638                         errp, -r, "unknown image encryption format: %u",
639                         curr_encrypt->format);
640             }
641         }
642 
643         if (r < 0) {
644             goto exit;
645         }
646 
647         curr_encrypt = curr_encrypt->parent;
648     }
649 
650     r = rbd_encryption_load2(image, specs, encrypt_count);
651     if (r < 0) {
652         error_setg_errno(errp, -r, "layered encryption load fail");
653         goto exit;
654     }
655     bs->encrypted = true;
656     s->encryption_format = encrypt->format;
657 
658 exit:
659     for (i = 0; i < encrypt_count; ++i) {
660         if (!specs[i].opts) {
661             break;
662         }
663 
664         switch (specs[i].format) {
665             case RBD_ENCRYPTION_FORMAT_LUKS1: {
666                 luks_opts = specs[i].opts;
667                 g_free((void *)luks_opts->passphrase);
668                 break;
669             }
670             case RBD_ENCRYPTION_FORMAT_LUKS2: {
671                 luks2_opts = specs[i].opts;
672                 g_free((void *)luks2_opts->passphrase);
673                 break;
674             }
675             case RBD_ENCRYPTION_FORMAT_LUKS: {
676                 luks_any_opts = specs[i].opts;
677                 g_free((void *)luks_any_opts->passphrase);
678                 break;
679             }
680         }
681 
682         g_free(specs[i].opts);
683     }
684     g_free(specs);
685     return r;
686 }
687 #endif
688 #endif
689 
690 /*
691  * For an image without encryption enabled on the rbd layer, probe the start of
692  * the image if it could be opened as an encrypted image so that we can display
693  * it when the user queries the node (most importantly in qemu-img).
694  *
695  * If the guest writes an encryption header to its disk after this probing, this
696  * won't be reflected when queried, but that's okay. There is no reason why the
697  * user should want to apply encryption at the rbd level while the image is
698  * still in use. This is just guest data.
699  */
qemu_rbd_encryption_probe(BlockDriverState * bs)700 static void qemu_rbd_encryption_probe(BlockDriverState *bs)
701 {
702     BDRVRBDState *s = bs->opaque;
703     char buf[RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN] = {0};
704     int r;
705 
706     assert(s->encryption_format == RBD_IMAGE_ENCRYPTION_FORMAT__MAX);
707 
708     r = rbd_read(s->image, 0,
709                  RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN, buf);
710     if (r < RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN) {
711         return;
712     }
713 
714     if (memcmp(buf, rbd_luks_header_verification,
715                RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN) == 0) {
716         s->encryption_format = RBD_IMAGE_ENCRYPTION_FORMAT_LUKS;
717     } else if (memcmp(buf, rbd_luks2_header_verification,
718                RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN) == 0) {
719         s->encryption_format = RBD_IMAGE_ENCRYPTION_FORMAT_LUKS2;
720     } else if (memcmp(buf, rbd_layered_luks_header_verification,
721                RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN) == 0) {
722         s->encryption_format = RBD_IMAGE_ENCRYPTION_FORMAT_LUKS;
723     } else if (memcmp(buf, rbd_layered_luks2_header_verification,
724                RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN) == 0) {
725         s->encryption_format = RBD_IMAGE_ENCRYPTION_FORMAT_LUKS2;
726     }
727 }
728 
729 /* FIXME Deprecate and remove keypairs or make it available in QMP. */
qemu_rbd_do_create(BlockdevCreateOptions * options,const char * keypairs,const char * password_secret,Error ** errp)730 static int qemu_rbd_do_create(BlockdevCreateOptions *options,
731                               const char *keypairs, const char *password_secret,
732                               Error **errp)
733 {
734     BlockdevCreateOptionsRbd *opts = &options->u.rbd;
735     rados_t cluster;
736     rados_ioctx_t io_ctx;
737     int obj_order = 0;
738     int ret;
739 
740     assert(options->driver == BLOCKDEV_DRIVER_RBD);
741     if (opts->location->snapshot) {
742         error_setg(errp, "Can't use snapshot name for image creation");
743         return -EINVAL;
744     }
745 
746 #ifndef LIBRBD_SUPPORTS_ENCRYPTION
747     if (opts->encrypt) {
748         error_setg(errp, "RBD library does not support image encryption");
749         return -ENOTSUP;
750     }
751 #endif
752 
753     if (opts->has_cluster_size) {
754         int64_t objsize = opts->cluster_size;
755         if ((objsize - 1) & objsize) {    /* not a power of 2? */
756             error_setg(errp, "obj size needs to be power of 2");
757             return -EINVAL;
758         }
759         if (objsize < 4096) {
760             error_setg(errp, "obj size too small");
761             return -EINVAL;
762         }
763         obj_order = ctz32(objsize);
764     }
765 
766     ret = qemu_rbd_connect(&cluster, &io_ctx, opts->location, false, keypairs,
767                            password_secret, errp);
768     if (ret < 0) {
769         return ret;
770     }
771 
772     ret = rbd_create(io_ctx, opts->location->image, opts->size, &obj_order);
773     if (ret < 0) {
774         error_setg_errno(errp, -ret, "error rbd create");
775         goto out;
776     }
777 
778 #ifdef LIBRBD_SUPPORTS_ENCRYPTION
779     if (opts->encrypt) {
780         rbd_image_t image;
781 
782         ret = rbd_open(io_ctx, opts->location->image, &image, NULL);
783         if (ret < 0) {
784             error_setg_errno(errp, -ret,
785                              "error opening image '%s' for encryption format",
786                              opts->location->image);
787             goto out;
788         }
789 
790         ret = qemu_rbd_encryption_format(image, opts->encrypt, errp);
791         rbd_close(image);
792         if (ret < 0) {
793             /* encryption format fail, try removing the image */
794             rbd_remove(io_ctx, opts->location->image);
795             goto out;
796         }
797     }
798 #endif
799 
800     ret = 0;
801 out:
802     rados_ioctx_destroy(io_ctx);
803     rados_shutdown(cluster);
804     return ret;
805 }
806 
qemu_rbd_co_create(BlockdevCreateOptions * options,Error ** errp)807 static int qemu_rbd_co_create(BlockdevCreateOptions *options, Error **errp)
808 {
809     return qemu_rbd_do_create(options, NULL, NULL, errp);
810 }
811 
qemu_rbd_extract_encryption_create_options(QemuOpts * opts,RbdEncryptionCreateOptions ** spec,Error ** errp)812 static int qemu_rbd_extract_encryption_create_options(
813         QemuOpts *opts,
814         RbdEncryptionCreateOptions **spec,
815         Error **errp)
816 {
817     QDict *opts_qdict;
818     QDict *encrypt_qdict;
819     Visitor *v;
820     int ret = 0;
821 
822     opts_qdict = qemu_opts_to_qdict(opts, NULL);
823     qdict_extract_subqdict(opts_qdict, &encrypt_qdict, "encrypt.");
824     qobject_unref(opts_qdict);
825     if (!qdict_size(encrypt_qdict)) {
826         *spec = NULL;
827         goto exit;
828     }
829 
830     /* Convert options into a QAPI object */
831     v = qobject_input_visitor_new_flat_confused(encrypt_qdict, errp);
832     if (!v) {
833         ret = -EINVAL;
834         goto exit;
835     }
836 
837     visit_type_RbdEncryptionCreateOptions(v, NULL, spec, errp);
838     visit_free(v);
839     if (!*spec) {
840         ret = -EINVAL;
841         goto exit;
842     }
843 
844 exit:
845     qobject_unref(encrypt_qdict);
846     return ret;
847 }
848 
qemu_rbd_co_create_opts(BlockDriver * drv,const char * filename,QemuOpts * opts,Error ** errp)849 static int coroutine_fn qemu_rbd_co_create_opts(BlockDriver *drv,
850                                                 const char *filename,
851                                                 QemuOpts *opts,
852                                                 Error **errp)
853 {
854     BlockdevCreateOptions *create_options;
855     BlockdevCreateOptionsRbd *rbd_opts;
856     BlockdevOptionsRbd *loc;
857     RbdEncryptionCreateOptions *encrypt = NULL;
858     Error *local_err = NULL;
859     const char *keypairs, *password_secret;
860     QDict *options = NULL;
861     int ret = 0;
862 
863     create_options = g_new0(BlockdevCreateOptions, 1);
864     create_options->driver = BLOCKDEV_DRIVER_RBD;
865     rbd_opts = &create_options->u.rbd;
866 
867     rbd_opts->location = g_new0(BlockdevOptionsRbd, 1);
868 
869     password_secret = qemu_opt_get(opts, "password-secret");
870 
871     /* Read out options */
872     rbd_opts->size = ROUND_UP(qemu_opt_get_size_del(opts, BLOCK_OPT_SIZE, 0),
873                               BDRV_SECTOR_SIZE);
874     rbd_opts->cluster_size = qemu_opt_get_size_del(opts,
875                                                    BLOCK_OPT_CLUSTER_SIZE, 0);
876     rbd_opts->has_cluster_size = (rbd_opts->cluster_size != 0);
877 
878     options = qdict_new();
879     qemu_rbd_parse_filename(filename, options, &local_err);
880     if (local_err) {
881         ret = -EINVAL;
882         error_propagate(errp, local_err);
883         goto exit;
884     }
885 
886     ret = qemu_rbd_extract_encryption_create_options(opts, &encrypt, errp);
887     if (ret < 0) {
888         goto exit;
889     }
890     rbd_opts->encrypt     = encrypt;
891 
892     /*
893      * Caution: while qdict_get_try_str() is fine, getting non-string
894      * types would require more care.  When @options come from -blockdev
895      * or blockdev_add, its members are typed according to the QAPI
896      * schema, but when they come from -drive, they're all QString.
897      */
898     loc = rbd_opts->location;
899     loc->pool        = g_strdup(qdict_get_try_str(options, "pool"));
900     loc->conf        = g_strdup(qdict_get_try_str(options, "conf"));
901     loc->user        = g_strdup(qdict_get_try_str(options, "user"));
902     loc->q_namespace = g_strdup(qdict_get_try_str(options, "namespace"));
903     loc->image       = g_strdup(qdict_get_try_str(options, "image"));
904     keypairs         = qdict_get_try_str(options, "=keyvalue-pairs");
905 
906     ret = qemu_rbd_do_create(create_options, keypairs, password_secret, errp);
907     if (ret < 0) {
908         goto exit;
909     }
910 
911 exit:
912     qobject_unref(options);
913     qapi_free_BlockdevCreateOptions(create_options);
914     return ret;
915 }
916 
qemu_rbd_mon_host(BlockdevOptionsRbd * opts,Error ** errp)917 static char *qemu_rbd_mon_host(BlockdevOptionsRbd *opts, Error **errp)
918 {
919     const char **vals;
920     const char *host, *port;
921     char *rados_str;
922     InetSocketAddressBaseList *p;
923     int i, cnt;
924 
925     if (!opts->has_server) {
926         return NULL;
927     }
928 
929     for (cnt = 0, p = opts->server; p; p = p->next) {
930         cnt++;
931     }
932 
933     vals = g_new(const char *, cnt + 1);
934 
935     for (i = 0, p = opts->server; p; p = p->next, i++) {
936         host = p->value->host;
937         port = p->value->port;
938 
939         if (strchr(host, ':')) {
940             vals[i] = g_strdup_printf("[%s]:%s", host, port);
941         } else {
942             vals[i] = g_strdup_printf("%s:%s", host, port);
943         }
944     }
945     vals[i] = NULL;
946 
947     rados_str = i ? g_strjoinv(";", (char **)vals) : NULL;
948     g_strfreev((char **)vals);
949     return rados_str;
950 }
951 
qemu_rbd_connect(rados_t * cluster,rados_ioctx_t * io_ctx,BlockdevOptionsRbd * opts,bool cache,const char * keypairs,const char * secretid,Error ** errp)952 static int qemu_rbd_connect(rados_t *cluster, rados_ioctx_t *io_ctx,
953                             BlockdevOptionsRbd *opts, bool cache,
954                             const char *keypairs, const char *secretid,
955                             Error **errp)
956 {
957     char *mon_host = NULL;
958     Error *local_err = NULL;
959     int r;
960 
961     if (secretid) {
962         if (opts->key_secret) {
963             error_setg(errp,
964                        "Legacy 'password-secret' clashes with 'key-secret'");
965             return -EINVAL;
966         }
967         opts->key_secret = g_strdup(secretid);
968     }
969 
970     mon_host = qemu_rbd_mon_host(opts, &local_err);
971     if (local_err) {
972         error_propagate(errp, local_err);
973         r = -EINVAL;
974         goto out;
975     }
976 
977     r = rados_create(cluster, opts->user);
978     if (r < 0) {
979         error_setg_errno(errp, -r, "error initializing");
980         goto out;
981     }
982 
983     /* try default location when conf=NULL, but ignore failure */
984     r = rados_conf_read_file(*cluster, opts->conf);
985     if (opts->conf && r < 0) {
986         error_setg_errno(errp, -r, "error reading conf file %s", opts->conf);
987         goto failed_shutdown;
988     }
989 
990     r = qemu_rbd_set_keypairs(*cluster, keypairs, errp);
991     if (r < 0) {
992         goto failed_shutdown;
993     }
994 
995     if (mon_host) {
996         r = rados_conf_set(*cluster, "mon_host", mon_host);
997         if (r < 0) {
998             goto failed_shutdown;
999         }
1000     }
1001 
1002     r = qemu_rbd_set_auth(*cluster, opts, errp);
1003     if (r < 0) {
1004         goto failed_shutdown;
1005     }
1006 
1007     /*
1008      * Fallback to more conservative semantics if setting cache
1009      * options fails. Ignore errors from setting rbd_cache because the
1010      * only possible error is that the option does not exist, and
1011      * librbd defaults to no caching. If write through caching cannot
1012      * be set up, fall back to no caching.
1013      */
1014     if (cache) {
1015         rados_conf_set(*cluster, "rbd_cache", "true");
1016     } else {
1017         rados_conf_set(*cluster, "rbd_cache", "false");
1018     }
1019 
1020     r = rados_connect(*cluster);
1021     if (r < 0) {
1022         error_setg_errno(errp, -r, "error connecting");
1023         goto failed_shutdown;
1024     }
1025 
1026     r = rados_ioctx_create(*cluster, opts->pool, io_ctx);
1027     if (r < 0) {
1028         error_setg_errno(errp, -r, "error opening pool %s", opts->pool);
1029         goto failed_shutdown;
1030     }
1031 
1032 #ifdef HAVE_RBD_NAMESPACE_EXISTS
1033     if (opts->q_namespace && strlen(opts->q_namespace) > 0) {
1034         bool exists;
1035 
1036         r = rbd_namespace_exists(*io_ctx, opts->q_namespace, &exists);
1037         if (r < 0) {
1038             error_setg_errno(errp, -r, "error checking namespace");
1039             goto failed_ioctx_destroy;
1040         }
1041 
1042         if (!exists) {
1043             error_setg(errp, "namespace '%s' does not exist",
1044                        opts->q_namespace);
1045             r = -ENOENT;
1046             goto failed_ioctx_destroy;
1047         }
1048     }
1049 #endif
1050 
1051     /*
1052      * Set the namespace after opening the io context on the pool,
1053      * if nspace == NULL or if nspace == "", it is just as we did nothing
1054      */
1055     rados_ioctx_set_namespace(*io_ctx, opts->q_namespace);
1056 
1057     r = 0;
1058     goto out;
1059 
1060 #ifdef HAVE_RBD_NAMESPACE_EXISTS
1061 failed_ioctx_destroy:
1062     rados_ioctx_destroy(*io_ctx);
1063 #endif
1064 failed_shutdown:
1065     rados_shutdown(*cluster);
1066 out:
1067     g_free(mon_host);
1068     return r;
1069 }
1070 
qemu_rbd_convert_options(QDict * options,BlockdevOptionsRbd ** opts,Error ** errp)1071 static int qemu_rbd_convert_options(QDict *options, BlockdevOptionsRbd **opts,
1072                                     Error **errp)
1073 {
1074     Visitor *v;
1075 
1076     /* Convert the remaining options into a QAPI object */
1077     v = qobject_input_visitor_new_flat_confused(options, errp);
1078     if (!v) {
1079         return -EINVAL;
1080     }
1081 
1082     visit_type_BlockdevOptionsRbd(v, NULL, opts, errp);
1083     visit_free(v);
1084     if (!opts) {
1085         return -EINVAL;
1086     }
1087 
1088     return 0;
1089 }
1090 
qemu_rbd_attempt_legacy_options(QDict * options,BlockdevOptionsRbd ** opts,char ** keypairs)1091 static int qemu_rbd_attempt_legacy_options(QDict *options,
1092                                            BlockdevOptionsRbd **opts,
1093                                            char **keypairs)
1094 {
1095     char *filename;
1096     int r;
1097 
1098     filename = g_strdup(qdict_get_try_str(options, "filename"));
1099     if (!filename) {
1100         return -EINVAL;
1101     }
1102     qdict_del(options, "filename");
1103 
1104     qemu_rbd_parse_filename(filename, options, NULL);
1105 
1106     /* keypairs freed by caller */
1107     *keypairs = g_strdup(qdict_get_try_str(options, "=keyvalue-pairs"));
1108     if (*keypairs) {
1109         qdict_del(options, "=keyvalue-pairs");
1110     }
1111 
1112     r = qemu_rbd_convert_options(options, opts, NULL);
1113 
1114     g_free(filename);
1115     return r;
1116 }
1117 
qemu_rbd_open(BlockDriverState * bs,QDict * options,int flags,Error ** errp)1118 static int qemu_rbd_open(BlockDriverState *bs, QDict *options, int flags,
1119                          Error **errp)
1120 {
1121     BDRVRBDState *s = bs->opaque;
1122     BlockdevOptionsRbd *opts = NULL;
1123     const QDictEntry *e;
1124     Error *local_err = NULL;
1125     char *keypairs, *secretid;
1126     rbd_image_info_t info;
1127     int r;
1128 
1129     keypairs = g_strdup(qdict_get_try_str(options, "=keyvalue-pairs"));
1130     if (keypairs) {
1131         qdict_del(options, "=keyvalue-pairs");
1132     }
1133 
1134     secretid = g_strdup(qdict_get_try_str(options, "password-secret"));
1135     if (secretid) {
1136         qdict_del(options, "password-secret");
1137     }
1138 
1139     r = qemu_rbd_convert_options(options, &opts, &local_err);
1140     if (local_err) {
1141         /* If keypairs are present, that means some options are present in
1142          * the modern option format.  Don't attempt to parse legacy option
1143          * formats, as we won't support mixed usage. */
1144         if (keypairs) {
1145             error_propagate(errp, local_err);
1146             goto out;
1147         }
1148 
1149         /* If the initial attempt to convert and process the options failed,
1150          * we may be attempting to open an image file that has the rbd options
1151          * specified in the older format consisting of all key/value pairs
1152          * encoded in the filename.  Go ahead and attempt to parse the
1153          * filename, and see if we can pull out the required options. */
1154         r = qemu_rbd_attempt_legacy_options(options, &opts, &keypairs);
1155         if (r < 0) {
1156             /* Propagate the original error, not the legacy parsing fallback
1157              * error, as the latter was just a best-effort attempt. */
1158             error_propagate(errp, local_err);
1159             goto out;
1160         }
1161         /* Take care whenever deciding to actually deprecate; once this ability
1162          * is removed, we will not be able to open any images with legacy-styled
1163          * backing image strings. */
1164         warn_report("RBD options encoded in the filename as keyvalue pairs "
1165                     "is deprecated");
1166     }
1167 
1168     /* Remove the processed options from the QDict (the visitor processes
1169      * _all_ options in the QDict) */
1170     while ((e = qdict_first(options))) {
1171         qdict_del(options, e->key);
1172     }
1173 
1174     r = qemu_rbd_connect(&s->cluster, &s->io_ctx, opts,
1175                          !(flags & BDRV_O_NOCACHE), keypairs, secretid, errp);
1176     if (r < 0) {
1177         goto out;
1178     }
1179 
1180     s->snap = g_strdup(opts->snapshot);
1181     s->image_name = g_strdup(opts->image);
1182 
1183     /* rbd_open is always r/w */
1184     r = rbd_open(s->io_ctx, s->image_name, &s->image, s->snap);
1185     if (r < 0) {
1186         error_setg_errno(errp, -r, "error reading header from %s",
1187                          s->image_name);
1188         goto failed_open;
1189     }
1190 
1191     s->encryption_format = RBD_IMAGE_ENCRYPTION_FORMAT__MAX;
1192     if (opts->encrypt) {
1193 #ifdef LIBRBD_SUPPORTS_ENCRYPTION
1194         if (opts->encrypt->parent) {
1195 #ifdef LIBRBD_SUPPORTS_ENCRYPTION_LOAD2
1196             r = qemu_rbd_encryption_load2(bs, s->image, opts->encrypt, errp);
1197 #else
1198             r = -ENOTSUP;
1199             error_setg(errp, "RBD library does not support layered encryption");
1200 #endif
1201         } else {
1202             r = qemu_rbd_encryption_load(bs, s->image, opts->encrypt, errp);
1203         }
1204         if (r < 0) {
1205             goto failed_post_open;
1206         }
1207 #else
1208         r = -ENOTSUP;
1209         error_setg(errp, "RBD library does not support image encryption");
1210         goto failed_post_open;
1211 #endif
1212     } else {
1213         qemu_rbd_encryption_probe(bs);
1214     }
1215 
1216     r = rbd_stat(s->image, &info, sizeof(info));
1217     if (r < 0) {
1218         error_setg_errno(errp, -r, "error getting image info from %s",
1219                          s->image_name);
1220         goto failed_post_open;
1221     }
1222     s->image_size = info.size;
1223     s->object_size = info.obj_size;
1224 
1225     /* If we are using an rbd snapshot, we must be r/o, otherwise
1226      * leave as-is */
1227     if (s->snap != NULL) {
1228         bdrv_graph_rdlock_main_loop();
1229         r = bdrv_apply_auto_read_only(bs, "rbd snapshots are read-only", errp);
1230         bdrv_graph_rdunlock_main_loop();
1231         if (r < 0) {
1232             goto failed_post_open;
1233         }
1234     }
1235 
1236 #ifdef LIBRBD_SUPPORTS_WRITE_ZEROES
1237     bs->supported_zero_flags = BDRV_REQ_MAY_UNMAP | BDRV_REQ_NO_FALLBACK;
1238 #endif
1239 
1240     /* When extending regular files, we get zeros from the OS */
1241     bs->supported_truncate_flags = BDRV_REQ_ZERO_WRITE;
1242 
1243     r = 0;
1244     goto out;
1245 
1246 failed_post_open:
1247     rbd_close(s->image);
1248 failed_open:
1249     rados_ioctx_destroy(s->io_ctx);
1250     g_free(s->snap);
1251     g_free(s->image_name);
1252     rados_shutdown(s->cluster);
1253 out:
1254     qapi_free_BlockdevOptionsRbd(opts);
1255     g_free(keypairs);
1256     g_free(secretid);
1257     return r;
1258 }
1259 
1260 
1261 /* Since RBD is currently always opened R/W via the API,
1262  * we just need to check if we are using a snapshot or not, in
1263  * order to determine if we will allow it to be R/W */
qemu_rbd_reopen_prepare(BDRVReopenState * state,BlockReopenQueue * queue,Error ** errp)1264 static int qemu_rbd_reopen_prepare(BDRVReopenState *state,
1265                                    BlockReopenQueue *queue, Error **errp)
1266 {
1267     BDRVRBDState *s = state->bs->opaque;
1268     int ret = 0;
1269 
1270     GRAPH_RDLOCK_GUARD_MAINLOOP();
1271 
1272     if (s->snap && state->flags & BDRV_O_RDWR) {
1273         error_setg(errp,
1274                    "Cannot change node '%s' to r/w when using RBD snapshot",
1275                    bdrv_get_device_or_node_name(state->bs));
1276         ret = -EINVAL;
1277     }
1278 
1279     return ret;
1280 }
1281 
qemu_rbd_close(BlockDriverState * bs)1282 static void qemu_rbd_close(BlockDriverState *bs)
1283 {
1284     BDRVRBDState *s = bs->opaque;
1285 
1286     rbd_close(s->image);
1287     rados_ioctx_destroy(s->io_ctx);
1288     g_free(s->snap);
1289     g_free(s->image_name);
1290     rados_shutdown(s->cluster);
1291 }
1292 
1293 /* Resize the RBD image and update the 'image_size' with the current size */
qemu_rbd_resize(BlockDriverState * bs,uint64_t size)1294 static int qemu_rbd_resize(BlockDriverState *bs, uint64_t size)
1295 {
1296     BDRVRBDState *s = bs->opaque;
1297     int r;
1298 
1299     r = rbd_resize(s->image, size);
1300     if (r < 0) {
1301         return r;
1302     }
1303 
1304     s->image_size = size;
1305 
1306     return 0;
1307 }
1308 
qemu_rbd_finish_bh(void * opaque)1309 static void qemu_rbd_finish_bh(void *opaque)
1310 {
1311     RBDTask *task = opaque;
1312     task->complete = true;
1313     aio_co_wake(task->co);
1314 }
1315 
1316 /*
1317  * This is the completion callback function for all rbd aio calls
1318  * started from qemu_rbd_start_co().
1319  *
1320  * Note: this function is being called from a non qemu thread so
1321  * we need to be careful about what we do here. Generally we only
1322  * schedule a BH, and do the rest of the io completion handling
1323  * from qemu_rbd_finish_bh() which runs in a qemu context.
1324  */
qemu_rbd_completion_cb(rbd_completion_t c,RBDTask * task)1325 static void qemu_rbd_completion_cb(rbd_completion_t c, RBDTask *task)
1326 {
1327     task->ret = rbd_aio_get_return_value(c);
1328     rbd_aio_release(c);
1329     aio_bh_schedule_oneshot(bdrv_get_aio_context(task->bs),
1330                             qemu_rbd_finish_bh, task);
1331 }
1332 
qemu_rbd_start_co(BlockDriverState * bs,uint64_t offset,uint64_t bytes,QEMUIOVector * qiov,int flags,RBDAIOCmd cmd)1333 static int coroutine_fn qemu_rbd_start_co(BlockDriverState *bs,
1334                                           uint64_t offset,
1335                                           uint64_t bytes,
1336                                           QEMUIOVector *qiov,
1337                                           int flags,
1338                                           RBDAIOCmd cmd)
1339 {
1340     BDRVRBDState *s = bs->opaque;
1341     RBDTask task = { .bs = bs, .co = qemu_coroutine_self() };
1342     rbd_completion_t c;
1343     int r;
1344 
1345     assert(!qiov || qiov->size == bytes);
1346 
1347     if (cmd == RBD_AIO_WRITE || cmd == RBD_AIO_WRITE_ZEROES) {
1348         /*
1349          * RBD APIs don't allow us to write more than actual size, so in order
1350          * to support growing images, we resize the image before write
1351          * operations that exceed the current size.
1352          */
1353         if (offset + bytes > s->image_size) {
1354             r = qemu_rbd_resize(bs, offset + bytes);
1355             if (r < 0) {
1356                 return r;
1357             }
1358         }
1359     }
1360 
1361     r = rbd_aio_create_completion(&task,
1362                                   (rbd_callback_t) qemu_rbd_completion_cb, &c);
1363     if (r < 0) {
1364         return r;
1365     }
1366 
1367     switch (cmd) {
1368     case RBD_AIO_READ:
1369         r = rbd_aio_readv(s->image, qiov->iov, qiov->niov, offset, c);
1370         break;
1371     case RBD_AIO_WRITE:
1372         r = rbd_aio_writev(s->image, qiov->iov, qiov->niov, offset, c);
1373         break;
1374     case RBD_AIO_DISCARD:
1375         r = rbd_aio_discard(s->image, offset, bytes, c);
1376         break;
1377     case RBD_AIO_FLUSH:
1378         r = rbd_aio_flush(s->image, c);
1379         break;
1380 #ifdef LIBRBD_SUPPORTS_WRITE_ZEROES
1381     case RBD_AIO_WRITE_ZEROES: {
1382         int zero_flags = 0;
1383 #ifdef RBD_WRITE_ZEROES_FLAG_THICK_PROVISION
1384         if (!(flags & BDRV_REQ_MAY_UNMAP)) {
1385             zero_flags = RBD_WRITE_ZEROES_FLAG_THICK_PROVISION;
1386         }
1387 #endif
1388         r = rbd_aio_write_zeroes(s->image, offset, bytes, c, zero_flags, 0);
1389         break;
1390     }
1391 #endif
1392     default:
1393         r = -EINVAL;
1394     }
1395 
1396     if (r < 0) {
1397         error_report("rbd request failed early: cmd %d offset %" PRIu64
1398                      " bytes %" PRIu64 " flags %d r %d (%s)", cmd, offset,
1399                      bytes, flags, r, strerror(-r));
1400         rbd_aio_release(c);
1401         return r;
1402     }
1403 
1404     while (!task.complete) {
1405         qemu_coroutine_yield();
1406     }
1407 
1408     if (task.ret < 0) {
1409         error_report("rbd request failed: cmd %d offset %" PRIu64 " bytes %"
1410                      PRIu64 " flags %d task.ret %" PRIi64 " (%s)", cmd, offset,
1411                      bytes, flags, task.ret, strerror(-task.ret));
1412         return task.ret;
1413     }
1414 
1415     /* zero pad short reads */
1416     if (cmd == RBD_AIO_READ && task.ret < qiov->size) {
1417         qemu_iovec_memset(qiov, task.ret, 0, qiov->size - task.ret);
1418     }
1419 
1420     return 0;
1421 }
1422 
1423 static int
qemu_rbd_co_preadv(BlockDriverState * bs,int64_t offset,int64_t bytes,QEMUIOVector * qiov,BdrvRequestFlags flags)1424 coroutine_fn qemu_rbd_co_preadv(BlockDriverState *bs, int64_t offset,
1425                                 int64_t bytes, QEMUIOVector *qiov,
1426                                 BdrvRequestFlags flags)
1427 {
1428     return qemu_rbd_start_co(bs, offset, bytes, qiov, flags, RBD_AIO_READ);
1429 }
1430 
1431 static int
qemu_rbd_co_pwritev(BlockDriverState * bs,int64_t offset,int64_t bytes,QEMUIOVector * qiov,BdrvRequestFlags flags)1432 coroutine_fn qemu_rbd_co_pwritev(BlockDriverState *bs, int64_t offset,
1433                                  int64_t bytes, QEMUIOVector *qiov,
1434                                  BdrvRequestFlags flags)
1435 {
1436     return qemu_rbd_start_co(bs, offset, bytes, qiov, flags, RBD_AIO_WRITE);
1437 }
1438 
qemu_rbd_co_flush(BlockDriverState * bs)1439 static int coroutine_fn qemu_rbd_co_flush(BlockDriverState *bs)
1440 {
1441     return qemu_rbd_start_co(bs, 0, 0, NULL, 0, RBD_AIO_FLUSH);
1442 }
1443 
qemu_rbd_co_pdiscard(BlockDriverState * bs,int64_t offset,int64_t bytes)1444 static int coroutine_fn qemu_rbd_co_pdiscard(BlockDriverState *bs,
1445                                              int64_t offset, int64_t bytes)
1446 {
1447     return qemu_rbd_start_co(bs, offset, bytes, NULL, 0, RBD_AIO_DISCARD);
1448 }
1449 
1450 #ifdef LIBRBD_SUPPORTS_WRITE_ZEROES
1451 static int
qemu_rbd_co_pwrite_zeroes(BlockDriverState * bs,int64_t offset,int64_t bytes,BdrvRequestFlags flags)1452 coroutine_fn qemu_rbd_co_pwrite_zeroes(BlockDriverState *bs, int64_t offset,
1453                                        int64_t bytes, BdrvRequestFlags flags)
1454 {
1455     return qemu_rbd_start_co(bs, offset, bytes, NULL, flags,
1456                              RBD_AIO_WRITE_ZEROES);
1457 }
1458 #endif
1459 
1460 static int coroutine_fn
qemu_rbd_co_get_info(BlockDriverState * bs,BlockDriverInfo * bdi)1461 qemu_rbd_co_get_info(BlockDriverState *bs, BlockDriverInfo *bdi)
1462 {
1463     BDRVRBDState *s = bs->opaque;
1464     bdi->cluster_size = s->object_size;
1465     return 0;
1466 }
1467 
qemu_rbd_get_specific_info(BlockDriverState * bs,Error ** errp)1468 static ImageInfoSpecific *qemu_rbd_get_specific_info(BlockDriverState *bs,
1469                                                      Error **errp)
1470 {
1471     BDRVRBDState *s = bs->opaque;
1472     ImageInfoSpecific *spec_info;
1473 
1474     spec_info = g_new(ImageInfoSpecific, 1);
1475     *spec_info = (ImageInfoSpecific){
1476         .type  = IMAGE_INFO_SPECIFIC_KIND_RBD,
1477         .u.rbd.data = g_new0(ImageInfoSpecificRbd, 1),
1478     };
1479 
1480     if (s->encryption_format == RBD_IMAGE_ENCRYPTION_FORMAT__MAX) {
1481         assert(!bs->encrypted);
1482     } else {
1483         ImageInfoSpecificRbd *rbd_info = spec_info->u.rbd.data;
1484 
1485         rbd_info->has_encryption_format = true;
1486         rbd_info->encryption_format = s->encryption_format;
1487     }
1488 
1489     return spec_info;
1490 }
1491 
1492 /*
1493  * rbd_diff_iterate2 allows to interrupt the exection by returning a negative
1494  * value in the callback routine. Choose a value that does not conflict with
1495  * an existing exitcode and return it if we want to prematurely stop the
1496  * execution because we detected a change in the allocation status.
1497  */
1498 #define QEMU_RBD_EXIT_DIFF_ITERATE2 -9000
1499 
qemu_rbd_diff_iterate_cb(uint64_t offs,size_t len,int exists,void * opaque)1500 static int qemu_rbd_diff_iterate_cb(uint64_t offs, size_t len,
1501                                     int exists, void *opaque)
1502 {
1503     RBDDiffIterateReq *req = opaque;
1504 
1505     assert(req->offs + req->bytes <= offs);
1506 
1507     /* treat a hole like an unallocated area and bail out */
1508     if (!exists) {
1509         return 0;
1510     }
1511 
1512     if (!req->exists && offs > req->offs) {
1513         /*
1514          * we started in an unallocated area and hit the first allocated
1515          * block. req->bytes must be set to the length of the unallocated area
1516          * before the allocated area. stop further processing.
1517          */
1518         req->bytes = offs - req->offs;
1519         return QEMU_RBD_EXIT_DIFF_ITERATE2;
1520     }
1521 
1522     if (req->exists && offs > req->offs + req->bytes) {
1523         /*
1524          * we started in an allocated area and jumped over an unallocated area,
1525          * req->bytes contains the length of the allocated area before the
1526          * unallocated area. stop further processing.
1527          */
1528         return QEMU_RBD_EXIT_DIFF_ITERATE2;
1529     }
1530 
1531     req->bytes += len;
1532     req->exists = true;
1533 
1534     return 0;
1535 }
1536 
qemu_rbd_co_block_status(BlockDriverState * bs,unsigned int mode,int64_t offset,int64_t bytes,int64_t * pnum,int64_t * map,BlockDriverState ** file)1537 static int coroutine_fn qemu_rbd_co_block_status(BlockDriverState *bs,
1538                                                  unsigned int mode,
1539                                                  int64_t offset, int64_t bytes,
1540                                                  int64_t *pnum, int64_t *map,
1541                                                  BlockDriverState **file)
1542 {
1543     BDRVRBDState *s = bs->opaque;
1544     int status, r;
1545     RBDDiffIterateReq req = { .offs = offset };
1546     uint64_t features, flags;
1547     uint64_t head = 0;
1548 
1549     assert(offset + bytes <= s->image_size);
1550 
1551     /* default to all sectors allocated */
1552     status = BDRV_BLOCK_DATA | BDRV_BLOCK_OFFSET_VALID;
1553     *map = offset;
1554     *file = bs;
1555     *pnum = bytes;
1556 
1557     /* check if RBD image supports fast-diff */
1558     r = rbd_get_features(s->image, &features);
1559     if (r < 0) {
1560         return status;
1561     }
1562     if (!(features & RBD_FEATURE_FAST_DIFF)) {
1563         return status;
1564     }
1565 
1566     /* check if RBD fast-diff result is valid */
1567     r = rbd_get_flags(s->image, &flags);
1568     if (r < 0) {
1569         return status;
1570     }
1571     if (flags & RBD_FLAG_FAST_DIFF_INVALID) {
1572         return status;
1573     }
1574 
1575 #if LIBRBD_VERSION_CODE < LIBRBD_VERSION(1, 17, 0)
1576     /*
1577      * librbd had a bug until early 2022 that affected all versions of ceph that
1578      * supported fast-diff. This bug results in reporting of incorrect offsets
1579      * if the offset parameter to rbd_diff_iterate2 is not object aligned.
1580      * Work around this bug by rounding down the offset to object boundaries.
1581      * This is OK because we call rbd_diff_iterate2 with whole_object = true.
1582      * However, this workaround only works for non cloned images with default
1583      * striping.
1584      *
1585      * See: https://tracker.ceph.com/issues/53784
1586      */
1587 
1588     /* check if RBD image has non-default striping enabled */
1589     if (features & RBD_FEATURE_STRIPINGV2) {
1590         return status;
1591     }
1592 
1593 #pragma GCC diagnostic push
1594 #pragma GCC diagnostic ignored "-Wdeprecated-declarations"
1595     /*
1596      * check if RBD image is a clone (= has a parent).
1597      *
1598      * rbd_get_parent_info is deprecated from Nautilus onwards, but the
1599      * replacement rbd_get_parent is not present in Luminous and Mimic.
1600      */
1601     if (rbd_get_parent_info(s->image, NULL, 0, NULL, 0, NULL, 0) != -ENOENT) {
1602         return status;
1603     }
1604 #pragma GCC diagnostic pop
1605 
1606     head = req.offs & (s->object_size - 1);
1607     req.offs -= head;
1608     bytes += head;
1609 #endif
1610 
1611     r = rbd_diff_iterate2(s->image, NULL, req.offs, bytes, true, true,
1612                           qemu_rbd_diff_iterate_cb, &req);
1613     if (r < 0 && r != QEMU_RBD_EXIT_DIFF_ITERATE2) {
1614         return status;
1615     }
1616     assert(req.bytes <= bytes);
1617     if (!req.exists) {
1618         if (r == 0) {
1619             /*
1620              * rbd_diff_iterate2 does not invoke callbacks for unallocated
1621              * areas. This here catches the case where no callback was
1622              * invoked at all (req.bytes == 0).
1623              */
1624             assert(req.bytes == 0);
1625             req.bytes = bytes;
1626         }
1627         status = BDRV_BLOCK_ZERO | BDRV_BLOCK_OFFSET_VALID;
1628     }
1629 
1630     assert(req.bytes > head);
1631     *pnum = req.bytes - head;
1632     return status;
1633 }
1634 
qemu_rbd_co_getlength(BlockDriverState * bs)1635 static int64_t coroutine_fn qemu_rbd_co_getlength(BlockDriverState *bs)
1636 {
1637     BDRVRBDState *s = bs->opaque;
1638     int r;
1639 
1640     r = rbd_get_size(s->image, &s->image_size);
1641     if (r < 0) {
1642         return r;
1643     }
1644 
1645     return s->image_size;
1646 }
1647 
qemu_rbd_co_truncate(BlockDriverState * bs,int64_t offset,bool exact,PreallocMode prealloc,BdrvRequestFlags flags,Error ** errp)1648 static int coroutine_fn qemu_rbd_co_truncate(BlockDriverState *bs,
1649                                              int64_t offset,
1650                                              bool exact,
1651                                              PreallocMode prealloc,
1652                                              BdrvRequestFlags flags,
1653                                              Error **errp)
1654 {
1655     int r;
1656 
1657     if (prealloc != PREALLOC_MODE_OFF) {
1658         error_setg(errp, "Unsupported preallocation mode '%s'",
1659                    PreallocMode_str(prealloc));
1660         return -ENOTSUP;
1661     }
1662 
1663     r = qemu_rbd_resize(bs, offset);
1664     if (r < 0) {
1665         error_setg_errno(errp, -r, "Failed to resize file");
1666         return r;
1667     }
1668 
1669     return 0;
1670 }
1671 
qemu_rbd_snap_create(BlockDriverState * bs,QEMUSnapshotInfo * sn_info)1672 static int qemu_rbd_snap_create(BlockDriverState *bs,
1673                                 QEMUSnapshotInfo *sn_info)
1674 {
1675     BDRVRBDState *s = bs->opaque;
1676     int r;
1677 
1678     if (sn_info->name[0] == '\0') {
1679         return -EINVAL; /* we need a name for rbd snapshots */
1680     }
1681 
1682     /*
1683      * rbd snapshots are using the name as the user controlled unique identifier
1684      * we can't use the rbd snapid for that purpose, as it can't be set
1685      */
1686     if (sn_info->id_str[0] != '\0' &&
1687         strcmp(sn_info->id_str, sn_info->name) != 0) {
1688         return -EINVAL;
1689     }
1690 
1691     if (strlen(sn_info->name) >= sizeof(sn_info->id_str)) {
1692         return -ERANGE;
1693     }
1694 
1695     r = rbd_snap_create(s->image, sn_info->name);
1696     if (r < 0) {
1697         error_report("failed to create snap: %s", strerror(-r));
1698         return r;
1699     }
1700 
1701     return 0;
1702 }
1703 
qemu_rbd_snap_remove(BlockDriverState * bs,const char * snapshot_id,const char * snapshot_name,Error ** errp)1704 static int qemu_rbd_snap_remove(BlockDriverState *bs,
1705                                 const char *snapshot_id,
1706                                 const char *snapshot_name,
1707                                 Error **errp)
1708 {
1709     BDRVRBDState *s = bs->opaque;
1710     int r;
1711 
1712     if (!snapshot_name) {
1713         error_setg(errp, "rbd need a valid snapshot name");
1714         return -EINVAL;
1715     }
1716 
1717     /* If snapshot_id is specified, it must be equal to name, see
1718        qemu_rbd_snap_list() */
1719     if (snapshot_id && strcmp(snapshot_id, snapshot_name)) {
1720         error_setg(errp,
1721                    "rbd do not support snapshot id, it should be NULL or "
1722                    "equal to snapshot name");
1723         return -EINVAL;
1724     }
1725 
1726     r = rbd_snap_remove(s->image, snapshot_name);
1727     if (r < 0) {
1728         error_setg_errno(errp, -r, "Failed to remove the snapshot");
1729     }
1730     return r;
1731 }
1732 
qemu_rbd_snap_rollback(BlockDriverState * bs,const char * snapshot_name)1733 static int qemu_rbd_snap_rollback(BlockDriverState *bs,
1734                                   const char *snapshot_name)
1735 {
1736     BDRVRBDState *s = bs->opaque;
1737 
1738     return rbd_snap_rollback(s->image, snapshot_name);
1739 }
1740 
qemu_rbd_snap_list(BlockDriverState * bs,QEMUSnapshotInfo ** psn_tab)1741 static int qemu_rbd_snap_list(BlockDriverState *bs,
1742                               QEMUSnapshotInfo **psn_tab)
1743 {
1744     BDRVRBDState *s = bs->opaque;
1745     QEMUSnapshotInfo *sn_info, *sn_tab = NULL;
1746     int i, snap_count;
1747     rbd_snap_info_t *snaps;
1748     int max_snaps = RBD_MAX_SNAPS;
1749 
1750     do {
1751         snaps = g_new(rbd_snap_info_t, max_snaps);
1752         snap_count = rbd_snap_list(s->image, snaps, &max_snaps);
1753         if (snap_count <= 0) {
1754             g_free(snaps);
1755         }
1756     } while (snap_count == -ERANGE);
1757 
1758     if (snap_count <= 0) {
1759         goto done;
1760     }
1761 
1762     sn_tab = g_new0(QEMUSnapshotInfo, snap_count);
1763 
1764     for (i = 0; i < snap_count; i++) {
1765         const char *snap_name = snaps[i].name;
1766 
1767         sn_info = sn_tab + i;
1768         pstrcpy(sn_info->id_str, sizeof(sn_info->id_str), snap_name);
1769         pstrcpy(sn_info->name, sizeof(sn_info->name), snap_name);
1770 
1771         sn_info->vm_state_size = snaps[i].size;
1772         sn_info->date_sec = 0;
1773         sn_info->date_nsec = 0;
1774         sn_info->vm_clock_nsec = 0;
1775     }
1776     rbd_snap_list_end(snaps);
1777     g_free(snaps);
1778 
1779  done:
1780     *psn_tab = sn_tab;
1781     return snap_count;
1782 }
1783 
qemu_rbd_co_invalidate_cache(BlockDriverState * bs,Error ** errp)1784 static void coroutine_fn qemu_rbd_co_invalidate_cache(BlockDriverState *bs,
1785                                                       Error **errp)
1786 {
1787     BDRVRBDState *s = bs->opaque;
1788     int r = rbd_invalidate_cache(s->image);
1789     if (r < 0) {
1790         error_setg_errno(errp, -r, "Failed to invalidate the cache");
1791     }
1792 }
1793 
1794 static QemuOptsList qemu_rbd_create_opts = {
1795     .name = "rbd-create-opts",
1796     .head = QTAILQ_HEAD_INITIALIZER(qemu_rbd_create_opts.head),
1797     .desc = {
1798         {
1799             .name = BLOCK_OPT_SIZE,
1800             .type = QEMU_OPT_SIZE,
1801             .help = "Virtual disk size"
1802         },
1803         {
1804             .name = BLOCK_OPT_CLUSTER_SIZE,
1805             .type = QEMU_OPT_SIZE,
1806             .help = "RBD object size"
1807         },
1808         {
1809             .name = "password-secret",
1810             .type = QEMU_OPT_STRING,
1811             .help = "ID of secret providing the password",
1812         },
1813         {
1814             .name = "encrypt.format",
1815             .type = QEMU_OPT_STRING,
1816             .help = "Encrypt the image, format choices: 'luks', 'luks2'",
1817         },
1818         {
1819             .name = "encrypt.cipher-alg",
1820             .type = QEMU_OPT_STRING,
1821             .help = "Name of encryption cipher algorithm"
1822                     " (allowed values: aes-128, aes-256)",
1823         },
1824         {
1825             .name = "encrypt.key-secret",
1826             .type = QEMU_OPT_STRING,
1827             .help = "ID of secret providing LUKS passphrase",
1828         },
1829         { /* end of list */ }
1830     }
1831 };
1832 
1833 static const char *const qemu_rbd_strong_runtime_opts[] = {
1834     "pool",
1835     "namespace",
1836     "image",
1837     "conf",
1838     "snapshot",
1839     "user",
1840     "server.",
1841     "password-secret",
1842 
1843     NULL
1844 };
1845 
1846 static BlockDriver bdrv_rbd = {
1847     .format_name            = "rbd",
1848     .instance_size          = sizeof(BDRVRBDState),
1849 
1850     .bdrv_parse_filename    = qemu_rbd_parse_filename,
1851     .bdrv_open              = qemu_rbd_open,
1852     .bdrv_close             = qemu_rbd_close,
1853     .bdrv_reopen_prepare    = qemu_rbd_reopen_prepare,
1854     .bdrv_co_create         = qemu_rbd_co_create,
1855     .bdrv_co_create_opts    = qemu_rbd_co_create_opts,
1856     .bdrv_has_zero_init     = bdrv_has_zero_init_1,
1857     .bdrv_co_get_info       = qemu_rbd_co_get_info,
1858     .bdrv_get_specific_info = qemu_rbd_get_specific_info,
1859     .create_opts            = &qemu_rbd_create_opts,
1860     .bdrv_co_getlength      = qemu_rbd_co_getlength,
1861     .bdrv_co_truncate       = qemu_rbd_co_truncate,
1862     .protocol_name          = "rbd",
1863 
1864     .bdrv_co_preadv         = qemu_rbd_co_preadv,
1865     .bdrv_co_pwritev        = qemu_rbd_co_pwritev,
1866     .bdrv_co_flush_to_disk  = qemu_rbd_co_flush,
1867     .bdrv_co_pdiscard       = qemu_rbd_co_pdiscard,
1868 #ifdef LIBRBD_SUPPORTS_WRITE_ZEROES
1869     .bdrv_co_pwrite_zeroes  = qemu_rbd_co_pwrite_zeroes,
1870 #endif
1871     .bdrv_co_block_status   = qemu_rbd_co_block_status,
1872 
1873     .bdrv_snapshot_create   = qemu_rbd_snap_create,
1874     .bdrv_snapshot_delete   = qemu_rbd_snap_remove,
1875     .bdrv_snapshot_list     = qemu_rbd_snap_list,
1876     .bdrv_snapshot_goto     = qemu_rbd_snap_rollback,
1877     .bdrv_co_invalidate_cache = qemu_rbd_co_invalidate_cache,
1878 
1879     .strong_runtime_opts    = qemu_rbd_strong_runtime_opts,
1880 };
1881 
bdrv_rbd_init(void)1882 static void bdrv_rbd_init(void)
1883 {
1884     bdrv_register(&bdrv_rbd);
1885 }
1886 
1887 block_init(bdrv_rbd_init);
1888