1 /* 2 * GlusterFS backend for QEMU 3 * 4 * Copyright (C) 2012 Bharata B Rao <bharata@linux.vnet.ibm.com> 5 * 6 * Pipe handling mechanism in AIO implementation is derived from 7 * block/rbd.c. Hence, 8 * 9 * Copyright (C) 2010-2011 Christian Brunner <chb@muc.de>, 10 * Josh Durgin <josh.durgin@dreamhost.com> 11 * 12 * This work is licensed under the terms of the GNU GPL, version 2. See 13 * the COPYING file in the top-level directory. 14 * 15 * Contributions after 2012-01-13 are licensed under the terms of the 16 * GNU GPL, version 2 or (at your option) any later version. 17 */ 18 #include <glusterfs/api/glfs.h> 19 #include "block/block_int.h" 20 #include "qemu/sockets.h" 21 #include "qemu/uri.h" 22 23 typedef struct GlusterAIOCB { 24 BlockDriverAIOCB common; 25 int64_t size; 26 int ret; 27 bool *finished; 28 QEMUBH *bh; 29 } GlusterAIOCB; 30 31 typedef struct BDRVGlusterState { 32 struct glfs *glfs; 33 int fds[2]; 34 struct glfs_fd *fd; 35 int qemu_aio_count; 36 int event_reader_pos; 37 GlusterAIOCB *event_acb; 38 } BDRVGlusterState; 39 40 #define GLUSTER_FD_READ 0 41 #define GLUSTER_FD_WRITE 1 42 43 typedef struct GlusterConf { 44 char *server; 45 int port; 46 char *volname; 47 char *image; 48 char *transport; 49 } GlusterConf; 50 51 static void qemu_gluster_gconf_free(GlusterConf *gconf) 52 { 53 g_free(gconf->server); 54 g_free(gconf->volname); 55 g_free(gconf->image); 56 g_free(gconf->transport); 57 g_free(gconf); 58 } 59 60 static int parse_volume_options(GlusterConf *gconf, char *path) 61 { 62 char *p, *q; 63 64 if (!path) { 65 return -EINVAL; 66 } 67 68 /* volume */ 69 p = q = path + strspn(path, "/"); 70 p += strcspn(p, "/"); 71 if (*p == '\0') { 72 return -EINVAL; 73 } 74 gconf->volname = g_strndup(q, p - q); 75 76 /* image */ 77 p += strspn(p, "/"); 78 if (*p == '\0') { 79 return -EINVAL; 80 } 81 gconf->image = g_strdup(p); 82 return 0; 83 } 84 85 /* 86 * file=gluster[+transport]://[server[:port]]/volname/image[?socket=...] 87 * 88 * 'gluster' is the protocol. 89 * 90 * 'transport' specifies the transport type used to connect to gluster 91 * management daemon (glusterd). Valid transport types are 92 * tcp, unix and rdma. If a transport type isn't specified, then tcp 93 * type is assumed. 94 * 95 * 'server' specifies the server where the volume file specification for 96 * the given volume resides. This can be either hostname, ipv4 address 97 * or ipv6 address. ipv6 address needs to be within square brackets [ ]. 98 * If transport type is 'unix', then 'server' field should not be specifed. 99 * The 'socket' field needs to be populated with the path to unix domain 100 * socket. 101 * 102 * 'port' is the port number on which glusterd is listening. This is optional 103 * and if not specified, QEMU will send 0 which will make gluster to use the 104 * default port. If the transport type is unix, then 'port' should not be 105 * specified. 106 * 107 * 'volname' is the name of the gluster volume which contains the VM image. 108 * 109 * 'image' is the path to the actual VM image that resides on gluster volume. 110 * 111 * Examples: 112 * 113 * file=gluster://1.2.3.4/testvol/a.img 114 * file=gluster+tcp://1.2.3.4/testvol/a.img 115 * file=gluster+tcp://1.2.3.4:24007/testvol/dir/a.img 116 * file=gluster+tcp://[1:2:3:4:5:6:7:8]/testvol/dir/a.img 117 * file=gluster+tcp://[1:2:3:4:5:6:7:8]:24007/testvol/dir/a.img 118 * file=gluster+tcp://server.domain.com:24007/testvol/dir/a.img 119 * file=gluster+unix:///testvol/dir/a.img?socket=/tmp/glusterd.socket 120 * file=gluster+rdma://1.2.3.4:24007/testvol/a.img 121 */ 122 static int qemu_gluster_parseuri(GlusterConf *gconf, const char *filename) 123 { 124 URI *uri; 125 QueryParams *qp = NULL; 126 bool is_unix = false; 127 int ret = 0; 128 129 uri = uri_parse(filename); 130 if (!uri) { 131 return -EINVAL; 132 } 133 134 /* transport */ 135 if (!strcmp(uri->scheme, "gluster")) { 136 gconf->transport = g_strdup("tcp"); 137 } else if (!strcmp(uri->scheme, "gluster+tcp")) { 138 gconf->transport = g_strdup("tcp"); 139 } else if (!strcmp(uri->scheme, "gluster+unix")) { 140 gconf->transport = g_strdup("unix"); 141 is_unix = true; 142 } else if (!strcmp(uri->scheme, "gluster+rdma")) { 143 gconf->transport = g_strdup("rdma"); 144 } else { 145 ret = -EINVAL; 146 goto out; 147 } 148 149 ret = parse_volume_options(gconf, uri->path); 150 if (ret < 0) { 151 goto out; 152 } 153 154 qp = query_params_parse(uri->query); 155 if (qp->n > 1 || (is_unix && !qp->n) || (!is_unix && qp->n)) { 156 ret = -EINVAL; 157 goto out; 158 } 159 160 if (is_unix) { 161 if (uri->server || uri->port) { 162 ret = -EINVAL; 163 goto out; 164 } 165 if (strcmp(qp->p[0].name, "socket")) { 166 ret = -EINVAL; 167 goto out; 168 } 169 gconf->server = g_strdup(qp->p[0].value); 170 } else { 171 gconf->server = g_strdup(uri->server); 172 gconf->port = uri->port; 173 } 174 175 out: 176 if (qp) { 177 query_params_free(qp); 178 } 179 uri_free(uri); 180 return ret; 181 } 182 183 static struct glfs *qemu_gluster_init(GlusterConf *gconf, const char *filename) 184 { 185 struct glfs *glfs = NULL; 186 int ret; 187 int old_errno; 188 189 ret = qemu_gluster_parseuri(gconf, filename); 190 if (ret < 0) { 191 error_report("Usage: file=gluster[+transport]://[server[:port]]/" 192 "volname/image[?socket=...]"); 193 errno = -ret; 194 goto out; 195 } 196 197 glfs = glfs_new(gconf->volname); 198 if (!glfs) { 199 goto out; 200 } 201 202 ret = glfs_set_volfile_server(glfs, gconf->transport, gconf->server, 203 gconf->port); 204 if (ret < 0) { 205 goto out; 206 } 207 208 /* 209 * TODO: Use GF_LOG_ERROR instead of hard code value of 4 here when 210 * GlusterFS makes GF_LOG_* macros available to libgfapi users. 211 */ 212 ret = glfs_set_logging(glfs, "-", 4); 213 if (ret < 0) { 214 goto out; 215 } 216 217 ret = glfs_init(glfs); 218 if (ret) { 219 error_report("Gluster connection failed for server=%s port=%d " 220 "volume=%s image=%s transport=%s", gconf->server, gconf->port, 221 gconf->volname, gconf->image, gconf->transport); 222 goto out; 223 } 224 return glfs; 225 226 out: 227 if (glfs) { 228 old_errno = errno; 229 glfs_fini(glfs); 230 errno = old_errno; 231 } 232 return NULL; 233 } 234 235 static void qemu_gluster_complete_aio(GlusterAIOCB *acb, BDRVGlusterState *s) 236 { 237 int ret; 238 bool *finished = acb->finished; 239 BlockDriverCompletionFunc *cb = acb->common.cb; 240 void *opaque = acb->common.opaque; 241 242 if (!acb->ret || acb->ret == acb->size) { 243 ret = 0; /* Success */ 244 } else if (acb->ret < 0) { 245 ret = acb->ret; /* Read/Write failed */ 246 } else { 247 ret = -EIO; /* Partial read/write - fail it */ 248 } 249 250 s->qemu_aio_count--; 251 qemu_aio_release(acb); 252 cb(opaque, ret); 253 if (finished) { 254 *finished = true; 255 } 256 } 257 258 static void qemu_gluster_aio_event_reader(void *opaque) 259 { 260 BDRVGlusterState *s = opaque; 261 ssize_t ret; 262 263 do { 264 char *p = (char *)&s->event_acb; 265 266 ret = read(s->fds[GLUSTER_FD_READ], p + s->event_reader_pos, 267 sizeof(s->event_acb) - s->event_reader_pos); 268 if (ret > 0) { 269 s->event_reader_pos += ret; 270 if (s->event_reader_pos == sizeof(s->event_acb)) { 271 s->event_reader_pos = 0; 272 qemu_gluster_complete_aio(s->event_acb, s); 273 } 274 } 275 } while (ret < 0 && errno == EINTR); 276 } 277 278 static int qemu_gluster_aio_flush_cb(void *opaque) 279 { 280 BDRVGlusterState *s = opaque; 281 282 return (s->qemu_aio_count > 0); 283 } 284 285 /* TODO Convert to fine grained options */ 286 static QemuOptsList runtime_opts = { 287 .name = "gluster", 288 .head = QTAILQ_HEAD_INITIALIZER(runtime_opts.head), 289 .desc = { 290 { 291 .name = "filename", 292 .type = QEMU_OPT_STRING, 293 .help = "URL to the gluster image", 294 }, 295 { /* end of list */ } 296 }, 297 }; 298 299 static int qemu_gluster_open(BlockDriverState *bs, QDict *options, 300 int bdrv_flags) 301 { 302 BDRVGlusterState *s = bs->opaque; 303 int open_flags = O_BINARY; 304 int ret = 0; 305 GlusterConf *gconf = g_malloc0(sizeof(GlusterConf)); 306 QemuOpts *opts; 307 Error *local_err = NULL; 308 const char *filename; 309 310 opts = qemu_opts_create_nofail(&runtime_opts); 311 qemu_opts_absorb_qdict(opts, options, &local_err); 312 if (error_is_set(&local_err)) { 313 qerror_report_err(local_err); 314 error_free(local_err); 315 ret = -EINVAL; 316 goto out; 317 } 318 319 filename = qemu_opt_get(opts, "filename"); 320 321 322 s->glfs = qemu_gluster_init(gconf, filename); 323 if (!s->glfs) { 324 ret = -errno; 325 goto out; 326 } 327 328 if (bdrv_flags & BDRV_O_RDWR) { 329 open_flags |= O_RDWR; 330 } else { 331 open_flags |= O_RDONLY; 332 } 333 334 if ((bdrv_flags & BDRV_O_NOCACHE)) { 335 open_flags |= O_DIRECT; 336 } 337 338 s->fd = glfs_open(s->glfs, gconf->image, open_flags); 339 if (!s->fd) { 340 ret = -errno; 341 goto out; 342 } 343 344 ret = qemu_pipe(s->fds); 345 if (ret < 0) { 346 ret = -errno; 347 goto out; 348 } 349 fcntl(s->fds[GLUSTER_FD_READ], F_SETFL, O_NONBLOCK); 350 qemu_aio_set_fd_handler(s->fds[GLUSTER_FD_READ], 351 qemu_gluster_aio_event_reader, NULL, qemu_gluster_aio_flush_cb, s); 352 353 out: 354 qemu_opts_del(opts); 355 qemu_gluster_gconf_free(gconf); 356 if (!ret) { 357 return ret; 358 } 359 if (s->fd) { 360 glfs_close(s->fd); 361 } 362 if (s->glfs) { 363 glfs_fini(s->glfs); 364 } 365 return ret; 366 } 367 368 static int qemu_gluster_create(const char *filename, 369 QEMUOptionParameter *options) 370 { 371 struct glfs *glfs; 372 struct glfs_fd *fd; 373 int ret = 0; 374 int64_t total_size = 0; 375 GlusterConf *gconf = g_malloc0(sizeof(GlusterConf)); 376 377 glfs = qemu_gluster_init(gconf, filename); 378 if (!glfs) { 379 ret = -errno; 380 goto out; 381 } 382 383 while (options && options->name) { 384 if (!strcmp(options->name, BLOCK_OPT_SIZE)) { 385 total_size = options->value.n / BDRV_SECTOR_SIZE; 386 } 387 options++; 388 } 389 390 fd = glfs_creat(glfs, gconf->image, 391 O_WRONLY | O_CREAT | O_TRUNC | O_BINARY, S_IRUSR | S_IWUSR); 392 if (!fd) { 393 ret = -errno; 394 } else { 395 if (glfs_ftruncate(fd, total_size * BDRV_SECTOR_SIZE) != 0) { 396 ret = -errno; 397 } 398 if (glfs_close(fd) != 0) { 399 ret = -errno; 400 } 401 } 402 out: 403 qemu_gluster_gconf_free(gconf); 404 if (glfs) { 405 glfs_fini(glfs); 406 } 407 return ret; 408 } 409 410 static void qemu_gluster_aio_cancel(BlockDriverAIOCB *blockacb) 411 { 412 GlusterAIOCB *acb = (GlusterAIOCB *)blockacb; 413 bool finished = false; 414 415 acb->finished = &finished; 416 while (!finished) { 417 qemu_aio_wait(); 418 } 419 } 420 421 static const AIOCBInfo gluster_aiocb_info = { 422 .aiocb_size = sizeof(GlusterAIOCB), 423 .cancel = qemu_gluster_aio_cancel, 424 }; 425 426 static void gluster_finish_aiocb(struct glfs_fd *fd, ssize_t ret, void *arg) 427 { 428 GlusterAIOCB *acb = (GlusterAIOCB *)arg; 429 BlockDriverState *bs = acb->common.bs; 430 BDRVGlusterState *s = bs->opaque; 431 int retval; 432 433 acb->ret = ret; 434 retval = qemu_write_full(s->fds[GLUSTER_FD_WRITE], &acb, sizeof(acb)); 435 if (retval != sizeof(acb)) { 436 /* 437 * Gluster AIO callback thread failed to notify the waiting 438 * QEMU thread about IO completion. 439 * 440 * Complete this IO request and make the disk inaccessible for 441 * subsequent reads and writes. 442 */ 443 error_report("Gluster failed to notify QEMU about IO completion"); 444 445 qemu_mutex_lock_iothread(); /* We are in gluster thread context */ 446 acb->common.cb(acb->common.opaque, -EIO); 447 qemu_aio_release(acb); 448 s->qemu_aio_count--; 449 close(s->fds[GLUSTER_FD_READ]); 450 close(s->fds[GLUSTER_FD_WRITE]); 451 qemu_aio_set_fd_handler(s->fds[GLUSTER_FD_READ], NULL, NULL, NULL, 452 NULL); 453 bs->drv = NULL; /* Make the disk inaccessible */ 454 qemu_mutex_unlock_iothread(); 455 } 456 } 457 458 static BlockDriverAIOCB *qemu_gluster_aio_rw(BlockDriverState *bs, 459 int64_t sector_num, QEMUIOVector *qiov, int nb_sectors, 460 BlockDriverCompletionFunc *cb, void *opaque, int write) 461 { 462 int ret; 463 GlusterAIOCB *acb; 464 BDRVGlusterState *s = bs->opaque; 465 size_t size; 466 off_t offset; 467 468 offset = sector_num * BDRV_SECTOR_SIZE; 469 size = nb_sectors * BDRV_SECTOR_SIZE; 470 s->qemu_aio_count++; 471 472 acb = qemu_aio_get(&gluster_aiocb_info, bs, cb, opaque); 473 acb->size = size; 474 acb->ret = 0; 475 acb->finished = NULL; 476 477 if (write) { 478 ret = glfs_pwritev_async(s->fd, qiov->iov, qiov->niov, offset, 0, 479 &gluster_finish_aiocb, acb); 480 } else { 481 ret = glfs_preadv_async(s->fd, qiov->iov, qiov->niov, offset, 0, 482 &gluster_finish_aiocb, acb); 483 } 484 485 if (ret < 0) { 486 goto out; 487 } 488 return &acb->common; 489 490 out: 491 s->qemu_aio_count--; 492 qemu_aio_release(acb); 493 return NULL; 494 } 495 496 static BlockDriverAIOCB *qemu_gluster_aio_readv(BlockDriverState *bs, 497 int64_t sector_num, QEMUIOVector *qiov, int nb_sectors, 498 BlockDriverCompletionFunc *cb, void *opaque) 499 { 500 return qemu_gluster_aio_rw(bs, sector_num, qiov, nb_sectors, cb, opaque, 0); 501 } 502 503 static BlockDriverAIOCB *qemu_gluster_aio_writev(BlockDriverState *bs, 504 int64_t sector_num, QEMUIOVector *qiov, int nb_sectors, 505 BlockDriverCompletionFunc *cb, void *opaque) 506 { 507 return qemu_gluster_aio_rw(bs, sector_num, qiov, nb_sectors, cb, opaque, 1); 508 } 509 510 static BlockDriverAIOCB *qemu_gluster_aio_flush(BlockDriverState *bs, 511 BlockDriverCompletionFunc *cb, void *opaque) 512 { 513 int ret; 514 GlusterAIOCB *acb; 515 BDRVGlusterState *s = bs->opaque; 516 517 acb = qemu_aio_get(&gluster_aiocb_info, bs, cb, opaque); 518 acb->size = 0; 519 acb->ret = 0; 520 acb->finished = NULL; 521 s->qemu_aio_count++; 522 523 ret = glfs_fsync_async(s->fd, &gluster_finish_aiocb, acb); 524 if (ret < 0) { 525 goto out; 526 } 527 return &acb->common; 528 529 out: 530 s->qemu_aio_count--; 531 qemu_aio_release(acb); 532 return NULL; 533 } 534 535 static int64_t qemu_gluster_getlength(BlockDriverState *bs) 536 { 537 BDRVGlusterState *s = bs->opaque; 538 int64_t ret; 539 540 ret = glfs_lseek(s->fd, 0, SEEK_END); 541 if (ret < 0) { 542 return -errno; 543 } else { 544 return ret; 545 } 546 } 547 548 static int64_t qemu_gluster_allocated_file_size(BlockDriverState *bs) 549 { 550 BDRVGlusterState *s = bs->opaque; 551 struct stat st; 552 int ret; 553 554 ret = glfs_fstat(s->fd, &st); 555 if (ret < 0) { 556 return -errno; 557 } else { 558 return st.st_blocks * 512; 559 } 560 } 561 562 static void qemu_gluster_close(BlockDriverState *bs) 563 { 564 BDRVGlusterState *s = bs->opaque; 565 566 close(s->fds[GLUSTER_FD_READ]); 567 close(s->fds[GLUSTER_FD_WRITE]); 568 qemu_aio_set_fd_handler(s->fds[GLUSTER_FD_READ], NULL, NULL, NULL, NULL); 569 570 if (s->fd) { 571 glfs_close(s->fd); 572 s->fd = NULL; 573 } 574 glfs_fini(s->glfs); 575 } 576 577 static QEMUOptionParameter qemu_gluster_create_options[] = { 578 { 579 .name = BLOCK_OPT_SIZE, 580 .type = OPT_SIZE, 581 .help = "Virtual disk size" 582 }, 583 { NULL } 584 }; 585 586 static BlockDriver bdrv_gluster = { 587 .format_name = "gluster", 588 .protocol_name = "gluster", 589 .instance_size = sizeof(BDRVGlusterState), 590 .bdrv_file_open = qemu_gluster_open, 591 .bdrv_close = qemu_gluster_close, 592 .bdrv_create = qemu_gluster_create, 593 .bdrv_getlength = qemu_gluster_getlength, 594 .bdrv_get_allocated_file_size = qemu_gluster_allocated_file_size, 595 .bdrv_aio_readv = qemu_gluster_aio_readv, 596 .bdrv_aio_writev = qemu_gluster_aio_writev, 597 .bdrv_aio_flush = qemu_gluster_aio_flush, 598 .create_options = qemu_gluster_create_options, 599 }; 600 601 static BlockDriver bdrv_gluster_tcp = { 602 .format_name = "gluster", 603 .protocol_name = "gluster+tcp", 604 .instance_size = sizeof(BDRVGlusterState), 605 .bdrv_file_open = qemu_gluster_open, 606 .bdrv_close = qemu_gluster_close, 607 .bdrv_create = qemu_gluster_create, 608 .bdrv_getlength = qemu_gluster_getlength, 609 .bdrv_get_allocated_file_size = qemu_gluster_allocated_file_size, 610 .bdrv_aio_readv = qemu_gluster_aio_readv, 611 .bdrv_aio_writev = qemu_gluster_aio_writev, 612 .bdrv_aio_flush = qemu_gluster_aio_flush, 613 .create_options = qemu_gluster_create_options, 614 }; 615 616 static BlockDriver bdrv_gluster_unix = { 617 .format_name = "gluster", 618 .protocol_name = "gluster+unix", 619 .instance_size = sizeof(BDRVGlusterState), 620 .bdrv_file_open = qemu_gluster_open, 621 .bdrv_close = qemu_gluster_close, 622 .bdrv_create = qemu_gluster_create, 623 .bdrv_getlength = qemu_gluster_getlength, 624 .bdrv_get_allocated_file_size = qemu_gluster_allocated_file_size, 625 .bdrv_aio_readv = qemu_gluster_aio_readv, 626 .bdrv_aio_writev = qemu_gluster_aio_writev, 627 .bdrv_aio_flush = qemu_gluster_aio_flush, 628 .create_options = qemu_gluster_create_options, 629 }; 630 631 static BlockDriver bdrv_gluster_rdma = { 632 .format_name = "gluster", 633 .protocol_name = "gluster+rdma", 634 .instance_size = sizeof(BDRVGlusterState), 635 .bdrv_file_open = qemu_gluster_open, 636 .bdrv_close = qemu_gluster_close, 637 .bdrv_create = qemu_gluster_create, 638 .bdrv_getlength = qemu_gluster_getlength, 639 .bdrv_get_allocated_file_size = qemu_gluster_allocated_file_size, 640 .bdrv_aio_readv = qemu_gluster_aio_readv, 641 .bdrv_aio_writev = qemu_gluster_aio_writev, 642 .bdrv_aio_flush = qemu_gluster_aio_flush, 643 .create_options = qemu_gluster_create_options, 644 }; 645 646 static void bdrv_gluster_init(void) 647 { 648 bdrv_register(&bdrv_gluster_rdma); 649 bdrv_register(&bdrv_gluster_unix); 650 bdrv_register(&bdrv_gluster_tcp); 651 bdrv_register(&bdrv_gluster); 652 } 653 654 block_init(bdrv_gluster_init); 655