1 /* 2 * GlusterFS backend for QEMU 3 * 4 * Copyright (C) 2012 Bharata B Rao <bharata@linux.vnet.ibm.com> 5 * 6 * Pipe handling mechanism in AIO implementation is derived from 7 * block/rbd.c. Hence, 8 * 9 * Copyright (C) 2010-2011 Christian Brunner <chb@muc.de>, 10 * Josh Durgin <josh.durgin@dreamhost.com> 11 * 12 * This work is licensed under the terms of the GNU GPL, version 2. See 13 * the COPYING file in the top-level directory. 14 * 15 * Contributions after 2012-01-13 are licensed under the terms of the 16 * GNU GPL, version 2 or (at your option) any later version. 17 */ 18 #include <glusterfs/api/glfs.h> 19 #include "block/block_int.h" 20 #include "qemu/sockets.h" 21 #include "qemu/uri.h" 22 23 typedef struct GlusterAIOCB { 24 BlockDriverAIOCB common; 25 int64_t size; 26 int ret; 27 bool *finished; 28 QEMUBH *bh; 29 } GlusterAIOCB; 30 31 typedef struct BDRVGlusterState { 32 struct glfs *glfs; 33 int fds[2]; 34 struct glfs_fd *fd; 35 int qemu_aio_count; 36 int event_reader_pos; 37 GlusterAIOCB *event_acb; 38 } BDRVGlusterState; 39 40 #define GLUSTER_FD_READ 0 41 #define GLUSTER_FD_WRITE 1 42 43 typedef struct GlusterConf { 44 char *server; 45 int port; 46 char *volname; 47 char *image; 48 char *transport; 49 } GlusterConf; 50 51 static void qemu_gluster_gconf_free(GlusterConf *gconf) 52 { 53 g_free(gconf->server); 54 g_free(gconf->volname); 55 g_free(gconf->image); 56 g_free(gconf->transport); 57 g_free(gconf); 58 } 59 60 static int parse_volume_options(GlusterConf *gconf, char *path) 61 { 62 char *p, *q; 63 64 if (!path) { 65 return -EINVAL; 66 } 67 68 /* volume */ 69 p = q = path + strspn(path, "/"); 70 p += strcspn(p, "/"); 71 if (*p == '\0') { 72 return -EINVAL; 73 } 74 gconf->volname = g_strndup(q, p - q); 75 76 /* image */ 77 p += strspn(p, "/"); 78 if (*p == '\0') { 79 return -EINVAL; 80 } 81 gconf->image = g_strdup(p); 82 return 0; 83 } 84 85 /* 86 * file=gluster[+transport]://[server[:port]]/volname/image[?socket=...] 87 * 88 * 'gluster' is the protocol. 89 * 90 * 'transport' specifies the transport type used to connect to gluster 91 * management daemon (glusterd). Valid transport types are 92 * tcp, unix and rdma. If a transport type isn't specified, then tcp 93 * type is assumed. 94 * 95 * 'server' specifies the server where the volume file specification for 96 * the given volume resides. This can be either hostname, ipv4 address 97 * or ipv6 address. ipv6 address needs to be within square brackets [ ]. 98 * If transport type is 'unix', then 'server' field should not be specifed. 99 * The 'socket' field needs to be populated with the path to unix domain 100 * socket. 101 * 102 * 'port' is the port number on which glusterd is listening. This is optional 103 * and if not specified, QEMU will send 0 which will make gluster to use the 104 * default port. If the transport type is unix, then 'port' should not be 105 * specified. 106 * 107 * 'volname' is the name of the gluster volume which contains the VM image. 108 * 109 * 'image' is the path to the actual VM image that resides on gluster volume. 110 * 111 * Examples: 112 * 113 * file=gluster://1.2.3.4/testvol/a.img 114 * file=gluster+tcp://1.2.3.4/testvol/a.img 115 * file=gluster+tcp://1.2.3.4:24007/testvol/dir/a.img 116 * file=gluster+tcp://[1:2:3:4:5:6:7:8]/testvol/dir/a.img 117 * file=gluster+tcp://[1:2:3:4:5:6:7:8]:24007/testvol/dir/a.img 118 * file=gluster+tcp://server.domain.com:24007/testvol/dir/a.img 119 * file=gluster+unix:///testvol/dir/a.img?socket=/tmp/glusterd.socket 120 * file=gluster+rdma://1.2.3.4:24007/testvol/a.img 121 */ 122 static int qemu_gluster_parseuri(GlusterConf *gconf, const char *filename) 123 { 124 URI *uri; 125 QueryParams *qp = NULL; 126 bool is_unix = false; 127 int ret = 0; 128 129 uri = uri_parse(filename); 130 if (!uri) { 131 return -EINVAL; 132 } 133 134 /* transport */ 135 if (!strcmp(uri->scheme, "gluster")) { 136 gconf->transport = g_strdup("tcp"); 137 } else if (!strcmp(uri->scheme, "gluster+tcp")) { 138 gconf->transport = g_strdup("tcp"); 139 } else if (!strcmp(uri->scheme, "gluster+unix")) { 140 gconf->transport = g_strdup("unix"); 141 is_unix = true; 142 } else if (!strcmp(uri->scheme, "gluster+rdma")) { 143 gconf->transport = g_strdup("rdma"); 144 } else { 145 ret = -EINVAL; 146 goto out; 147 } 148 149 ret = parse_volume_options(gconf, uri->path); 150 if (ret < 0) { 151 goto out; 152 } 153 154 qp = query_params_parse(uri->query); 155 if (qp->n > 1 || (is_unix && !qp->n) || (!is_unix && qp->n)) { 156 ret = -EINVAL; 157 goto out; 158 } 159 160 if (is_unix) { 161 if (uri->server || uri->port) { 162 ret = -EINVAL; 163 goto out; 164 } 165 if (strcmp(qp->p[0].name, "socket")) { 166 ret = -EINVAL; 167 goto out; 168 } 169 gconf->server = g_strdup(qp->p[0].value); 170 } else { 171 gconf->server = g_strdup(uri->server); 172 gconf->port = uri->port; 173 } 174 175 out: 176 if (qp) { 177 query_params_free(qp); 178 } 179 uri_free(uri); 180 return ret; 181 } 182 183 static struct glfs *qemu_gluster_init(GlusterConf *gconf, const char *filename) 184 { 185 struct glfs *glfs = NULL; 186 int ret; 187 int old_errno; 188 189 ret = qemu_gluster_parseuri(gconf, filename); 190 if (ret < 0) { 191 error_report("Usage: file=gluster[+transport]://[server[:port]]/" 192 "volname/image[?socket=...]"); 193 errno = -ret; 194 goto out; 195 } 196 197 glfs = glfs_new(gconf->volname); 198 if (!glfs) { 199 goto out; 200 } 201 202 ret = glfs_set_volfile_server(glfs, gconf->transport, gconf->server, 203 gconf->port); 204 if (ret < 0) { 205 goto out; 206 } 207 208 /* 209 * TODO: Use GF_LOG_ERROR instead of hard code value of 4 here when 210 * GlusterFS makes GF_LOG_* macros available to libgfapi users. 211 */ 212 ret = glfs_set_logging(glfs, "-", 4); 213 if (ret < 0) { 214 goto out; 215 } 216 217 ret = glfs_init(glfs); 218 if (ret) { 219 error_report("Gluster connection failed for server=%s port=%d " 220 "volume=%s image=%s transport=%s", gconf->server, gconf->port, 221 gconf->volname, gconf->image, gconf->transport); 222 goto out; 223 } 224 return glfs; 225 226 out: 227 if (glfs) { 228 old_errno = errno; 229 glfs_fini(glfs); 230 errno = old_errno; 231 } 232 return NULL; 233 } 234 235 static void qemu_gluster_complete_aio(GlusterAIOCB *acb, BDRVGlusterState *s) 236 { 237 int ret; 238 bool *finished = acb->finished; 239 BlockDriverCompletionFunc *cb = acb->common.cb; 240 void *opaque = acb->common.opaque; 241 242 if (!acb->ret || acb->ret == acb->size) { 243 ret = 0; /* Success */ 244 } else if (acb->ret < 0) { 245 ret = acb->ret; /* Read/Write failed */ 246 } else { 247 ret = -EIO; /* Partial read/write - fail it */ 248 } 249 250 s->qemu_aio_count--; 251 qemu_aio_release(acb); 252 cb(opaque, ret); 253 if (finished) { 254 *finished = true; 255 } 256 } 257 258 static void qemu_gluster_aio_event_reader(void *opaque) 259 { 260 BDRVGlusterState *s = opaque; 261 ssize_t ret; 262 263 do { 264 char *p = (char *)&s->event_acb; 265 266 ret = read(s->fds[GLUSTER_FD_READ], p + s->event_reader_pos, 267 sizeof(s->event_acb) - s->event_reader_pos); 268 if (ret > 0) { 269 s->event_reader_pos += ret; 270 if (s->event_reader_pos == sizeof(s->event_acb)) { 271 s->event_reader_pos = 0; 272 qemu_gluster_complete_aio(s->event_acb, s); 273 } 274 } 275 } while (ret < 0 && errno == EINTR); 276 } 277 278 static int qemu_gluster_aio_flush_cb(void *opaque) 279 { 280 BDRVGlusterState *s = opaque; 281 282 return (s->qemu_aio_count > 0); 283 } 284 285 static int qemu_gluster_open(BlockDriverState *bs, const char *filename, 286 QDict *options, int bdrv_flags) 287 { 288 BDRVGlusterState *s = bs->opaque; 289 int open_flags = O_BINARY; 290 int ret = 0; 291 GlusterConf *gconf = g_malloc0(sizeof(GlusterConf)); 292 293 s->glfs = qemu_gluster_init(gconf, filename); 294 if (!s->glfs) { 295 ret = -errno; 296 goto out; 297 } 298 299 if (bdrv_flags & BDRV_O_RDWR) { 300 open_flags |= O_RDWR; 301 } else { 302 open_flags |= O_RDONLY; 303 } 304 305 if ((bdrv_flags & BDRV_O_NOCACHE)) { 306 open_flags |= O_DIRECT; 307 } 308 309 s->fd = glfs_open(s->glfs, gconf->image, open_flags); 310 if (!s->fd) { 311 ret = -errno; 312 goto out; 313 } 314 315 ret = qemu_pipe(s->fds); 316 if (ret < 0) { 317 ret = -errno; 318 goto out; 319 } 320 fcntl(s->fds[GLUSTER_FD_READ], F_SETFL, O_NONBLOCK); 321 qemu_aio_set_fd_handler(s->fds[GLUSTER_FD_READ], 322 qemu_gluster_aio_event_reader, NULL, qemu_gluster_aio_flush_cb, s); 323 324 out: 325 qemu_gluster_gconf_free(gconf); 326 if (!ret) { 327 return ret; 328 } 329 if (s->fd) { 330 glfs_close(s->fd); 331 } 332 if (s->glfs) { 333 glfs_fini(s->glfs); 334 } 335 return ret; 336 } 337 338 static int qemu_gluster_create(const char *filename, 339 QEMUOptionParameter *options) 340 { 341 struct glfs *glfs; 342 struct glfs_fd *fd; 343 int ret = 0; 344 int64_t total_size = 0; 345 GlusterConf *gconf = g_malloc0(sizeof(GlusterConf)); 346 347 glfs = qemu_gluster_init(gconf, filename); 348 if (!glfs) { 349 ret = -errno; 350 goto out; 351 } 352 353 while (options && options->name) { 354 if (!strcmp(options->name, BLOCK_OPT_SIZE)) { 355 total_size = options->value.n / BDRV_SECTOR_SIZE; 356 } 357 options++; 358 } 359 360 fd = glfs_creat(glfs, gconf->image, 361 O_WRONLY | O_CREAT | O_TRUNC | O_BINARY, S_IRUSR | S_IWUSR); 362 if (!fd) { 363 ret = -errno; 364 } else { 365 if (glfs_ftruncate(fd, total_size * BDRV_SECTOR_SIZE) != 0) { 366 ret = -errno; 367 } 368 if (glfs_close(fd) != 0) { 369 ret = -errno; 370 } 371 } 372 out: 373 qemu_gluster_gconf_free(gconf); 374 if (glfs) { 375 glfs_fini(glfs); 376 } 377 return ret; 378 } 379 380 static void qemu_gluster_aio_cancel(BlockDriverAIOCB *blockacb) 381 { 382 GlusterAIOCB *acb = (GlusterAIOCB *)blockacb; 383 bool finished = false; 384 385 acb->finished = &finished; 386 while (!finished) { 387 qemu_aio_wait(); 388 } 389 } 390 391 static const AIOCBInfo gluster_aiocb_info = { 392 .aiocb_size = sizeof(GlusterAIOCB), 393 .cancel = qemu_gluster_aio_cancel, 394 }; 395 396 static void gluster_finish_aiocb(struct glfs_fd *fd, ssize_t ret, void *arg) 397 { 398 GlusterAIOCB *acb = (GlusterAIOCB *)arg; 399 BlockDriverState *bs = acb->common.bs; 400 BDRVGlusterState *s = bs->opaque; 401 int retval; 402 403 acb->ret = ret; 404 retval = qemu_write_full(s->fds[GLUSTER_FD_WRITE], &acb, sizeof(acb)); 405 if (retval != sizeof(acb)) { 406 /* 407 * Gluster AIO callback thread failed to notify the waiting 408 * QEMU thread about IO completion. 409 * 410 * Complete this IO request and make the disk inaccessible for 411 * subsequent reads and writes. 412 */ 413 error_report("Gluster failed to notify QEMU about IO completion"); 414 415 qemu_mutex_lock_iothread(); /* We are in gluster thread context */ 416 acb->common.cb(acb->common.opaque, -EIO); 417 qemu_aio_release(acb); 418 s->qemu_aio_count--; 419 close(s->fds[GLUSTER_FD_READ]); 420 close(s->fds[GLUSTER_FD_WRITE]); 421 qemu_aio_set_fd_handler(s->fds[GLUSTER_FD_READ], NULL, NULL, NULL, 422 NULL); 423 bs->drv = NULL; /* Make the disk inaccessible */ 424 qemu_mutex_unlock_iothread(); 425 } 426 } 427 428 static BlockDriverAIOCB *qemu_gluster_aio_rw(BlockDriverState *bs, 429 int64_t sector_num, QEMUIOVector *qiov, int nb_sectors, 430 BlockDriverCompletionFunc *cb, void *opaque, int write) 431 { 432 int ret; 433 GlusterAIOCB *acb; 434 BDRVGlusterState *s = bs->opaque; 435 size_t size; 436 off_t offset; 437 438 offset = sector_num * BDRV_SECTOR_SIZE; 439 size = nb_sectors * BDRV_SECTOR_SIZE; 440 s->qemu_aio_count++; 441 442 acb = qemu_aio_get(&gluster_aiocb_info, bs, cb, opaque); 443 acb->size = size; 444 acb->ret = 0; 445 acb->finished = NULL; 446 447 if (write) { 448 ret = glfs_pwritev_async(s->fd, qiov->iov, qiov->niov, offset, 0, 449 &gluster_finish_aiocb, acb); 450 } else { 451 ret = glfs_preadv_async(s->fd, qiov->iov, qiov->niov, offset, 0, 452 &gluster_finish_aiocb, acb); 453 } 454 455 if (ret < 0) { 456 goto out; 457 } 458 return &acb->common; 459 460 out: 461 s->qemu_aio_count--; 462 qemu_aio_release(acb); 463 return NULL; 464 } 465 466 static BlockDriverAIOCB *qemu_gluster_aio_readv(BlockDriverState *bs, 467 int64_t sector_num, QEMUIOVector *qiov, int nb_sectors, 468 BlockDriverCompletionFunc *cb, void *opaque) 469 { 470 return qemu_gluster_aio_rw(bs, sector_num, qiov, nb_sectors, cb, opaque, 0); 471 } 472 473 static BlockDriverAIOCB *qemu_gluster_aio_writev(BlockDriverState *bs, 474 int64_t sector_num, QEMUIOVector *qiov, int nb_sectors, 475 BlockDriverCompletionFunc *cb, void *opaque) 476 { 477 return qemu_gluster_aio_rw(bs, sector_num, qiov, nb_sectors, cb, opaque, 1); 478 } 479 480 static BlockDriverAIOCB *qemu_gluster_aio_flush(BlockDriverState *bs, 481 BlockDriverCompletionFunc *cb, void *opaque) 482 { 483 int ret; 484 GlusterAIOCB *acb; 485 BDRVGlusterState *s = bs->opaque; 486 487 acb = qemu_aio_get(&gluster_aiocb_info, bs, cb, opaque); 488 acb->size = 0; 489 acb->ret = 0; 490 acb->finished = NULL; 491 s->qemu_aio_count++; 492 493 ret = glfs_fsync_async(s->fd, &gluster_finish_aiocb, acb); 494 if (ret < 0) { 495 goto out; 496 } 497 return &acb->common; 498 499 out: 500 s->qemu_aio_count--; 501 qemu_aio_release(acb); 502 return NULL; 503 } 504 505 static int64_t qemu_gluster_getlength(BlockDriverState *bs) 506 { 507 BDRVGlusterState *s = bs->opaque; 508 int64_t ret; 509 510 ret = glfs_lseek(s->fd, 0, SEEK_END); 511 if (ret < 0) { 512 return -errno; 513 } else { 514 return ret; 515 } 516 } 517 518 static int64_t qemu_gluster_allocated_file_size(BlockDriverState *bs) 519 { 520 BDRVGlusterState *s = bs->opaque; 521 struct stat st; 522 int ret; 523 524 ret = glfs_fstat(s->fd, &st); 525 if (ret < 0) { 526 return -errno; 527 } else { 528 return st.st_blocks * 512; 529 } 530 } 531 532 static void qemu_gluster_close(BlockDriverState *bs) 533 { 534 BDRVGlusterState *s = bs->opaque; 535 536 close(s->fds[GLUSTER_FD_READ]); 537 close(s->fds[GLUSTER_FD_WRITE]); 538 qemu_aio_set_fd_handler(s->fds[GLUSTER_FD_READ], NULL, NULL, NULL, NULL); 539 540 if (s->fd) { 541 glfs_close(s->fd); 542 s->fd = NULL; 543 } 544 glfs_fini(s->glfs); 545 } 546 547 static QEMUOptionParameter qemu_gluster_create_options[] = { 548 { 549 .name = BLOCK_OPT_SIZE, 550 .type = OPT_SIZE, 551 .help = "Virtual disk size" 552 }, 553 { NULL } 554 }; 555 556 static BlockDriver bdrv_gluster = { 557 .format_name = "gluster", 558 .protocol_name = "gluster", 559 .instance_size = sizeof(BDRVGlusterState), 560 .bdrv_file_open = qemu_gluster_open, 561 .bdrv_close = qemu_gluster_close, 562 .bdrv_create = qemu_gluster_create, 563 .bdrv_getlength = qemu_gluster_getlength, 564 .bdrv_get_allocated_file_size = qemu_gluster_allocated_file_size, 565 .bdrv_aio_readv = qemu_gluster_aio_readv, 566 .bdrv_aio_writev = qemu_gluster_aio_writev, 567 .bdrv_aio_flush = qemu_gluster_aio_flush, 568 .create_options = qemu_gluster_create_options, 569 }; 570 571 static BlockDriver bdrv_gluster_tcp = { 572 .format_name = "gluster", 573 .protocol_name = "gluster+tcp", 574 .instance_size = sizeof(BDRVGlusterState), 575 .bdrv_file_open = qemu_gluster_open, 576 .bdrv_close = qemu_gluster_close, 577 .bdrv_create = qemu_gluster_create, 578 .bdrv_getlength = qemu_gluster_getlength, 579 .bdrv_get_allocated_file_size = qemu_gluster_allocated_file_size, 580 .bdrv_aio_readv = qemu_gluster_aio_readv, 581 .bdrv_aio_writev = qemu_gluster_aio_writev, 582 .bdrv_aio_flush = qemu_gluster_aio_flush, 583 .create_options = qemu_gluster_create_options, 584 }; 585 586 static BlockDriver bdrv_gluster_unix = { 587 .format_name = "gluster", 588 .protocol_name = "gluster+unix", 589 .instance_size = sizeof(BDRVGlusterState), 590 .bdrv_file_open = qemu_gluster_open, 591 .bdrv_close = qemu_gluster_close, 592 .bdrv_create = qemu_gluster_create, 593 .bdrv_getlength = qemu_gluster_getlength, 594 .bdrv_get_allocated_file_size = qemu_gluster_allocated_file_size, 595 .bdrv_aio_readv = qemu_gluster_aio_readv, 596 .bdrv_aio_writev = qemu_gluster_aio_writev, 597 .bdrv_aio_flush = qemu_gluster_aio_flush, 598 .create_options = qemu_gluster_create_options, 599 }; 600 601 static BlockDriver bdrv_gluster_rdma = { 602 .format_name = "gluster", 603 .protocol_name = "gluster+rdma", 604 .instance_size = sizeof(BDRVGlusterState), 605 .bdrv_file_open = qemu_gluster_open, 606 .bdrv_close = qemu_gluster_close, 607 .bdrv_create = qemu_gluster_create, 608 .bdrv_getlength = qemu_gluster_getlength, 609 .bdrv_get_allocated_file_size = qemu_gluster_allocated_file_size, 610 .bdrv_aio_readv = qemu_gluster_aio_readv, 611 .bdrv_aio_writev = qemu_gluster_aio_writev, 612 .bdrv_aio_flush = qemu_gluster_aio_flush, 613 .create_options = qemu_gluster_create_options, 614 }; 615 616 static void bdrv_gluster_init(void) 617 { 618 bdrv_register(&bdrv_gluster_rdma); 619 bdrv_register(&bdrv_gluster_unix); 620 bdrv_register(&bdrv_gluster_tcp); 621 bdrv_register(&bdrv_gluster); 622 } 623 624 block_init(bdrv_gluster_init); 625