xref: /openbmc/qemu/block/gluster.c (revision 1fd6bb44)
1 /*
2  * GlusterFS backend for QEMU
3  *
4  * Copyright (C) 2012 Bharata B Rao <bharata@linux.vnet.ibm.com>
5  *
6  * Pipe handling mechanism in AIO implementation is derived from
7  * block/rbd.c. Hence,
8  *
9  * Copyright (C) 2010-2011 Christian Brunner <chb@muc.de>,
10  *                         Josh Durgin <josh.durgin@dreamhost.com>
11  *
12  * This work is licensed under the terms of the GNU GPL, version 2.  See
13  * the COPYING file in the top-level directory.
14  *
15  * Contributions after 2012-01-13 are licensed under the terms of the
16  * GNU GPL, version 2 or (at your option) any later version.
17  */
18 #include <glusterfs/api/glfs.h>
19 #include "block/block_int.h"
20 #include "qemu/sockets.h"
21 #include "qemu/uri.h"
22 
23 typedef struct GlusterAIOCB {
24     BlockDriverAIOCB common;
25     int64_t size;
26     int ret;
27     bool *finished;
28     QEMUBH *bh;
29 } GlusterAIOCB;
30 
31 typedef struct BDRVGlusterState {
32     struct glfs *glfs;
33     int fds[2];
34     struct glfs_fd *fd;
35     int qemu_aio_count;
36     int event_reader_pos;
37     GlusterAIOCB *event_acb;
38 } BDRVGlusterState;
39 
40 #define GLUSTER_FD_READ  0
41 #define GLUSTER_FD_WRITE 1
42 
43 typedef struct GlusterConf {
44     char *server;
45     int port;
46     char *volname;
47     char *image;
48     char *transport;
49 } GlusterConf;
50 
51 static void qemu_gluster_gconf_free(GlusterConf *gconf)
52 {
53     g_free(gconf->server);
54     g_free(gconf->volname);
55     g_free(gconf->image);
56     g_free(gconf->transport);
57     g_free(gconf);
58 }
59 
60 static int parse_volume_options(GlusterConf *gconf, char *path)
61 {
62     char *p, *q;
63 
64     if (!path) {
65         return -EINVAL;
66     }
67 
68     /* volume */
69     p = q = path + strspn(path, "/");
70     p += strcspn(p, "/");
71     if (*p == '\0') {
72         return -EINVAL;
73     }
74     gconf->volname = g_strndup(q, p - q);
75 
76     /* image */
77     p += strspn(p, "/");
78     if (*p == '\0') {
79         return -EINVAL;
80     }
81     gconf->image = g_strdup(p);
82     return 0;
83 }
84 
85 /*
86  * file=gluster[+transport]://[server[:port]]/volname/image[?socket=...]
87  *
88  * 'gluster' is the protocol.
89  *
90  * 'transport' specifies the transport type used to connect to gluster
91  * management daemon (glusterd). Valid transport types are
92  * tcp, unix and rdma. If a transport type isn't specified, then tcp
93  * type is assumed.
94  *
95  * 'server' specifies the server where the volume file specification for
96  * the given volume resides. This can be either hostname, ipv4 address
97  * or ipv6 address. ipv6 address needs to be within square brackets [ ].
98  * If transport type is 'unix', then 'server' field should not be specifed.
99  * The 'socket' field needs to be populated with the path to unix domain
100  * socket.
101  *
102  * 'port' is the port number on which glusterd is listening. This is optional
103  * and if not specified, QEMU will send 0 which will make gluster to use the
104  * default port. If the transport type is unix, then 'port' should not be
105  * specified.
106  *
107  * 'volname' is the name of the gluster volume which contains the VM image.
108  *
109  * 'image' is the path to the actual VM image that resides on gluster volume.
110  *
111  * Examples:
112  *
113  * file=gluster://1.2.3.4/testvol/a.img
114  * file=gluster+tcp://1.2.3.4/testvol/a.img
115  * file=gluster+tcp://1.2.3.4:24007/testvol/dir/a.img
116  * file=gluster+tcp://[1:2:3:4:5:6:7:8]/testvol/dir/a.img
117  * file=gluster+tcp://[1:2:3:4:5:6:7:8]:24007/testvol/dir/a.img
118  * file=gluster+tcp://server.domain.com:24007/testvol/dir/a.img
119  * file=gluster+unix:///testvol/dir/a.img?socket=/tmp/glusterd.socket
120  * file=gluster+rdma://1.2.3.4:24007/testvol/a.img
121  */
122 static int qemu_gluster_parseuri(GlusterConf *gconf, const char *filename)
123 {
124     URI *uri;
125     QueryParams *qp = NULL;
126     bool is_unix = false;
127     int ret = 0;
128 
129     uri = uri_parse(filename);
130     if (!uri) {
131         return -EINVAL;
132     }
133 
134     /* transport */
135     if (!strcmp(uri->scheme, "gluster")) {
136         gconf->transport = g_strdup("tcp");
137     } else if (!strcmp(uri->scheme, "gluster+tcp")) {
138         gconf->transport = g_strdup("tcp");
139     } else if (!strcmp(uri->scheme, "gluster+unix")) {
140         gconf->transport = g_strdup("unix");
141         is_unix = true;
142     } else if (!strcmp(uri->scheme, "gluster+rdma")) {
143         gconf->transport = g_strdup("rdma");
144     } else {
145         ret = -EINVAL;
146         goto out;
147     }
148 
149     ret = parse_volume_options(gconf, uri->path);
150     if (ret < 0) {
151         goto out;
152     }
153 
154     qp = query_params_parse(uri->query);
155     if (qp->n > 1 || (is_unix && !qp->n) || (!is_unix && qp->n)) {
156         ret = -EINVAL;
157         goto out;
158     }
159 
160     if (is_unix) {
161         if (uri->server || uri->port) {
162             ret = -EINVAL;
163             goto out;
164         }
165         if (strcmp(qp->p[0].name, "socket")) {
166             ret = -EINVAL;
167             goto out;
168         }
169         gconf->server = g_strdup(qp->p[0].value);
170     } else {
171         gconf->server = g_strdup(uri->server);
172         gconf->port = uri->port;
173     }
174 
175 out:
176     if (qp) {
177         query_params_free(qp);
178     }
179     uri_free(uri);
180     return ret;
181 }
182 
183 static struct glfs *qemu_gluster_init(GlusterConf *gconf, const char *filename)
184 {
185     struct glfs *glfs = NULL;
186     int ret;
187     int old_errno;
188 
189     ret = qemu_gluster_parseuri(gconf, filename);
190     if (ret < 0) {
191         error_report("Usage: file=gluster[+transport]://[server[:port]]/"
192             "volname/image[?socket=...]");
193         errno = -ret;
194         goto out;
195     }
196 
197     glfs = glfs_new(gconf->volname);
198     if (!glfs) {
199         goto out;
200     }
201 
202     ret = glfs_set_volfile_server(glfs, gconf->transport, gconf->server,
203             gconf->port);
204     if (ret < 0) {
205         goto out;
206     }
207 
208     /*
209      * TODO: Use GF_LOG_ERROR instead of hard code value of 4 here when
210      * GlusterFS makes GF_LOG_* macros available to libgfapi users.
211      */
212     ret = glfs_set_logging(glfs, "-", 4);
213     if (ret < 0) {
214         goto out;
215     }
216 
217     ret = glfs_init(glfs);
218     if (ret) {
219         error_report("Gluster connection failed for server=%s port=%d "
220              "volume=%s image=%s transport=%s", gconf->server, gconf->port,
221              gconf->volname, gconf->image, gconf->transport);
222         goto out;
223     }
224     return glfs;
225 
226 out:
227     if (glfs) {
228         old_errno = errno;
229         glfs_fini(glfs);
230         errno = old_errno;
231     }
232     return NULL;
233 }
234 
235 static void qemu_gluster_complete_aio(GlusterAIOCB *acb, BDRVGlusterState *s)
236 {
237     int ret;
238     bool *finished = acb->finished;
239     BlockDriverCompletionFunc *cb = acb->common.cb;
240     void *opaque = acb->common.opaque;
241 
242     if (!acb->ret || acb->ret == acb->size) {
243         ret = 0; /* Success */
244     } else if (acb->ret < 0) {
245         ret = acb->ret; /* Read/Write failed */
246     } else {
247         ret = -EIO; /* Partial read/write - fail it */
248     }
249 
250     s->qemu_aio_count--;
251     qemu_aio_release(acb);
252     cb(opaque, ret);
253     if (finished) {
254         *finished = true;
255     }
256 }
257 
258 static void qemu_gluster_aio_event_reader(void *opaque)
259 {
260     BDRVGlusterState *s = opaque;
261     ssize_t ret;
262 
263     do {
264         char *p = (char *)&s->event_acb;
265 
266         ret = read(s->fds[GLUSTER_FD_READ], p + s->event_reader_pos,
267                    sizeof(s->event_acb) - s->event_reader_pos);
268         if (ret > 0) {
269             s->event_reader_pos += ret;
270             if (s->event_reader_pos == sizeof(s->event_acb)) {
271                 s->event_reader_pos = 0;
272                 qemu_gluster_complete_aio(s->event_acb, s);
273             }
274         }
275     } while (ret < 0 && errno == EINTR);
276 }
277 
278 static int qemu_gluster_aio_flush_cb(void *opaque)
279 {
280     BDRVGlusterState *s = opaque;
281 
282     return (s->qemu_aio_count > 0);
283 }
284 
285 static int qemu_gluster_open(BlockDriverState *bs, const char *filename,
286     QDict *options, int bdrv_flags)
287 {
288     BDRVGlusterState *s = bs->opaque;
289     int open_flags = O_BINARY;
290     int ret = 0;
291     GlusterConf *gconf = g_malloc0(sizeof(GlusterConf));
292 
293     s->glfs = qemu_gluster_init(gconf, filename);
294     if (!s->glfs) {
295         ret = -errno;
296         goto out;
297     }
298 
299     if (bdrv_flags & BDRV_O_RDWR) {
300         open_flags |= O_RDWR;
301     } else {
302         open_flags |= O_RDONLY;
303     }
304 
305     if ((bdrv_flags & BDRV_O_NOCACHE)) {
306         open_flags |= O_DIRECT;
307     }
308 
309     s->fd = glfs_open(s->glfs, gconf->image, open_flags);
310     if (!s->fd) {
311         ret = -errno;
312         goto out;
313     }
314 
315     ret = qemu_pipe(s->fds);
316     if (ret < 0) {
317         ret = -errno;
318         goto out;
319     }
320     fcntl(s->fds[GLUSTER_FD_READ], F_SETFL, O_NONBLOCK);
321     qemu_aio_set_fd_handler(s->fds[GLUSTER_FD_READ],
322         qemu_gluster_aio_event_reader, NULL, qemu_gluster_aio_flush_cb, s);
323 
324 out:
325     qemu_gluster_gconf_free(gconf);
326     if (!ret) {
327         return ret;
328     }
329     if (s->fd) {
330         glfs_close(s->fd);
331     }
332     if (s->glfs) {
333         glfs_fini(s->glfs);
334     }
335     return ret;
336 }
337 
338 static int qemu_gluster_create(const char *filename,
339         QEMUOptionParameter *options)
340 {
341     struct glfs *glfs;
342     struct glfs_fd *fd;
343     int ret = 0;
344     int64_t total_size = 0;
345     GlusterConf *gconf = g_malloc0(sizeof(GlusterConf));
346 
347     glfs = qemu_gluster_init(gconf, filename);
348     if (!glfs) {
349         ret = -errno;
350         goto out;
351     }
352 
353     while (options && options->name) {
354         if (!strcmp(options->name, BLOCK_OPT_SIZE)) {
355             total_size = options->value.n / BDRV_SECTOR_SIZE;
356         }
357         options++;
358     }
359 
360     fd = glfs_creat(glfs, gconf->image,
361         O_WRONLY | O_CREAT | O_TRUNC | O_BINARY, S_IRUSR | S_IWUSR);
362     if (!fd) {
363         ret = -errno;
364     } else {
365         if (glfs_ftruncate(fd, total_size * BDRV_SECTOR_SIZE) != 0) {
366             ret = -errno;
367         }
368         if (glfs_close(fd) != 0) {
369             ret = -errno;
370         }
371     }
372 out:
373     qemu_gluster_gconf_free(gconf);
374     if (glfs) {
375         glfs_fini(glfs);
376     }
377     return ret;
378 }
379 
380 static void qemu_gluster_aio_cancel(BlockDriverAIOCB *blockacb)
381 {
382     GlusterAIOCB *acb = (GlusterAIOCB *)blockacb;
383     bool finished = false;
384 
385     acb->finished = &finished;
386     while (!finished) {
387         qemu_aio_wait();
388     }
389 }
390 
391 static const AIOCBInfo gluster_aiocb_info = {
392     .aiocb_size = sizeof(GlusterAIOCB),
393     .cancel = qemu_gluster_aio_cancel,
394 };
395 
396 static void gluster_finish_aiocb(struct glfs_fd *fd, ssize_t ret, void *arg)
397 {
398     GlusterAIOCB *acb = (GlusterAIOCB *)arg;
399     BlockDriverState *bs = acb->common.bs;
400     BDRVGlusterState *s = bs->opaque;
401     int retval;
402 
403     acb->ret = ret;
404     retval = qemu_write_full(s->fds[GLUSTER_FD_WRITE], &acb, sizeof(acb));
405     if (retval != sizeof(acb)) {
406         /*
407          * Gluster AIO callback thread failed to notify the waiting
408          * QEMU thread about IO completion.
409          *
410          * Complete this IO request and make the disk inaccessible for
411          * subsequent reads and writes.
412          */
413         error_report("Gluster failed to notify QEMU about IO completion");
414 
415         qemu_mutex_lock_iothread(); /* We are in gluster thread context */
416         acb->common.cb(acb->common.opaque, -EIO);
417         qemu_aio_release(acb);
418         s->qemu_aio_count--;
419         close(s->fds[GLUSTER_FD_READ]);
420         close(s->fds[GLUSTER_FD_WRITE]);
421         qemu_aio_set_fd_handler(s->fds[GLUSTER_FD_READ], NULL, NULL, NULL,
422             NULL);
423         bs->drv = NULL; /* Make the disk inaccessible */
424         qemu_mutex_unlock_iothread();
425     }
426 }
427 
428 static BlockDriverAIOCB *qemu_gluster_aio_rw(BlockDriverState *bs,
429         int64_t sector_num, QEMUIOVector *qiov, int nb_sectors,
430         BlockDriverCompletionFunc *cb, void *opaque, int write)
431 {
432     int ret;
433     GlusterAIOCB *acb;
434     BDRVGlusterState *s = bs->opaque;
435     size_t size;
436     off_t offset;
437 
438     offset = sector_num * BDRV_SECTOR_SIZE;
439     size = nb_sectors * BDRV_SECTOR_SIZE;
440     s->qemu_aio_count++;
441 
442     acb = qemu_aio_get(&gluster_aiocb_info, bs, cb, opaque);
443     acb->size = size;
444     acb->ret = 0;
445     acb->finished = NULL;
446 
447     if (write) {
448         ret = glfs_pwritev_async(s->fd, qiov->iov, qiov->niov, offset, 0,
449             &gluster_finish_aiocb, acb);
450     } else {
451         ret = glfs_preadv_async(s->fd, qiov->iov, qiov->niov, offset, 0,
452             &gluster_finish_aiocb, acb);
453     }
454 
455     if (ret < 0) {
456         goto out;
457     }
458     return &acb->common;
459 
460 out:
461     s->qemu_aio_count--;
462     qemu_aio_release(acb);
463     return NULL;
464 }
465 
466 static BlockDriverAIOCB *qemu_gluster_aio_readv(BlockDriverState *bs,
467         int64_t sector_num, QEMUIOVector *qiov, int nb_sectors,
468         BlockDriverCompletionFunc *cb, void *opaque)
469 {
470     return qemu_gluster_aio_rw(bs, sector_num, qiov, nb_sectors, cb, opaque, 0);
471 }
472 
473 static BlockDriverAIOCB *qemu_gluster_aio_writev(BlockDriverState *bs,
474         int64_t sector_num, QEMUIOVector *qiov, int nb_sectors,
475         BlockDriverCompletionFunc *cb, void *opaque)
476 {
477     return qemu_gluster_aio_rw(bs, sector_num, qiov, nb_sectors, cb, opaque, 1);
478 }
479 
480 static BlockDriverAIOCB *qemu_gluster_aio_flush(BlockDriverState *bs,
481         BlockDriverCompletionFunc *cb, void *opaque)
482 {
483     int ret;
484     GlusterAIOCB *acb;
485     BDRVGlusterState *s = bs->opaque;
486 
487     acb = qemu_aio_get(&gluster_aiocb_info, bs, cb, opaque);
488     acb->size = 0;
489     acb->ret = 0;
490     acb->finished = NULL;
491     s->qemu_aio_count++;
492 
493     ret = glfs_fsync_async(s->fd, &gluster_finish_aiocb, acb);
494     if (ret < 0) {
495         goto out;
496     }
497     return &acb->common;
498 
499 out:
500     s->qemu_aio_count--;
501     qemu_aio_release(acb);
502     return NULL;
503 }
504 
505 static int64_t qemu_gluster_getlength(BlockDriverState *bs)
506 {
507     BDRVGlusterState *s = bs->opaque;
508     int64_t ret;
509 
510     ret = glfs_lseek(s->fd, 0, SEEK_END);
511     if (ret < 0) {
512         return -errno;
513     } else {
514         return ret;
515     }
516 }
517 
518 static int64_t qemu_gluster_allocated_file_size(BlockDriverState *bs)
519 {
520     BDRVGlusterState *s = bs->opaque;
521     struct stat st;
522     int ret;
523 
524     ret = glfs_fstat(s->fd, &st);
525     if (ret < 0) {
526         return -errno;
527     } else {
528         return st.st_blocks * 512;
529     }
530 }
531 
532 static void qemu_gluster_close(BlockDriverState *bs)
533 {
534     BDRVGlusterState *s = bs->opaque;
535 
536     close(s->fds[GLUSTER_FD_READ]);
537     close(s->fds[GLUSTER_FD_WRITE]);
538     qemu_aio_set_fd_handler(s->fds[GLUSTER_FD_READ], NULL, NULL, NULL, NULL);
539 
540     if (s->fd) {
541         glfs_close(s->fd);
542         s->fd = NULL;
543     }
544     glfs_fini(s->glfs);
545 }
546 
547 static QEMUOptionParameter qemu_gluster_create_options[] = {
548     {
549         .name = BLOCK_OPT_SIZE,
550         .type = OPT_SIZE,
551         .help = "Virtual disk size"
552     },
553     { NULL }
554 };
555 
556 static BlockDriver bdrv_gluster = {
557     .format_name                  = "gluster",
558     .protocol_name                = "gluster",
559     .instance_size                = sizeof(BDRVGlusterState),
560     .bdrv_file_open               = qemu_gluster_open,
561     .bdrv_close                   = qemu_gluster_close,
562     .bdrv_create                  = qemu_gluster_create,
563     .bdrv_getlength               = qemu_gluster_getlength,
564     .bdrv_get_allocated_file_size = qemu_gluster_allocated_file_size,
565     .bdrv_aio_readv               = qemu_gluster_aio_readv,
566     .bdrv_aio_writev              = qemu_gluster_aio_writev,
567     .bdrv_aio_flush               = qemu_gluster_aio_flush,
568     .create_options               = qemu_gluster_create_options,
569 };
570 
571 static BlockDriver bdrv_gluster_tcp = {
572     .format_name                  = "gluster",
573     .protocol_name                = "gluster+tcp",
574     .instance_size                = sizeof(BDRVGlusterState),
575     .bdrv_file_open               = qemu_gluster_open,
576     .bdrv_close                   = qemu_gluster_close,
577     .bdrv_create                  = qemu_gluster_create,
578     .bdrv_getlength               = qemu_gluster_getlength,
579     .bdrv_get_allocated_file_size = qemu_gluster_allocated_file_size,
580     .bdrv_aio_readv               = qemu_gluster_aio_readv,
581     .bdrv_aio_writev              = qemu_gluster_aio_writev,
582     .bdrv_aio_flush               = qemu_gluster_aio_flush,
583     .create_options               = qemu_gluster_create_options,
584 };
585 
586 static BlockDriver bdrv_gluster_unix = {
587     .format_name                  = "gluster",
588     .protocol_name                = "gluster+unix",
589     .instance_size                = sizeof(BDRVGlusterState),
590     .bdrv_file_open               = qemu_gluster_open,
591     .bdrv_close                   = qemu_gluster_close,
592     .bdrv_create                  = qemu_gluster_create,
593     .bdrv_getlength               = qemu_gluster_getlength,
594     .bdrv_get_allocated_file_size = qemu_gluster_allocated_file_size,
595     .bdrv_aio_readv               = qemu_gluster_aio_readv,
596     .bdrv_aio_writev              = qemu_gluster_aio_writev,
597     .bdrv_aio_flush               = qemu_gluster_aio_flush,
598     .create_options               = qemu_gluster_create_options,
599 };
600 
601 static BlockDriver bdrv_gluster_rdma = {
602     .format_name                  = "gluster",
603     .protocol_name                = "gluster+rdma",
604     .instance_size                = sizeof(BDRVGlusterState),
605     .bdrv_file_open               = qemu_gluster_open,
606     .bdrv_close                   = qemu_gluster_close,
607     .bdrv_create                  = qemu_gluster_create,
608     .bdrv_getlength               = qemu_gluster_getlength,
609     .bdrv_get_allocated_file_size = qemu_gluster_allocated_file_size,
610     .bdrv_aio_readv               = qemu_gluster_aio_readv,
611     .bdrv_aio_writev              = qemu_gluster_aio_writev,
612     .bdrv_aio_flush               = qemu_gluster_aio_flush,
613     .create_options               = qemu_gluster_create_options,
614 };
615 
616 static void bdrv_gluster_init(void)
617 {
618     bdrv_register(&bdrv_gluster_rdma);
619     bdrv_register(&bdrv_gluster_unix);
620     bdrv_register(&bdrv_gluster_tcp);
621     bdrv_register(&bdrv_gluster);
622 }
623 
624 block_init(bdrv_gluster_init);
625