xref: /openbmc/qemu/block/rbd.c (revision 776574d6)
1 /*
2  * QEMU Block driver for RADOS (Ceph)
3  *
4  * Copyright (C) 2010-2011 Christian Brunner <chb@muc.de>,
5  *                         Josh Durgin <josh.durgin@dreamhost.com>
6  *
7  * This work is licensed under the terms of the GNU GPL, version 2.  See
8  * the COPYING file in the top-level directory.
9  *
10  */
11 
12 #include <inttypes.h>
13 
14 #include "qemu-common.h"
15 #include "qemu-error.h"
16 #include "block_int.h"
17 
18 #include <rbd/librbd.h>
19 
20 /*
21  * When specifying the image filename use:
22  *
23  * rbd:poolname/devicename[@snapshotname][:option1=value1[:option2=value2...]]
24  *
25  * poolname must be the name of an existing rados pool.
26  *
27  * devicename is the name of the rbd image.
28  *
29  * Each option given is used to configure rados, and may be any valid
30  * Ceph option, "id", or "conf".
31  *
32  * The "id" option indicates what user we should authenticate as to
33  * the Ceph cluster.  If it is excluded we will use the Ceph default
34  * (normally 'admin').
35  *
36  * The "conf" option specifies a Ceph configuration file to read.  If
37  * it is not specified, we will read from the default Ceph locations
38  * (e.g., /etc/ceph/ceph.conf).  To avoid reading _any_ configuration
39  * file, specify conf=/dev/null.
40  *
41  * Configuration values containing :, @, or = can be escaped with a
42  * leading "\".
43  */
44 
45 #define OBJ_MAX_SIZE (1UL << OBJ_DEFAULT_OBJ_ORDER)
46 
47 #define RBD_MAX_CONF_NAME_SIZE 128
48 #define RBD_MAX_CONF_VAL_SIZE 512
49 #define RBD_MAX_CONF_SIZE 1024
50 #define RBD_MAX_POOL_NAME_SIZE 128
51 #define RBD_MAX_SNAP_NAME_SIZE 128
52 #define RBD_MAX_SNAPS 100
53 
54 typedef struct RBDAIOCB {
55     BlockDriverAIOCB common;
56     QEMUBH *bh;
57     int ret;
58     QEMUIOVector *qiov;
59     char *bounce;
60     int write;
61     int64_t sector_num;
62     int error;
63     struct BDRVRBDState *s;
64     int cancelled;
65 } RBDAIOCB;
66 
67 typedef struct RADOSCB {
68     int rcbid;
69     RBDAIOCB *acb;
70     struct BDRVRBDState *s;
71     int done;
72     int64_t size;
73     char *buf;
74     int ret;
75 } RADOSCB;
76 
77 #define RBD_FD_READ 0
78 #define RBD_FD_WRITE 1
79 
80 typedef struct BDRVRBDState {
81     int fds[2];
82     rados_t cluster;
83     rados_ioctx_t io_ctx;
84     rbd_image_t image;
85     char name[RBD_MAX_IMAGE_NAME_SIZE];
86     int qemu_aio_count;
87     char *snap;
88     int event_reader_pos;
89     RADOSCB *event_rcb;
90 } BDRVRBDState;
91 
92 static void rbd_aio_bh_cb(void *opaque);
93 
94 static int qemu_rbd_next_tok(char *dst, int dst_len,
95                              char *src, char delim,
96                              const char *name,
97                              char **p)
98 {
99     int l;
100     char *end;
101 
102     *p = NULL;
103 
104     if (delim != '\0') {
105         for (end = src; *end; ++end) {
106             if (*end == delim) {
107                 break;
108             }
109             if (*end == '\\' && end[1] != '\0') {
110                 end++;
111             }
112         }
113         if (*end == delim) {
114             *p = end + 1;
115             *end = '\0';
116         }
117     }
118     l = strlen(src);
119     if (l >= dst_len) {
120         error_report("%s too long", name);
121         return -EINVAL;
122     } else if (l == 0) {
123         error_report("%s too short", name);
124         return -EINVAL;
125     }
126 
127     pstrcpy(dst, dst_len, src);
128 
129     return 0;
130 }
131 
132 static void qemu_rbd_unescape(char *src)
133 {
134     char *p;
135 
136     for (p = src; *src; ++src, ++p) {
137         if (*src == '\\' && src[1] != '\0') {
138             src++;
139         }
140         *p = *src;
141     }
142     *p = '\0';
143 }
144 
145 static int qemu_rbd_parsename(const char *filename,
146                               char *pool, int pool_len,
147                               char *snap, int snap_len,
148                               char *name, int name_len,
149                               char *conf, int conf_len)
150 {
151     const char *start;
152     char *p, *buf;
153     int ret;
154 
155     if (!strstart(filename, "rbd:", &start)) {
156         return -EINVAL;
157     }
158 
159     buf = g_strdup(start);
160     p = buf;
161     *snap = '\0';
162     *conf = '\0';
163 
164     ret = qemu_rbd_next_tok(pool, pool_len, p, '/', "pool name", &p);
165     if (ret < 0 || !p) {
166         ret = -EINVAL;
167         goto done;
168     }
169     qemu_rbd_unescape(pool);
170 
171     if (strchr(p, '@')) {
172         ret = qemu_rbd_next_tok(name, name_len, p, '@', "object name", &p);
173         if (ret < 0) {
174             goto done;
175         }
176         ret = qemu_rbd_next_tok(snap, snap_len, p, ':', "snap name", &p);
177         qemu_rbd_unescape(snap);
178     } else {
179         ret = qemu_rbd_next_tok(name, name_len, p, ':', "object name", &p);
180     }
181     qemu_rbd_unescape(name);
182     if (ret < 0 || !p) {
183         goto done;
184     }
185 
186     ret = qemu_rbd_next_tok(conf, conf_len, p, '\0', "configuration", &p);
187 
188 done:
189     g_free(buf);
190     return ret;
191 }
192 
193 static char *qemu_rbd_parse_clientname(const char *conf, char *clientname)
194 {
195     const char *p = conf;
196 
197     while (*p) {
198         int len;
199         const char *end = strchr(p, ':');
200 
201         if (end) {
202             len = end - p;
203         } else {
204             len = strlen(p);
205         }
206 
207         if (strncmp(p, "id=", 3) == 0) {
208             len -= 3;
209             strncpy(clientname, p + 3, len);
210             clientname[len] = '\0';
211             return clientname;
212         }
213         if (end == NULL) {
214             break;
215         }
216         p = end + 1;
217     }
218     return NULL;
219 }
220 
221 static int qemu_rbd_set_conf(rados_t cluster, const char *conf)
222 {
223     char *p, *buf;
224     char name[RBD_MAX_CONF_NAME_SIZE];
225     char value[RBD_MAX_CONF_VAL_SIZE];
226     int ret = 0;
227 
228     buf = g_strdup(conf);
229     p = buf;
230 
231     while (p) {
232         ret = qemu_rbd_next_tok(name, sizeof(name), p,
233                                 '=', "conf option name", &p);
234         if (ret < 0) {
235             break;
236         }
237         qemu_rbd_unescape(name);
238 
239         if (!p) {
240             error_report("conf option %s has no value", name);
241             ret = -EINVAL;
242             break;
243         }
244 
245         ret = qemu_rbd_next_tok(value, sizeof(value), p,
246                                 ':', "conf option value", &p);
247         if (ret < 0) {
248             break;
249         }
250         qemu_rbd_unescape(value);
251 
252         if (strcmp(name, "conf") == 0) {
253             ret = rados_conf_read_file(cluster, value);
254             if (ret < 0) {
255                 error_report("error reading conf file %s", value);
256                 break;
257             }
258         } else if (strcmp(name, "id") == 0) {
259             /* ignore, this is parsed by qemu_rbd_parse_clientname() */
260         } else {
261             ret = rados_conf_set(cluster, name, value);
262             if (ret < 0) {
263                 error_report("invalid conf option %s", name);
264                 ret = -EINVAL;
265                 break;
266             }
267         }
268     }
269 
270     g_free(buf);
271     return ret;
272 }
273 
274 static int qemu_rbd_create(const char *filename, QEMUOptionParameter *options)
275 {
276     int64_t bytes = 0;
277     int64_t objsize;
278     int obj_order = 0;
279     char pool[RBD_MAX_POOL_NAME_SIZE];
280     char name[RBD_MAX_IMAGE_NAME_SIZE];
281     char snap_buf[RBD_MAX_SNAP_NAME_SIZE];
282     char conf[RBD_MAX_CONF_SIZE];
283     char clientname_buf[RBD_MAX_CONF_SIZE];
284     char *clientname;
285     rados_t cluster;
286     rados_ioctx_t io_ctx;
287     int ret;
288 
289     if (qemu_rbd_parsename(filename, pool, sizeof(pool),
290                            snap_buf, sizeof(snap_buf),
291                            name, sizeof(name),
292                            conf, sizeof(conf)) < 0) {
293         return -EINVAL;
294     }
295 
296     /* Read out options */
297     while (options && options->name) {
298         if (!strcmp(options->name, BLOCK_OPT_SIZE)) {
299             bytes = options->value.n;
300         } else if (!strcmp(options->name, BLOCK_OPT_CLUSTER_SIZE)) {
301             if (options->value.n) {
302                 objsize = options->value.n;
303                 if ((objsize - 1) & objsize) {    /* not a power of 2? */
304                     error_report("obj size needs to be power of 2");
305                     return -EINVAL;
306                 }
307                 if (objsize < 4096) {
308                     error_report("obj size too small");
309                     return -EINVAL;
310                 }
311                 obj_order = ffs(objsize) - 1;
312             }
313         }
314         options++;
315     }
316 
317     clientname = qemu_rbd_parse_clientname(conf, clientname_buf);
318     if (rados_create(&cluster, clientname) < 0) {
319         error_report("error initializing");
320         return -EIO;
321     }
322 
323     if (strstr(conf, "conf=") == NULL) {
324         /* try default location, but ignore failure */
325         rados_conf_read_file(cluster, NULL);
326     }
327 
328     if (conf[0] != '\0' &&
329         qemu_rbd_set_conf(cluster, conf) < 0) {
330         error_report("error setting config options");
331         rados_shutdown(cluster);
332         return -EIO;
333     }
334 
335     if (rados_connect(cluster) < 0) {
336         error_report("error connecting");
337         rados_shutdown(cluster);
338         return -EIO;
339     }
340 
341     if (rados_ioctx_create(cluster, pool, &io_ctx) < 0) {
342         error_report("error opening pool %s", pool);
343         rados_shutdown(cluster);
344         return -EIO;
345     }
346 
347     ret = rbd_create(io_ctx, name, bytes, &obj_order);
348     rados_ioctx_destroy(io_ctx);
349     rados_shutdown(cluster);
350 
351     return ret;
352 }
353 
354 /*
355  * This aio completion is being called from qemu_rbd_aio_event_reader()
356  * and runs in qemu context. It schedules a bh, but just in case the aio
357  * was not cancelled before.
358  */
359 static void qemu_rbd_complete_aio(RADOSCB *rcb)
360 {
361     RBDAIOCB *acb = rcb->acb;
362     int64_t r;
363 
364     if (acb->cancelled) {
365         qemu_vfree(acb->bounce);
366         qemu_aio_release(acb);
367         goto done;
368     }
369 
370     r = rcb->ret;
371 
372     if (acb->write) {
373         if (r < 0) {
374             acb->ret = r;
375             acb->error = 1;
376         } else if (!acb->error) {
377             acb->ret = rcb->size;
378         }
379     } else {
380         if (r < 0) {
381             memset(rcb->buf, 0, rcb->size);
382             acb->ret = r;
383             acb->error = 1;
384         } else if (r < rcb->size) {
385             memset(rcb->buf + r, 0, rcb->size - r);
386             if (!acb->error) {
387                 acb->ret = rcb->size;
388             }
389         } else if (!acb->error) {
390             acb->ret = r;
391         }
392     }
393     /* Note that acb->bh can be NULL in case where the aio was cancelled */
394     acb->bh = qemu_bh_new(rbd_aio_bh_cb, acb);
395     qemu_bh_schedule(acb->bh);
396 done:
397     g_free(rcb);
398 }
399 
400 /*
401  * aio fd read handler. It runs in the qemu context and calls the
402  * completion handling of completed rados aio operations.
403  */
404 static void qemu_rbd_aio_event_reader(void *opaque)
405 {
406     BDRVRBDState *s = opaque;
407 
408     ssize_t ret;
409 
410     do {
411         char *p = (char *)&s->event_rcb;
412 
413         /* now read the rcb pointer that was sent from a non qemu thread */
414         ret = read(s->fds[RBD_FD_READ], p + s->event_reader_pos,
415                    sizeof(s->event_rcb) - s->event_reader_pos);
416         if (ret > 0) {
417             s->event_reader_pos += ret;
418             if (s->event_reader_pos == sizeof(s->event_rcb)) {
419                 s->event_reader_pos = 0;
420                 qemu_rbd_complete_aio(s->event_rcb);
421                 s->qemu_aio_count--;
422             }
423         }
424     } while (ret < 0 && errno == EINTR);
425 }
426 
427 static int qemu_rbd_aio_flush_cb(void *opaque)
428 {
429     BDRVRBDState *s = opaque;
430 
431     return (s->qemu_aio_count > 0);
432 }
433 
434 static int qemu_rbd_open(BlockDriverState *bs, const char *filename, int flags)
435 {
436     BDRVRBDState *s = bs->opaque;
437     char pool[RBD_MAX_POOL_NAME_SIZE];
438     char snap_buf[RBD_MAX_SNAP_NAME_SIZE];
439     char conf[RBD_MAX_CONF_SIZE];
440     char clientname_buf[RBD_MAX_CONF_SIZE];
441     char *clientname;
442     int r;
443 
444     if (qemu_rbd_parsename(filename, pool, sizeof(pool),
445                            snap_buf, sizeof(snap_buf),
446                            s->name, sizeof(s->name),
447                            conf, sizeof(conf)) < 0) {
448         return -EINVAL;
449     }
450 
451     clientname = qemu_rbd_parse_clientname(conf, clientname_buf);
452     r = rados_create(&s->cluster, clientname);
453     if (r < 0) {
454         error_report("error initializing");
455         return r;
456     }
457 
458     s->snap = NULL;
459     if (snap_buf[0] != '\0') {
460         s->snap = g_strdup(snap_buf);
461     }
462 
463     if (strstr(conf, "conf=") == NULL) {
464         /* try default location, but ignore failure */
465         rados_conf_read_file(s->cluster, NULL);
466     }
467 
468     if (conf[0] != '\0') {
469         r = qemu_rbd_set_conf(s->cluster, conf);
470         if (r < 0) {
471             error_report("error setting config options");
472             goto failed_shutdown;
473         }
474     }
475 
476     r = rados_connect(s->cluster);
477     if (r < 0) {
478         error_report("error connecting");
479         goto failed_shutdown;
480     }
481 
482     r = rados_ioctx_create(s->cluster, pool, &s->io_ctx);
483     if (r < 0) {
484         error_report("error opening pool %s", pool);
485         goto failed_shutdown;
486     }
487 
488     r = rbd_open(s->io_ctx, s->name, &s->image, s->snap);
489     if (r < 0) {
490         error_report("error reading header from %s", s->name);
491         goto failed_open;
492     }
493 
494     bs->read_only = (s->snap != NULL);
495 
496     s->event_reader_pos = 0;
497     r = qemu_pipe(s->fds);
498     if (r < 0) {
499         error_report("error opening eventfd");
500         goto failed;
501     }
502     fcntl(s->fds[0], F_SETFL, O_NONBLOCK);
503     fcntl(s->fds[1], F_SETFL, O_NONBLOCK);
504     qemu_aio_set_fd_handler(s->fds[RBD_FD_READ], qemu_rbd_aio_event_reader,
505                             NULL, qemu_rbd_aio_flush_cb, NULL, s);
506 
507 
508     return 0;
509 
510 failed:
511     rbd_close(s->image);
512 failed_open:
513     rados_ioctx_destroy(s->io_ctx);
514 failed_shutdown:
515     rados_shutdown(s->cluster);
516     g_free(s->snap);
517     return r;
518 }
519 
520 static void qemu_rbd_close(BlockDriverState *bs)
521 {
522     BDRVRBDState *s = bs->opaque;
523 
524     close(s->fds[0]);
525     close(s->fds[1]);
526     qemu_aio_set_fd_handler(s->fds[RBD_FD_READ], NULL , NULL, NULL, NULL,
527         NULL);
528 
529     rbd_close(s->image);
530     rados_ioctx_destroy(s->io_ctx);
531     g_free(s->snap);
532     rados_shutdown(s->cluster);
533 }
534 
535 /*
536  * Cancel aio. Since we don't reference acb in a non qemu threads,
537  * it is safe to access it here.
538  */
539 static void qemu_rbd_aio_cancel(BlockDriverAIOCB *blockacb)
540 {
541     RBDAIOCB *acb = (RBDAIOCB *) blockacb;
542     acb->cancelled = 1;
543 }
544 
545 static AIOPool rbd_aio_pool = {
546     .aiocb_size = sizeof(RBDAIOCB),
547     .cancel = qemu_rbd_aio_cancel,
548 };
549 
550 static int qemu_rbd_send_pipe(BDRVRBDState *s, RADOSCB *rcb)
551 {
552     int ret = 0;
553     while (1) {
554         fd_set wfd;
555         int fd = s->fds[RBD_FD_WRITE];
556 
557         /* send the op pointer to the qemu thread that is responsible
558            for the aio/op completion. Must do it in a qemu thread context */
559         ret = write(fd, (void *)&rcb, sizeof(rcb));
560         if (ret >= 0) {
561             break;
562         }
563         if (errno == EINTR) {
564             continue;
565         }
566         if (errno != EAGAIN) {
567             break;
568         }
569 
570         FD_ZERO(&wfd);
571         FD_SET(fd, &wfd);
572         do {
573             ret = select(fd + 1, NULL, &wfd, NULL, NULL);
574         } while (ret < 0 && errno == EINTR);
575     }
576 
577     return ret;
578 }
579 
580 /*
581  * This is the callback function for rbd_aio_read and _write
582  *
583  * Note: this function is being called from a non qemu thread so
584  * we need to be careful about what we do here. Generally we only
585  * write to the block notification pipe, and do the rest of the
586  * io completion handling from qemu_rbd_aio_event_reader() which
587  * runs in a qemu context.
588  */
589 static void rbd_finish_aiocb(rbd_completion_t c, RADOSCB *rcb)
590 {
591     int ret;
592     rcb->ret = rbd_aio_get_return_value(c);
593     rbd_aio_release(c);
594     ret = qemu_rbd_send_pipe(rcb->s, rcb);
595     if (ret < 0) {
596         error_report("failed writing to acb->s->fds");
597         g_free(rcb);
598     }
599 }
600 
601 /* Callback when all queued rbd_aio requests are complete */
602 
603 static void rbd_aio_bh_cb(void *opaque)
604 {
605     RBDAIOCB *acb = opaque;
606 
607     if (!acb->write) {
608         qemu_iovec_from_buffer(acb->qiov, acb->bounce, acb->qiov->size);
609     }
610     qemu_vfree(acb->bounce);
611     acb->common.cb(acb->common.opaque, (acb->ret > 0 ? 0 : acb->ret));
612     qemu_bh_delete(acb->bh);
613     acb->bh = NULL;
614 
615     qemu_aio_release(acb);
616 }
617 
618 static BlockDriverAIOCB *rbd_aio_rw_vector(BlockDriverState *bs,
619                                            int64_t sector_num,
620                                            QEMUIOVector *qiov,
621                                            int nb_sectors,
622                                            BlockDriverCompletionFunc *cb,
623                                            void *opaque, int write)
624 {
625     RBDAIOCB *acb;
626     RADOSCB *rcb;
627     rbd_completion_t c;
628     int64_t off, size;
629     char *buf;
630     int r;
631 
632     BDRVRBDState *s = bs->opaque;
633 
634     acb = qemu_aio_get(&rbd_aio_pool, bs, cb, opaque);
635     if (!acb) {
636         return NULL;
637     }
638     acb->write = write;
639     acb->qiov = qiov;
640     acb->bounce = qemu_blockalign(bs, qiov->size);
641     acb->ret = 0;
642     acb->error = 0;
643     acb->s = s;
644     acb->cancelled = 0;
645     acb->bh = NULL;
646 
647     if (write) {
648         qemu_iovec_to_buffer(acb->qiov, acb->bounce);
649     }
650 
651     buf = acb->bounce;
652 
653     off = sector_num * BDRV_SECTOR_SIZE;
654     size = nb_sectors * BDRV_SECTOR_SIZE;
655 
656     s->qemu_aio_count++; /* All the RADOSCB */
657 
658     rcb = g_malloc(sizeof(RADOSCB));
659     rcb->done = 0;
660     rcb->acb = acb;
661     rcb->buf = buf;
662     rcb->s = acb->s;
663     rcb->size = size;
664     r = rbd_aio_create_completion(rcb, (rbd_callback_t) rbd_finish_aiocb, &c);
665     if (r < 0) {
666         goto failed;
667     }
668 
669     if (write) {
670         r = rbd_aio_write(s->image, off, size, buf, c);
671     } else {
672         r = rbd_aio_read(s->image, off, size, buf, c);
673     }
674 
675     if (r < 0) {
676         goto failed;
677     }
678 
679     return &acb->common;
680 
681 failed:
682     g_free(rcb);
683     s->qemu_aio_count--;
684     qemu_aio_release(acb);
685     return NULL;
686 }
687 
688 static BlockDriverAIOCB *qemu_rbd_aio_readv(BlockDriverState *bs,
689                                             int64_t sector_num,
690                                             QEMUIOVector *qiov,
691                                             int nb_sectors,
692                                             BlockDriverCompletionFunc *cb,
693                                             void *opaque)
694 {
695     return rbd_aio_rw_vector(bs, sector_num, qiov, nb_sectors, cb, opaque, 0);
696 }
697 
698 static BlockDriverAIOCB *qemu_rbd_aio_writev(BlockDriverState *bs,
699                                              int64_t sector_num,
700                                              QEMUIOVector *qiov,
701                                              int nb_sectors,
702                                              BlockDriverCompletionFunc *cb,
703                                              void *opaque)
704 {
705     return rbd_aio_rw_vector(bs, sector_num, qiov, nb_sectors, cb, opaque, 1);
706 }
707 
708 static int qemu_rbd_flush(BlockDriverState *bs)
709 {
710 #if LIBRBD_VERSION_CODE >= LIBRBD_VERSION(0, 1, 1)
711     /* rbd_flush added in 0.1.1 */
712     BDRVRBDState *s = bs->opaque;
713     return rbd_flush(s->image);
714 #else
715     return 0;
716 #endif
717 }
718 
719 static int qemu_rbd_getinfo(BlockDriverState *bs, BlockDriverInfo *bdi)
720 {
721     BDRVRBDState *s = bs->opaque;
722     rbd_image_info_t info;
723     int r;
724 
725     r = rbd_stat(s->image, &info, sizeof(info));
726     if (r < 0) {
727         return r;
728     }
729 
730     bdi->cluster_size = info.obj_size;
731     return 0;
732 }
733 
734 static int64_t qemu_rbd_getlength(BlockDriverState *bs)
735 {
736     BDRVRBDState *s = bs->opaque;
737     rbd_image_info_t info;
738     int r;
739 
740     r = rbd_stat(s->image, &info, sizeof(info));
741     if (r < 0) {
742         return r;
743     }
744 
745     return info.size;
746 }
747 
748 static int qemu_rbd_truncate(BlockDriverState *bs, int64_t offset)
749 {
750     BDRVRBDState *s = bs->opaque;
751     int r;
752 
753     r = rbd_resize(s->image, offset);
754     if (r < 0) {
755         return r;
756     }
757 
758     return 0;
759 }
760 
761 static int qemu_rbd_snap_create(BlockDriverState *bs,
762                                 QEMUSnapshotInfo *sn_info)
763 {
764     BDRVRBDState *s = bs->opaque;
765     int r;
766 
767     if (sn_info->name[0] == '\0') {
768         return -EINVAL; /* we need a name for rbd snapshots */
769     }
770 
771     /*
772      * rbd snapshots are using the name as the user controlled unique identifier
773      * we can't use the rbd snapid for that purpose, as it can't be set
774      */
775     if (sn_info->id_str[0] != '\0' &&
776         strcmp(sn_info->id_str, sn_info->name) != 0) {
777         return -EINVAL;
778     }
779 
780     if (strlen(sn_info->name) >= sizeof(sn_info->id_str)) {
781         return -ERANGE;
782     }
783 
784     r = rbd_snap_create(s->image, sn_info->name);
785     if (r < 0) {
786         error_report("failed to create snap: %s", strerror(-r));
787         return r;
788     }
789 
790     return 0;
791 }
792 
793 static int qemu_rbd_snap_list(BlockDriverState *bs,
794                               QEMUSnapshotInfo **psn_tab)
795 {
796     BDRVRBDState *s = bs->opaque;
797     QEMUSnapshotInfo *sn_info, *sn_tab = NULL;
798     int i, snap_count;
799     rbd_snap_info_t *snaps;
800     int max_snaps = RBD_MAX_SNAPS;
801 
802     do {
803         snaps = g_malloc(sizeof(*snaps) * max_snaps);
804         snap_count = rbd_snap_list(s->image, snaps, &max_snaps);
805         if (snap_count < 0) {
806             g_free(snaps);
807         }
808     } while (snap_count == -ERANGE);
809 
810     if (snap_count <= 0) {
811         return snap_count;
812     }
813 
814     sn_tab = g_malloc0(snap_count * sizeof(QEMUSnapshotInfo));
815 
816     for (i = 0; i < snap_count; i++) {
817         const char *snap_name = snaps[i].name;
818 
819         sn_info = sn_tab + i;
820         pstrcpy(sn_info->id_str, sizeof(sn_info->id_str), snap_name);
821         pstrcpy(sn_info->name, sizeof(sn_info->name), snap_name);
822 
823         sn_info->vm_state_size = snaps[i].size;
824         sn_info->date_sec = 0;
825         sn_info->date_nsec = 0;
826         sn_info->vm_clock_nsec = 0;
827     }
828     rbd_snap_list_end(snaps);
829 
830     *psn_tab = sn_tab;
831     return snap_count;
832 }
833 
834 static QEMUOptionParameter qemu_rbd_create_options[] = {
835     {
836      .name = BLOCK_OPT_SIZE,
837      .type = OPT_SIZE,
838      .help = "Virtual disk size"
839     },
840     {
841      .name = BLOCK_OPT_CLUSTER_SIZE,
842      .type = OPT_SIZE,
843      .help = "RBD object size"
844     },
845     {NULL}
846 };
847 
848 static BlockDriver bdrv_rbd = {
849     .format_name        = "rbd",
850     .instance_size      = sizeof(BDRVRBDState),
851     .bdrv_file_open     = qemu_rbd_open,
852     .bdrv_close         = qemu_rbd_close,
853     .bdrv_create        = qemu_rbd_create,
854     .bdrv_flush         = qemu_rbd_flush,
855     .bdrv_get_info      = qemu_rbd_getinfo,
856     .create_options     = qemu_rbd_create_options,
857     .bdrv_getlength     = qemu_rbd_getlength,
858     .bdrv_truncate      = qemu_rbd_truncate,
859     .protocol_name      = "rbd",
860 
861     .bdrv_aio_readv     = qemu_rbd_aio_readv,
862     .bdrv_aio_writev    = qemu_rbd_aio_writev,
863 
864     .bdrv_snapshot_create = qemu_rbd_snap_create,
865     .bdrv_snapshot_list = qemu_rbd_snap_list,
866 };
867 
868 static void bdrv_rbd_init(void)
869 {
870     bdrv_register(&bdrv_rbd);
871 }
872 
873 block_init(bdrv_rbd_init);
874