1 /* 2 * QEMU I/O channels 3 * 4 * Copyright (c) 2015 Red Hat, Inc. 5 * 6 * This library is free software; you can redistribute it and/or 7 * modify it under the terms of the GNU Lesser General Public 8 * License as published by the Free Software Foundation; either 9 * version 2.1 of the License, or (at your option) any later version. 10 * 11 * This library is distributed in the hope that it will be useful, 12 * but WITHOUT ANY WARRANTY; without even the implied warranty of 13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 14 * Lesser General Public License for more details. 15 * 16 * You should have received a copy of the GNU Lesser General Public 17 * License along with this library; if not, see <http://www.gnu.org/licenses/>. 18 * 19 */ 20 21 #include "qemu/osdep.h" 22 #include "io/channel.h" 23 #include "qapi/error.h" 24 #include "qemu/main-loop.h" 25 #include "qemu/module.h" 26 #include "qemu/iov.h" 27 28 bool qio_channel_has_feature(QIOChannel *ioc, 29 QIOChannelFeature feature) 30 { 31 return ioc->features & (1 << feature); 32 } 33 34 35 void qio_channel_set_feature(QIOChannel *ioc, 36 QIOChannelFeature feature) 37 { 38 ioc->features |= (1 << feature); 39 } 40 41 42 void qio_channel_set_name(QIOChannel *ioc, 43 const char *name) 44 { 45 g_free(ioc->name); 46 ioc->name = g_strdup(name); 47 } 48 49 50 ssize_t qio_channel_readv_full(QIOChannel *ioc, 51 const struct iovec *iov, 52 size_t niov, 53 int **fds, 54 size_t *nfds, 55 Error **errp) 56 { 57 QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc); 58 59 if ((fds || nfds) && 60 !qio_channel_has_feature(ioc, QIO_CHANNEL_FEATURE_FD_PASS)) { 61 error_setg_errno(errp, EINVAL, 62 "Channel does not support file descriptor passing"); 63 return -1; 64 } 65 66 return klass->io_readv(ioc, iov, niov, fds, nfds, errp); 67 } 68 69 70 ssize_t qio_channel_writev_full(QIOChannel *ioc, 71 const struct iovec *iov, 72 size_t niov, 73 int *fds, 74 size_t nfds, 75 int flags, 76 Error **errp) 77 { 78 QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc); 79 80 if (fds || nfds) { 81 if (!qio_channel_has_feature(ioc, QIO_CHANNEL_FEATURE_FD_PASS)) { 82 error_setg_errno(errp, EINVAL, 83 "Channel does not support file descriptor passing"); 84 return -1; 85 } 86 if (flags & QIO_CHANNEL_WRITE_FLAG_ZERO_COPY) { 87 error_setg_errno(errp, EINVAL, 88 "Zero Copy does not support file descriptor passing"); 89 return -1; 90 } 91 } 92 93 if ((flags & QIO_CHANNEL_WRITE_FLAG_ZERO_COPY) && 94 !qio_channel_has_feature(ioc, QIO_CHANNEL_FEATURE_WRITE_ZERO_COPY)) { 95 error_setg_errno(errp, EINVAL, 96 "Requested Zero Copy feature is not available"); 97 return -1; 98 } 99 100 return klass->io_writev(ioc, iov, niov, fds, nfds, flags, errp); 101 } 102 103 104 int qio_channel_readv_all_eof(QIOChannel *ioc, 105 const struct iovec *iov, 106 size_t niov, 107 Error **errp) 108 { 109 return qio_channel_readv_full_all_eof(ioc, iov, niov, NULL, NULL, errp); 110 } 111 112 int qio_channel_readv_all(QIOChannel *ioc, 113 const struct iovec *iov, 114 size_t niov, 115 Error **errp) 116 { 117 return qio_channel_readv_full_all(ioc, iov, niov, NULL, NULL, errp); 118 } 119 120 int qio_channel_readv_full_all_eof(QIOChannel *ioc, 121 const struct iovec *iov, 122 size_t niov, 123 int **fds, size_t *nfds, 124 Error **errp) 125 { 126 int ret = -1; 127 struct iovec *local_iov = g_new(struct iovec, niov); 128 struct iovec *local_iov_head = local_iov; 129 unsigned int nlocal_iov = niov; 130 int **local_fds = fds; 131 size_t *local_nfds = nfds; 132 bool partial = false; 133 134 if (nfds) { 135 *nfds = 0; 136 } 137 138 if (fds) { 139 *fds = NULL; 140 } 141 142 nlocal_iov = iov_copy(local_iov, nlocal_iov, 143 iov, niov, 144 0, iov_size(iov, niov)); 145 146 while ((nlocal_iov > 0) || local_fds) { 147 ssize_t len; 148 len = qio_channel_readv_full(ioc, local_iov, nlocal_iov, local_fds, 149 local_nfds, errp); 150 if (len == QIO_CHANNEL_ERR_BLOCK) { 151 if (qemu_in_coroutine()) { 152 qio_channel_yield(ioc, G_IO_IN); 153 } else { 154 qio_channel_wait(ioc, G_IO_IN); 155 } 156 continue; 157 } 158 159 if (len == 0) { 160 if (local_nfds && *local_nfds) { 161 /* 162 * Got some FDs, but no data yet. This isn't an EOF 163 * scenario (yet), so carry on to try to read data 164 * on next loop iteration 165 */ 166 goto next_iter; 167 } else if (!partial) { 168 /* No fds and no data - EOF before any data read */ 169 ret = 0; 170 goto cleanup; 171 } else { 172 len = -1; 173 error_setg(errp, 174 "Unexpected end-of-file before all data were read"); 175 /* Fallthrough into len < 0 handling */ 176 } 177 } 178 179 if (len < 0) { 180 /* Close any FDs we previously received */ 181 if (nfds && fds) { 182 size_t i; 183 for (i = 0; i < (*nfds); i++) { 184 close((*fds)[i]); 185 } 186 g_free(*fds); 187 *fds = NULL; 188 *nfds = 0; 189 } 190 goto cleanup; 191 } 192 193 if (nlocal_iov) { 194 iov_discard_front(&local_iov, &nlocal_iov, len); 195 } 196 197 next_iter: 198 partial = true; 199 local_fds = NULL; 200 local_nfds = NULL; 201 } 202 203 ret = 1; 204 205 cleanup: 206 g_free(local_iov_head); 207 return ret; 208 } 209 210 int qio_channel_readv_full_all(QIOChannel *ioc, 211 const struct iovec *iov, 212 size_t niov, 213 int **fds, size_t *nfds, 214 Error **errp) 215 { 216 int ret = qio_channel_readv_full_all_eof(ioc, iov, niov, fds, nfds, errp); 217 218 if (ret == 0) { 219 error_setg(errp, "Unexpected end-of-file before all data were read"); 220 return -1; 221 } 222 if (ret == 1) { 223 return 0; 224 } 225 226 return ret; 227 } 228 229 int qio_channel_writev_all(QIOChannel *ioc, 230 const struct iovec *iov, 231 size_t niov, 232 Error **errp) 233 { 234 return qio_channel_writev_full_all(ioc, iov, niov, NULL, 0, 0, errp); 235 } 236 237 int qio_channel_writev_full_all(QIOChannel *ioc, 238 const struct iovec *iov, 239 size_t niov, 240 int *fds, size_t nfds, 241 int flags, Error **errp) 242 { 243 int ret = -1; 244 struct iovec *local_iov = g_new(struct iovec, niov); 245 struct iovec *local_iov_head = local_iov; 246 unsigned int nlocal_iov = niov; 247 248 nlocal_iov = iov_copy(local_iov, nlocal_iov, 249 iov, niov, 250 0, iov_size(iov, niov)); 251 252 while (nlocal_iov > 0) { 253 ssize_t len; 254 255 len = qio_channel_writev_full(ioc, local_iov, nlocal_iov, fds, 256 nfds, flags, errp); 257 258 if (len == QIO_CHANNEL_ERR_BLOCK) { 259 if (qemu_in_coroutine()) { 260 qio_channel_yield(ioc, G_IO_OUT); 261 } else { 262 qio_channel_wait(ioc, G_IO_OUT); 263 } 264 continue; 265 } 266 if (len < 0) { 267 goto cleanup; 268 } 269 270 iov_discard_front(&local_iov, &nlocal_iov, len); 271 272 fds = NULL; 273 nfds = 0; 274 } 275 276 ret = 0; 277 cleanup: 278 g_free(local_iov_head); 279 return ret; 280 } 281 282 ssize_t qio_channel_readv(QIOChannel *ioc, 283 const struct iovec *iov, 284 size_t niov, 285 Error **errp) 286 { 287 return qio_channel_readv_full(ioc, iov, niov, NULL, NULL, errp); 288 } 289 290 291 ssize_t qio_channel_writev(QIOChannel *ioc, 292 const struct iovec *iov, 293 size_t niov, 294 Error **errp) 295 { 296 return qio_channel_writev_full(ioc, iov, niov, NULL, 0, 0, errp); 297 } 298 299 300 ssize_t qio_channel_read(QIOChannel *ioc, 301 char *buf, 302 size_t buflen, 303 Error **errp) 304 { 305 struct iovec iov = { .iov_base = buf, .iov_len = buflen }; 306 return qio_channel_readv_full(ioc, &iov, 1, NULL, NULL, errp); 307 } 308 309 310 ssize_t qio_channel_write(QIOChannel *ioc, 311 const char *buf, 312 size_t buflen, 313 Error **errp) 314 { 315 struct iovec iov = { .iov_base = (char *)buf, .iov_len = buflen }; 316 return qio_channel_writev_full(ioc, &iov, 1, NULL, 0, 0, errp); 317 } 318 319 320 int qio_channel_read_all_eof(QIOChannel *ioc, 321 char *buf, 322 size_t buflen, 323 Error **errp) 324 { 325 struct iovec iov = { .iov_base = buf, .iov_len = buflen }; 326 return qio_channel_readv_all_eof(ioc, &iov, 1, errp); 327 } 328 329 330 int qio_channel_read_all(QIOChannel *ioc, 331 char *buf, 332 size_t buflen, 333 Error **errp) 334 { 335 struct iovec iov = { .iov_base = buf, .iov_len = buflen }; 336 return qio_channel_readv_all(ioc, &iov, 1, errp); 337 } 338 339 340 int qio_channel_write_all(QIOChannel *ioc, 341 const char *buf, 342 size_t buflen, 343 Error **errp) 344 { 345 struct iovec iov = { .iov_base = (char *)buf, .iov_len = buflen }; 346 return qio_channel_writev_all(ioc, &iov, 1, errp); 347 } 348 349 350 int qio_channel_set_blocking(QIOChannel *ioc, 351 bool enabled, 352 Error **errp) 353 { 354 QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc); 355 return klass->io_set_blocking(ioc, enabled, errp); 356 } 357 358 359 int qio_channel_close(QIOChannel *ioc, 360 Error **errp) 361 { 362 QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc); 363 return klass->io_close(ioc, errp); 364 } 365 366 367 GSource *qio_channel_create_watch(QIOChannel *ioc, 368 GIOCondition condition) 369 { 370 QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc); 371 GSource *ret = klass->io_create_watch(ioc, condition); 372 373 if (ioc->name) { 374 g_source_set_name(ret, ioc->name); 375 } 376 377 return ret; 378 } 379 380 381 void qio_channel_set_aio_fd_handler(QIOChannel *ioc, 382 AioContext *ctx, 383 IOHandler *io_read, 384 IOHandler *io_write, 385 void *opaque) 386 { 387 QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc); 388 389 klass->io_set_aio_fd_handler(ioc, ctx, io_read, io_write, opaque); 390 } 391 392 guint qio_channel_add_watch_full(QIOChannel *ioc, 393 GIOCondition condition, 394 QIOChannelFunc func, 395 gpointer user_data, 396 GDestroyNotify notify, 397 GMainContext *context) 398 { 399 GSource *source; 400 guint id; 401 402 source = qio_channel_create_watch(ioc, condition); 403 404 g_source_set_callback(source, (GSourceFunc)func, user_data, notify); 405 406 id = g_source_attach(source, context); 407 g_source_unref(source); 408 409 return id; 410 } 411 412 guint qio_channel_add_watch(QIOChannel *ioc, 413 GIOCondition condition, 414 QIOChannelFunc func, 415 gpointer user_data, 416 GDestroyNotify notify) 417 { 418 return qio_channel_add_watch_full(ioc, condition, func, 419 user_data, notify, NULL); 420 } 421 422 GSource *qio_channel_add_watch_source(QIOChannel *ioc, 423 GIOCondition condition, 424 QIOChannelFunc func, 425 gpointer user_data, 426 GDestroyNotify notify, 427 GMainContext *context) 428 { 429 GSource *source; 430 guint id; 431 432 id = qio_channel_add_watch_full(ioc, condition, func, 433 user_data, notify, context); 434 source = g_main_context_find_source_by_id(context, id); 435 g_source_ref(source); 436 return source; 437 } 438 439 440 int qio_channel_shutdown(QIOChannel *ioc, 441 QIOChannelShutdown how, 442 Error **errp) 443 { 444 QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc); 445 446 if (!klass->io_shutdown) { 447 error_setg(errp, "Data path shutdown not supported"); 448 return -1; 449 } 450 451 return klass->io_shutdown(ioc, how, errp); 452 } 453 454 455 void qio_channel_set_delay(QIOChannel *ioc, 456 bool enabled) 457 { 458 QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc); 459 460 if (klass->io_set_delay) { 461 klass->io_set_delay(ioc, enabled); 462 } 463 } 464 465 466 void qio_channel_set_cork(QIOChannel *ioc, 467 bool enabled) 468 { 469 QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc); 470 471 if (klass->io_set_cork) { 472 klass->io_set_cork(ioc, enabled); 473 } 474 } 475 476 477 off_t qio_channel_io_seek(QIOChannel *ioc, 478 off_t offset, 479 int whence, 480 Error **errp) 481 { 482 QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc); 483 484 if (!klass->io_seek) { 485 error_setg(errp, "Channel does not support random access"); 486 return -1; 487 } 488 489 return klass->io_seek(ioc, offset, whence, errp); 490 } 491 492 int qio_channel_flush(QIOChannel *ioc, 493 Error **errp) 494 { 495 QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc); 496 497 if (!klass->io_flush || 498 !qio_channel_has_feature(ioc, QIO_CHANNEL_FEATURE_WRITE_ZERO_COPY)) { 499 return 0; 500 } 501 502 return klass->io_flush(ioc, errp); 503 } 504 505 506 static void qio_channel_restart_read(void *opaque) 507 { 508 QIOChannel *ioc = opaque; 509 Coroutine *co = ioc->read_coroutine; 510 511 /* Assert that aio_co_wake() reenters the coroutine directly */ 512 assert(qemu_get_current_aio_context() == 513 qemu_coroutine_get_aio_context(co)); 514 aio_co_wake(co); 515 } 516 517 static void qio_channel_restart_write(void *opaque) 518 { 519 QIOChannel *ioc = opaque; 520 Coroutine *co = ioc->write_coroutine; 521 522 /* Assert that aio_co_wake() reenters the coroutine directly */ 523 assert(qemu_get_current_aio_context() == 524 qemu_coroutine_get_aio_context(co)); 525 aio_co_wake(co); 526 } 527 528 static void qio_channel_set_aio_fd_handlers(QIOChannel *ioc) 529 { 530 IOHandler *rd_handler = NULL, *wr_handler = NULL; 531 AioContext *ctx; 532 533 if (ioc->read_coroutine) { 534 rd_handler = qio_channel_restart_read; 535 } 536 if (ioc->write_coroutine) { 537 wr_handler = qio_channel_restart_write; 538 } 539 540 ctx = ioc->ctx ? ioc->ctx : iohandler_get_aio_context(); 541 qio_channel_set_aio_fd_handler(ioc, ctx, rd_handler, wr_handler, ioc); 542 } 543 544 void qio_channel_attach_aio_context(QIOChannel *ioc, 545 AioContext *ctx) 546 { 547 assert(!ioc->read_coroutine); 548 assert(!ioc->write_coroutine); 549 ioc->ctx = ctx; 550 } 551 552 void qio_channel_detach_aio_context(QIOChannel *ioc) 553 { 554 ioc->read_coroutine = NULL; 555 ioc->write_coroutine = NULL; 556 qio_channel_set_aio_fd_handlers(ioc); 557 ioc->ctx = NULL; 558 } 559 560 void coroutine_fn qio_channel_yield(QIOChannel *ioc, 561 GIOCondition condition) 562 { 563 assert(qemu_in_coroutine()); 564 if (condition == G_IO_IN) { 565 assert(!ioc->read_coroutine); 566 ioc->read_coroutine = qemu_coroutine_self(); 567 } else if (condition == G_IO_OUT) { 568 assert(!ioc->write_coroutine); 569 ioc->write_coroutine = qemu_coroutine_self(); 570 } else { 571 abort(); 572 } 573 qio_channel_set_aio_fd_handlers(ioc); 574 qemu_coroutine_yield(); 575 576 /* Allow interrupting the operation by reentering the coroutine other than 577 * through the aio_fd_handlers. */ 578 if (condition == G_IO_IN && ioc->read_coroutine) { 579 ioc->read_coroutine = NULL; 580 qio_channel_set_aio_fd_handlers(ioc); 581 } else if (condition == G_IO_OUT && ioc->write_coroutine) { 582 ioc->write_coroutine = NULL; 583 qio_channel_set_aio_fd_handlers(ioc); 584 } 585 } 586 587 588 static gboolean qio_channel_wait_complete(QIOChannel *ioc, 589 GIOCondition condition, 590 gpointer opaque) 591 { 592 GMainLoop *loop = opaque; 593 594 g_main_loop_quit(loop); 595 return FALSE; 596 } 597 598 599 void qio_channel_wait(QIOChannel *ioc, 600 GIOCondition condition) 601 { 602 GMainContext *ctxt = g_main_context_new(); 603 GMainLoop *loop = g_main_loop_new(ctxt, TRUE); 604 GSource *source; 605 606 source = qio_channel_create_watch(ioc, condition); 607 608 g_source_set_callback(source, 609 (GSourceFunc)qio_channel_wait_complete, 610 loop, 611 NULL); 612 613 g_source_attach(source, ctxt); 614 615 g_main_loop_run(loop); 616 617 g_source_unref(source); 618 g_main_loop_unref(loop); 619 g_main_context_unref(ctxt); 620 } 621 622 623 static void qio_channel_finalize(Object *obj) 624 { 625 QIOChannel *ioc = QIO_CHANNEL(obj); 626 627 g_free(ioc->name); 628 629 #ifdef _WIN32 630 if (ioc->event) { 631 CloseHandle(ioc->event); 632 } 633 #endif 634 } 635 636 static const TypeInfo qio_channel_info = { 637 .parent = TYPE_OBJECT, 638 .name = TYPE_QIO_CHANNEL, 639 .instance_size = sizeof(QIOChannel), 640 .instance_finalize = qio_channel_finalize, 641 .abstract = true, 642 .class_size = sizeof(QIOChannelClass), 643 }; 644 645 646 static void qio_channel_register_types(void) 647 { 648 type_register_static(&qio_channel_info); 649 } 650 651 652 type_init(qio_channel_register_types); 653