1 /* 2 * GlusterFS backend for QEMU 3 * 4 * Copyright (C) 2012 Bharata B Rao <bharata@linux.vnet.ibm.com> 5 * 6 * Pipe handling mechanism in AIO implementation is derived from 7 * block/rbd.c. Hence, 8 * 9 * Copyright (C) 2010-2011 Christian Brunner <chb@muc.de>, 10 * Josh Durgin <josh.durgin@dreamhost.com> 11 * 12 * This work is licensed under the terms of the GNU GPL, version 2. See 13 * the COPYING file in the top-level directory. 14 * 15 * Contributions after 2012-01-13 are licensed under the terms of the 16 * GNU GPL, version 2 or (at your option) any later version. 17 */ 18 #include <glusterfs/api/glfs.h> 19 #include "block/block_int.h" 20 #include "qemu/sockets.h" 21 #include "qemu/uri.h" 22 23 typedef struct GlusterAIOCB { 24 BlockDriverAIOCB common; 25 int64_t size; 26 int ret; 27 bool *finished; 28 QEMUBH *bh; 29 } GlusterAIOCB; 30 31 typedef struct BDRVGlusterState { 32 struct glfs *glfs; 33 int fds[2]; 34 struct glfs_fd *fd; 35 int event_reader_pos; 36 GlusterAIOCB *event_acb; 37 } BDRVGlusterState; 38 39 #define GLUSTER_FD_READ 0 40 #define GLUSTER_FD_WRITE 1 41 42 typedef struct GlusterConf { 43 char *server; 44 int port; 45 char *volname; 46 char *image; 47 char *transport; 48 } GlusterConf; 49 50 static void qemu_gluster_gconf_free(GlusterConf *gconf) 51 { 52 g_free(gconf->server); 53 g_free(gconf->volname); 54 g_free(gconf->image); 55 g_free(gconf->transport); 56 g_free(gconf); 57 } 58 59 static int parse_volume_options(GlusterConf *gconf, char *path) 60 { 61 char *p, *q; 62 63 if (!path) { 64 return -EINVAL; 65 } 66 67 /* volume */ 68 p = q = path + strspn(path, "/"); 69 p += strcspn(p, "/"); 70 if (*p == '\0') { 71 return -EINVAL; 72 } 73 gconf->volname = g_strndup(q, p - q); 74 75 /* image */ 76 p += strspn(p, "/"); 77 if (*p == '\0') { 78 return -EINVAL; 79 } 80 gconf->image = g_strdup(p); 81 return 0; 82 } 83 84 /* 85 * file=gluster[+transport]://[server[:port]]/volname/image[?socket=...] 86 * 87 * 'gluster' is the protocol. 88 * 89 * 'transport' specifies the transport type used to connect to gluster 90 * management daemon (glusterd). Valid transport types are 91 * tcp, unix and rdma. If a transport type isn't specified, then tcp 92 * type is assumed. 93 * 94 * 'server' specifies the server where the volume file specification for 95 * the given volume resides. This can be either hostname, ipv4 address 96 * or ipv6 address. ipv6 address needs to be within square brackets [ ]. 97 * If transport type is 'unix', then 'server' field should not be specifed. 98 * The 'socket' field needs to be populated with the path to unix domain 99 * socket. 100 * 101 * 'port' is the port number on which glusterd is listening. This is optional 102 * and if not specified, QEMU will send 0 which will make gluster to use the 103 * default port. If the transport type is unix, then 'port' should not be 104 * specified. 105 * 106 * 'volname' is the name of the gluster volume which contains the VM image. 107 * 108 * 'image' is the path to the actual VM image that resides on gluster volume. 109 * 110 * Examples: 111 * 112 * file=gluster://1.2.3.4/testvol/a.img 113 * file=gluster+tcp://1.2.3.4/testvol/a.img 114 * file=gluster+tcp://1.2.3.4:24007/testvol/dir/a.img 115 * file=gluster+tcp://[1:2:3:4:5:6:7:8]/testvol/dir/a.img 116 * file=gluster+tcp://[1:2:3:4:5:6:7:8]:24007/testvol/dir/a.img 117 * file=gluster+tcp://server.domain.com:24007/testvol/dir/a.img 118 * file=gluster+unix:///testvol/dir/a.img?socket=/tmp/glusterd.socket 119 * file=gluster+rdma://1.2.3.4:24007/testvol/a.img 120 */ 121 static int qemu_gluster_parseuri(GlusterConf *gconf, const char *filename) 122 { 123 URI *uri; 124 QueryParams *qp = NULL; 125 bool is_unix = false; 126 int ret = 0; 127 128 uri = uri_parse(filename); 129 if (!uri) { 130 return -EINVAL; 131 } 132 133 /* transport */ 134 if (!strcmp(uri->scheme, "gluster")) { 135 gconf->transport = g_strdup("tcp"); 136 } else if (!strcmp(uri->scheme, "gluster+tcp")) { 137 gconf->transport = g_strdup("tcp"); 138 } else if (!strcmp(uri->scheme, "gluster+unix")) { 139 gconf->transport = g_strdup("unix"); 140 is_unix = true; 141 } else if (!strcmp(uri->scheme, "gluster+rdma")) { 142 gconf->transport = g_strdup("rdma"); 143 } else { 144 ret = -EINVAL; 145 goto out; 146 } 147 148 ret = parse_volume_options(gconf, uri->path); 149 if (ret < 0) { 150 goto out; 151 } 152 153 qp = query_params_parse(uri->query); 154 if (qp->n > 1 || (is_unix && !qp->n) || (!is_unix && qp->n)) { 155 ret = -EINVAL; 156 goto out; 157 } 158 159 if (is_unix) { 160 if (uri->server || uri->port) { 161 ret = -EINVAL; 162 goto out; 163 } 164 if (strcmp(qp->p[0].name, "socket")) { 165 ret = -EINVAL; 166 goto out; 167 } 168 gconf->server = g_strdup(qp->p[0].value); 169 } else { 170 gconf->server = g_strdup(uri->server); 171 gconf->port = uri->port; 172 } 173 174 out: 175 if (qp) { 176 query_params_free(qp); 177 } 178 uri_free(uri); 179 return ret; 180 } 181 182 static struct glfs *qemu_gluster_init(GlusterConf *gconf, const char *filename) 183 { 184 struct glfs *glfs = NULL; 185 int ret; 186 int old_errno; 187 188 ret = qemu_gluster_parseuri(gconf, filename); 189 if (ret < 0) { 190 error_report("Usage: file=gluster[+transport]://[server[:port]]/" 191 "volname/image[?socket=...]"); 192 errno = -ret; 193 goto out; 194 } 195 196 glfs = glfs_new(gconf->volname); 197 if (!glfs) { 198 goto out; 199 } 200 201 ret = glfs_set_volfile_server(glfs, gconf->transport, gconf->server, 202 gconf->port); 203 if (ret < 0) { 204 goto out; 205 } 206 207 /* 208 * TODO: Use GF_LOG_ERROR instead of hard code value of 4 here when 209 * GlusterFS makes GF_LOG_* macros available to libgfapi users. 210 */ 211 ret = glfs_set_logging(glfs, "-", 4); 212 if (ret < 0) { 213 goto out; 214 } 215 216 ret = glfs_init(glfs); 217 if (ret) { 218 error_report("Gluster connection failed for server=%s port=%d " 219 "volume=%s image=%s transport=%s", gconf->server, gconf->port, 220 gconf->volname, gconf->image, gconf->transport); 221 goto out; 222 } 223 return glfs; 224 225 out: 226 if (glfs) { 227 old_errno = errno; 228 glfs_fini(glfs); 229 errno = old_errno; 230 } 231 return NULL; 232 } 233 234 static void qemu_gluster_complete_aio(GlusterAIOCB *acb, BDRVGlusterState *s) 235 { 236 int ret; 237 bool *finished = acb->finished; 238 BlockDriverCompletionFunc *cb = acb->common.cb; 239 void *opaque = acb->common.opaque; 240 241 if (!acb->ret || acb->ret == acb->size) { 242 ret = 0; /* Success */ 243 } else if (acb->ret < 0) { 244 ret = acb->ret; /* Read/Write failed */ 245 } else { 246 ret = -EIO; /* Partial read/write - fail it */ 247 } 248 249 qemu_aio_release(acb); 250 cb(opaque, ret); 251 if (finished) { 252 *finished = true; 253 } 254 } 255 256 static void qemu_gluster_aio_event_reader(void *opaque) 257 { 258 BDRVGlusterState *s = opaque; 259 ssize_t ret; 260 261 do { 262 char *p = (char *)&s->event_acb; 263 264 ret = read(s->fds[GLUSTER_FD_READ], p + s->event_reader_pos, 265 sizeof(s->event_acb) - s->event_reader_pos); 266 if (ret > 0) { 267 s->event_reader_pos += ret; 268 if (s->event_reader_pos == sizeof(s->event_acb)) { 269 s->event_reader_pos = 0; 270 qemu_gluster_complete_aio(s->event_acb, s); 271 } 272 } 273 } while (ret < 0 && errno == EINTR); 274 } 275 276 /* TODO Convert to fine grained options */ 277 static QemuOptsList runtime_opts = { 278 .name = "gluster", 279 .head = QTAILQ_HEAD_INITIALIZER(runtime_opts.head), 280 .desc = { 281 { 282 .name = "filename", 283 .type = QEMU_OPT_STRING, 284 .help = "URL to the gluster image", 285 }, 286 { /* end of list */ } 287 }, 288 }; 289 290 static int qemu_gluster_open(BlockDriverState *bs, QDict *options, 291 int bdrv_flags) 292 { 293 BDRVGlusterState *s = bs->opaque; 294 int open_flags = O_BINARY; 295 int ret = 0; 296 GlusterConf *gconf = g_malloc0(sizeof(GlusterConf)); 297 QemuOpts *opts; 298 Error *local_err = NULL; 299 const char *filename; 300 301 opts = qemu_opts_create_nofail(&runtime_opts); 302 qemu_opts_absorb_qdict(opts, options, &local_err); 303 if (error_is_set(&local_err)) { 304 qerror_report_err(local_err); 305 error_free(local_err); 306 ret = -EINVAL; 307 goto out; 308 } 309 310 filename = qemu_opt_get(opts, "filename"); 311 312 313 s->glfs = qemu_gluster_init(gconf, filename); 314 if (!s->glfs) { 315 ret = -errno; 316 goto out; 317 } 318 319 if (bdrv_flags & BDRV_O_RDWR) { 320 open_flags |= O_RDWR; 321 } else { 322 open_flags |= O_RDONLY; 323 } 324 325 if ((bdrv_flags & BDRV_O_NOCACHE)) { 326 open_flags |= O_DIRECT; 327 } 328 329 s->fd = glfs_open(s->glfs, gconf->image, open_flags); 330 if (!s->fd) { 331 ret = -errno; 332 goto out; 333 } 334 335 ret = qemu_pipe(s->fds); 336 if (ret < 0) { 337 ret = -errno; 338 goto out; 339 } 340 fcntl(s->fds[GLUSTER_FD_READ], F_SETFL, O_NONBLOCK); 341 qemu_aio_set_fd_handler(s->fds[GLUSTER_FD_READ], 342 qemu_gluster_aio_event_reader, NULL, s); 343 344 out: 345 qemu_opts_del(opts); 346 qemu_gluster_gconf_free(gconf); 347 if (!ret) { 348 return ret; 349 } 350 if (s->fd) { 351 glfs_close(s->fd); 352 } 353 if (s->glfs) { 354 glfs_fini(s->glfs); 355 } 356 return ret; 357 } 358 359 static int qemu_gluster_create(const char *filename, 360 QEMUOptionParameter *options) 361 { 362 struct glfs *glfs; 363 struct glfs_fd *fd; 364 int ret = 0; 365 int64_t total_size = 0; 366 GlusterConf *gconf = g_malloc0(sizeof(GlusterConf)); 367 368 glfs = qemu_gluster_init(gconf, filename); 369 if (!glfs) { 370 ret = -errno; 371 goto out; 372 } 373 374 while (options && options->name) { 375 if (!strcmp(options->name, BLOCK_OPT_SIZE)) { 376 total_size = options->value.n / BDRV_SECTOR_SIZE; 377 } 378 options++; 379 } 380 381 fd = glfs_creat(glfs, gconf->image, 382 O_WRONLY | O_CREAT | O_TRUNC | O_BINARY, S_IRUSR | S_IWUSR); 383 if (!fd) { 384 ret = -errno; 385 } else { 386 if (glfs_ftruncate(fd, total_size * BDRV_SECTOR_SIZE) != 0) { 387 ret = -errno; 388 } 389 if (glfs_close(fd) != 0) { 390 ret = -errno; 391 } 392 } 393 out: 394 qemu_gluster_gconf_free(gconf); 395 if (glfs) { 396 glfs_fini(glfs); 397 } 398 return ret; 399 } 400 401 static void qemu_gluster_aio_cancel(BlockDriverAIOCB *blockacb) 402 { 403 GlusterAIOCB *acb = (GlusterAIOCB *)blockacb; 404 bool finished = false; 405 406 acb->finished = &finished; 407 while (!finished) { 408 qemu_aio_wait(); 409 } 410 } 411 412 static const AIOCBInfo gluster_aiocb_info = { 413 .aiocb_size = sizeof(GlusterAIOCB), 414 .cancel = qemu_gluster_aio_cancel, 415 }; 416 417 static void gluster_finish_aiocb(struct glfs_fd *fd, ssize_t ret, void *arg) 418 { 419 GlusterAIOCB *acb = (GlusterAIOCB *)arg; 420 BlockDriverState *bs = acb->common.bs; 421 BDRVGlusterState *s = bs->opaque; 422 int retval; 423 424 acb->ret = ret; 425 retval = qemu_write_full(s->fds[GLUSTER_FD_WRITE], &acb, sizeof(acb)); 426 if (retval != sizeof(acb)) { 427 /* 428 * Gluster AIO callback thread failed to notify the waiting 429 * QEMU thread about IO completion. 430 */ 431 error_report("Gluster AIO completion failed: %s", strerror(errno)); 432 abort(); 433 } 434 } 435 436 static BlockDriverAIOCB *qemu_gluster_aio_rw(BlockDriverState *bs, 437 int64_t sector_num, QEMUIOVector *qiov, int nb_sectors, 438 BlockDriverCompletionFunc *cb, void *opaque, int write) 439 { 440 int ret; 441 GlusterAIOCB *acb; 442 BDRVGlusterState *s = bs->opaque; 443 size_t size; 444 off_t offset; 445 446 offset = sector_num * BDRV_SECTOR_SIZE; 447 size = nb_sectors * BDRV_SECTOR_SIZE; 448 449 acb = qemu_aio_get(&gluster_aiocb_info, bs, cb, opaque); 450 acb->size = size; 451 acb->ret = 0; 452 acb->finished = NULL; 453 454 if (write) { 455 ret = glfs_pwritev_async(s->fd, qiov->iov, qiov->niov, offset, 0, 456 &gluster_finish_aiocb, acb); 457 } else { 458 ret = glfs_preadv_async(s->fd, qiov->iov, qiov->niov, offset, 0, 459 &gluster_finish_aiocb, acb); 460 } 461 462 if (ret < 0) { 463 goto out; 464 } 465 return &acb->common; 466 467 out: 468 qemu_aio_release(acb); 469 return NULL; 470 } 471 472 static int qemu_gluster_truncate(BlockDriverState *bs, int64_t offset) 473 { 474 int ret; 475 BDRVGlusterState *s = bs->opaque; 476 477 ret = glfs_ftruncate(s->fd, offset); 478 if (ret < 0) { 479 return -errno; 480 } 481 482 return 0; 483 } 484 485 static BlockDriverAIOCB *qemu_gluster_aio_readv(BlockDriverState *bs, 486 int64_t sector_num, QEMUIOVector *qiov, int nb_sectors, 487 BlockDriverCompletionFunc *cb, void *opaque) 488 { 489 return qemu_gluster_aio_rw(bs, sector_num, qiov, nb_sectors, cb, opaque, 0); 490 } 491 492 static BlockDriverAIOCB *qemu_gluster_aio_writev(BlockDriverState *bs, 493 int64_t sector_num, QEMUIOVector *qiov, int nb_sectors, 494 BlockDriverCompletionFunc *cb, void *opaque) 495 { 496 return qemu_gluster_aio_rw(bs, sector_num, qiov, nb_sectors, cb, opaque, 1); 497 } 498 499 static BlockDriverAIOCB *qemu_gluster_aio_flush(BlockDriverState *bs, 500 BlockDriverCompletionFunc *cb, void *opaque) 501 { 502 int ret; 503 GlusterAIOCB *acb; 504 BDRVGlusterState *s = bs->opaque; 505 506 acb = qemu_aio_get(&gluster_aiocb_info, bs, cb, opaque); 507 acb->size = 0; 508 acb->ret = 0; 509 acb->finished = NULL; 510 511 ret = glfs_fsync_async(s->fd, &gluster_finish_aiocb, acb); 512 if (ret < 0) { 513 goto out; 514 } 515 return &acb->common; 516 517 out: 518 qemu_aio_release(acb); 519 return NULL; 520 } 521 522 #ifdef CONFIG_GLUSTERFS_DISCARD 523 static BlockDriverAIOCB *qemu_gluster_aio_discard(BlockDriverState *bs, 524 int64_t sector_num, int nb_sectors, BlockDriverCompletionFunc *cb, 525 void *opaque) 526 { 527 int ret; 528 GlusterAIOCB *acb; 529 BDRVGlusterState *s = bs->opaque; 530 size_t size; 531 off_t offset; 532 533 offset = sector_num * BDRV_SECTOR_SIZE; 534 size = nb_sectors * BDRV_SECTOR_SIZE; 535 536 acb = qemu_aio_get(&gluster_aiocb_info, bs, cb, opaque); 537 acb->size = 0; 538 acb->ret = 0; 539 acb->finished = NULL; 540 541 ret = glfs_discard_async(s->fd, offset, size, &gluster_finish_aiocb, acb); 542 if (ret < 0) { 543 goto out; 544 } 545 return &acb->common; 546 547 out: 548 qemu_aio_release(acb); 549 return NULL; 550 } 551 #endif 552 553 static int64_t qemu_gluster_getlength(BlockDriverState *bs) 554 { 555 BDRVGlusterState *s = bs->opaque; 556 int64_t ret; 557 558 ret = glfs_lseek(s->fd, 0, SEEK_END); 559 if (ret < 0) { 560 return -errno; 561 } else { 562 return ret; 563 } 564 } 565 566 static int64_t qemu_gluster_allocated_file_size(BlockDriverState *bs) 567 { 568 BDRVGlusterState *s = bs->opaque; 569 struct stat st; 570 int ret; 571 572 ret = glfs_fstat(s->fd, &st); 573 if (ret < 0) { 574 return -errno; 575 } else { 576 return st.st_blocks * 512; 577 } 578 } 579 580 static void qemu_gluster_close(BlockDriverState *bs) 581 { 582 BDRVGlusterState *s = bs->opaque; 583 584 close(s->fds[GLUSTER_FD_READ]); 585 close(s->fds[GLUSTER_FD_WRITE]); 586 qemu_aio_set_fd_handler(s->fds[GLUSTER_FD_READ], NULL, NULL, NULL); 587 588 if (s->fd) { 589 glfs_close(s->fd); 590 s->fd = NULL; 591 } 592 glfs_fini(s->glfs); 593 } 594 595 static int qemu_gluster_has_zero_init(BlockDriverState *bs) 596 { 597 /* GlusterFS volume could be backed by a block device */ 598 return 0; 599 } 600 601 static QEMUOptionParameter qemu_gluster_create_options[] = { 602 { 603 .name = BLOCK_OPT_SIZE, 604 .type = OPT_SIZE, 605 .help = "Virtual disk size" 606 }, 607 { NULL } 608 }; 609 610 static BlockDriver bdrv_gluster = { 611 .format_name = "gluster", 612 .protocol_name = "gluster", 613 .instance_size = sizeof(BDRVGlusterState), 614 .bdrv_file_open = qemu_gluster_open, 615 .bdrv_close = qemu_gluster_close, 616 .bdrv_create = qemu_gluster_create, 617 .bdrv_getlength = qemu_gluster_getlength, 618 .bdrv_get_allocated_file_size = qemu_gluster_allocated_file_size, 619 .bdrv_truncate = qemu_gluster_truncate, 620 .bdrv_aio_readv = qemu_gluster_aio_readv, 621 .bdrv_aio_writev = qemu_gluster_aio_writev, 622 .bdrv_aio_flush = qemu_gluster_aio_flush, 623 .bdrv_has_zero_init = qemu_gluster_has_zero_init, 624 #ifdef CONFIG_GLUSTERFS_DISCARD 625 .bdrv_aio_discard = qemu_gluster_aio_discard, 626 #endif 627 .create_options = qemu_gluster_create_options, 628 }; 629 630 static BlockDriver bdrv_gluster_tcp = { 631 .format_name = "gluster", 632 .protocol_name = "gluster+tcp", 633 .instance_size = sizeof(BDRVGlusterState), 634 .bdrv_file_open = qemu_gluster_open, 635 .bdrv_close = qemu_gluster_close, 636 .bdrv_create = qemu_gluster_create, 637 .bdrv_getlength = qemu_gluster_getlength, 638 .bdrv_get_allocated_file_size = qemu_gluster_allocated_file_size, 639 .bdrv_truncate = qemu_gluster_truncate, 640 .bdrv_aio_readv = qemu_gluster_aio_readv, 641 .bdrv_aio_writev = qemu_gluster_aio_writev, 642 .bdrv_aio_flush = qemu_gluster_aio_flush, 643 .bdrv_has_zero_init = qemu_gluster_has_zero_init, 644 #ifdef CONFIG_GLUSTERFS_DISCARD 645 .bdrv_aio_discard = qemu_gluster_aio_discard, 646 #endif 647 .create_options = qemu_gluster_create_options, 648 }; 649 650 static BlockDriver bdrv_gluster_unix = { 651 .format_name = "gluster", 652 .protocol_name = "gluster+unix", 653 .instance_size = sizeof(BDRVGlusterState), 654 .bdrv_file_open = qemu_gluster_open, 655 .bdrv_close = qemu_gluster_close, 656 .bdrv_create = qemu_gluster_create, 657 .bdrv_getlength = qemu_gluster_getlength, 658 .bdrv_get_allocated_file_size = qemu_gluster_allocated_file_size, 659 .bdrv_truncate = qemu_gluster_truncate, 660 .bdrv_aio_readv = qemu_gluster_aio_readv, 661 .bdrv_aio_writev = qemu_gluster_aio_writev, 662 .bdrv_aio_flush = qemu_gluster_aio_flush, 663 .bdrv_has_zero_init = qemu_gluster_has_zero_init, 664 #ifdef CONFIG_GLUSTERFS_DISCARD 665 .bdrv_aio_discard = qemu_gluster_aio_discard, 666 #endif 667 .create_options = qemu_gluster_create_options, 668 }; 669 670 static BlockDriver bdrv_gluster_rdma = { 671 .format_name = "gluster", 672 .protocol_name = "gluster+rdma", 673 .instance_size = sizeof(BDRVGlusterState), 674 .bdrv_file_open = qemu_gluster_open, 675 .bdrv_close = qemu_gluster_close, 676 .bdrv_create = qemu_gluster_create, 677 .bdrv_getlength = qemu_gluster_getlength, 678 .bdrv_get_allocated_file_size = qemu_gluster_allocated_file_size, 679 .bdrv_truncate = qemu_gluster_truncate, 680 .bdrv_aio_readv = qemu_gluster_aio_readv, 681 .bdrv_aio_writev = qemu_gluster_aio_writev, 682 .bdrv_aio_flush = qemu_gluster_aio_flush, 683 .bdrv_has_zero_init = qemu_gluster_has_zero_init, 684 #ifdef CONFIG_GLUSTERFS_DISCARD 685 .bdrv_aio_discard = qemu_gluster_aio_discard, 686 #endif 687 .create_options = qemu_gluster_create_options, 688 }; 689 690 static void bdrv_gluster_init(void) 691 { 692 bdrv_register(&bdrv_gluster_rdma); 693 bdrv_register(&bdrv_gluster_unix); 694 bdrv_register(&bdrv_gluster_tcp); 695 bdrv_register(&bdrv_gluster); 696 } 697 698 block_init(bdrv_gluster_init); 699