xref: /openbmc/qemu/block/rbd.c (revision 500eb6db)
1 /*
2  * QEMU Block driver for RADOS (Ceph)
3  *
4  * Copyright (C) 2010-2011 Christian Brunner <chb@muc.de>,
5  *                         Josh Durgin <josh.durgin@dreamhost.com>
6  *
7  * This work is licensed under the terms of the GNU GPL, version 2.  See
8  * the COPYING file in the top-level directory.
9  *
10  * Contributions after 2012-01-13 are licensed under the terms of the
11  * GNU GPL, version 2 or (at your option) any later version.
12  */
13 
14 #include "qemu/osdep.h"
15 
16 #include <rbd/librbd.h>
17 #include "qapi/error.h"
18 #include "qemu/error-report.h"
19 #include "qemu/module.h"
20 #include "qemu/option.h"
21 #include "block/block_int.h"
22 #include "block/qdict.h"
23 #include "crypto/secret.h"
24 #include "qemu/cutils.h"
25 #include "qapi/qmp/qstring.h"
26 #include "qapi/qmp/qdict.h"
27 #include "qapi/qmp/qjson.h"
28 #include "qapi/qmp/qlist.h"
29 #include "qapi/qobject-input-visitor.h"
30 #include "qapi/qapi-visit-block-core.h"
31 
32 /*
33  * When specifying the image filename use:
34  *
35  * rbd:poolname/devicename[@snapshotname][:option1=value1[:option2=value2...]]
36  *
37  * poolname must be the name of an existing rados pool.
38  *
39  * devicename is the name of the rbd image.
40  *
41  * Each option given is used to configure rados, and may be any valid
42  * Ceph option, "id", or "conf".
43  *
44  * The "id" option indicates what user we should authenticate as to
45  * the Ceph cluster.  If it is excluded we will use the Ceph default
46  * (normally 'admin').
47  *
48  * The "conf" option specifies a Ceph configuration file to read.  If
49  * it is not specified, we will read from the default Ceph locations
50  * (e.g., /etc/ceph/ceph.conf).  To avoid reading _any_ configuration
51  * file, specify conf=/dev/null.
52  *
53  * Configuration values containing :, @, or = can be escaped with a
54  * leading "\".
55  */
56 
57 /* rbd_aio_discard added in 0.1.2 */
58 #if LIBRBD_VERSION_CODE >= LIBRBD_VERSION(0, 1, 2)
59 #define LIBRBD_SUPPORTS_DISCARD
60 #else
61 #undef LIBRBD_SUPPORTS_DISCARD
62 #endif
63 
64 #define OBJ_MAX_SIZE (1UL << OBJ_DEFAULT_OBJ_ORDER)
65 
66 #define RBD_MAX_SNAPS 100
67 
68 /* The LIBRBD_SUPPORTS_IOVEC is defined in librbd.h */
69 #ifdef LIBRBD_SUPPORTS_IOVEC
70 #define LIBRBD_USE_IOVEC 1
71 #else
72 #define LIBRBD_USE_IOVEC 0
73 #endif
74 
75 typedef enum {
76     RBD_AIO_READ,
77     RBD_AIO_WRITE,
78     RBD_AIO_DISCARD,
79     RBD_AIO_FLUSH
80 } RBDAIOCmd;
81 
82 typedef struct RBDAIOCB {
83     BlockAIOCB common;
84     int64_t ret;
85     QEMUIOVector *qiov;
86     char *bounce;
87     RBDAIOCmd cmd;
88     int error;
89     struct BDRVRBDState *s;
90 } RBDAIOCB;
91 
92 typedef struct RADOSCB {
93     RBDAIOCB *acb;
94     struct BDRVRBDState *s;
95     int64_t size;
96     char *buf;
97     int64_t ret;
98 } RADOSCB;
99 
100 typedef struct BDRVRBDState {
101     rados_t cluster;
102     rados_ioctx_t io_ctx;
103     rbd_image_t image;
104     char *image_name;
105     char *snap;
106 } BDRVRBDState;
107 
108 static int qemu_rbd_connect(rados_t *cluster, rados_ioctx_t *io_ctx,
109                             BlockdevOptionsRbd *opts, bool cache,
110                             const char *keypairs, const char *secretid,
111                             Error **errp);
112 
113 static char *qemu_rbd_next_tok(char *src, char delim, char **p)
114 {
115     char *end;
116 
117     *p = NULL;
118 
119     for (end = src; *end; ++end) {
120         if (*end == delim) {
121             break;
122         }
123         if (*end == '\\' && end[1] != '\0') {
124             end++;
125         }
126     }
127     if (*end == delim) {
128         *p = end + 1;
129         *end = '\0';
130     }
131     return src;
132 }
133 
134 static void qemu_rbd_unescape(char *src)
135 {
136     char *p;
137 
138     for (p = src; *src; ++src, ++p) {
139         if (*src == '\\' && src[1] != '\0') {
140             src++;
141         }
142         *p = *src;
143     }
144     *p = '\0';
145 }
146 
147 static void qemu_rbd_parse_filename(const char *filename, QDict *options,
148                                     Error **errp)
149 {
150     const char *start;
151     char *p, *buf;
152     QList *keypairs = NULL;
153     char *found_str;
154 
155     if (!strstart(filename, "rbd:", &start)) {
156         error_setg(errp, "File name must start with 'rbd:'");
157         return;
158     }
159 
160     buf = g_strdup(start);
161     p = buf;
162 
163     found_str = qemu_rbd_next_tok(p, '/', &p);
164     if (!p) {
165         error_setg(errp, "Pool name is required");
166         goto done;
167     }
168     qemu_rbd_unescape(found_str);
169     qdict_put_str(options, "pool", found_str);
170 
171     if (strchr(p, '@')) {
172         found_str = qemu_rbd_next_tok(p, '@', &p);
173         qemu_rbd_unescape(found_str);
174         qdict_put_str(options, "image", found_str);
175 
176         found_str = qemu_rbd_next_tok(p, ':', &p);
177         qemu_rbd_unescape(found_str);
178         qdict_put_str(options, "snapshot", found_str);
179     } else {
180         found_str = qemu_rbd_next_tok(p, ':', &p);
181         qemu_rbd_unescape(found_str);
182         qdict_put_str(options, "image", found_str);
183     }
184     if (!p) {
185         goto done;
186     }
187 
188     /* The following are essentially all key/value pairs, and we treat
189      * 'id' and 'conf' a bit special.  Key/value pairs may be in any order. */
190     while (p) {
191         char *name, *value;
192         name = qemu_rbd_next_tok(p, '=', &p);
193         if (!p) {
194             error_setg(errp, "conf option %s has no value", name);
195             break;
196         }
197 
198         qemu_rbd_unescape(name);
199 
200         value = qemu_rbd_next_tok(p, ':', &p);
201         qemu_rbd_unescape(value);
202 
203         if (!strcmp(name, "conf")) {
204             qdict_put_str(options, "conf", value);
205         } else if (!strcmp(name, "id")) {
206             qdict_put_str(options, "user", value);
207         } else {
208             /*
209              * We pass these internally to qemu_rbd_set_keypairs(), so
210              * we can get away with the simpler list of [ "key1",
211              * "value1", "key2", "value2" ] rather than a raw dict
212              * { "key1": "value1", "key2": "value2" } where we can't
213              * guarantee order, or even a more correct but complex
214              * [ { "key1": "value1" }, { "key2": "value2" } ]
215              */
216             if (!keypairs) {
217                 keypairs = qlist_new();
218             }
219             qlist_append_str(keypairs, name);
220             qlist_append_str(keypairs, value);
221         }
222     }
223 
224     if (keypairs) {
225         qdict_put(options, "=keyvalue-pairs",
226                   qobject_to_json(QOBJECT(keypairs)));
227     }
228 
229 done:
230     g_free(buf);
231     qobject_unref(keypairs);
232     return;
233 }
234 
235 
236 static void qemu_rbd_refresh_limits(BlockDriverState *bs, Error **errp)
237 {
238     /* XXX Does RBD support AIO on less than 512-byte alignment? */
239     bs->bl.request_alignment = 512;
240 }
241 
242 
243 static int qemu_rbd_set_auth(rados_t cluster, BlockdevOptionsRbd *opts,
244                              Error **errp)
245 {
246     char *key, *acr;
247     int r;
248     GString *accu;
249     RbdAuthModeList *auth;
250 
251     if (opts->key_secret) {
252         key = qcrypto_secret_lookup_as_base64(opts->key_secret, errp);
253         if (!key) {
254             return -EIO;
255         }
256         r = rados_conf_set(cluster, "key", key);
257         g_free(key);
258         if (r < 0) {
259             error_setg_errno(errp, -r, "Could not set 'key'");
260             return r;
261         }
262     }
263 
264     if (opts->has_auth_client_required) {
265         accu = g_string_new("");
266         for (auth = opts->auth_client_required; auth; auth = auth->next) {
267             if (accu->str[0]) {
268                 g_string_append_c(accu, ';');
269             }
270             g_string_append(accu, RbdAuthMode_str(auth->value));
271         }
272         acr = g_string_free(accu, FALSE);
273         r = rados_conf_set(cluster, "auth_client_required", acr);
274         g_free(acr);
275         if (r < 0) {
276             error_setg_errno(errp, -r,
277                              "Could not set 'auth_client_required'");
278             return r;
279         }
280     }
281 
282     return 0;
283 }
284 
285 static int qemu_rbd_set_keypairs(rados_t cluster, const char *keypairs_json,
286                                  Error **errp)
287 {
288     QList *keypairs;
289     QString *name;
290     QString *value;
291     const char *key;
292     size_t remaining;
293     int ret = 0;
294 
295     if (!keypairs_json) {
296         return ret;
297     }
298     keypairs = qobject_to(QList,
299                           qobject_from_json(keypairs_json, &error_abort));
300     remaining = qlist_size(keypairs) / 2;
301     assert(remaining);
302 
303     while (remaining--) {
304         name = qobject_to(QString, qlist_pop(keypairs));
305         value = qobject_to(QString, qlist_pop(keypairs));
306         assert(name && value);
307         key = qstring_get_str(name);
308 
309         ret = rados_conf_set(cluster, key, qstring_get_str(value));
310         qobject_unref(value);
311         if (ret < 0) {
312             error_setg_errno(errp, -ret, "invalid conf option %s", key);
313             qobject_unref(name);
314             ret = -EINVAL;
315             break;
316         }
317         qobject_unref(name);
318     }
319 
320     qobject_unref(keypairs);
321     return ret;
322 }
323 
324 static void qemu_rbd_memset(RADOSCB *rcb, int64_t offs)
325 {
326     if (LIBRBD_USE_IOVEC) {
327         RBDAIOCB *acb = rcb->acb;
328         iov_memset(acb->qiov->iov, acb->qiov->niov, offs, 0,
329                    acb->qiov->size - offs);
330     } else {
331         memset(rcb->buf + offs, 0, rcb->size - offs);
332     }
333 }
334 
335 static QemuOptsList runtime_opts = {
336     .name = "rbd",
337     .head = QTAILQ_HEAD_INITIALIZER(runtime_opts.head),
338     .desc = {
339         {
340             .name = "pool",
341             .type = QEMU_OPT_STRING,
342             .help = "Rados pool name",
343         },
344         {
345             .name = "image",
346             .type = QEMU_OPT_STRING,
347             .help = "Image name in the pool",
348         },
349         {
350             .name = "conf",
351             .type = QEMU_OPT_STRING,
352             .help = "Rados config file location",
353         },
354         {
355             .name = "snapshot",
356             .type = QEMU_OPT_STRING,
357             .help = "Ceph snapshot name",
358         },
359         {
360             /* maps to 'id' in rados_create() */
361             .name = "user",
362             .type = QEMU_OPT_STRING,
363             .help = "Rados id name",
364         },
365         /*
366          * server.* extracted manually, see qemu_rbd_mon_host()
367          */
368         { /* end of list */ }
369     },
370 };
371 
372 /* FIXME Deprecate and remove keypairs or make it available in QMP. */
373 static int qemu_rbd_do_create(BlockdevCreateOptions *options,
374                               const char *keypairs, const char *password_secret,
375                               Error **errp)
376 {
377     BlockdevCreateOptionsRbd *opts = &options->u.rbd;
378     rados_t cluster;
379     rados_ioctx_t io_ctx;
380     int obj_order = 0;
381     int ret;
382 
383     assert(options->driver == BLOCKDEV_DRIVER_RBD);
384     if (opts->location->has_snapshot) {
385         error_setg(errp, "Can't use snapshot name for image creation");
386         return -EINVAL;
387     }
388 
389     if (opts->has_cluster_size) {
390         int64_t objsize = opts->cluster_size;
391         if ((objsize - 1) & objsize) {    /* not a power of 2? */
392             error_setg(errp, "obj size needs to be power of 2");
393             return -EINVAL;
394         }
395         if (objsize < 4096) {
396             error_setg(errp, "obj size too small");
397             return -EINVAL;
398         }
399         obj_order = ctz32(objsize);
400     }
401 
402     ret = qemu_rbd_connect(&cluster, &io_ctx, opts->location, false, keypairs,
403                            password_secret, errp);
404     if (ret < 0) {
405         return ret;
406     }
407 
408     ret = rbd_create(io_ctx, opts->location->image, opts->size, &obj_order);
409     if (ret < 0) {
410         error_setg_errno(errp, -ret, "error rbd create");
411         goto out;
412     }
413 
414     ret = 0;
415 out:
416     rados_ioctx_destroy(io_ctx);
417     rados_shutdown(cluster);
418     return ret;
419 }
420 
421 static int qemu_rbd_co_create(BlockdevCreateOptions *options, Error **errp)
422 {
423     return qemu_rbd_do_create(options, NULL, NULL, errp);
424 }
425 
426 static int coroutine_fn qemu_rbd_co_create_opts(const char *filename,
427                                                 QemuOpts *opts,
428                                                 Error **errp)
429 {
430     BlockdevCreateOptions *create_options;
431     BlockdevCreateOptionsRbd *rbd_opts;
432     BlockdevOptionsRbd *loc;
433     Error *local_err = NULL;
434     const char *keypairs, *password_secret;
435     QDict *options = NULL;
436     int ret = 0;
437 
438     create_options = g_new0(BlockdevCreateOptions, 1);
439     create_options->driver = BLOCKDEV_DRIVER_RBD;
440     rbd_opts = &create_options->u.rbd;
441 
442     rbd_opts->location = g_new0(BlockdevOptionsRbd, 1);
443 
444     password_secret = qemu_opt_get(opts, "password-secret");
445 
446     /* Read out options */
447     rbd_opts->size = ROUND_UP(qemu_opt_get_size_del(opts, BLOCK_OPT_SIZE, 0),
448                               BDRV_SECTOR_SIZE);
449     rbd_opts->cluster_size = qemu_opt_get_size_del(opts,
450                                                    BLOCK_OPT_CLUSTER_SIZE, 0);
451     rbd_opts->has_cluster_size = (rbd_opts->cluster_size != 0);
452 
453     options = qdict_new();
454     qemu_rbd_parse_filename(filename, options, &local_err);
455     if (local_err) {
456         ret = -EINVAL;
457         error_propagate(errp, local_err);
458         goto exit;
459     }
460 
461     /*
462      * Caution: while qdict_get_try_str() is fine, getting non-string
463      * types would require more care.  When @options come from -blockdev
464      * or blockdev_add, its members are typed according to the QAPI
465      * schema, but when they come from -drive, they're all QString.
466      */
467     loc = rbd_opts->location;
468     loc->pool     = g_strdup(qdict_get_try_str(options, "pool"));
469     loc->conf     = g_strdup(qdict_get_try_str(options, "conf"));
470     loc->has_conf = !!loc->conf;
471     loc->user     = g_strdup(qdict_get_try_str(options, "user"));
472     loc->has_user = !!loc->user;
473     loc->image    = g_strdup(qdict_get_try_str(options, "image"));
474     keypairs      = qdict_get_try_str(options, "=keyvalue-pairs");
475 
476     ret = qemu_rbd_do_create(create_options, keypairs, password_secret, errp);
477     if (ret < 0) {
478         goto exit;
479     }
480 
481 exit:
482     qobject_unref(options);
483     qapi_free_BlockdevCreateOptions(create_options);
484     return ret;
485 }
486 
487 /*
488  * This aio completion is being called from rbd_finish_bh() and runs in qemu
489  * BH context.
490  */
491 static void qemu_rbd_complete_aio(RADOSCB *rcb)
492 {
493     RBDAIOCB *acb = rcb->acb;
494     int64_t r;
495 
496     r = rcb->ret;
497 
498     if (acb->cmd != RBD_AIO_READ) {
499         if (r < 0) {
500             acb->ret = r;
501             acb->error = 1;
502         } else if (!acb->error) {
503             acb->ret = rcb->size;
504         }
505     } else {
506         if (r < 0) {
507             qemu_rbd_memset(rcb, 0);
508             acb->ret = r;
509             acb->error = 1;
510         } else if (r < rcb->size) {
511             qemu_rbd_memset(rcb, r);
512             if (!acb->error) {
513                 acb->ret = rcb->size;
514             }
515         } else if (!acb->error) {
516             acb->ret = r;
517         }
518     }
519 
520     g_free(rcb);
521 
522     if (!LIBRBD_USE_IOVEC) {
523         if (acb->cmd == RBD_AIO_READ) {
524             qemu_iovec_from_buf(acb->qiov, 0, acb->bounce, acb->qiov->size);
525         }
526         qemu_vfree(acb->bounce);
527     }
528 
529     acb->common.cb(acb->common.opaque, (acb->ret > 0 ? 0 : acb->ret));
530 
531     qemu_aio_unref(acb);
532 }
533 
534 static char *qemu_rbd_mon_host(BlockdevOptionsRbd *opts, Error **errp)
535 {
536     const char **vals;
537     const char *host, *port;
538     char *rados_str;
539     InetSocketAddressBaseList *p;
540     int i, cnt;
541 
542     if (!opts->has_server) {
543         return NULL;
544     }
545 
546     for (cnt = 0, p = opts->server; p; p = p->next) {
547         cnt++;
548     }
549 
550     vals = g_new(const char *, cnt + 1);
551 
552     for (i = 0, p = opts->server; p; p = p->next, i++) {
553         host = p->value->host;
554         port = p->value->port;
555 
556         if (strchr(host, ':')) {
557             vals[i] = g_strdup_printf("[%s]:%s", host, port);
558         } else {
559             vals[i] = g_strdup_printf("%s:%s", host, port);
560         }
561     }
562     vals[i] = NULL;
563 
564     rados_str = i ? g_strjoinv(";", (char **)vals) : NULL;
565     g_strfreev((char **)vals);
566     return rados_str;
567 }
568 
569 static int qemu_rbd_connect(rados_t *cluster, rados_ioctx_t *io_ctx,
570                             BlockdevOptionsRbd *opts, bool cache,
571                             const char *keypairs, const char *secretid,
572                             Error **errp)
573 {
574     char *mon_host = NULL;
575     Error *local_err = NULL;
576     int r;
577 
578     if (secretid) {
579         if (opts->key_secret) {
580             error_setg(errp,
581                        "Legacy 'password-secret' clashes with 'key-secret'");
582             return -EINVAL;
583         }
584         opts->key_secret = g_strdup(secretid);
585         opts->has_key_secret = true;
586     }
587 
588     mon_host = qemu_rbd_mon_host(opts, &local_err);
589     if (local_err) {
590         error_propagate(errp, local_err);
591         r = -EINVAL;
592         goto failed_opts;
593     }
594 
595     r = rados_create(cluster, opts->user);
596     if (r < 0) {
597         error_setg_errno(errp, -r, "error initializing");
598         goto failed_opts;
599     }
600 
601     /* try default location when conf=NULL, but ignore failure */
602     r = rados_conf_read_file(*cluster, opts->conf);
603     if (opts->has_conf && r < 0) {
604         error_setg_errno(errp, -r, "error reading conf file %s", opts->conf);
605         goto failed_shutdown;
606     }
607 
608     r = qemu_rbd_set_keypairs(*cluster, keypairs, errp);
609     if (r < 0) {
610         goto failed_shutdown;
611     }
612 
613     if (mon_host) {
614         r = rados_conf_set(*cluster, "mon_host", mon_host);
615         if (r < 0) {
616             goto failed_shutdown;
617         }
618     }
619 
620     r = qemu_rbd_set_auth(*cluster, opts, errp);
621     if (r < 0) {
622         goto failed_shutdown;
623     }
624 
625     /*
626      * Fallback to more conservative semantics if setting cache
627      * options fails. Ignore errors from setting rbd_cache because the
628      * only possible error is that the option does not exist, and
629      * librbd defaults to no caching. If write through caching cannot
630      * be set up, fall back to no caching.
631      */
632     if (cache) {
633         rados_conf_set(*cluster, "rbd_cache", "true");
634     } else {
635         rados_conf_set(*cluster, "rbd_cache", "false");
636     }
637 
638     r = rados_connect(*cluster);
639     if (r < 0) {
640         error_setg_errno(errp, -r, "error connecting");
641         goto failed_shutdown;
642     }
643 
644     r = rados_ioctx_create(*cluster, opts->pool, io_ctx);
645     if (r < 0) {
646         error_setg_errno(errp, -r, "error opening pool %s", opts->pool);
647         goto failed_shutdown;
648     }
649 
650     return 0;
651 
652 failed_shutdown:
653     rados_shutdown(*cluster);
654 failed_opts:
655     g_free(mon_host);
656     return r;
657 }
658 
659 static int qemu_rbd_convert_options(QDict *options, BlockdevOptionsRbd **opts,
660                                     Error **errp)
661 {
662     Visitor *v;
663     Error *local_err = NULL;
664 
665     /* Convert the remaining options into a QAPI object */
666     v = qobject_input_visitor_new_flat_confused(options, errp);
667     if (!v) {
668         return -EINVAL;
669     }
670 
671     visit_type_BlockdevOptionsRbd(v, NULL, opts, &local_err);
672     visit_free(v);
673 
674     if (local_err) {
675         error_propagate(errp, local_err);
676         return -EINVAL;
677     }
678 
679     return 0;
680 }
681 
682 static int qemu_rbd_attempt_legacy_options(QDict *options,
683                                            BlockdevOptionsRbd **opts,
684                                            char **keypairs)
685 {
686     char *filename;
687     int r;
688 
689     filename = g_strdup(qdict_get_try_str(options, "filename"));
690     if (!filename) {
691         return -EINVAL;
692     }
693     qdict_del(options, "filename");
694 
695     qemu_rbd_parse_filename(filename, options, NULL);
696 
697     /* keypairs freed by caller */
698     *keypairs = g_strdup(qdict_get_try_str(options, "=keyvalue-pairs"));
699     if (*keypairs) {
700         qdict_del(options, "=keyvalue-pairs");
701     }
702 
703     r = qemu_rbd_convert_options(options, opts, NULL);
704 
705     g_free(filename);
706     return r;
707 }
708 
709 static int qemu_rbd_open(BlockDriverState *bs, QDict *options, int flags,
710                          Error **errp)
711 {
712     BDRVRBDState *s = bs->opaque;
713     BlockdevOptionsRbd *opts = NULL;
714     const QDictEntry *e;
715     Error *local_err = NULL;
716     char *keypairs, *secretid;
717     int r;
718 
719     keypairs = g_strdup(qdict_get_try_str(options, "=keyvalue-pairs"));
720     if (keypairs) {
721         qdict_del(options, "=keyvalue-pairs");
722     }
723 
724     secretid = g_strdup(qdict_get_try_str(options, "password-secret"));
725     if (secretid) {
726         qdict_del(options, "password-secret");
727     }
728 
729     r = qemu_rbd_convert_options(options, &opts, &local_err);
730     if (local_err) {
731         /* If keypairs are present, that means some options are present in
732          * the modern option format.  Don't attempt to parse legacy option
733          * formats, as we won't support mixed usage. */
734         if (keypairs) {
735             error_propagate(errp, local_err);
736             goto out;
737         }
738 
739         /* If the initial attempt to convert and process the options failed,
740          * we may be attempting to open an image file that has the rbd options
741          * specified in the older format consisting of all key/value pairs
742          * encoded in the filename.  Go ahead and attempt to parse the
743          * filename, and see if we can pull out the required options. */
744         r = qemu_rbd_attempt_legacy_options(options, &opts, &keypairs);
745         if (r < 0) {
746             /* Propagate the original error, not the legacy parsing fallback
747              * error, as the latter was just a best-effort attempt. */
748             error_propagate(errp, local_err);
749             goto out;
750         }
751         /* Take care whenever deciding to actually deprecate; once this ability
752          * is removed, we will not be able to open any images with legacy-styled
753          * backing image strings. */
754         warn_report("RBD options encoded in the filename as keyvalue pairs "
755                     "is deprecated");
756     }
757 
758     /* Remove the processed options from the QDict (the visitor processes
759      * _all_ options in the QDict) */
760     while ((e = qdict_first(options))) {
761         qdict_del(options, e->key);
762     }
763 
764     r = qemu_rbd_connect(&s->cluster, &s->io_ctx, opts,
765                          !(flags & BDRV_O_NOCACHE), keypairs, secretid, errp);
766     if (r < 0) {
767         goto out;
768     }
769 
770     s->snap = g_strdup(opts->snapshot);
771     s->image_name = g_strdup(opts->image);
772 
773     /* rbd_open is always r/w */
774     r = rbd_open(s->io_ctx, s->image_name, &s->image, s->snap);
775     if (r < 0) {
776         error_setg_errno(errp, -r, "error reading header from %s",
777                          s->image_name);
778         goto failed_open;
779     }
780 
781     /* If we are using an rbd snapshot, we must be r/o, otherwise
782      * leave as-is */
783     if (s->snap != NULL) {
784         r = bdrv_apply_auto_read_only(bs, "rbd snapshots are read-only", errp);
785         if (r < 0) {
786             rbd_close(s->image);
787             goto failed_open;
788         }
789     }
790 
791     r = 0;
792     goto out;
793 
794 failed_open:
795     rados_ioctx_destroy(s->io_ctx);
796     g_free(s->snap);
797     g_free(s->image_name);
798     rados_shutdown(s->cluster);
799 out:
800     qapi_free_BlockdevOptionsRbd(opts);
801     g_free(keypairs);
802     g_free(secretid);
803     return r;
804 }
805 
806 
807 /* Since RBD is currently always opened R/W via the API,
808  * we just need to check if we are using a snapshot or not, in
809  * order to determine if we will allow it to be R/W */
810 static int qemu_rbd_reopen_prepare(BDRVReopenState *state,
811                                    BlockReopenQueue *queue, Error **errp)
812 {
813     BDRVRBDState *s = state->bs->opaque;
814     int ret = 0;
815 
816     if (s->snap && state->flags & BDRV_O_RDWR) {
817         error_setg(errp,
818                    "Cannot change node '%s' to r/w when using RBD snapshot",
819                    bdrv_get_device_or_node_name(state->bs));
820         ret = -EINVAL;
821     }
822 
823     return ret;
824 }
825 
826 static void qemu_rbd_close(BlockDriverState *bs)
827 {
828     BDRVRBDState *s = bs->opaque;
829 
830     rbd_close(s->image);
831     rados_ioctx_destroy(s->io_ctx);
832     g_free(s->snap);
833     g_free(s->image_name);
834     rados_shutdown(s->cluster);
835 }
836 
837 static const AIOCBInfo rbd_aiocb_info = {
838     .aiocb_size = sizeof(RBDAIOCB),
839 };
840 
841 static void rbd_finish_bh(void *opaque)
842 {
843     RADOSCB *rcb = opaque;
844     qemu_rbd_complete_aio(rcb);
845 }
846 
847 /*
848  * This is the callback function for rbd_aio_read and _write
849  *
850  * Note: this function is being called from a non qemu thread so
851  * we need to be careful about what we do here. Generally we only
852  * schedule a BH, and do the rest of the io completion handling
853  * from rbd_finish_bh() which runs in a qemu context.
854  */
855 static void rbd_finish_aiocb(rbd_completion_t c, RADOSCB *rcb)
856 {
857     RBDAIOCB *acb = rcb->acb;
858 
859     rcb->ret = rbd_aio_get_return_value(c);
860     rbd_aio_release(c);
861 
862     aio_bh_schedule_oneshot(bdrv_get_aio_context(acb->common.bs),
863                             rbd_finish_bh, rcb);
864 }
865 
866 static int rbd_aio_discard_wrapper(rbd_image_t image,
867                                    uint64_t off,
868                                    uint64_t len,
869                                    rbd_completion_t comp)
870 {
871 #ifdef LIBRBD_SUPPORTS_DISCARD
872     return rbd_aio_discard(image, off, len, comp);
873 #else
874     return -ENOTSUP;
875 #endif
876 }
877 
878 static int rbd_aio_flush_wrapper(rbd_image_t image,
879                                  rbd_completion_t comp)
880 {
881 #ifdef LIBRBD_SUPPORTS_AIO_FLUSH
882     return rbd_aio_flush(image, comp);
883 #else
884     return -ENOTSUP;
885 #endif
886 }
887 
888 static BlockAIOCB *rbd_start_aio(BlockDriverState *bs,
889                                  int64_t off,
890                                  QEMUIOVector *qiov,
891                                  int64_t size,
892                                  BlockCompletionFunc *cb,
893                                  void *opaque,
894                                  RBDAIOCmd cmd)
895 {
896     RBDAIOCB *acb;
897     RADOSCB *rcb = NULL;
898     rbd_completion_t c;
899     int r;
900 
901     BDRVRBDState *s = bs->opaque;
902 
903     acb = qemu_aio_get(&rbd_aiocb_info, bs, cb, opaque);
904     acb->cmd = cmd;
905     acb->qiov = qiov;
906     assert(!qiov || qiov->size == size);
907 
908     rcb = g_new(RADOSCB, 1);
909 
910     if (!LIBRBD_USE_IOVEC) {
911         if (cmd == RBD_AIO_DISCARD || cmd == RBD_AIO_FLUSH) {
912             acb->bounce = NULL;
913         } else {
914             acb->bounce = qemu_try_blockalign(bs, qiov->size);
915             if (acb->bounce == NULL) {
916                 goto failed;
917             }
918         }
919         if (cmd == RBD_AIO_WRITE) {
920             qemu_iovec_to_buf(acb->qiov, 0, acb->bounce, qiov->size);
921         }
922         rcb->buf = acb->bounce;
923     }
924 
925     acb->ret = 0;
926     acb->error = 0;
927     acb->s = s;
928 
929     rcb->acb = acb;
930     rcb->s = acb->s;
931     rcb->size = size;
932     r = rbd_aio_create_completion(rcb, (rbd_callback_t) rbd_finish_aiocb, &c);
933     if (r < 0) {
934         goto failed;
935     }
936 
937     switch (cmd) {
938     case RBD_AIO_WRITE:
939 #ifdef LIBRBD_SUPPORTS_IOVEC
940             r = rbd_aio_writev(s->image, qiov->iov, qiov->niov, off, c);
941 #else
942             r = rbd_aio_write(s->image, off, size, rcb->buf, c);
943 #endif
944         break;
945     case RBD_AIO_READ:
946 #ifdef LIBRBD_SUPPORTS_IOVEC
947             r = rbd_aio_readv(s->image, qiov->iov, qiov->niov, off, c);
948 #else
949             r = rbd_aio_read(s->image, off, size, rcb->buf, c);
950 #endif
951         break;
952     case RBD_AIO_DISCARD:
953         r = rbd_aio_discard_wrapper(s->image, off, size, c);
954         break;
955     case RBD_AIO_FLUSH:
956         r = rbd_aio_flush_wrapper(s->image, c);
957         break;
958     default:
959         r = -EINVAL;
960     }
961 
962     if (r < 0) {
963         goto failed_completion;
964     }
965     return &acb->common;
966 
967 failed_completion:
968     rbd_aio_release(c);
969 failed:
970     g_free(rcb);
971     if (!LIBRBD_USE_IOVEC) {
972         qemu_vfree(acb->bounce);
973     }
974 
975     qemu_aio_unref(acb);
976     return NULL;
977 }
978 
979 static BlockAIOCB *qemu_rbd_aio_preadv(BlockDriverState *bs,
980                                        uint64_t offset, uint64_t bytes,
981                                        QEMUIOVector *qiov, int flags,
982                                        BlockCompletionFunc *cb,
983                                        void *opaque)
984 {
985     return rbd_start_aio(bs, offset, qiov, bytes, cb, opaque,
986                          RBD_AIO_READ);
987 }
988 
989 static BlockAIOCB *qemu_rbd_aio_pwritev(BlockDriverState *bs,
990                                         uint64_t offset, uint64_t bytes,
991                                         QEMUIOVector *qiov, int flags,
992                                         BlockCompletionFunc *cb,
993                                         void *opaque)
994 {
995     return rbd_start_aio(bs, offset, qiov, bytes, cb, opaque,
996                          RBD_AIO_WRITE);
997 }
998 
999 #ifdef LIBRBD_SUPPORTS_AIO_FLUSH
1000 static BlockAIOCB *qemu_rbd_aio_flush(BlockDriverState *bs,
1001                                       BlockCompletionFunc *cb,
1002                                       void *opaque)
1003 {
1004     return rbd_start_aio(bs, 0, NULL, 0, cb, opaque, RBD_AIO_FLUSH);
1005 }
1006 
1007 #else
1008 
1009 static int qemu_rbd_co_flush(BlockDriverState *bs)
1010 {
1011 #if LIBRBD_VERSION_CODE >= LIBRBD_VERSION(0, 1, 1)
1012     /* rbd_flush added in 0.1.1 */
1013     BDRVRBDState *s = bs->opaque;
1014     return rbd_flush(s->image);
1015 #else
1016     return 0;
1017 #endif
1018 }
1019 #endif
1020 
1021 static int qemu_rbd_getinfo(BlockDriverState *bs, BlockDriverInfo *bdi)
1022 {
1023     BDRVRBDState *s = bs->opaque;
1024     rbd_image_info_t info;
1025     int r;
1026 
1027     r = rbd_stat(s->image, &info, sizeof(info));
1028     if (r < 0) {
1029         return r;
1030     }
1031 
1032     bdi->cluster_size = info.obj_size;
1033     return 0;
1034 }
1035 
1036 static int64_t qemu_rbd_getlength(BlockDriverState *bs)
1037 {
1038     BDRVRBDState *s = bs->opaque;
1039     rbd_image_info_t info;
1040     int r;
1041 
1042     r = rbd_stat(s->image, &info, sizeof(info));
1043     if (r < 0) {
1044         return r;
1045     }
1046 
1047     return info.size;
1048 }
1049 
1050 static int coroutine_fn qemu_rbd_co_truncate(BlockDriverState *bs,
1051                                              int64_t offset,
1052                                              PreallocMode prealloc,
1053                                              Error **errp)
1054 {
1055     BDRVRBDState *s = bs->opaque;
1056     int r;
1057 
1058     if (prealloc != PREALLOC_MODE_OFF) {
1059         error_setg(errp, "Unsupported preallocation mode '%s'",
1060                    PreallocMode_str(prealloc));
1061         return -ENOTSUP;
1062     }
1063 
1064     r = rbd_resize(s->image, offset);
1065     if (r < 0) {
1066         error_setg_errno(errp, -r, "Failed to resize file");
1067         return r;
1068     }
1069 
1070     return 0;
1071 }
1072 
1073 static int qemu_rbd_snap_create(BlockDriverState *bs,
1074                                 QEMUSnapshotInfo *sn_info)
1075 {
1076     BDRVRBDState *s = bs->opaque;
1077     int r;
1078 
1079     if (sn_info->name[0] == '\0') {
1080         return -EINVAL; /* we need a name for rbd snapshots */
1081     }
1082 
1083     /*
1084      * rbd snapshots are using the name as the user controlled unique identifier
1085      * we can't use the rbd snapid for that purpose, as it can't be set
1086      */
1087     if (sn_info->id_str[0] != '\0' &&
1088         strcmp(sn_info->id_str, sn_info->name) != 0) {
1089         return -EINVAL;
1090     }
1091 
1092     if (strlen(sn_info->name) >= sizeof(sn_info->id_str)) {
1093         return -ERANGE;
1094     }
1095 
1096     r = rbd_snap_create(s->image, sn_info->name);
1097     if (r < 0) {
1098         error_report("failed to create snap: %s", strerror(-r));
1099         return r;
1100     }
1101 
1102     return 0;
1103 }
1104 
1105 static int qemu_rbd_snap_remove(BlockDriverState *bs,
1106                                 const char *snapshot_id,
1107                                 const char *snapshot_name,
1108                                 Error **errp)
1109 {
1110     BDRVRBDState *s = bs->opaque;
1111     int r;
1112 
1113     if (!snapshot_name) {
1114         error_setg(errp, "rbd need a valid snapshot name");
1115         return -EINVAL;
1116     }
1117 
1118     /* If snapshot_id is specified, it must be equal to name, see
1119        qemu_rbd_snap_list() */
1120     if (snapshot_id && strcmp(snapshot_id, snapshot_name)) {
1121         error_setg(errp,
1122                    "rbd do not support snapshot id, it should be NULL or "
1123                    "equal to snapshot name");
1124         return -EINVAL;
1125     }
1126 
1127     r = rbd_snap_remove(s->image, snapshot_name);
1128     if (r < 0) {
1129         error_setg_errno(errp, -r, "Failed to remove the snapshot");
1130     }
1131     return r;
1132 }
1133 
1134 static int qemu_rbd_snap_rollback(BlockDriverState *bs,
1135                                   const char *snapshot_name)
1136 {
1137     BDRVRBDState *s = bs->opaque;
1138 
1139     return rbd_snap_rollback(s->image, snapshot_name);
1140 }
1141 
1142 static int qemu_rbd_snap_list(BlockDriverState *bs,
1143                               QEMUSnapshotInfo **psn_tab)
1144 {
1145     BDRVRBDState *s = bs->opaque;
1146     QEMUSnapshotInfo *sn_info, *sn_tab = NULL;
1147     int i, snap_count;
1148     rbd_snap_info_t *snaps;
1149     int max_snaps = RBD_MAX_SNAPS;
1150 
1151     do {
1152         snaps = g_new(rbd_snap_info_t, max_snaps);
1153         snap_count = rbd_snap_list(s->image, snaps, &max_snaps);
1154         if (snap_count <= 0) {
1155             g_free(snaps);
1156         }
1157     } while (snap_count == -ERANGE);
1158 
1159     if (snap_count <= 0) {
1160         goto done;
1161     }
1162 
1163     sn_tab = g_new0(QEMUSnapshotInfo, snap_count);
1164 
1165     for (i = 0; i < snap_count; i++) {
1166         const char *snap_name = snaps[i].name;
1167 
1168         sn_info = sn_tab + i;
1169         pstrcpy(sn_info->id_str, sizeof(sn_info->id_str), snap_name);
1170         pstrcpy(sn_info->name, sizeof(sn_info->name), snap_name);
1171 
1172         sn_info->vm_state_size = snaps[i].size;
1173         sn_info->date_sec = 0;
1174         sn_info->date_nsec = 0;
1175         sn_info->vm_clock_nsec = 0;
1176     }
1177     rbd_snap_list_end(snaps);
1178     g_free(snaps);
1179 
1180  done:
1181     *psn_tab = sn_tab;
1182     return snap_count;
1183 }
1184 
1185 #ifdef LIBRBD_SUPPORTS_DISCARD
1186 static BlockAIOCB *qemu_rbd_aio_pdiscard(BlockDriverState *bs,
1187                                          int64_t offset,
1188                                          int bytes,
1189                                          BlockCompletionFunc *cb,
1190                                          void *opaque)
1191 {
1192     return rbd_start_aio(bs, offset, NULL, bytes, cb, opaque,
1193                          RBD_AIO_DISCARD);
1194 }
1195 #endif
1196 
1197 #ifdef LIBRBD_SUPPORTS_INVALIDATE
1198 static void coroutine_fn qemu_rbd_co_invalidate_cache(BlockDriverState *bs,
1199                                                       Error **errp)
1200 {
1201     BDRVRBDState *s = bs->opaque;
1202     int r = rbd_invalidate_cache(s->image);
1203     if (r < 0) {
1204         error_setg_errno(errp, -r, "Failed to invalidate the cache");
1205     }
1206 }
1207 #endif
1208 
1209 static QemuOptsList qemu_rbd_create_opts = {
1210     .name = "rbd-create-opts",
1211     .head = QTAILQ_HEAD_INITIALIZER(qemu_rbd_create_opts.head),
1212     .desc = {
1213         {
1214             .name = BLOCK_OPT_SIZE,
1215             .type = QEMU_OPT_SIZE,
1216             .help = "Virtual disk size"
1217         },
1218         {
1219             .name = BLOCK_OPT_CLUSTER_SIZE,
1220             .type = QEMU_OPT_SIZE,
1221             .help = "RBD object size"
1222         },
1223         {
1224             .name = "password-secret",
1225             .type = QEMU_OPT_STRING,
1226             .help = "ID of secret providing the password",
1227         },
1228         { /* end of list */ }
1229     }
1230 };
1231 
1232 static const char *const qemu_rbd_strong_runtime_opts[] = {
1233     "pool",
1234     "image",
1235     "conf",
1236     "snapshot",
1237     "user",
1238     "server.",
1239     "password-secret",
1240 
1241     NULL
1242 };
1243 
1244 static BlockDriver bdrv_rbd = {
1245     .format_name            = "rbd",
1246     .instance_size          = sizeof(BDRVRBDState),
1247     .bdrv_parse_filename    = qemu_rbd_parse_filename,
1248     .bdrv_refresh_limits    = qemu_rbd_refresh_limits,
1249     .bdrv_file_open         = qemu_rbd_open,
1250     .bdrv_close             = qemu_rbd_close,
1251     .bdrv_reopen_prepare    = qemu_rbd_reopen_prepare,
1252     .bdrv_co_create         = qemu_rbd_co_create,
1253     .bdrv_co_create_opts    = qemu_rbd_co_create_opts,
1254     .bdrv_has_zero_init     = bdrv_has_zero_init_1,
1255     .bdrv_get_info          = qemu_rbd_getinfo,
1256     .create_opts            = &qemu_rbd_create_opts,
1257     .bdrv_getlength         = qemu_rbd_getlength,
1258     .bdrv_co_truncate       = qemu_rbd_co_truncate,
1259     .protocol_name          = "rbd",
1260 
1261     .bdrv_aio_preadv        = qemu_rbd_aio_preadv,
1262     .bdrv_aio_pwritev       = qemu_rbd_aio_pwritev,
1263 
1264 #ifdef LIBRBD_SUPPORTS_AIO_FLUSH
1265     .bdrv_aio_flush         = qemu_rbd_aio_flush,
1266 #else
1267     .bdrv_co_flush_to_disk  = qemu_rbd_co_flush,
1268 #endif
1269 
1270 #ifdef LIBRBD_SUPPORTS_DISCARD
1271     .bdrv_aio_pdiscard      = qemu_rbd_aio_pdiscard,
1272 #endif
1273 
1274     .bdrv_snapshot_create   = qemu_rbd_snap_create,
1275     .bdrv_snapshot_delete   = qemu_rbd_snap_remove,
1276     .bdrv_snapshot_list     = qemu_rbd_snap_list,
1277     .bdrv_snapshot_goto     = qemu_rbd_snap_rollback,
1278 #ifdef LIBRBD_SUPPORTS_INVALIDATE
1279     .bdrv_co_invalidate_cache = qemu_rbd_co_invalidate_cache,
1280 #endif
1281 
1282     .strong_runtime_opts    = qemu_rbd_strong_runtime_opts,
1283 };
1284 
1285 static void bdrv_rbd_init(void)
1286 {
1287     bdrv_register(&bdrv_rbd);
1288 }
1289 
1290 block_init(bdrv_rbd_init);
1291