xref: /openbmc/qemu/block/rbd.c (revision e8e9ed8b1127a39c95eba77d7410a0076dbd761c)
1 /*
2  * QEMU Block driver for RADOS (Ceph)
3  *
4  * Copyright (C) 2010-2011 Christian Brunner <chb@muc.de>,
5  *                         Josh Durgin <josh.durgin@dreamhost.com>
6  *
7  * This work is licensed under the terms of the GNU GPL, version 2.  See
8  * the COPYING file in the top-level directory.
9  *
10  * Contributions after 2012-01-13 are licensed under the terms of the
11  * GNU GPL, version 2 or (at your option) any later version.
12  */
13 
14 #include "qemu/osdep.h"
15 
16 #include <rbd/librbd.h>
17 #include "qapi/error.h"
18 #include "qemu/error-report.h"
19 #include "qemu/module.h"
20 #include "qemu/option.h"
21 #include "block/block-io.h"
22 #include "block/block_int.h"
23 #include "block/qdict.h"
24 #include "crypto/secret.h"
25 #include "qemu/cutils.h"
26 #include "system/replay.h"
27 #include "qobject/qstring.h"
28 #include "qobject/qdict.h"
29 #include "qobject/qjson.h"
30 #include "qobject/qlist.h"
31 #include "qapi/qobject-input-visitor.h"
32 #include "qapi/qapi-visit-block-core.h"
33 
34 /*
35  * When specifying the image filename use:
36  *
37  * rbd:poolname/devicename[@snapshotname][:option1=value1[:option2=value2...]]
38  *
39  * poolname must be the name of an existing rados pool.
40  *
41  * devicename is the name of the rbd image.
42  *
43  * Each option given is used to configure rados, and may be any valid
44  * Ceph option, "id", or "conf".
45  *
46  * The "id" option indicates what user we should authenticate as to
47  * the Ceph cluster.  If it is excluded we will use the Ceph default
48  * (normally 'admin').
49  *
50  * The "conf" option specifies a Ceph configuration file to read.  If
51  * it is not specified, we will read from the default Ceph locations
52  * (e.g., /etc/ceph/ceph.conf).  To avoid reading _any_ configuration
53  * file, specify conf=/dev/null.
54  *
55  * Configuration values containing :, @, or = can be escaped with a
56  * leading "\".
57  */
58 
59 #define OBJ_MAX_SIZE (1UL << OBJ_DEFAULT_OBJ_ORDER)
60 
61 #define RBD_MAX_SNAPS 100
62 
63 #define RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN 8
64 
65 static const char rbd_luks_header_verification[
66         RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN] = {
67     'L', 'U', 'K', 'S', 0xBA, 0xBE, 0, 1
68 };
69 
70 static const char rbd_luks2_header_verification[
71         RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN] = {
72     'L', 'U', 'K', 'S', 0xBA, 0xBE, 0, 2
73 };
74 
75 static const char rbd_layered_luks_header_verification[
76         RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN] = {
77     'R', 'B', 'D', 'L', 0xBA, 0xBE, 0, 1
78 };
79 
80 static const char rbd_layered_luks2_header_verification[
81         RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN] = {
82     'R', 'B', 'D', 'L', 0xBA, 0xBE, 0, 2
83 };
84 
85 typedef enum {
86     RBD_AIO_READ,
87     RBD_AIO_WRITE,
88     RBD_AIO_DISCARD,
89     RBD_AIO_FLUSH,
90     RBD_AIO_WRITE_ZEROES
91 } RBDAIOCmd;
92 
93 typedef struct BDRVRBDState {
94     rados_t cluster;
95     rados_ioctx_t io_ctx;
96     rbd_image_t image;
97     char *image_name;
98     char *snap;
99     char *namespace;
100     uint64_t image_size;
101     uint64_t object_size;
102 
103     /*
104      * If @bs->encrypted is true, this is the encryption format actually loaded
105      * at the librbd level. If it is false, it is the result of probing.
106      * RBD_IMAGE_ENCRYPTION_FORMAT__MAX means that encryption is not enabled and
107      * probing didn't find any known encryption header either.
108      */
109     RbdImageEncryptionFormat encryption_format;
110 } BDRVRBDState;
111 
112 typedef struct RBDTask {
113     Coroutine *co;
114     int64_t ret;
115 } RBDTask;
116 
117 typedef struct RBDDiffIterateReq {
118     uint64_t offs;
119     uint64_t bytes;
120     bool exists;
121 } RBDDiffIterateReq;
122 
123 static int qemu_rbd_connect(rados_t *cluster, rados_ioctx_t *io_ctx,
124                             BlockdevOptionsRbd *opts, bool cache,
125                             const char *keypairs, const char *secretid,
126                             Error **errp);
127 
qemu_rbd_strchr(char * src,char delim)128 static char *qemu_rbd_strchr(char *src, char delim)
129 {
130     char *p;
131 
132     for (p = src; *p; ++p) {
133         if (*p == delim) {
134             return p;
135         }
136         if (*p == '\\' && p[1] != '\0') {
137             ++p;
138         }
139     }
140 
141     return NULL;
142 }
143 
144 
qemu_rbd_next_tok(char * src,char delim,char ** p)145 static char *qemu_rbd_next_tok(char *src, char delim, char **p)
146 {
147     char *end;
148 
149     *p = NULL;
150 
151     end = qemu_rbd_strchr(src, delim);
152     if (end) {
153         *p = end + 1;
154         *end = '\0';
155     }
156     return src;
157 }
158 
qemu_rbd_unescape(char * src)159 static void qemu_rbd_unescape(char *src)
160 {
161     char *p;
162 
163     for (p = src; *src; ++src, ++p) {
164         if (*src == '\\' && src[1] != '\0') {
165             src++;
166         }
167         *p = *src;
168     }
169     *p = '\0';
170 }
171 
qemu_rbd_parse_filename(const char * filename,QDict * options,Error ** errp)172 static void qemu_rbd_parse_filename(const char *filename, QDict *options,
173                                     Error **errp)
174 {
175     const char *start;
176     char *p, *buf;
177     QList *keypairs = NULL;
178     char *found_str, *image_name;
179 
180     if (!strstart(filename, "rbd:", &start)) {
181         error_setg(errp, "File name must start with 'rbd:'");
182         return;
183     }
184 
185     buf = g_strdup(start);
186     p = buf;
187 
188     found_str = qemu_rbd_next_tok(p, '/', &p);
189     if (!p) {
190         error_setg(errp, "Pool name is required");
191         goto done;
192     }
193     qemu_rbd_unescape(found_str);
194     qdict_put_str(options, "pool", found_str);
195 
196     if (qemu_rbd_strchr(p, '@')) {
197         image_name = qemu_rbd_next_tok(p, '@', &p);
198 
199         found_str = qemu_rbd_next_tok(p, ':', &p);
200         qemu_rbd_unescape(found_str);
201         qdict_put_str(options, "snapshot", found_str);
202     } else {
203         image_name = qemu_rbd_next_tok(p, ':', &p);
204     }
205     /* Check for namespace in the image_name */
206     if (qemu_rbd_strchr(image_name, '/')) {
207         found_str = qemu_rbd_next_tok(image_name, '/', &image_name);
208         qemu_rbd_unescape(found_str);
209         qdict_put_str(options, "namespace", found_str);
210     } else {
211         qdict_put_str(options, "namespace", "");
212     }
213     qemu_rbd_unescape(image_name);
214     qdict_put_str(options, "image", image_name);
215     if (!p) {
216         goto done;
217     }
218 
219     /* The following are essentially all key/value pairs, and we treat
220      * 'id' and 'conf' a bit special.  Key/value pairs may be in any order. */
221     while (p) {
222         char *name, *value;
223         name = qemu_rbd_next_tok(p, '=', &p);
224         if (!p) {
225             error_setg(errp, "conf option %s has no value", name);
226             break;
227         }
228 
229         qemu_rbd_unescape(name);
230 
231         value = qemu_rbd_next_tok(p, ':', &p);
232         qemu_rbd_unescape(value);
233 
234         if (!strcmp(name, "conf")) {
235             qdict_put_str(options, "conf", value);
236         } else if (!strcmp(name, "id")) {
237             qdict_put_str(options, "user", value);
238         } else {
239             /*
240              * We pass these internally to qemu_rbd_set_keypairs(), so
241              * we can get away with the simpler list of [ "key1",
242              * "value1", "key2", "value2" ] rather than a raw dict
243              * { "key1": "value1", "key2": "value2" } where we can't
244              * guarantee order, or even a more correct but complex
245              * [ { "key1": "value1" }, { "key2": "value2" } ]
246              */
247             if (!keypairs) {
248                 keypairs = qlist_new();
249             }
250             qlist_append_str(keypairs, name);
251             qlist_append_str(keypairs, value);
252         }
253     }
254 
255     if (keypairs) {
256         qdict_put(options, "=keyvalue-pairs",
257                   qstring_from_gstring(qobject_to_json(QOBJECT(keypairs))));
258     }
259 
260 done:
261     g_free(buf);
262     qobject_unref(keypairs);
263 }
264 
qemu_rbd_set_auth(rados_t cluster,BlockdevOptionsRbd * opts,Error ** errp)265 static int qemu_rbd_set_auth(rados_t cluster, BlockdevOptionsRbd *opts,
266                              Error **errp)
267 {
268     char *key, *acr;
269     int r;
270     GString *accu;
271     RbdAuthModeList *auth;
272 
273     if (opts->key_secret) {
274         key = qcrypto_secret_lookup_as_base64(opts->key_secret, errp);
275         if (!key) {
276             return -EIO;
277         }
278         r = rados_conf_set(cluster, "key", key);
279         g_free(key);
280         if (r < 0) {
281             error_setg_errno(errp, -r, "Could not set 'key'");
282             return r;
283         }
284     }
285 
286     if (opts->has_auth_client_required) {
287         accu = g_string_new("");
288         for (auth = opts->auth_client_required; auth; auth = auth->next) {
289             if (accu->str[0]) {
290                 g_string_append_c(accu, ';');
291             }
292             g_string_append(accu, RbdAuthMode_str(auth->value));
293         }
294         acr = g_string_free(accu, FALSE);
295         r = rados_conf_set(cluster, "auth_client_required", acr);
296         g_free(acr);
297         if (r < 0) {
298             error_setg_errno(errp, -r,
299                              "Could not set 'auth_client_required'");
300             return r;
301         }
302     }
303 
304     return 0;
305 }
306 
qemu_rbd_set_keypairs(rados_t cluster,const char * keypairs_json,Error ** errp)307 static int qemu_rbd_set_keypairs(rados_t cluster, const char *keypairs_json,
308                                  Error **errp)
309 {
310     QList *keypairs;
311     QString *name;
312     QString *value;
313     const char *key;
314     size_t remaining;
315     int ret = 0;
316 
317     if (!keypairs_json) {
318         return ret;
319     }
320     keypairs = qobject_to(QList,
321                           qobject_from_json(keypairs_json, &error_abort));
322     remaining = qlist_size(keypairs) / 2;
323     assert(remaining);
324 
325     while (remaining--) {
326         name = qobject_to(QString, qlist_pop(keypairs));
327         value = qobject_to(QString, qlist_pop(keypairs));
328         assert(name && value);
329         key = qstring_get_str(name);
330 
331         ret = rados_conf_set(cluster, key, qstring_get_str(value));
332         qobject_unref(value);
333         if (ret < 0) {
334             error_setg_errno(errp, -ret, "invalid conf option %s", key);
335             qobject_unref(name);
336             ret = -EINVAL;
337             break;
338         }
339         qobject_unref(name);
340     }
341 
342     qobject_unref(keypairs);
343     return ret;
344 }
345 
346 #ifdef LIBRBD_SUPPORTS_ENCRYPTION
qemu_rbd_convert_luks_options(RbdEncryptionOptionsLUKSBase * luks_opts,char ** passphrase,size_t * passphrase_len,Error ** errp)347 static int qemu_rbd_convert_luks_options(
348         RbdEncryptionOptionsLUKSBase *luks_opts,
349         char **passphrase,
350         size_t *passphrase_len,
351         Error **errp)
352 {
353     return qcrypto_secret_lookup(luks_opts->key_secret, (uint8_t **)passphrase,
354                                  passphrase_len, errp);
355 }
356 
qemu_rbd_convert_luks_create_options(RbdEncryptionCreateOptionsLUKSBase * luks_opts,rbd_encryption_algorithm_t * alg,char ** passphrase,size_t * passphrase_len,Error ** errp)357 static int qemu_rbd_convert_luks_create_options(
358         RbdEncryptionCreateOptionsLUKSBase *luks_opts,
359         rbd_encryption_algorithm_t *alg,
360         char **passphrase,
361         size_t *passphrase_len,
362         Error **errp)
363 {
364     int r = 0;
365 
366     r = qemu_rbd_convert_luks_options(
367             qapi_RbdEncryptionCreateOptionsLUKSBase_base(luks_opts),
368             passphrase, passphrase_len, errp);
369     if (r < 0) {
370         return r;
371     }
372 
373     if (luks_opts->has_cipher_alg) {
374         switch (luks_opts->cipher_alg) {
375             case QCRYPTO_CIPHER_ALGO_AES_128: {
376                 *alg = RBD_ENCRYPTION_ALGORITHM_AES128;
377                 break;
378             }
379             case QCRYPTO_CIPHER_ALGO_AES_256: {
380                 *alg = RBD_ENCRYPTION_ALGORITHM_AES256;
381                 break;
382             }
383             default: {
384                 r = -ENOTSUP;
385                 error_setg_errno(errp, -r, "unknown encryption algorithm: %u",
386                                  luks_opts->cipher_alg);
387                 return r;
388             }
389         }
390     } else {
391         /* default alg */
392         *alg = RBD_ENCRYPTION_ALGORITHM_AES256;
393     }
394 
395     return 0;
396 }
397 
qemu_rbd_encryption_format(rbd_image_t image,RbdEncryptionCreateOptions * encrypt,Error ** errp)398 static int qemu_rbd_encryption_format(rbd_image_t image,
399                                       RbdEncryptionCreateOptions *encrypt,
400                                       Error **errp)
401 {
402     int r = 0;
403     g_autofree char *passphrase = NULL;
404     rbd_encryption_format_t format;
405     rbd_encryption_options_t opts;
406     rbd_encryption_luks1_format_options_t luks_opts;
407     rbd_encryption_luks2_format_options_t luks2_opts;
408     size_t opts_size;
409     uint64_t raw_size, effective_size;
410 
411     r = rbd_get_size(image, &raw_size);
412     if (r < 0) {
413         error_setg_errno(errp, -r, "cannot get raw image size");
414         return r;
415     }
416 
417     switch (encrypt->format) {
418         case RBD_IMAGE_ENCRYPTION_FORMAT_LUKS: {
419             memset(&luks_opts, 0, sizeof(luks_opts));
420             format = RBD_ENCRYPTION_FORMAT_LUKS1;
421             opts = &luks_opts;
422             opts_size = sizeof(luks_opts);
423             r = qemu_rbd_convert_luks_create_options(
424                     qapi_RbdEncryptionCreateOptionsLUKS_base(&encrypt->u.luks),
425                     &luks_opts.alg, &passphrase, &luks_opts.passphrase_size,
426                     errp);
427             if (r < 0) {
428                 return r;
429             }
430             luks_opts.passphrase = passphrase;
431             break;
432         }
433         case RBD_IMAGE_ENCRYPTION_FORMAT_LUKS2: {
434             memset(&luks2_opts, 0, sizeof(luks2_opts));
435             format = RBD_ENCRYPTION_FORMAT_LUKS2;
436             opts = &luks2_opts;
437             opts_size = sizeof(luks2_opts);
438             r = qemu_rbd_convert_luks_create_options(
439                     qapi_RbdEncryptionCreateOptionsLUKS2_base(
440                             &encrypt->u.luks2),
441                     &luks2_opts.alg, &passphrase, &luks2_opts.passphrase_size,
442                     errp);
443             if (r < 0) {
444                 return r;
445             }
446             luks2_opts.passphrase = passphrase;
447             break;
448         }
449         default: {
450             r = -ENOTSUP;
451             error_setg_errno(
452                     errp, -r, "unknown image encryption format: %u",
453                     encrypt->format);
454             return r;
455         }
456     }
457 
458     r = rbd_encryption_format(image, format, opts, opts_size);
459     if (r < 0) {
460         error_setg_errno(errp, -r, "encryption format fail");
461         return r;
462     }
463 
464     r = rbd_get_size(image, &effective_size);
465     if (r < 0) {
466         error_setg_errno(errp, -r, "cannot get effective image size");
467         return r;
468     }
469 
470     r = rbd_resize(image, raw_size + (raw_size - effective_size));
471     if (r < 0) {
472         error_setg_errno(errp, -r, "cannot resize image after format");
473         return r;
474     }
475 
476     return 0;
477 }
478 
qemu_rbd_encryption_load(BlockDriverState * bs,rbd_image_t image,RbdEncryptionOptions * encrypt,Error ** errp)479 static int qemu_rbd_encryption_load(BlockDriverState *bs,
480                                     rbd_image_t image,
481                                     RbdEncryptionOptions *encrypt,
482                                     Error **errp)
483 {
484     BDRVRBDState *s = bs->opaque;
485     int r = 0;
486     g_autofree char *passphrase = NULL;
487     rbd_encryption_luks1_format_options_t luks_opts;
488     rbd_encryption_luks2_format_options_t luks2_opts;
489 #ifdef LIBRBD_SUPPORTS_ENCRYPTION_LOAD2
490     rbd_encryption_luks_format_options_t luks_any_opts;
491 #endif
492     rbd_encryption_format_t format;
493     rbd_encryption_options_t opts;
494     size_t opts_size;
495 
496     switch (encrypt->format) {
497         case RBD_IMAGE_ENCRYPTION_FORMAT_LUKS: {
498             memset(&luks_opts, 0, sizeof(luks_opts));
499             format = RBD_ENCRYPTION_FORMAT_LUKS1;
500             opts = &luks_opts;
501             opts_size = sizeof(luks_opts);
502             r = qemu_rbd_convert_luks_options(
503                     qapi_RbdEncryptionOptionsLUKS_base(&encrypt->u.luks),
504                     &passphrase, &luks_opts.passphrase_size, errp);
505             if (r < 0) {
506                 return r;
507             }
508             luks_opts.passphrase = passphrase;
509             break;
510         }
511         case RBD_IMAGE_ENCRYPTION_FORMAT_LUKS2: {
512             memset(&luks2_opts, 0, sizeof(luks2_opts));
513             format = RBD_ENCRYPTION_FORMAT_LUKS2;
514             opts = &luks2_opts;
515             opts_size = sizeof(luks2_opts);
516             r = qemu_rbd_convert_luks_options(
517                     qapi_RbdEncryptionOptionsLUKS2_base(&encrypt->u.luks2),
518                     &passphrase, &luks2_opts.passphrase_size, errp);
519             if (r < 0) {
520                 return r;
521             }
522             luks2_opts.passphrase = passphrase;
523             break;
524         }
525 #ifdef LIBRBD_SUPPORTS_ENCRYPTION_LOAD2
526         case RBD_IMAGE_ENCRYPTION_FORMAT_LUKS_ANY: {
527             memset(&luks_any_opts, 0, sizeof(luks_any_opts));
528             format = RBD_ENCRYPTION_FORMAT_LUKS;
529             opts = &luks_any_opts;
530             opts_size = sizeof(luks_any_opts);
531             r = qemu_rbd_convert_luks_options(
532                     qapi_RbdEncryptionOptionsLUKSAny_base(&encrypt->u.luks_any),
533                     &passphrase, &luks_any_opts.passphrase_size, errp);
534             if (r < 0) {
535                 return r;
536             }
537             luks_any_opts.passphrase = passphrase;
538             break;
539         }
540 #endif
541         default: {
542             r = -ENOTSUP;
543             error_setg_errno(
544                     errp, -r, "unknown image encryption format: %u",
545                     encrypt->format);
546             return r;
547         }
548     }
549 
550     r = rbd_encryption_load(image, format, opts, opts_size);
551     if (r < 0) {
552         error_setg_errno(errp, -r, "encryption load fail");
553         return r;
554     }
555     bs->encrypted = true;
556     s->encryption_format = encrypt->format;
557 
558     return 0;
559 }
560 
561 #ifdef LIBRBD_SUPPORTS_ENCRYPTION_LOAD2
qemu_rbd_encryption_load2(BlockDriverState * bs,rbd_image_t image,RbdEncryptionOptions * encrypt,Error ** errp)562 static int qemu_rbd_encryption_load2(BlockDriverState *bs,
563                                      rbd_image_t image,
564                                      RbdEncryptionOptions *encrypt,
565                                      Error **errp)
566 {
567     BDRVRBDState *s = bs->opaque;
568     int r = 0;
569     int encrypt_count = 1;
570     int i;
571     RbdEncryptionOptions *curr_encrypt;
572     rbd_encryption_spec_t *specs;
573     rbd_encryption_luks1_format_options_t *luks_opts;
574     rbd_encryption_luks2_format_options_t *luks2_opts;
575     rbd_encryption_luks_format_options_t *luks_any_opts;
576 
577     /* count encryption options */
578     for (curr_encrypt = encrypt->parent; curr_encrypt;
579          curr_encrypt = curr_encrypt->parent) {
580         ++encrypt_count;
581     }
582 
583     specs = g_new0(rbd_encryption_spec_t, encrypt_count);
584 
585     curr_encrypt = encrypt;
586     for (i = 0; i < encrypt_count; ++i) {
587         switch (curr_encrypt->format) {
588             case RBD_IMAGE_ENCRYPTION_FORMAT_LUKS: {
589                 specs[i].format = RBD_ENCRYPTION_FORMAT_LUKS1;
590 
591                 luks_opts = g_new0(rbd_encryption_luks1_format_options_t, 1);
592                 specs[i].opts = luks_opts;
593                 specs[i].opts_size = sizeof(*luks_opts);
594 
595                 r = qemu_rbd_convert_luks_options(
596                         qapi_RbdEncryptionOptionsLUKS_base(
597                                 &curr_encrypt->u.luks),
598                         (char **)&luks_opts->passphrase,
599                         &luks_opts->passphrase_size,
600                         errp);
601                 break;
602             }
603             case RBD_IMAGE_ENCRYPTION_FORMAT_LUKS2: {
604                 specs[i].format = RBD_ENCRYPTION_FORMAT_LUKS2;
605 
606                 luks2_opts = g_new0(rbd_encryption_luks2_format_options_t, 1);
607                 specs[i].opts = luks2_opts;
608                 specs[i].opts_size = sizeof(*luks2_opts);
609 
610                 r = qemu_rbd_convert_luks_options(
611                         qapi_RbdEncryptionOptionsLUKS2_base(
612                                 &curr_encrypt->u.luks2),
613                         (char **)&luks2_opts->passphrase,
614                         &luks2_opts->passphrase_size,
615                         errp);
616                 break;
617             }
618             case RBD_IMAGE_ENCRYPTION_FORMAT_LUKS_ANY: {
619                 specs[i].format = RBD_ENCRYPTION_FORMAT_LUKS;
620 
621                 luks_any_opts = g_new0(rbd_encryption_luks_format_options_t, 1);
622                 specs[i].opts = luks_any_opts;
623                 specs[i].opts_size = sizeof(*luks_any_opts);
624 
625                 r = qemu_rbd_convert_luks_options(
626                         qapi_RbdEncryptionOptionsLUKSAny_base(
627                                 &curr_encrypt->u.luks_any),
628                         (char **)&luks_any_opts->passphrase,
629                         &luks_any_opts->passphrase_size,
630                         errp);
631                 break;
632             }
633             default: {
634                 r = -ENOTSUP;
635                 error_setg_errno(
636                         errp, -r, "unknown image encryption format: %u",
637                         curr_encrypt->format);
638             }
639         }
640 
641         if (r < 0) {
642             goto exit;
643         }
644 
645         curr_encrypt = curr_encrypt->parent;
646     }
647 
648     r = rbd_encryption_load2(image, specs, encrypt_count);
649     if (r < 0) {
650         error_setg_errno(errp, -r, "layered encryption load fail");
651         goto exit;
652     }
653     bs->encrypted = true;
654     s->encryption_format = encrypt->format;
655 
656 exit:
657     for (i = 0; i < encrypt_count; ++i) {
658         if (!specs[i].opts) {
659             break;
660         }
661 
662         switch (specs[i].format) {
663             case RBD_ENCRYPTION_FORMAT_LUKS1: {
664                 luks_opts = specs[i].opts;
665                 g_free((void *)luks_opts->passphrase);
666                 break;
667             }
668             case RBD_ENCRYPTION_FORMAT_LUKS2: {
669                 luks2_opts = specs[i].opts;
670                 g_free((void *)luks2_opts->passphrase);
671                 break;
672             }
673             case RBD_ENCRYPTION_FORMAT_LUKS: {
674                 luks_any_opts = specs[i].opts;
675                 g_free((void *)luks_any_opts->passphrase);
676                 break;
677             }
678         }
679 
680         g_free(specs[i].opts);
681     }
682     g_free(specs);
683     return r;
684 }
685 #endif
686 #endif
687 
688 /*
689  * For an image without encryption enabled on the rbd layer, probe the start of
690  * the image if it could be opened as an encrypted image so that we can display
691  * it when the user queries the node (most importantly in qemu-img).
692  *
693  * If the guest writes an encryption header to its disk after this probing, this
694  * won't be reflected when queried, but that's okay. There is no reason why the
695  * user should want to apply encryption at the rbd level while the image is
696  * still in use. This is just guest data.
697  */
qemu_rbd_encryption_probe(BlockDriverState * bs)698 static void qemu_rbd_encryption_probe(BlockDriverState *bs)
699 {
700     BDRVRBDState *s = bs->opaque;
701     char buf[RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN] = {0};
702     int r;
703 
704     assert(s->encryption_format == RBD_IMAGE_ENCRYPTION_FORMAT__MAX);
705 
706     r = rbd_read(s->image, 0,
707                  RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN, buf);
708     if (r < RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN) {
709         return;
710     }
711 
712     if (memcmp(buf, rbd_luks_header_verification,
713                RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN) == 0) {
714         s->encryption_format = RBD_IMAGE_ENCRYPTION_FORMAT_LUKS;
715     } else if (memcmp(buf, rbd_luks2_header_verification,
716                RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN) == 0) {
717         s->encryption_format = RBD_IMAGE_ENCRYPTION_FORMAT_LUKS2;
718     } else if (memcmp(buf, rbd_layered_luks_header_verification,
719                RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN) == 0) {
720         s->encryption_format = RBD_IMAGE_ENCRYPTION_FORMAT_LUKS;
721     } else if (memcmp(buf, rbd_layered_luks2_header_verification,
722                RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN) == 0) {
723         s->encryption_format = RBD_IMAGE_ENCRYPTION_FORMAT_LUKS2;
724     }
725 }
726 
727 /* FIXME Deprecate and remove keypairs or make it available in QMP. */
qemu_rbd_do_create(BlockdevCreateOptions * options,const char * keypairs,const char * password_secret,Error ** errp)728 static int qemu_rbd_do_create(BlockdevCreateOptions *options,
729                               const char *keypairs, const char *password_secret,
730                               Error **errp)
731 {
732     BlockdevCreateOptionsRbd *opts = &options->u.rbd;
733     rados_t cluster;
734     rados_ioctx_t io_ctx;
735     int obj_order = 0;
736     int ret;
737 
738     assert(options->driver == BLOCKDEV_DRIVER_RBD);
739     if (opts->location->snapshot) {
740         error_setg(errp, "Can't use snapshot name for image creation");
741         return -EINVAL;
742     }
743 
744 #ifndef LIBRBD_SUPPORTS_ENCRYPTION
745     if (opts->encrypt) {
746         error_setg(errp, "RBD library does not support image encryption");
747         return -ENOTSUP;
748     }
749 #endif
750 
751     if (opts->has_cluster_size) {
752         int64_t objsize = opts->cluster_size;
753         if ((objsize - 1) & objsize) {    /* not a power of 2? */
754             error_setg(errp, "obj size needs to be power of 2");
755             return -EINVAL;
756         }
757         if (objsize < 4096) {
758             error_setg(errp, "obj size too small");
759             return -EINVAL;
760         }
761         obj_order = ctz32(objsize);
762     }
763 
764     ret = qemu_rbd_connect(&cluster, &io_ctx, opts->location, false, keypairs,
765                            password_secret, errp);
766     if (ret < 0) {
767         return ret;
768     }
769 
770     ret = rbd_create(io_ctx, opts->location->image, opts->size, &obj_order);
771     if (ret < 0) {
772         error_setg_errno(errp, -ret, "error rbd create");
773         goto out;
774     }
775 
776 #ifdef LIBRBD_SUPPORTS_ENCRYPTION
777     if (opts->encrypt) {
778         rbd_image_t image;
779 
780         ret = rbd_open(io_ctx, opts->location->image, &image, NULL);
781         if (ret < 0) {
782             error_setg_errno(errp, -ret,
783                              "error opening image '%s' for encryption format",
784                              opts->location->image);
785             goto out;
786         }
787 
788         ret = qemu_rbd_encryption_format(image, opts->encrypt, errp);
789         rbd_close(image);
790         if (ret < 0) {
791             /* encryption format fail, try removing the image */
792             rbd_remove(io_ctx, opts->location->image);
793             goto out;
794         }
795     }
796 #endif
797 
798     ret = 0;
799 out:
800     rados_ioctx_destroy(io_ctx);
801     rados_shutdown(cluster);
802     return ret;
803 }
804 
qemu_rbd_co_create(BlockdevCreateOptions * options,Error ** errp)805 static int qemu_rbd_co_create(BlockdevCreateOptions *options, Error **errp)
806 {
807     return qemu_rbd_do_create(options, NULL, NULL, errp);
808 }
809 
qemu_rbd_extract_encryption_create_options(QemuOpts * opts,RbdEncryptionCreateOptions ** spec,Error ** errp)810 static int qemu_rbd_extract_encryption_create_options(
811         QemuOpts *opts,
812         RbdEncryptionCreateOptions **spec,
813         Error **errp)
814 {
815     QDict *opts_qdict;
816     QDict *encrypt_qdict;
817     Visitor *v;
818     int ret = 0;
819 
820     opts_qdict = qemu_opts_to_qdict(opts, NULL);
821     qdict_extract_subqdict(opts_qdict, &encrypt_qdict, "encrypt.");
822     qobject_unref(opts_qdict);
823     if (!qdict_size(encrypt_qdict)) {
824         *spec = NULL;
825         goto exit;
826     }
827 
828     /* Convert options into a QAPI object */
829     v = qobject_input_visitor_new_flat_confused(encrypt_qdict, errp);
830     if (!v) {
831         ret = -EINVAL;
832         goto exit;
833     }
834 
835     visit_type_RbdEncryptionCreateOptions(v, NULL, spec, errp);
836     visit_free(v);
837     if (!*spec) {
838         ret = -EINVAL;
839         goto exit;
840     }
841 
842 exit:
843     qobject_unref(encrypt_qdict);
844     return ret;
845 }
846 
qemu_rbd_co_create_opts(BlockDriver * drv,const char * filename,QemuOpts * opts,Error ** errp)847 static int coroutine_fn qemu_rbd_co_create_opts(BlockDriver *drv,
848                                                 const char *filename,
849                                                 QemuOpts *opts,
850                                                 Error **errp)
851 {
852     BlockdevCreateOptions *create_options;
853     BlockdevCreateOptionsRbd *rbd_opts;
854     BlockdevOptionsRbd *loc;
855     RbdEncryptionCreateOptions *encrypt = NULL;
856     Error *local_err = NULL;
857     const char *keypairs, *password_secret;
858     QDict *options = NULL;
859     int ret = 0;
860 
861     create_options = g_new0(BlockdevCreateOptions, 1);
862     create_options->driver = BLOCKDEV_DRIVER_RBD;
863     rbd_opts = &create_options->u.rbd;
864 
865     rbd_opts->location = g_new0(BlockdevOptionsRbd, 1);
866 
867     password_secret = qemu_opt_get(opts, "password-secret");
868 
869     /* Read out options */
870     rbd_opts->size = ROUND_UP(qemu_opt_get_size_del(opts, BLOCK_OPT_SIZE, 0),
871                               BDRV_SECTOR_SIZE);
872     rbd_opts->cluster_size = qemu_opt_get_size_del(opts,
873                                                    BLOCK_OPT_CLUSTER_SIZE, 0);
874     rbd_opts->has_cluster_size = (rbd_opts->cluster_size != 0);
875 
876     options = qdict_new();
877     qemu_rbd_parse_filename(filename, options, &local_err);
878     if (local_err) {
879         ret = -EINVAL;
880         error_propagate(errp, local_err);
881         goto exit;
882     }
883 
884     ret = qemu_rbd_extract_encryption_create_options(opts, &encrypt, errp);
885     if (ret < 0) {
886         goto exit;
887     }
888     rbd_opts->encrypt     = encrypt;
889 
890     /*
891      * Caution: while qdict_get_try_str() is fine, getting non-string
892      * types would require more care.  When @options come from -blockdev
893      * or blockdev_add, its members are typed according to the QAPI
894      * schema, but when they come from -drive, they're all QString.
895      */
896     loc = rbd_opts->location;
897     loc->pool        = g_strdup(qdict_get_try_str(options, "pool"));
898     loc->conf        = g_strdup(qdict_get_try_str(options, "conf"));
899     loc->user        = g_strdup(qdict_get_try_str(options, "user"));
900     loc->q_namespace = g_strdup(qdict_get_try_str(options, "namespace"));
901     loc->image       = g_strdup(qdict_get_try_str(options, "image"));
902     keypairs         = qdict_get_try_str(options, "=keyvalue-pairs");
903 
904     ret = qemu_rbd_do_create(create_options, keypairs, password_secret, errp);
905     if (ret < 0) {
906         goto exit;
907     }
908 
909 exit:
910     qobject_unref(options);
911     qapi_free_BlockdevCreateOptions(create_options);
912     return ret;
913 }
914 
qemu_rbd_mon_host(BlockdevOptionsRbd * opts,Error ** errp)915 static char *qemu_rbd_mon_host(BlockdevOptionsRbd *opts, Error **errp)
916 {
917     const char **vals;
918     const char *host, *port;
919     char *rados_str;
920     InetSocketAddressBaseList *p;
921     int i, cnt;
922 
923     if (!opts->has_server) {
924         return NULL;
925     }
926 
927     for (cnt = 0, p = opts->server; p; p = p->next) {
928         cnt++;
929     }
930 
931     vals = g_new(const char *, cnt + 1);
932 
933     for (i = 0, p = opts->server; p; p = p->next, i++) {
934         host = p->value->host;
935         port = p->value->port;
936 
937         if (strchr(host, ':')) {
938             vals[i] = g_strdup_printf("[%s]:%s", host, port);
939         } else {
940             vals[i] = g_strdup_printf("%s:%s", host, port);
941         }
942     }
943     vals[i] = NULL;
944 
945     rados_str = i ? g_strjoinv(";", (char **)vals) : NULL;
946     g_strfreev((char **)vals);
947     return rados_str;
948 }
949 
qemu_rbd_connect(rados_t * cluster,rados_ioctx_t * io_ctx,BlockdevOptionsRbd * opts,bool cache,const char * keypairs,const char * secretid,Error ** errp)950 static int qemu_rbd_connect(rados_t *cluster, rados_ioctx_t *io_ctx,
951                             BlockdevOptionsRbd *opts, bool cache,
952                             const char *keypairs, const char *secretid,
953                             Error **errp)
954 {
955     char *mon_host = NULL;
956     Error *local_err = NULL;
957     int r;
958 
959     if (secretid) {
960         if (opts->key_secret) {
961             error_setg(errp,
962                        "Legacy 'password-secret' clashes with 'key-secret'");
963             return -EINVAL;
964         }
965         opts->key_secret = g_strdup(secretid);
966     }
967 
968     mon_host = qemu_rbd_mon_host(opts, &local_err);
969     if (local_err) {
970         error_propagate(errp, local_err);
971         r = -EINVAL;
972         goto out;
973     }
974 
975     r = rados_create(cluster, opts->user);
976     if (r < 0) {
977         error_setg_errno(errp, -r, "error initializing");
978         goto out;
979     }
980 
981     /* try default location when conf=NULL, but ignore failure */
982     r = rados_conf_read_file(*cluster, opts->conf);
983     if (opts->conf && r < 0) {
984         error_setg_errno(errp, -r, "error reading conf file %s", opts->conf);
985         goto failed_shutdown;
986     }
987 
988     r = qemu_rbd_set_keypairs(*cluster, keypairs, errp);
989     if (r < 0) {
990         goto failed_shutdown;
991     }
992 
993     if (mon_host) {
994         r = rados_conf_set(*cluster, "mon_host", mon_host);
995         if (r < 0) {
996             goto failed_shutdown;
997         }
998     }
999 
1000     r = qemu_rbd_set_auth(*cluster, opts, errp);
1001     if (r < 0) {
1002         goto failed_shutdown;
1003     }
1004 
1005     /*
1006      * Fallback to more conservative semantics if setting cache
1007      * options fails. Ignore errors from setting rbd_cache because the
1008      * only possible error is that the option does not exist, and
1009      * librbd defaults to no caching. If write through caching cannot
1010      * be set up, fall back to no caching.
1011      */
1012     if (cache) {
1013         rados_conf_set(*cluster, "rbd_cache", "true");
1014     } else {
1015         rados_conf_set(*cluster, "rbd_cache", "false");
1016     }
1017 
1018     r = rados_connect(*cluster);
1019     if (r < 0) {
1020         error_setg_errno(errp, -r, "error connecting");
1021         goto failed_shutdown;
1022     }
1023 
1024     r = rados_ioctx_create(*cluster, opts->pool, io_ctx);
1025     if (r < 0) {
1026         error_setg_errno(errp, -r, "error opening pool %s", opts->pool);
1027         goto failed_shutdown;
1028     }
1029 
1030 #ifdef HAVE_RBD_NAMESPACE_EXISTS
1031     if (opts->q_namespace && strlen(opts->q_namespace) > 0) {
1032         bool exists;
1033 
1034         r = rbd_namespace_exists(*io_ctx, opts->q_namespace, &exists);
1035         if (r < 0) {
1036             error_setg_errno(errp, -r, "error checking namespace");
1037             goto failed_ioctx_destroy;
1038         }
1039 
1040         if (!exists) {
1041             error_setg(errp, "namespace '%s' does not exist",
1042                        opts->q_namespace);
1043             r = -ENOENT;
1044             goto failed_ioctx_destroy;
1045         }
1046     }
1047 #endif
1048 
1049     /*
1050      * Set the namespace after opening the io context on the pool,
1051      * if nspace == NULL or if nspace == "", it is just as we did nothing
1052      */
1053     rados_ioctx_set_namespace(*io_ctx, opts->q_namespace);
1054 
1055     r = 0;
1056     goto out;
1057 
1058 #ifdef HAVE_RBD_NAMESPACE_EXISTS
1059 failed_ioctx_destroy:
1060     rados_ioctx_destroy(*io_ctx);
1061 #endif
1062 failed_shutdown:
1063     rados_shutdown(*cluster);
1064 out:
1065     g_free(mon_host);
1066     return r;
1067 }
1068 
qemu_rbd_convert_options(QDict * options,BlockdevOptionsRbd ** opts,Error ** errp)1069 static int qemu_rbd_convert_options(QDict *options, BlockdevOptionsRbd **opts,
1070                                     Error **errp)
1071 {
1072     Visitor *v;
1073 
1074     /* Convert the remaining options into a QAPI object */
1075     v = qobject_input_visitor_new_flat_confused(options, errp);
1076     if (!v) {
1077         return -EINVAL;
1078     }
1079 
1080     visit_type_BlockdevOptionsRbd(v, NULL, opts, errp);
1081     visit_free(v);
1082     if (!opts) {
1083         return -EINVAL;
1084     }
1085 
1086     return 0;
1087 }
1088 
qemu_rbd_attempt_legacy_options(QDict * options,BlockdevOptionsRbd ** opts,char ** keypairs)1089 static int qemu_rbd_attempt_legacy_options(QDict *options,
1090                                            BlockdevOptionsRbd **opts,
1091                                            char **keypairs)
1092 {
1093     char *filename;
1094     int r;
1095 
1096     filename = g_strdup(qdict_get_try_str(options, "filename"));
1097     if (!filename) {
1098         return -EINVAL;
1099     }
1100     qdict_del(options, "filename");
1101 
1102     qemu_rbd_parse_filename(filename, options, NULL);
1103 
1104     /* keypairs freed by caller */
1105     *keypairs = g_strdup(qdict_get_try_str(options, "=keyvalue-pairs"));
1106     if (*keypairs) {
1107         qdict_del(options, "=keyvalue-pairs");
1108     }
1109 
1110     r = qemu_rbd_convert_options(options, opts, NULL);
1111 
1112     g_free(filename);
1113     return r;
1114 }
1115 
qemu_rbd_open(BlockDriverState * bs,QDict * options,int flags,Error ** errp)1116 static int qemu_rbd_open(BlockDriverState *bs, QDict *options, int flags,
1117                          Error **errp)
1118 {
1119     BDRVRBDState *s = bs->opaque;
1120     BlockdevOptionsRbd *opts = NULL;
1121     const QDictEntry *e;
1122     Error *local_err = NULL;
1123     char *keypairs, *secretid;
1124     rbd_image_info_t info;
1125     int r;
1126 
1127     keypairs = g_strdup(qdict_get_try_str(options, "=keyvalue-pairs"));
1128     if (keypairs) {
1129         qdict_del(options, "=keyvalue-pairs");
1130     }
1131 
1132     secretid = g_strdup(qdict_get_try_str(options, "password-secret"));
1133     if (secretid) {
1134         qdict_del(options, "password-secret");
1135     }
1136 
1137     r = qemu_rbd_convert_options(options, &opts, &local_err);
1138     if (local_err) {
1139         /* If keypairs are present, that means some options are present in
1140          * the modern option format.  Don't attempt to parse legacy option
1141          * formats, as we won't support mixed usage. */
1142         if (keypairs) {
1143             error_propagate(errp, local_err);
1144             goto out;
1145         }
1146 
1147         /* If the initial attempt to convert and process the options failed,
1148          * we may be attempting to open an image file that has the rbd options
1149          * specified in the older format consisting of all key/value pairs
1150          * encoded in the filename.  Go ahead and attempt to parse the
1151          * filename, and see if we can pull out the required options. */
1152         r = qemu_rbd_attempt_legacy_options(options, &opts, &keypairs);
1153         if (r < 0) {
1154             /* Propagate the original error, not the legacy parsing fallback
1155              * error, as the latter was just a best-effort attempt. */
1156             error_propagate(errp, local_err);
1157             goto out;
1158         }
1159         /* Take care whenever deciding to actually deprecate; once this ability
1160          * is removed, we will not be able to open any images with legacy-styled
1161          * backing image strings. */
1162         warn_report("RBD options encoded in the filename as keyvalue pairs "
1163                     "is deprecated");
1164     }
1165 
1166     /* Remove the processed options from the QDict (the visitor processes
1167      * _all_ options in the QDict) */
1168     while ((e = qdict_first(options))) {
1169         qdict_del(options, e->key);
1170     }
1171 
1172     r = qemu_rbd_connect(&s->cluster, &s->io_ctx, opts,
1173                          !(flags & BDRV_O_NOCACHE), keypairs, secretid, errp);
1174     if (r < 0) {
1175         goto out;
1176     }
1177 
1178     s->snap = g_strdup(opts->snapshot);
1179     s->image_name = g_strdup(opts->image);
1180 
1181     /* rbd_open is always r/w */
1182     r = rbd_open(s->io_ctx, s->image_name, &s->image, s->snap);
1183     if (r < 0) {
1184         error_setg_errno(errp, -r, "error reading header from %s",
1185                          s->image_name);
1186         goto failed_open;
1187     }
1188 
1189     s->encryption_format = RBD_IMAGE_ENCRYPTION_FORMAT__MAX;
1190     if (opts->encrypt) {
1191 #ifdef LIBRBD_SUPPORTS_ENCRYPTION
1192         if (opts->encrypt->parent) {
1193 #ifdef LIBRBD_SUPPORTS_ENCRYPTION_LOAD2
1194             r = qemu_rbd_encryption_load2(bs, s->image, opts->encrypt, errp);
1195 #else
1196             r = -ENOTSUP;
1197             error_setg(errp, "RBD library does not support layered encryption");
1198 #endif
1199         } else {
1200             r = qemu_rbd_encryption_load(bs, s->image, opts->encrypt, errp);
1201         }
1202         if (r < 0) {
1203             goto failed_post_open;
1204         }
1205 #else
1206         r = -ENOTSUP;
1207         error_setg(errp, "RBD library does not support image encryption");
1208         goto failed_post_open;
1209 #endif
1210     } else {
1211         qemu_rbd_encryption_probe(bs);
1212     }
1213 
1214     r = rbd_stat(s->image, &info, sizeof(info));
1215     if (r < 0) {
1216         error_setg_errno(errp, -r, "error getting image info from %s",
1217                          s->image_name);
1218         goto failed_post_open;
1219     }
1220     s->image_size = info.size;
1221     s->object_size = info.obj_size;
1222 
1223     /* If we are using an rbd snapshot, we must be r/o, otherwise
1224      * leave as-is */
1225     if (s->snap != NULL) {
1226         bdrv_graph_rdlock_main_loop();
1227         r = bdrv_apply_auto_read_only(bs, "rbd snapshots are read-only", errp);
1228         bdrv_graph_rdunlock_main_loop();
1229         if (r < 0) {
1230             goto failed_post_open;
1231         }
1232     }
1233 
1234 #ifdef LIBRBD_SUPPORTS_WRITE_ZEROES
1235     bs->supported_zero_flags = BDRV_REQ_MAY_UNMAP | BDRV_REQ_NO_FALLBACK;
1236 #endif
1237 
1238     /* When extending regular files, we get zeros from the OS */
1239     bs->supported_truncate_flags = BDRV_REQ_ZERO_WRITE;
1240 
1241     r = 0;
1242     goto out;
1243 
1244 failed_post_open:
1245     rbd_close(s->image);
1246 failed_open:
1247     rados_ioctx_destroy(s->io_ctx);
1248     g_free(s->snap);
1249     g_free(s->image_name);
1250     rados_shutdown(s->cluster);
1251 out:
1252     qapi_free_BlockdevOptionsRbd(opts);
1253     g_free(keypairs);
1254     g_free(secretid);
1255     return r;
1256 }
1257 
1258 
1259 /* Since RBD is currently always opened R/W via the API,
1260  * we just need to check if we are using a snapshot or not, in
1261  * order to determine if we will allow it to be R/W */
qemu_rbd_reopen_prepare(BDRVReopenState * state,BlockReopenQueue * queue,Error ** errp)1262 static int qemu_rbd_reopen_prepare(BDRVReopenState *state,
1263                                    BlockReopenQueue *queue, Error **errp)
1264 {
1265     BDRVRBDState *s = state->bs->opaque;
1266     int ret = 0;
1267 
1268     GRAPH_RDLOCK_GUARD_MAINLOOP();
1269 
1270     if (s->snap && state->flags & BDRV_O_RDWR) {
1271         error_setg(errp,
1272                    "Cannot change node '%s' to r/w when using RBD snapshot",
1273                    bdrv_get_device_or_node_name(state->bs));
1274         ret = -EINVAL;
1275     }
1276 
1277     return ret;
1278 }
1279 
qemu_rbd_close(BlockDriverState * bs)1280 static void qemu_rbd_close(BlockDriverState *bs)
1281 {
1282     BDRVRBDState *s = bs->opaque;
1283 
1284     rbd_close(s->image);
1285     rados_ioctx_destroy(s->io_ctx);
1286     g_free(s->snap);
1287     g_free(s->image_name);
1288     rados_shutdown(s->cluster);
1289 }
1290 
1291 /* Resize the RBD image and update the 'image_size' with the current size */
qemu_rbd_resize(BlockDriverState * bs,uint64_t size)1292 static int qemu_rbd_resize(BlockDriverState *bs, uint64_t size)
1293 {
1294     BDRVRBDState *s = bs->opaque;
1295     int r;
1296 
1297     r = rbd_resize(s->image, size);
1298     if (r < 0) {
1299         return r;
1300     }
1301 
1302     s->image_size = size;
1303 
1304     return 0;
1305 }
1306 
qemu_rbd_finish_bh(void * opaque)1307 static void qemu_rbd_finish_bh(void *opaque)
1308 {
1309     RBDTask *task = opaque;
1310     aio_co_wake(task->co);
1311 }
1312 
1313 /*
1314  * This is the completion callback function for all rbd aio calls
1315  * started from qemu_rbd_start_co().
1316  *
1317  * Note: this function is being called from a non qemu thread so
1318  * we need to be careful about what we do here. Generally we only
1319  * schedule a BH, and do the rest of the io completion handling
1320  * from qemu_rbd_finish_bh() which runs in a qemu context.
1321  */
qemu_rbd_completion_cb(rbd_completion_t c,RBDTask * task)1322 static void qemu_rbd_completion_cb(rbd_completion_t c, RBDTask *task)
1323 {
1324     task->ret = rbd_aio_get_return_value(c);
1325     rbd_aio_release(c);
1326     aio_bh_schedule_oneshot(qemu_coroutine_get_aio_context(task->co),
1327                             qemu_rbd_finish_bh, task);
1328 }
1329 
qemu_rbd_start_co(BlockDriverState * bs,uint64_t offset,uint64_t bytes,QEMUIOVector * qiov,int flags,RBDAIOCmd cmd)1330 static int coroutine_fn qemu_rbd_start_co(BlockDriverState *bs,
1331                                           uint64_t offset,
1332                                           uint64_t bytes,
1333                                           QEMUIOVector *qiov,
1334                                           int flags,
1335                                           RBDAIOCmd cmd)
1336 {
1337     BDRVRBDState *s = bs->opaque;
1338     RBDTask task = { .co = qemu_coroutine_self() };
1339     rbd_completion_t c;
1340     int r;
1341 
1342     assert(!qiov || qiov->size == bytes);
1343 
1344     if (cmd == RBD_AIO_WRITE || cmd == RBD_AIO_WRITE_ZEROES) {
1345         /*
1346          * RBD APIs don't allow us to write more than actual size, so in order
1347          * to support growing images, we resize the image before write
1348          * operations that exceed the current size.
1349          */
1350         if (offset + bytes > s->image_size) {
1351             r = qemu_rbd_resize(bs, offset + bytes);
1352             if (r < 0) {
1353                 return r;
1354             }
1355         }
1356     }
1357 
1358     r = rbd_aio_create_completion(&task,
1359                                   (rbd_callback_t) qemu_rbd_completion_cb, &c);
1360     if (r < 0) {
1361         return r;
1362     }
1363 
1364     switch (cmd) {
1365     case RBD_AIO_READ:
1366         r = rbd_aio_readv(s->image, qiov->iov, qiov->niov, offset, c);
1367         break;
1368     case RBD_AIO_WRITE:
1369         r = rbd_aio_writev(s->image, qiov->iov, qiov->niov, offset, c);
1370         break;
1371     case RBD_AIO_DISCARD:
1372         r = rbd_aio_discard(s->image, offset, bytes, c);
1373         break;
1374     case RBD_AIO_FLUSH:
1375         r = rbd_aio_flush(s->image, c);
1376         break;
1377 #ifdef LIBRBD_SUPPORTS_WRITE_ZEROES
1378     case RBD_AIO_WRITE_ZEROES: {
1379         int zero_flags = 0;
1380 #ifdef RBD_WRITE_ZEROES_FLAG_THICK_PROVISION
1381         if (!(flags & BDRV_REQ_MAY_UNMAP)) {
1382             zero_flags = RBD_WRITE_ZEROES_FLAG_THICK_PROVISION;
1383         }
1384 #endif
1385         r = rbd_aio_write_zeroes(s->image, offset, bytes, c, zero_flags, 0);
1386         break;
1387     }
1388 #endif
1389     default:
1390         r = -EINVAL;
1391     }
1392 
1393     if (r < 0) {
1394         error_report("rbd request failed early: cmd %d offset %" PRIu64
1395                      " bytes %" PRIu64 " flags %d r %d (%s)", cmd, offset,
1396                      bytes, flags, r, strerror(-r));
1397         rbd_aio_release(c);
1398         return r;
1399     }
1400 
1401     /* Expect exactly a single wake from qemu_rbd_finish_bh() */
1402     qemu_coroutine_yield();
1403 
1404     if (task.ret < 0) {
1405         error_report("rbd request failed: cmd %d offset %" PRIu64 " bytes %"
1406                      PRIu64 " flags %d task.ret %" PRIi64 " (%s)", cmd, offset,
1407                      bytes, flags, task.ret, strerror(-task.ret));
1408         return task.ret;
1409     }
1410 
1411     /* zero pad short reads */
1412     if (cmd == RBD_AIO_READ && task.ret < qiov->size) {
1413         qemu_iovec_memset(qiov, task.ret, 0, qiov->size - task.ret);
1414     }
1415 
1416     return 0;
1417 }
1418 
1419 static int
qemu_rbd_co_preadv(BlockDriverState * bs,int64_t offset,int64_t bytes,QEMUIOVector * qiov,BdrvRequestFlags flags)1420 coroutine_fn qemu_rbd_co_preadv(BlockDriverState *bs, int64_t offset,
1421                                 int64_t bytes, QEMUIOVector *qiov,
1422                                 BdrvRequestFlags flags)
1423 {
1424     return qemu_rbd_start_co(bs, offset, bytes, qiov, flags, RBD_AIO_READ);
1425 }
1426 
1427 static int
qemu_rbd_co_pwritev(BlockDriverState * bs,int64_t offset,int64_t bytes,QEMUIOVector * qiov,BdrvRequestFlags flags)1428 coroutine_fn qemu_rbd_co_pwritev(BlockDriverState *bs, int64_t offset,
1429                                  int64_t bytes, QEMUIOVector *qiov,
1430                                  BdrvRequestFlags flags)
1431 {
1432     return qemu_rbd_start_co(bs, offset, bytes, qiov, flags, RBD_AIO_WRITE);
1433 }
1434 
qemu_rbd_co_flush(BlockDriverState * bs)1435 static int coroutine_fn qemu_rbd_co_flush(BlockDriverState *bs)
1436 {
1437     return qemu_rbd_start_co(bs, 0, 0, NULL, 0, RBD_AIO_FLUSH);
1438 }
1439 
qemu_rbd_co_pdiscard(BlockDriverState * bs,int64_t offset,int64_t bytes)1440 static int coroutine_fn qemu_rbd_co_pdiscard(BlockDriverState *bs,
1441                                              int64_t offset, int64_t bytes)
1442 {
1443     return qemu_rbd_start_co(bs, offset, bytes, NULL, 0, RBD_AIO_DISCARD);
1444 }
1445 
1446 #ifdef LIBRBD_SUPPORTS_WRITE_ZEROES
1447 static int
qemu_rbd_co_pwrite_zeroes(BlockDriverState * bs,int64_t offset,int64_t bytes,BdrvRequestFlags flags)1448 coroutine_fn qemu_rbd_co_pwrite_zeroes(BlockDriverState *bs, int64_t offset,
1449                                        int64_t bytes, BdrvRequestFlags flags)
1450 {
1451     return qemu_rbd_start_co(bs, offset, bytes, NULL, flags,
1452                              RBD_AIO_WRITE_ZEROES);
1453 }
1454 #endif
1455 
1456 static int coroutine_fn
qemu_rbd_co_get_info(BlockDriverState * bs,BlockDriverInfo * bdi)1457 qemu_rbd_co_get_info(BlockDriverState *bs, BlockDriverInfo *bdi)
1458 {
1459     BDRVRBDState *s = bs->opaque;
1460     bdi->cluster_size = s->object_size;
1461     return 0;
1462 }
1463 
qemu_rbd_get_specific_info(BlockDriverState * bs,Error ** errp)1464 static ImageInfoSpecific *qemu_rbd_get_specific_info(BlockDriverState *bs,
1465                                                      Error **errp)
1466 {
1467     BDRVRBDState *s = bs->opaque;
1468     ImageInfoSpecific *spec_info;
1469 
1470     spec_info = g_new(ImageInfoSpecific, 1);
1471     *spec_info = (ImageInfoSpecific){
1472         .type  = IMAGE_INFO_SPECIFIC_KIND_RBD,
1473         .u.rbd.data = g_new0(ImageInfoSpecificRbd, 1),
1474     };
1475 
1476     if (s->encryption_format == RBD_IMAGE_ENCRYPTION_FORMAT__MAX) {
1477         assert(!bs->encrypted);
1478     } else {
1479         ImageInfoSpecificRbd *rbd_info = spec_info->u.rbd.data;
1480 
1481         rbd_info->has_encryption_format = true;
1482         rbd_info->encryption_format = s->encryption_format;
1483     }
1484 
1485     return spec_info;
1486 }
1487 
1488 /*
1489  * rbd_diff_iterate2 allows to interrupt the exection by returning a negative
1490  * value in the callback routine. Choose a value that does not conflict with
1491  * an existing exitcode and return it if we want to prematurely stop the
1492  * execution because we detected a change in the allocation status.
1493  */
1494 #define QEMU_RBD_EXIT_DIFF_ITERATE2 -9000
1495 
qemu_rbd_diff_iterate_cb(uint64_t offs,size_t len,int exists,void * opaque)1496 static int qemu_rbd_diff_iterate_cb(uint64_t offs, size_t len,
1497                                     int exists, void *opaque)
1498 {
1499     RBDDiffIterateReq *req = opaque;
1500 
1501     assert(req->offs + req->bytes <= offs);
1502 
1503     /* treat a hole like an unallocated area and bail out */
1504     if (!exists) {
1505         return 0;
1506     }
1507 
1508     if (!req->exists && offs > req->offs) {
1509         /*
1510          * we started in an unallocated area and hit the first allocated
1511          * block. req->bytes must be set to the length of the unallocated area
1512          * before the allocated area. stop further processing.
1513          */
1514         req->bytes = offs - req->offs;
1515         return QEMU_RBD_EXIT_DIFF_ITERATE2;
1516     }
1517 
1518     if (req->exists && offs > req->offs + req->bytes) {
1519         /*
1520          * we started in an allocated area and jumped over an unallocated area,
1521          * req->bytes contains the length of the allocated area before the
1522          * unallocated area. stop further processing.
1523          */
1524         return QEMU_RBD_EXIT_DIFF_ITERATE2;
1525     }
1526 
1527     req->bytes += len;
1528     req->exists = true;
1529 
1530     return 0;
1531 }
1532 
qemu_rbd_co_block_status(BlockDriverState * bs,unsigned int mode,int64_t offset,int64_t bytes,int64_t * pnum,int64_t * map,BlockDriverState ** file)1533 static int coroutine_fn qemu_rbd_co_block_status(BlockDriverState *bs,
1534                                                  unsigned int mode,
1535                                                  int64_t offset, int64_t bytes,
1536                                                  int64_t *pnum, int64_t *map,
1537                                                  BlockDriverState **file)
1538 {
1539     BDRVRBDState *s = bs->opaque;
1540     int status, r;
1541     RBDDiffIterateReq req = { .offs = offset };
1542     uint64_t features, flags;
1543     uint64_t head = 0;
1544 
1545     assert(offset + bytes <= s->image_size);
1546 
1547     /* default to all sectors allocated */
1548     status = BDRV_BLOCK_DATA | BDRV_BLOCK_OFFSET_VALID;
1549     *map = offset;
1550     *file = bs;
1551     *pnum = bytes;
1552 
1553     /* check if RBD image supports fast-diff */
1554     r = rbd_get_features(s->image, &features);
1555     if (r < 0) {
1556         return status;
1557     }
1558     if (!(features & RBD_FEATURE_FAST_DIFF)) {
1559         return status;
1560     }
1561 
1562     /* check if RBD fast-diff result is valid */
1563     r = rbd_get_flags(s->image, &flags);
1564     if (r < 0) {
1565         return status;
1566     }
1567     if (flags & RBD_FLAG_FAST_DIFF_INVALID) {
1568         return status;
1569     }
1570 
1571 #if LIBRBD_VERSION_CODE < LIBRBD_VERSION(1, 17, 0)
1572     /*
1573      * librbd had a bug until early 2022 that affected all versions of ceph that
1574      * supported fast-diff. This bug results in reporting of incorrect offsets
1575      * if the offset parameter to rbd_diff_iterate2 is not object aligned.
1576      * Work around this bug by rounding down the offset to object boundaries.
1577      * This is OK because we call rbd_diff_iterate2 with whole_object = true.
1578      * However, this workaround only works for non cloned images with default
1579      * striping.
1580      *
1581      * See: https://tracker.ceph.com/issues/53784
1582      */
1583 
1584     /* check if RBD image has non-default striping enabled */
1585     if (features & RBD_FEATURE_STRIPINGV2) {
1586         return status;
1587     }
1588 
1589 #pragma GCC diagnostic push
1590 #pragma GCC diagnostic ignored "-Wdeprecated-declarations"
1591     /*
1592      * check if RBD image is a clone (= has a parent).
1593      *
1594      * rbd_get_parent_info is deprecated from Nautilus onwards, but the
1595      * replacement rbd_get_parent is not present in Luminous and Mimic.
1596      */
1597     if (rbd_get_parent_info(s->image, NULL, 0, NULL, 0, NULL, 0) != -ENOENT) {
1598         return status;
1599     }
1600 #pragma GCC diagnostic pop
1601 
1602     head = req.offs & (s->object_size - 1);
1603     req.offs -= head;
1604     bytes += head;
1605 #endif
1606 
1607     r = rbd_diff_iterate2(s->image, NULL, req.offs, bytes, true, true,
1608                           qemu_rbd_diff_iterate_cb, &req);
1609     if (r < 0 && r != QEMU_RBD_EXIT_DIFF_ITERATE2) {
1610         return status;
1611     }
1612     assert(req.bytes <= bytes);
1613     if (!req.exists) {
1614         if (r == 0) {
1615             /*
1616              * rbd_diff_iterate2 does not invoke callbacks for unallocated
1617              * areas. This here catches the case where no callback was
1618              * invoked at all (req.bytes == 0).
1619              */
1620             assert(req.bytes == 0);
1621             req.bytes = bytes;
1622         }
1623         status = BDRV_BLOCK_ZERO | BDRV_BLOCK_OFFSET_VALID;
1624     }
1625 
1626     assert(req.bytes > head);
1627     *pnum = req.bytes - head;
1628     return status;
1629 }
1630 
qemu_rbd_co_getlength(BlockDriverState * bs)1631 static int64_t coroutine_fn qemu_rbd_co_getlength(BlockDriverState *bs)
1632 {
1633     BDRVRBDState *s = bs->opaque;
1634     int r;
1635 
1636     r = rbd_get_size(s->image, &s->image_size);
1637     if (r < 0) {
1638         return r;
1639     }
1640 
1641     return s->image_size;
1642 }
1643 
qemu_rbd_co_truncate(BlockDriverState * bs,int64_t offset,bool exact,PreallocMode prealloc,BdrvRequestFlags flags,Error ** errp)1644 static int coroutine_fn qemu_rbd_co_truncate(BlockDriverState *bs,
1645                                              int64_t offset,
1646                                              bool exact,
1647                                              PreallocMode prealloc,
1648                                              BdrvRequestFlags flags,
1649                                              Error **errp)
1650 {
1651     int r;
1652 
1653     if (prealloc != PREALLOC_MODE_OFF) {
1654         error_setg(errp, "Unsupported preallocation mode '%s'",
1655                    PreallocMode_str(prealloc));
1656         return -ENOTSUP;
1657     }
1658 
1659     r = qemu_rbd_resize(bs, offset);
1660     if (r < 0) {
1661         error_setg_errno(errp, -r, "Failed to resize file");
1662         return r;
1663     }
1664 
1665     return 0;
1666 }
1667 
qemu_rbd_snap_create(BlockDriverState * bs,QEMUSnapshotInfo * sn_info)1668 static int qemu_rbd_snap_create(BlockDriverState *bs,
1669                                 QEMUSnapshotInfo *sn_info)
1670 {
1671     BDRVRBDState *s = bs->opaque;
1672     int r;
1673 
1674     if (sn_info->name[0] == '\0') {
1675         return -EINVAL; /* we need a name for rbd snapshots */
1676     }
1677 
1678     /*
1679      * rbd snapshots are using the name as the user controlled unique identifier
1680      * we can't use the rbd snapid for that purpose, as it can't be set
1681      */
1682     if (sn_info->id_str[0] != '\0' &&
1683         strcmp(sn_info->id_str, sn_info->name) != 0) {
1684         return -EINVAL;
1685     }
1686 
1687     if (strlen(sn_info->name) >= sizeof(sn_info->id_str)) {
1688         return -ERANGE;
1689     }
1690 
1691     r = rbd_snap_create(s->image, sn_info->name);
1692     if (r < 0) {
1693         error_report("failed to create snap: %s", strerror(-r));
1694         return r;
1695     }
1696 
1697     return 0;
1698 }
1699 
qemu_rbd_snap_remove(BlockDriverState * bs,const char * snapshot_id,const char * snapshot_name,Error ** errp)1700 static int qemu_rbd_snap_remove(BlockDriverState *bs,
1701                                 const char *snapshot_id,
1702                                 const char *snapshot_name,
1703                                 Error **errp)
1704 {
1705     BDRVRBDState *s = bs->opaque;
1706     int r;
1707 
1708     if (!snapshot_name) {
1709         error_setg(errp, "rbd need a valid snapshot name");
1710         return -EINVAL;
1711     }
1712 
1713     /* If snapshot_id is specified, it must be equal to name, see
1714        qemu_rbd_snap_list() */
1715     if (snapshot_id && strcmp(snapshot_id, snapshot_name)) {
1716         error_setg(errp,
1717                    "rbd do not support snapshot id, it should be NULL or "
1718                    "equal to snapshot name");
1719         return -EINVAL;
1720     }
1721 
1722     r = rbd_snap_remove(s->image, snapshot_name);
1723     if (r < 0) {
1724         error_setg_errno(errp, -r, "Failed to remove the snapshot");
1725     }
1726     return r;
1727 }
1728 
qemu_rbd_snap_rollback(BlockDriverState * bs,const char * snapshot_name)1729 static int qemu_rbd_snap_rollback(BlockDriverState *bs,
1730                                   const char *snapshot_name)
1731 {
1732     BDRVRBDState *s = bs->opaque;
1733 
1734     return rbd_snap_rollback(s->image, snapshot_name);
1735 }
1736 
qemu_rbd_snap_list(BlockDriverState * bs,QEMUSnapshotInfo ** psn_tab)1737 static int qemu_rbd_snap_list(BlockDriverState *bs,
1738                               QEMUSnapshotInfo **psn_tab)
1739 {
1740     BDRVRBDState *s = bs->opaque;
1741     QEMUSnapshotInfo *sn_info, *sn_tab = NULL;
1742     int i, snap_count;
1743     rbd_snap_info_t *snaps;
1744     int max_snaps = RBD_MAX_SNAPS;
1745 
1746     do {
1747         snaps = g_new(rbd_snap_info_t, max_snaps);
1748         snap_count = rbd_snap_list(s->image, snaps, &max_snaps);
1749         if (snap_count <= 0) {
1750             g_free(snaps);
1751         }
1752     } while (snap_count == -ERANGE);
1753 
1754     if (snap_count <= 0) {
1755         goto done;
1756     }
1757 
1758     sn_tab = g_new0(QEMUSnapshotInfo, snap_count);
1759 
1760     for (i = 0; i < snap_count; i++) {
1761         const char *snap_name = snaps[i].name;
1762 
1763         sn_info = sn_tab + i;
1764         pstrcpy(sn_info->id_str, sizeof(sn_info->id_str), snap_name);
1765         pstrcpy(sn_info->name, sizeof(sn_info->name), snap_name);
1766 
1767         sn_info->vm_state_size = snaps[i].size;
1768         sn_info->date_sec = 0;
1769         sn_info->date_nsec = 0;
1770         sn_info->vm_clock_nsec = 0;
1771     }
1772     rbd_snap_list_end(snaps);
1773     g_free(snaps);
1774 
1775  done:
1776     *psn_tab = sn_tab;
1777     return snap_count;
1778 }
1779 
qemu_rbd_co_invalidate_cache(BlockDriverState * bs,Error ** errp)1780 static void coroutine_fn qemu_rbd_co_invalidate_cache(BlockDriverState *bs,
1781                                                       Error **errp)
1782 {
1783     BDRVRBDState *s = bs->opaque;
1784     int r = rbd_invalidate_cache(s->image);
1785     if (r < 0) {
1786         error_setg_errno(errp, -r, "Failed to invalidate the cache");
1787     }
1788 }
1789 
1790 static QemuOptsList qemu_rbd_create_opts = {
1791     .name = "rbd-create-opts",
1792     .head = QTAILQ_HEAD_INITIALIZER(qemu_rbd_create_opts.head),
1793     .desc = {
1794         {
1795             .name = BLOCK_OPT_SIZE,
1796             .type = QEMU_OPT_SIZE,
1797             .help = "Virtual disk size"
1798         },
1799         {
1800             .name = BLOCK_OPT_CLUSTER_SIZE,
1801             .type = QEMU_OPT_SIZE,
1802             .help = "RBD object size"
1803         },
1804         {
1805             .name = "password-secret",
1806             .type = QEMU_OPT_STRING,
1807             .help = "ID of secret providing the password",
1808         },
1809         {
1810             .name = "encrypt.format",
1811             .type = QEMU_OPT_STRING,
1812             .help = "Encrypt the image, format choices: 'luks', 'luks2'",
1813         },
1814         {
1815             .name = "encrypt.cipher-alg",
1816             .type = QEMU_OPT_STRING,
1817             .help = "Name of encryption cipher algorithm"
1818                     " (allowed values: aes-128, aes-256)",
1819         },
1820         {
1821             .name = "encrypt.key-secret",
1822             .type = QEMU_OPT_STRING,
1823             .help = "ID of secret providing LUKS passphrase",
1824         },
1825         { /* end of list */ }
1826     }
1827 };
1828 
1829 static const char *const qemu_rbd_strong_runtime_opts[] = {
1830     "pool",
1831     "namespace",
1832     "image",
1833     "conf",
1834     "snapshot",
1835     "user",
1836     "server.",
1837     "password-secret",
1838 
1839     NULL
1840 };
1841 
1842 static BlockDriver bdrv_rbd = {
1843     .format_name            = "rbd",
1844     .instance_size          = sizeof(BDRVRBDState),
1845 
1846     .bdrv_parse_filename    = qemu_rbd_parse_filename,
1847     .bdrv_open              = qemu_rbd_open,
1848     .bdrv_close             = qemu_rbd_close,
1849     .bdrv_reopen_prepare    = qemu_rbd_reopen_prepare,
1850     .bdrv_co_create         = qemu_rbd_co_create,
1851     .bdrv_co_create_opts    = qemu_rbd_co_create_opts,
1852     .bdrv_has_zero_init     = bdrv_has_zero_init_1,
1853     .bdrv_co_get_info       = qemu_rbd_co_get_info,
1854     .bdrv_get_specific_info = qemu_rbd_get_specific_info,
1855     .create_opts            = &qemu_rbd_create_opts,
1856     .bdrv_co_getlength      = qemu_rbd_co_getlength,
1857     .bdrv_co_truncate       = qemu_rbd_co_truncate,
1858     .protocol_name          = "rbd",
1859 
1860     .bdrv_co_preadv         = qemu_rbd_co_preadv,
1861     .bdrv_co_pwritev        = qemu_rbd_co_pwritev,
1862     .bdrv_co_flush_to_disk  = qemu_rbd_co_flush,
1863     .bdrv_co_pdiscard       = qemu_rbd_co_pdiscard,
1864 #ifdef LIBRBD_SUPPORTS_WRITE_ZEROES
1865     .bdrv_co_pwrite_zeroes  = qemu_rbd_co_pwrite_zeroes,
1866 #endif
1867     .bdrv_co_block_status   = qemu_rbd_co_block_status,
1868 
1869     .bdrv_snapshot_create   = qemu_rbd_snap_create,
1870     .bdrv_snapshot_delete   = qemu_rbd_snap_remove,
1871     .bdrv_snapshot_list     = qemu_rbd_snap_list,
1872     .bdrv_snapshot_goto     = qemu_rbd_snap_rollback,
1873     .bdrv_co_invalidate_cache = qemu_rbd_co_invalidate_cache,
1874 
1875     .strong_runtime_opts    = qemu_rbd_strong_runtime_opts,
1876 };
1877 
bdrv_rbd_init(void)1878 static void bdrv_rbd_init(void)
1879 {
1880     bdrv_register(&bdrv_rbd);
1881 }
1882 
1883 block_init(bdrv_rbd_init);
1884