xref: /openbmc/qemu/block/rbd.c (revision c3e31eaa21bc038c146cb196f7762a972eb9de5b)
1 /*
2  * QEMU Block driver for RADOS (Ceph)
3  *
4  * Copyright (C) 2010-2011 Christian Brunner <chb@muc.de>,
5  *                         Josh Durgin <josh.durgin@dreamhost.com>
6  *
7  * This work is licensed under the terms of the GNU GPL, version 2.  See
8  * the COPYING file in the top-level directory.
9  *
10  * Contributions after 2012-01-13 are licensed under the terms of the
11  * GNU GPL, version 2 or (at your option) any later version.
12  */
13 
14 #include "qemu/osdep.h"
15 
16 #include "qapi/error.h"
17 #include "qemu/error-report.h"
18 #include "block/block_int.h"
19 #include "crypto/secret.h"
20 #include "qemu/cutils.h"
21 #include "qapi/qmp/qstring.h"
22 
23 #include <rbd/librbd.h>
24 
25 /*
26  * When specifying the image filename use:
27  *
28  * rbd:poolname/devicename[@snapshotname][:option1=value1[:option2=value2...]]
29  *
30  * poolname must be the name of an existing rados pool.
31  *
32  * devicename is the name of the rbd image.
33  *
34  * Each option given is used to configure rados, and may be any valid
35  * Ceph option, "id", or "conf".
36  *
37  * The "id" option indicates what user we should authenticate as to
38  * the Ceph cluster.  If it is excluded we will use the Ceph default
39  * (normally 'admin').
40  *
41  * The "conf" option specifies a Ceph configuration file to read.  If
42  * it is not specified, we will read from the default Ceph locations
43  * (e.g., /etc/ceph/ceph.conf).  To avoid reading _any_ configuration
44  * file, specify conf=/dev/null.
45  *
46  * Configuration values containing :, @, or = can be escaped with a
47  * leading "\".
48  */
49 
50 /* rbd_aio_discard added in 0.1.2 */
51 #if LIBRBD_VERSION_CODE >= LIBRBD_VERSION(0, 1, 2)
52 #define LIBRBD_SUPPORTS_DISCARD
53 #else
54 #undef LIBRBD_SUPPORTS_DISCARD
55 #endif
56 
57 #define OBJ_MAX_SIZE (1UL << OBJ_DEFAULT_OBJ_ORDER)
58 
59 #define RBD_MAX_CONF_NAME_SIZE 128
60 #define RBD_MAX_CONF_VAL_SIZE 512
61 #define RBD_MAX_CONF_SIZE 1024
62 #define RBD_MAX_POOL_NAME_SIZE 128
63 #define RBD_MAX_SNAP_NAME_SIZE 128
64 #define RBD_MAX_SNAPS 100
65 
66 /* The LIBRBD_SUPPORTS_IOVEC is defined in librbd.h */
67 #ifdef LIBRBD_SUPPORTS_IOVEC
68 #define LIBRBD_USE_IOVEC 1
69 #else
70 #define LIBRBD_USE_IOVEC 0
71 #endif
72 
73 typedef enum {
74     RBD_AIO_READ,
75     RBD_AIO_WRITE,
76     RBD_AIO_DISCARD,
77     RBD_AIO_FLUSH
78 } RBDAIOCmd;
79 
80 typedef struct RBDAIOCB {
81     BlockAIOCB common;
82     int64_t ret;
83     QEMUIOVector *qiov;
84     char *bounce;
85     RBDAIOCmd cmd;
86     int error;
87     struct BDRVRBDState *s;
88 } RBDAIOCB;
89 
90 typedef struct RADOSCB {
91     RBDAIOCB *acb;
92     struct BDRVRBDState *s;
93     int64_t size;
94     char *buf;
95     int64_t ret;
96 } RADOSCB;
97 
98 typedef struct BDRVRBDState {
99     rados_t cluster;
100     rados_ioctx_t io_ctx;
101     rbd_image_t image;
102     char name[RBD_MAX_IMAGE_NAME_SIZE];
103     char *snap;
104 } BDRVRBDState;
105 
106 static char *qemu_rbd_next_tok(int max_len,
107                                char *src, char delim,
108                                const char *name,
109                                char **p, Error **errp)
110 {
111     int l;
112     char *end;
113 
114     *p = NULL;
115 
116     if (delim != '\0') {
117         for (end = src; *end; ++end) {
118             if (*end == delim) {
119                 break;
120             }
121             if (*end == '\\' && end[1] != '\0') {
122                 end++;
123             }
124         }
125         if (*end == delim) {
126             *p = end + 1;
127             *end = '\0';
128         }
129     }
130     l = strlen(src);
131     if (l >= max_len) {
132         error_setg(errp, "%s too long", name);
133         return NULL;
134     } else if (l == 0) {
135         error_setg(errp, "%s too short", name);
136         return NULL;
137     }
138 
139     return src;
140 }
141 
142 static void qemu_rbd_unescape(char *src)
143 {
144     char *p;
145 
146     for (p = src; *src; ++src, ++p) {
147         if (*src == '\\' && src[1] != '\0') {
148             src++;
149         }
150         *p = *src;
151     }
152     *p = '\0';
153 }
154 
155 static void qemu_rbd_parse_filename(const char *filename, QDict *options,
156                                     Error **errp)
157 {
158     const char *start;
159     char *p, *buf, *keypairs;
160     char *found_str;
161     size_t max_keypair_size;
162     Error *local_err = NULL;
163 
164     if (!strstart(filename, "rbd:", &start)) {
165         error_setg(errp, "File name must start with 'rbd:'");
166         return;
167     }
168 
169     max_keypair_size = strlen(start) + 1;
170     buf = g_strdup(start);
171     keypairs = g_malloc0(max_keypair_size);
172     p = buf;
173 
174     found_str = qemu_rbd_next_tok(RBD_MAX_POOL_NAME_SIZE, p,
175                                   '/', "pool name", &p, &local_err);
176     if (local_err) {
177         goto done;
178     }
179     if (!p) {
180         error_setg(errp, "Pool name is required");
181         goto done;
182     }
183     qemu_rbd_unescape(found_str);
184     qdict_put(options, "pool", qstring_from_str(found_str));
185 
186     if (strchr(p, '@')) {
187         found_str = qemu_rbd_next_tok(RBD_MAX_IMAGE_NAME_SIZE, p,
188                                       '@', "object name", &p, &local_err);
189         if (local_err) {
190             goto done;
191         }
192         qemu_rbd_unescape(found_str);
193         qdict_put(options, "image", qstring_from_str(found_str));
194 
195         found_str = qemu_rbd_next_tok(RBD_MAX_SNAP_NAME_SIZE, p,
196                                       ':', "snap name", &p, &local_err);
197         if (local_err) {
198             goto done;
199         }
200         qemu_rbd_unescape(found_str);
201         qdict_put(options, "snapshot", qstring_from_str(found_str));
202     } else {
203         found_str = qemu_rbd_next_tok(RBD_MAX_IMAGE_NAME_SIZE, p,
204                                       ':', "object name", &p, &local_err);
205         if (local_err) {
206             goto done;
207         }
208         qemu_rbd_unescape(found_str);
209         qdict_put(options, "image", qstring_from_str(found_str));
210     }
211     if (!p) {
212         goto done;
213     }
214 
215     found_str = qemu_rbd_next_tok(RBD_MAX_CONF_NAME_SIZE, p,
216                                   '\0', "configuration", &p, &local_err);
217     if (local_err) {
218         goto done;
219     }
220 
221     p = found_str;
222 
223     /* The following are essentially all key/value pairs, and we treat
224      * 'id' and 'conf' a bit special.  Key/value pairs may be in any order. */
225     while (p) {
226         char *name, *value;
227         name = qemu_rbd_next_tok(RBD_MAX_CONF_NAME_SIZE, p,
228                                  '=', "conf option name", &p, &local_err);
229         if (local_err) {
230             break;
231         }
232 
233         if (!p) {
234             error_setg(errp, "conf option %s has no value", name);
235             break;
236         }
237 
238         qemu_rbd_unescape(name);
239 
240         value = qemu_rbd_next_tok(RBD_MAX_CONF_VAL_SIZE, p,
241                                   ':', "conf option value", &p, &local_err);
242         if (local_err) {
243             break;
244         }
245         qemu_rbd_unescape(value);
246 
247         if (!strcmp(name, "conf")) {
248             qdict_put(options, "conf", qstring_from_str(value));
249         } else if (!strcmp(name, "id")) {
250             qdict_put(options, "user" , qstring_from_str(value));
251         } else {
252             /* FIXME: This is pretty ugly, and not the right way to do this.
253              *        These should be contained in a structure, and then
254              *        passed explicitly as individual key/value pairs to
255              *        rados.  Consider this legacy code that needs to be
256              *        updated. */
257             char *tmp = g_malloc0(max_keypair_size);
258             /* only use a delimiter if it is not the first keypair found */
259             /* These are sets of unknown key/value pairs we'll pass along
260              * to ceph */
261             if (keypairs[0]) {
262                 snprintf(tmp, max_keypair_size, ":%s=%s", name, value);
263                 pstrcat(keypairs, max_keypair_size, tmp);
264             } else {
265                 snprintf(keypairs, max_keypair_size, "%s=%s", name, value);
266             }
267             g_free(tmp);
268         }
269     }
270 
271     if (keypairs[0]) {
272         qdict_put(options, "keyvalue-pairs", qstring_from_str(keypairs));
273     }
274 
275 
276 done:
277     if (local_err) {
278         error_propagate(errp, local_err);
279     }
280     g_free(buf);
281     g_free(keypairs);
282     return;
283 }
284 
285 
286 static int qemu_rbd_set_auth(rados_t cluster, const char *secretid,
287                              Error **errp)
288 {
289     if (secretid == 0) {
290         return 0;
291     }
292 
293     gchar *secret = qcrypto_secret_lookup_as_base64(secretid,
294                                                     errp);
295     if (!secret) {
296         return -1;
297     }
298 
299     rados_conf_set(cluster, "key", secret);
300     g_free(secret);
301 
302     return 0;
303 }
304 
305 static int qemu_rbd_set_keypairs(rados_t cluster, const char *keypairs,
306                                  Error **errp)
307 {
308     char *p, *buf;
309     char *name;
310     char *value;
311     Error *local_err = NULL;
312     int ret = 0;
313 
314     buf = g_strdup(keypairs);
315     p = buf;
316 
317     while (p) {
318         name = qemu_rbd_next_tok(RBD_MAX_CONF_NAME_SIZE, p,
319                                  '=', "conf option name", &p, &local_err);
320         if (local_err) {
321             break;
322         }
323 
324         if (!p) {
325             error_setg(errp, "conf option %s has no value", name);
326             ret = -EINVAL;
327             break;
328         }
329 
330         value = qemu_rbd_next_tok(RBD_MAX_CONF_VAL_SIZE, p,
331                                   ':', "conf option value", &p, &local_err);
332         if (local_err) {
333             break;
334         }
335 
336         ret = rados_conf_set(cluster, name, value);
337         if (ret < 0) {
338             error_setg_errno(errp, -ret, "invalid conf option %s", name);
339             ret = -EINVAL;
340             break;
341         }
342     }
343 
344     if (local_err) {
345         error_propagate(errp, local_err);
346         ret = -EINVAL;
347     }
348     g_free(buf);
349     return ret;
350 }
351 
352 static void qemu_rbd_memset(RADOSCB *rcb, int64_t offs)
353 {
354     if (LIBRBD_USE_IOVEC) {
355         RBDAIOCB *acb = rcb->acb;
356         iov_memset(acb->qiov->iov, acb->qiov->niov, offs, 0,
357                    acb->qiov->size - offs);
358     } else {
359         memset(rcb->buf + offs, 0, rcb->size - offs);
360     }
361 }
362 
363 static QemuOptsList runtime_opts = {
364     .name = "rbd",
365     .head = QTAILQ_HEAD_INITIALIZER(runtime_opts.head),
366     .desc = {
367         {
368             .name = "filename",
369             .type = QEMU_OPT_STRING,
370             .help = "Specification of the rbd image",
371         },
372         {
373             .name = "password-secret",
374             .type = QEMU_OPT_STRING,
375             .help = "ID of secret providing the password",
376         },
377         {
378             .name = "conf",
379             .type = QEMU_OPT_STRING,
380             .help = "Rados config file location",
381         },
382         {
383             .name = "pool",
384             .type = QEMU_OPT_STRING,
385             .help = "Rados pool name",
386         },
387         {
388             .name = "image",
389             .type = QEMU_OPT_STRING,
390             .help = "Image name in the pool",
391         },
392         {
393             .name = "snapshot",
394             .type = QEMU_OPT_STRING,
395             .help = "Ceph snapshot name",
396         },
397         {
398             /* maps to 'id' in rados_create() */
399             .name = "user",
400             .type = QEMU_OPT_STRING,
401             .help = "Rados id name",
402         },
403         {
404             .name = "keyvalue-pairs",
405             .type = QEMU_OPT_STRING,
406             .help = "Legacy rados key/value option parameters",
407         },
408         {
409             .name = "host",
410             .type = QEMU_OPT_STRING,
411         },
412         {
413             .name = "port",
414             .type = QEMU_OPT_STRING,
415         },
416         {
417             .name = "auth",
418             .type = QEMU_OPT_STRING,
419             .help = "Supported authentication method, either cephx or none",
420         },
421         { /* end of list */ }
422     },
423 };
424 
425 static int qemu_rbd_create(const char *filename, QemuOpts *opts, Error **errp)
426 {
427     Error *local_err = NULL;
428     int64_t bytes = 0;
429     int64_t objsize;
430     int obj_order = 0;
431     const char *pool, *name, *conf, *clientname, *keypairs;
432     const char *secretid;
433     rados_t cluster;
434     rados_ioctx_t io_ctx;
435     QDict *options = NULL;
436     QemuOpts *rbd_opts = NULL;
437     int ret = 0;
438 
439     secretid = qemu_opt_get(opts, "password-secret");
440 
441     /* Read out options */
442     bytes = ROUND_UP(qemu_opt_get_size_del(opts, BLOCK_OPT_SIZE, 0),
443                      BDRV_SECTOR_SIZE);
444     objsize = qemu_opt_get_size_del(opts, BLOCK_OPT_CLUSTER_SIZE, 0);
445     if (objsize) {
446         if ((objsize - 1) & objsize) {    /* not a power of 2? */
447             error_setg(errp, "obj size needs to be power of 2");
448             ret = -EINVAL;
449             goto exit;
450         }
451         if (objsize < 4096) {
452             error_setg(errp, "obj size too small");
453             ret = -EINVAL;
454             goto exit;
455         }
456         obj_order = ctz32(objsize);
457     }
458 
459     options = qdict_new();
460     qemu_rbd_parse_filename(filename, options, &local_err);
461     if (local_err) {
462         ret = -EINVAL;
463         error_propagate(errp, local_err);
464         goto exit;
465     }
466 
467     rbd_opts = qemu_opts_create(&runtime_opts, NULL, 0, &error_abort);
468     qemu_opts_absorb_qdict(rbd_opts, options, &local_err);
469     if (local_err) {
470         error_propagate(errp, local_err);
471         ret = -EINVAL;
472         goto exit;
473     }
474 
475     pool       = qemu_opt_get(rbd_opts, "pool");
476     conf       = qemu_opt_get(rbd_opts, "conf");
477     clientname = qemu_opt_get(rbd_opts, "user");
478     name       = qemu_opt_get(rbd_opts, "image");
479     keypairs   = qemu_opt_get(rbd_opts, "keyvalue-pairs");
480 
481     ret = rados_create(&cluster, clientname);
482     if (ret < 0) {
483         error_setg_errno(errp, -ret, "error initializing");
484         goto exit;
485     }
486 
487     /* try default location when conf=NULL, but ignore failure */
488     ret = rados_conf_read_file(cluster, conf);
489     if (conf && ret < 0) {
490         error_setg_errno(errp, -ret, "error reading conf file %s", conf);
491         ret = -EIO;
492         goto shutdown;
493     }
494 
495     ret = qemu_rbd_set_keypairs(cluster, keypairs, errp);
496     if (ret < 0) {
497         ret = -EIO;
498         goto shutdown;
499     }
500 
501     if (qemu_rbd_set_auth(cluster, secretid, errp) < 0) {
502         ret = -EIO;
503         goto shutdown;
504     }
505 
506     ret = rados_connect(cluster);
507     if (ret < 0) {
508         error_setg_errno(errp, -ret, "error connecting");
509         goto shutdown;
510     }
511 
512     ret = rados_ioctx_create(cluster, pool, &io_ctx);
513     if (ret < 0) {
514         error_setg_errno(errp, -ret, "error opening pool %s", pool);
515         goto shutdown;
516     }
517 
518     ret = rbd_create(io_ctx, name, bytes, &obj_order);
519     if (ret < 0) {
520         error_setg_errno(errp, -ret, "error rbd create");
521     }
522 
523     rados_ioctx_destroy(io_ctx);
524 
525 shutdown:
526     rados_shutdown(cluster);
527 
528 exit:
529     QDECREF(options);
530     qemu_opts_del(rbd_opts);
531     return ret;
532 }
533 
534 /*
535  * This aio completion is being called from rbd_finish_bh() and runs in qemu
536  * BH context.
537  */
538 static void qemu_rbd_complete_aio(RADOSCB *rcb)
539 {
540     RBDAIOCB *acb = rcb->acb;
541     int64_t r;
542 
543     r = rcb->ret;
544 
545     if (acb->cmd != RBD_AIO_READ) {
546         if (r < 0) {
547             acb->ret = r;
548             acb->error = 1;
549         } else if (!acb->error) {
550             acb->ret = rcb->size;
551         }
552     } else {
553         if (r < 0) {
554             qemu_rbd_memset(rcb, 0);
555             acb->ret = r;
556             acb->error = 1;
557         } else if (r < rcb->size) {
558             qemu_rbd_memset(rcb, r);
559             if (!acb->error) {
560                 acb->ret = rcb->size;
561             }
562         } else if (!acb->error) {
563             acb->ret = r;
564         }
565     }
566 
567     g_free(rcb);
568 
569     if (!LIBRBD_USE_IOVEC) {
570         if (acb->cmd == RBD_AIO_READ) {
571             qemu_iovec_from_buf(acb->qiov, 0, acb->bounce, acb->qiov->size);
572         }
573         qemu_vfree(acb->bounce);
574     }
575 
576     acb->common.cb(acb->common.opaque, (acb->ret > 0 ? 0 : acb->ret));
577 
578     qemu_aio_unref(acb);
579 }
580 
581 #define RBD_MON_HOST          0
582 #define RBD_AUTH_SUPPORTED    1
583 
584 static char *qemu_rbd_array_opts(QDict *options, const char *prefix, int type,
585                                  Error **errp)
586 {
587     int num_entries;
588     QemuOpts *opts = NULL;
589     QDict *sub_options;
590     const char *host;
591     const char *port;
592     char *str;
593     char *rados_str = NULL;
594     Error *local_err = NULL;
595     int i;
596 
597     assert(type == RBD_MON_HOST || type == RBD_AUTH_SUPPORTED);
598 
599     num_entries = qdict_array_entries(options, prefix);
600 
601     if (num_entries < 0) {
602         error_setg(errp, "Parse error on RBD QDict array");
603         return NULL;
604     }
605 
606     for (i = 0; i < num_entries; i++) {
607         char *strbuf = NULL;
608         const char *value;
609         char *rados_str_tmp;
610 
611         str = g_strdup_printf("%s%d.", prefix, i);
612         qdict_extract_subqdict(options, &sub_options, str);
613         g_free(str);
614 
615         opts = qemu_opts_create(&runtime_opts, NULL, 0, &error_abort);
616         qemu_opts_absorb_qdict(opts, sub_options, &local_err);
617         QDECREF(sub_options);
618         if (local_err) {
619             error_propagate(errp, local_err);
620             g_free(rados_str);
621             rados_str = NULL;
622             goto exit;
623         }
624 
625         if (type == RBD_MON_HOST) {
626             host = qemu_opt_get(opts, "host");
627             port = qemu_opt_get(opts, "port");
628 
629             value = host;
630             if (port) {
631                 /* check for ipv6 */
632                 if (strchr(host, ':')) {
633                     strbuf = g_strdup_printf("[%s]:%s", host, port);
634                 } else {
635                     strbuf = g_strdup_printf("%s:%s", host, port);
636                 }
637                 value = strbuf;
638             } else if (strchr(host, ':')) {
639                 strbuf = g_strdup_printf("[%s]", host);
640                 value = strbuf;
641             }
642         } else {
643             value = qemu_opt_get(opts, "auth");
644         }
645 
646 
647         /* each iteration in the for loop will build upon the string, and if
648          * rados_str is NULL then it is our first pass */
649         if (rados_str) {
650             /* separate options with ';', as that  is what rados_conf_set()
651              * requires */
652             rados_str_tmp = rados_str;
653             rados_str = g_strdup_printf("%s;%s", rados_str_tmp, value);
654             g_free(rados_str_tmp);
655         } else {
656             rados_str = g_strdup(value);
657         }
658 
659         g_free(strbuf);
660         qemu_opts_del(opts);
661         opts = NULL;
662     }
663 
664 exit:
665     qemu_opts_del(opts);
666     return rados_str;
667 }
668 
669 static int qemu_rbd_open(BlockDriverState *bs, QDict *options, int flags,
670                          Error **errp)
671 {
672     BDRVRBDState *s = bs->opaque;
673     const char *pool, *snap, *conf, *clientname, *name, *keypairs;
674     const char *secretid;
675     QemuOpts *opts;
676     Error *local_err = NULL;
677     char *mon_host = NULL;
678     char *auth_supported = NULL;
679     int r;
680 
681     opts = qemu_opts_create(&runtime_opts, NULL, 0, &error_abort);
682     qemu_opts_absorb_qdict(opts, options, &local_err);
683     if (local_err) {
684         error_propagate(errp, local_err);
685         qemu_opts_del(opts);
686         return -EINVAL;
687     }
688 
689     auth_supported = qemu_rbd_array_opts(options, "auth-supported.",
690                                          RBD_AUTH_SUPPORTED, &local_err);
691     if (local_err) {
692         error_propagate(errp, local_err);
693         r = -EINVAL;
694         goto failed_opts;
695     }
696 
697     mon_host = qemu_rbd_array_opts(options, "server.",
698                                    RBD_MON_HOST, &local_err);
699     if (local_err) {
700         error_propagate(errp, local_err);
701         r = -EINVAL;
702         goto failed_opts;
703     }
704 
705     secretid = qemu_opt_get(opts, "password-secret");
706 
707     pool           = qemu_opt_get(opts, "pool");
708     conf           = qemu_opt_get(opts, "conf");
709     snap           = qemu_opt_get(opts, "snapshot");
710     clientname     = qemu_opt_get(opts, "user");
711     name           = qemu_opt_get(opts, "image");
712     keypairs       = qemu_opt_get(opts, "keyvalue-pairs");
713 
714     r = rados_create(&s->cluster, clientname);
715     if (r < 0) {
716         error_setg_errno(errp, -r, "error initializing");
717         goto failed_opts;
718     }
719 
720     s->snap = g_strdup(snap);
721     if (name) {
722         pstrcpy(s->name, RBD_MAX_IMAGE_NAME_SIZE, name);
723     }
724 
725     /* try default location when conf=NULL, but ignore failure */
726     r = rados_conf_read_file(s->cluster, conf);
727     if (conf && r < 0) {
728         error_setg_errno(errp, -r, "error reading conf file %s", conf);
729         goto failed_shutdown;
730     }
731 
732     r = qemu_rbd_set_keypairs(s->cluster, keypairs, errp);
733     if (r < 0) {
734         goto failed_shutdown;
735     }
736 
737     if (mon_host) {
738         r = rados_conf_set(s->cluster, "mon_host", mon_host);
739         if (r < 0) {
740             goto failed_shutdown;
741         }
742     }
743 
744     if (auth_supported) {
745         r = rados_conf_set(s->cluster, "auth_supported", auth_supported);
746         if (r < 0) {
747             goto failed_shutdown;
748         }
749     }
750 
751     if (qemu_rbd_set_auth(s->cluster, secretid, errp) < 0) {
752         r = -EIO;
753         goto failed_shutdown;
754     }
755 
756     /*
757      * Fallback to more conservative semantics if setting cache
758      * options fails. Ignore errors from setting rbd_cache because the
759      * only possible error is that the option does not exist, and
760      * librbd defaults to no caching. If write through caching cannot
761      * be set up, fall back to no caching.
762      */
763     if (flags & BDRV_O_NOCACHE) {
764         rados_conf_set(s->cluster, "rbd_cache", "false");
765     } else {
766         rados_conf_set(s->cluster, "rbd_cache", "true");
767     }
768 
769     r = rados_connect(s->cluster);
770     if (r < 0) {
771         error_setg_errno(errp, -r, "error connecting");
772         goto failed_shutdown;
773     }
774 
775     r = rados_ioctx_create(s->cluster, pool, &s->io_ctx);
776     if (r < 0) {
777         error_setg_errno(errp, -r, "error opening pool %s", pool);
778         goto failed_shutdown;
779     }
780 
781     r = rbd_open(s->io_ctx, s->name, &s->image, s->snap);
782     if (r < 0) {
783         error_setg_errno(errp, -r, "error reading header from %s", s->name);
784         goto failed_open;
785     }
786 
787     bs->read_only = (s->snap != NULL);
788 
789     qemu_opts_del(opts);
790     return 0;
791 
792 failed_open:
793     rados_ioctx_destroy(s->io_ctx);
794 failed_shutdown:
795     rados_shutdown(s->cluster);
796     g_free(s->snap);
797 failed_opts:
798     qemu_opts_del(opts);
799     g_free(mon_host);
800     g_free(auth_supported);
801     return r;
802 }
803 
804 static void qemu_rbd_close(BlockDriverState *bs)
805 {
806     BDRVRBDState *s = bs->opaque;
807 
808     rbd_close(s->image);
809     rados_ioctx_destroy(s->io_ctx);
810     g_free(s->snap);
811     rados_shutdown(s->cluster);
812 }
813 
814 static const AIOCBInfo rbd_aiocb_info = {
815     .aiocb_size = sizeof(RBDAIOCB),
816 };
817 
818 static void rbd_finish_bh(void *opaque)
819 {
820     RADOSCB *rcb = opaque;
821     qemu_rbd_complete_aio(rcb);
822 }
823 
824 /*
825  * This is the callback function for rbd_aio_read and _write
826  *
827  * Note: this function is being called from a non qemu thread so
828  * we need to be careful about what we do here. Generally we only
829  * schedule a BH, and do the rest of the io completion handling
830  * from rbd_finish_bh() which runs in a qemu context.
831  */
832 static void rbd_finish_aiocb(rbd_completion_t c, RADOSCB *rcb)
833 {
834     RBDAIOCB *acb = rcb->acb;
835 
836     rcb->ret = rbd_aio_get_return_value(c);
837     rbd_aio_release(c);
838 
839     aio_bh_schedule_oneshot(bdrv_get_aio_context(acb->common.bs),
840                             rbd_finish_bh, rcb);
841 }
842 
843 static int rbd_aio_discard_wrapper(rbd_image_t image,
844                                    uint64_t off,
845                                    uint64_t len,
846                                    rbd_completion_t comp)
847 {
848 #ifdef LIBRBD_SUPPORTS_DISCARD
849     return rbd_aio_discard(image, off, len, comp);
850 #else
851     return -ENOTSUP;
852 #endif
853 }
854 
855 static int rbd_aio_flush_wrapper(rbd_image_t image,
856                                  rbd_completion_t comp)
857 {
858 #ifdef LIBRBD_SUPPORTS_AIO_FLUSH
859     return rbd_aio_flush(image, comp);
860 #else
861     return -ENOTSUP;
862 #endif
863 }
864 
865 static BlockAIOCB *rbd_start_aio(BlockDriverState *bs,
866                                  int64_t off,
867                                  QEMUIOVector *qiov,
868                                  int64_t size,
869                                  BlockCompletionFunc *cb,
870                                  void *opaque,
871                                  RBDAIOCmd cmd)
872 {
873     RBDAIOCB *acb;
874     RADOSCB *rcb = NULL;
875     rbd_completion_t c;
876     int r;
877 
878     BDRVRBDState *s = bs->opaque;
879 
880     acb = qemu_aio_get(&rbd_aiocb_info, bs, cb, opaque);
881     acb->cmd = cmd;
882     acb->qiov = qiov;
883     assert(!qiov || qiov->size == size);
884 
885     rcb = g_new(RADOSCB, 1);
886 
887     if (!LIBRBD_USE_IOVEC) {
888         if (cmd == RBD_AIO_DISCARD || cmd == RBD_AIO_FLUSH) {
889             acb->bounce = NULL;
890         } else {
891             acb->bounce = qemu_try_blockalign(bs, qiov->size);
892             if (acb->bounce == NULL) {
893                 goto failed;
894             }
895         }
896         if (cmd == RBD_AIO_WRITE) {
897             qemu_iovec_to_buf(acb->qiov, 0, acb->bounce, qiov->size);
898         }
899         rcb->buf = acb->bounce;
900     }
901 
902     acb->ret = 0;
903     acb->error = 0;
904     acb->s = s;
905 
906     rcb->acb = acb;
907     rcb->s = acb->s;
908     rcb->size = size;
909     r = rbd_aio_create_completion(rcb, (rbd_callback_t) rbd_finish_aiocb, &c);
910     if (r < 0) {
911         goto failed;
912     }
913 
914     switch (cmd) {
915     case RBD_AIO_WRITE:
916 #ifdef LIBRBD_SUPPORTS_IOVEC
917             r = rbd_aio_writev(s->image, qiov->iov, qiov->niov, off, c);
918 #else
919             r = rbd_aio_write(s->image, off, size, rcb->buf, c);
920 #endif
921         break;
922     case RBD_AIO_READ:
923 #ifdef LIBRBD_SUPPORTS_IOVEC
924             r = rbd_aio_readv(s->image, qiov->iov, qiov->niov, off, c);
925 #else
926             r = rbd_aio_read(s->image, off, size, rcb->buf, c);
927 #endif
928         break;
929     case RBD_AIO_DISCARD:
930         r = rbd_aio_discard_wrapper(s->image, off, size, c);
931         break;
932     case RBD_AIO_FLUSH:
933         r = rbd_aio_flush_wrapper(s->image, c);
934         break;
935     default:
936         r = -EINVAL;
937     }
938 
939     if (r < 0) {
940         goto failed_completion;
941     }
942     return &acb->common;
943 
944 failed_completion:
945     rbd_aio_release(c);
946 failed:
947     g_free(rcb);
948     if (!LIBRBD_USE_IOVEC) {
949         qemu_vfree(acb->bounce);
950     }
951 
952     qemu_aio_unref(acb);
953     return NULL;
954 }
955 
956 static BlockAIOCB *qemu_rbd_aio_readv(BlockDriverState *bs,
957                                       int64_t sector_num,
958                                       QEMUIOVector *qiov,
959                                       int nb_sectors,
960                                       BlockCompletionFunc *cb,
961                                       void *opaque)
962 {
963     return rbd_start_aio(bs, sector_num << BDRV_SECTOR_BITS, qiov,
964                          (int64_t) nb_sectors << BDRV_SECTOR_BITS, cb, opaque,
965                          RBD_AIO_READ);
966 }
967 
968 static BlockAIOCB *qemu_rbd_aio_writev(BlockDriverState *bs,
969                                        int64_t sector_num,
970                                        QEMUIOVector *qiov,
971                                        int nb_sectors,
972                                        BlockCompletionFunc *cb,
973                                        void *opaque)
974 {
975     return rbd_start_aio(bs, sector_num << BDRV_SECTOR_BITS, qiov,
976                          (int64_t) nb_sectors << BDRV_SECTOR_BITS, cb, opaque,
977                          RBD_AIO_WRITE);
978 }
979 
980 #ifdef LIBRBD_SUPPORTS_AIO_FLUSH
981 static BlockAIOCB *qemu_rbd_aio_flush(BlockDriverState *bs,
982                                       BlockCompletionFunc *cb,
983                                       void *opaque)
984 {
985     return rbd_start_aio(bs, 0, NULL, 0, cb, opaque, RBD_AIO_FLUSH);
986 }
987 
988 #else
989 
990 static int qemu_rbd_co_flush(BlockDriverState *bs)
991 {
992 #if LIBRBD_VERSION_CODE >= LIBRBD_VERSION(0, 1, 1)
993     /* rbd_flush added in 0.1.1 */
994     BDRVRBDState *s = bs->opaque;
995     return rbd_flush(s->image);
996 #else
997     return 0;
998 #endif
999 }
1000 #endif
1001 
1002 static int qemu_rbd_getinfo(BlockDriverState *bs, BlockDriverInfo *bdi)
1003 {
1004     BDRVRBDState *s = bs->opaque;
1005     rbd_image_info_t info;
1006     int r;
1007 
1008     r = rbd_stat(s->image, &info, sizeof(info));
1009     if (r < 0) {
1010         return r;
1011     }
1012 
1013     bdi->cluster_size = info.obj_size;
1014     return 0;
1015 }
1016 
1017 static int64_t qemu_rbd_getlength(BlockDriverState *bs)
1018 {
1019     BDRVRBDState *s = bs->opaque;
1020     rbd_image_info_t info;
1021     int r;
1022 
1023     r = rbd_stat(s->image, &info, sizeof(info));
1024     if (r < 0) {
1025         return r;
1026     }
1027 
1028     return info.size;
1029 }
1030 
1031 static int qemu_rbd_truncate(BlockDriverState *bs, int64_t offset)
1032 {
1033     BDRVRBDState *s = bs->opaque;
1034     int r;
1035 
1036     r = rbd_resize(s->image, offset);
1037     if (r < 0) {
1038         return r;
1039     }
1040 
1041     return 0;
1042 }
1043 
1044 static int qemu_rbd_snap_create(BlockDriverState *bs,
1045                                 QEMUSnapshotInfo *sn_info)
1046 {
1047     BDRVRBDState *s = bs->opaque;
1048     int r;
1049 
1050     if (sn_info->name[0] == '\0') {
1051         return -EINVAL; /* we need a name for rbd snapshots */
1052     }
1053 
1054     /*
1055      * rbd snapshots are using the name as the user controlled unique identifier
1056      * we can't use the rbd snapid for that purpose, as it can't be set
1057      */
1058     if (sn_info->id_str[0] != '\0' &&
1059         strcmp(sn_info->id_str, sn_info->name) != 0) {
1060         return -EINVAL;
1061     }
1062 
1063     if (strlen(sn_info->name) >= sizeof(sn_info->id_str)) {
1064         return -ERANGE;
1065     }
1066 
1067     r = rbd_snap_create(s->image, sn_info->name);
1068     if (r < 0) {
1069         error_report("failed to create snap: %s", strerror(-r));
1070         return r;
1071     }
1072 
1073     return 0;
1074 }
1075 
1076 static int qemu_rbd_snap_remove(BlockDriverState *bs,
1077                                 const char *snapshot_id,
1078                                 const char *snapshot_name,
1079                                 Error **errp)
1080 {
1081     BDRVRBDState *s = bs->opaque;
1082     int r;
1083 
1084     if (!snapshot_name) {
1085         error_setg(errp, "rbd need a valid snapshot name");
1086         return -EINVAL;
1087     }
1088 
1089     /* If snapshot_id is specified, it must be equal to name, see
1090        qemu_rbd_snap_list() */
1091     if (snapshot_id && strcmp(snapshot_id, snapshot_name)) {
1092         error_setg(errp,
1093                    "rbd do not support snapshot id, it should be NULL or "
1094                    "equal to snapshot name");
1095         return -EINVAL;
1096     }
1097 
1098     r = rbd_snap_remove(s->image, snapshot_name);
1099     if (r < 0) {
1100         error_setg_errno(errp, -r, "Failed to remove the snapshot");
1101     }
1102     return r;
1103 }
1104 
1105 static int qemu_rbd_snap_rollback(BlockDriverState *bs,
1106                                   const char *snapshot_name)
1107 {
1108     BDRVRBDState *s = bs->opaque;
1109 
1110     return rbd_snap_rollback(s->image, snapshot_name);
1111 }
1112 
1113 static int qemu_rbd_snap_list(BlockDriverState *bs,
1114                               QEMUSnapshotInfo **psn_tab)
1115 {
1116     BDRVRBDState *s = bs->opaque;
1117     QEMUSnapshotInfo *sn_info, *sn_tab = NULL;
1118     int i, snap_count;
1119     rbd_snap_info_t *snaps;
1120     int max_snaps = RBD_MAX_SNAPS;
1121 
1122     do {
1123         snaps = g_new(rbd_snap_info_t, max_snaps);
1124         snap_count = rbd_snap_list(s->image, snaps, &max_snaps);
1125         if (snap_count <= 0) {
1126             g_free(snaps);
1127         }
1128     } while (snap_count == -ERANGE);
1129 
1130     if (snap_count <= 0) {
1131         goto done;
1132     }
1133 
1134     sn_tab = g_new0(QEMUSnapshotInfo, snap_count);
1135 
1136     for (i = 0; i < snap_count; i++) {
1137         const char *snap_name = snaps[i].name;
1138 
1139         sn_info = sn_tab + i;
1140         pstrcpy(sn_info->id_str, sizeof(sn_info->id_str), snap_name);
1141         pstrcpy(sn_info->name, sizeof(sn_info->name), snap_name);
1142 
1143         sn_info->vm_state_size = snaps[i].size;
1144         sn_info->date_sec = 0;
1145         sn_info->date_nsec = 0;
1146         sn_info->vm_clock_nsec = 0;
1147     }
1148     rbd_snap_list_end(snaps);
1149     g_free(snaps);
1150 
1151  done:
1152     *psn_tab = sn_tab;
1153     return snap_count;
1154 }
1155 
1156 #ifdef LIBRBD_SUPPORTS_DISCARD
1157 static BlockAIOCB *qemu_rbd_aio_pdiscard(BlockDriverState *bs,
1158                                          int64_t offset,
1159                                          int count,
1160                                          BlockCompletionFunc *cb,
1161                                          void *opaque)
1162 {
1163     return rbd_start_aio(bs, offset, NULL, count, cb, opaque,
1164                          RBD_AIO_DISCARD);
1165 }
1166 #endif
1167 
1168 #ifdef LIBRBD_SUPPORTS_INVALIDATE
1169 static void qemu_rbd_invalidate_cache(BlockDriverState *bs,
1170                                       Error **errp)
1171 {
1172     BDRVRBDState *s = bs->opaque;
1173     int r = rbd_invalidate_cache(s->image);
1174     if (r < 0) {
1175         error_setg_errno(errp, -r, "Failed to invalidate the cache");
1176     }
1177 }
1178 #endif
1179 
1180 static QemuOptsList qemu_rbd_create_opts = {
1181     .name = "rbd-create-opts",
1182     .head = QTAILQ_HEAD_INITIALIZER(qemu_rbd_create_opts.head),
1183     .desc = {
1184         {
1185             .name = BLOCK_OPT_SIZE,
1186             .type = QEMU_OPT_SIZE,
1187             .help = "Virtual disk size"
1188         },
1189         {
1190             .name = BLOCK_OPT_CLUSTER_SIZE,
1191             .type = QEMU_OPT_SIZE,
1192             .help = "RBD object size"
1193         },
1194         {
1195             .name = "password-secret",
1196             .type = QEMU_OPT_STRING,
1197             .help = "ID of secret providing the password",
1198         },
1199         { /* end of list */ }
1200     }
1201 };
1202 
1203 static BlockDriver bdrv_rbd = {
1204     .format_name            = "rbd",
1205     .instance_size          = sizeof(BDRVRBDState),
1206     .bdrv_parse_filename    = qemu_rbd_parse_filename,
1207     .bdrv_file_open         = qemu_rbd_open,
1208     .bdrv_close             = qemu_rbd_close,
1209     .bdrv_create            = qemu_rbd_create,
1210     .bdrv_has_zero_init     = bdrv_has_zero_init_1,
1211     .bdrv_get_info          = qemu_rbd_getinfo,
1212     .create_opts            = &qemu_rbd_create_opts,
1213     .bdrv_getlength         = qemu_rbd_getlength,
1214     .bdrv_truncate          = qemu_rbd_truncate,
1215     .protocol_name          = "rbd",
1216 
1217     .bdrv_aio_readv         = qemu_rbd_aio_readv,
1218     .bdrv_aio_writev        = qemu_rbd_aio_writev,
1219 
1220 #ifdef LIBRBD_SUPPORTS_AIO_FLUSH
1221     .bdrv_aio_flush         = qemu_rbd_aio_flush,
1222 #else
1223     .bdrv_co_flush_to_disk  = qemu_rbd_co_flush,
1224 #endif
1225 
1226 #ifdef LIBRBD_SUPPORTS_DISCARD
1227     .bdrv_aio_pdiscard      = qemu_rbd_aio_pdiscard,
1228 #endif
1229 
1230     .bdrv_snapshot_create   = qemu_rbd_snap_create,
1231     .bdrv_snapshot_delete   = qemu_rbd_snap_remove,
1232     .bdrv_snapshot_list     = qemu_rbd_snap_list,
1233     .bdrv_snapshot_goto     = qemu_rbd_snap_rollback,
1234 #ifdef LIBRBD_SUPPORTS_INVALIDATE
1235     .bdrv_invalidate_cache  = qemu_rbd_invalidate_cache,
1236 #endif
1237 };
1238 
1239 static void bdrv_rbd_init(void)
1240 {
1241     bdrv_register(&bdrv_rbd);
1242 }
1243 
1244 block_init(bdrv_rbd_init);
1245