1 /* 2 * QEMU I/O channels 3 * 4 * Copyright (c) 2015 Red Hat, Inc. 5 * 6 * This library is free software; you can redistribute it and/or 7 * modify it under the terms of the GNU Lesser General Public 8 * License as published by the Free Software Foundation; either 9 * version 2.1 of the License, or (at your option) any later version. 10 * 11 * This library is distributed in the hope that it will be useful, 12 * but WITHOUT ANY WARRANTY; without even the implied warranty of 13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 14 * Lesser General Public License for more details. 15 * 16 * You should have received a copy of the GNU Lesser General Public 17 * License along with this library; if not, see <http://www.gnu.org/licenses/>. 18 * 19 */ 20 21 #include "qemu/osdep.h" 22 #include "io/channel.h" 23 #include "qapi/error.h" 24 #include "qemu/main-loop.h" 25 #include "qemu/module.h" 26 #include "qemu/iov.h" 27 28 bool qio_channel_has_feature(QIOChannel *ioc, 29 QIOChannelFeature feature) 30 { 31 return ioc->features & (1 << feature); 32 } 33 34 35 void qio_channel_set_feature(QIOChannel *ioc, 36 QIOChannelFeature feature) 37 { 38 ioc->features |= (1 << feature); 39 } 40 41 42 void qio_channel_set_name(QIOChannel *ioc, 43 const char *name) 44 { 45 g_free(ioc->name); 46 ioc->name = g_strdup(name); 47 } 48 49 50 ssize_t qio_channel_readv_full(QIOChannel *ioc, 51 const struct iovec *iov, 52 size_t niov, 53 int **fds, 54 size_t *nfds, 55 int flags, 56 Error **errp) 57 { 58 QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc); 59 60 if ((fds || nfds) && 61 !qio_channel_has_feature(ioc, QIO_CHANNEL_FEATURE_FD_PASS)) { 62 error_setg_errno(errp, EINVAL, 63 "Channel does not support file descriptor passing"); 64 return -1; 65 } 66 67 if ((flags & QIO_CHANNEL_READ_FLAG_MSG_PEEK) && 68 !qio_channel_has_feature(ioc, QIO_CHANNEL_FEATURE_READ_MSG_PEEK)) { 69 error_setg_errno(errp, EINVAL, 70 "Channel does not support peek read"); 71 return -1; 72 } 73 74 return klass->io_readv(ioc, iov, niov, fds, nfds, flags, errp); 75 } 76 77 78 ssize_t qio_channel_writev_full(QIOChannel *ioc, 79 const struct iovec *iov, 80 size_t niov, 81 int *fds, 82 size_t nfds, 83 int flags, 84 Error **errp) 85 { 86 QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc); 87 88 if (fds || nfds) { 89 if (!qio_channel_has_feature(ioc, QIO_CHANNEL_FEATURE_FD_PASS)) { 90 error_setg_errno(errp, EINVAL, 91 "Channel does not support file descriptor passing"); 92 return -1; 93 } 94 if (flags & QIO_CHANNEL_WRITE_FLAG_ZERO_COPY) { 95 error_setg_errno(errp, EINVAL, 96 "Zero Copy does not support file descriptor passing"); 97 return -1; 98 } 99 } 100 101 if ((flags & QIO_CHANNEL_WRITE_FLAG_ZERO_COPY) && 102 !qio_channel_has_feature(ioc, QIO_CHANNEL_FEATURE_WRITE_ZERO_COPY)) { 103 error_setg_errno(errp, EINVAL, 104 "Requested Zero Copy feature is not available"); 105 return -1; 106 } 107 108 return klass->io_writev(ioc, iov, niov, fds, nfds, flags, errp); 109 } 110 111 112 int coroutine_mixed_fn qio_channel_readv_all_eof(QIOChannel *ioc, 113 const struct iovec *iov, 114 size_t niov, 115 Error **errp) 116 { 117 return qio_channel_readv_full_all_eof(ioc, iov, niov, NULL, NULL, errp); 118 } 119 120 int coroutine_mixed_fn qio_channel_readv_all(QIOChannel *ioc, 121 const struct iovec *iov, 122 size_t niov, 123 Error **errp) 124 { 125 return qio_channel_readv_full_all(ioc, iov, niov, NULL, NULL, errp); 126 } 127 128 int coroutine_mixed_fn qio_channel_readv_full_all_eof(QIOChannel *ioc, 129 const struct iovec *iov, 130 size_t niov, 131 int **fds, size_t *nfds, 132 Error **errp) 133 { 134 int ret = -1; 135 struct iovec *local_iov = g_new(struct iovec, niov); 136 struct iovec *local_iov_head = local_iov; 137 unsigned int nlocal_iov = niov; 138 int **local_fds = fds; 139 size_t *local_nfds = nfds; 140 bool partial = false; 141 142 if (nfds) { 143 *nfds = 0; 144 } 145 146 if (fds) { 147 *fds = NULL; 148 } 149 150 nlocal_iov = iov_copy(local_iov, nlocal_iov, 151 iov, niov, 152 0, iov_size(iov, niov)); 153 154 while ((nlocal_iov > 0) || local_fds) { 155 ssize_t len; 156 len = qio_channel_readv_full(ioc, local_iov, nlocal_iov, local_fds, 157 local_nfds, 0, errp); 158 if (len == QIO_CHANNEL_ERR_BLOCK) { 159 if (qemu_in_coroutine()) { 160 qio_channel_yield(ioc, G_IO_IN); 161 } else { 162 qio_channel_wait(ioc, G_IO_IN); 163 } 164 continue; 165 } 166 167 if (len == 0) { 168 if (local_nfds && *local_nfds) { 169 /* 170 * Got some FDs, but no data yet. This isn't an EOF 171 * scenario (yet), so carry on to try to read data 172 * on next loop iteration 173 */ 174 goto next_iter; 175 } else if (!partial) { 176 /* No fds and no data - EOF before any data read */ 177 ret = 0; 178 goto cleanup; 179 } else { 180 len = -1; 181 error_setg(errp, 182 "Unexpected end-of-file before all data were read"); 183 /* Fallthrough into len < 0 handling */ 184 } 185 } 186 187 if (len < 0) { 188 /* Close any FDs we previously received */ 189 if (nfds && fds) { 190 size_t i; 191 for (i = 0; i < (*nfds); i++) { 192 close((*fds)[i]); 193 } 194 g_free(*fds); 195 *fds = NULL; 196 *nfds = 0; 197 } 198 goto cleanup; 199 } 200 201 if (nlocal_iov) { 202 iov_discard_front(&local_iov, &nlocal_iov, len); 203 } 204 205 next_iter: 206 partial = true; 207 local_fds = NULL; 208 local_nfds = NULL; 209 } 210 211 ret = 1; 212 213 cleanup: 214 g_free(local_iov_head); 215 return ret; 216 } 217 218 int coroutine_mixed_fn qio_channel_readv_full_all(QIOChannel *ioc, 219 const struct iovec *iov, 220 size_t niov, 221 int **fds, size_t *nfds, 222 Error **errp) 223 { 224 int ret = qio_channel_readv_full_all_eof(ioc, iov, niov, fds, nfds, errp); 225 226 if (ret == 0) { 227 error_setg(errp, "Unexpected end-of-file before all data were read"); 228 return -1; 229 } 230 if (ret == 1) { 231 return 0; 232 } 233 234 return ret; 235 } 236 237 int coroutine_mixed_fn qio_channel_writev_all(QIOChannel *ioc, 238 const struct iovec *iov, 239 size_t niov, 240 Error **errp) 241 { 242 return qio_channel_writev_full_all(ioc, iov, niov, NULL, 0, 0, errp); 243 } 244 245 int coroutine_mixed_fn qio_channel_writev_full_all(QIOChannel *ioc, 246 const struct iovec *iov, 247 size_t niov, 248 int *fds, size_t nfds, 249 int flags, Error **errp) 250 { 251 int ret = -1; 252 struct iovec *local_iov = g_new(struct iovec, niov); 253 struct iovec *local_iov_head = local_iov; 254 unsigned int nlocal_iov = niov; 255 256 nlocal_iov = iov_copy(local_iov, nlocal_iov, 257 iov, niov, 258 0, iov_size(iov, niov)); 259 260 while (nlocal_iov > 0) { 261 ssize_t len; 262 263 len = qio_channel_writev_full(ioc, local_iov, nlocal_iov, fds, 264 nfds, flags, errp); 265 266 if (len == QIO_CHANNEL_ERR_BLOCK) { 267 if (qemu_in_coroutine()) { 268 qio_channel_yield(ioc, G_IO_OUT); 269 } else { 270 qio_channel_wait(ioc, G_IO_OUT); 271 } 272 continue; 273 } 274 if (len < 0) { 275 goto cleanup; 276 } 277 278 iov_discard_front(&local_iov, &nlocal_iov, len); 279 280 fds = NULL; 281 nfds = 0; 282 } 283 284 ret = 0; 285 cleanup: 286 g_free(local_iov_head); 287 return ret; 288 } 289 290 ssize_t qio_channel_readv(QIOChannel *ioc, 291 const struct iovec *iov, 292 size_t niov, 293 Error **errp) 294 { 295 return qio_channel_readv_full(ioc, iov, niov, NULL, NULL, 0, errp); 296 } 297 298 299 ssize_t qio_channel_writev(QIOChannel *ioc, 300 const struct iovec *iov, 301 size_t niov, 302 Error **errp) 303 { 304 return qio_channel_writev_full(ioc, iov, niov, NULL, 0, 0, errp); 305 } 306 307 308 ssize_t qio_channel_read(QIOChannel *ioc, 309 char *buf, 310 size_t buflen, 311 Error **errp) 312 { 313 struct iovec iov = { .iov_base = buf, .iov_len = buflen }; 314 return qio_channel_readv_full(ioc, &iov, 1, NULL, NULL, 0, errp); 315 } 316 317 318 ssize_t qio_channel_write(QIOChannel *ioc, 319 const char *buf, 320 size_t buflen, 321 Error **errp) 322 { 323 struct iovec iov = { .iov_base = (char *)buf, .iov_len = buflen }; 324 return qio_channel_writev_full(ioc, &iov, 1, NULL, 0, 0, errp); 325 } 326 327 328 int coroutine_mixed_fn qio_channel_read_all_eof(QIOChannel *ioc, 329 char *buf, 330 size_t buflen, 331 Error **errp) 332 { 333 struct iovec iov = { .iov_base = buf, .iov_len = buflen }; 334 return qio_channel_readv_all_eof(ioc, &iov, 1, errp); 335 } 336 337 338 int coroutine_mixed_fn qio_channel_read_all(QIOChannel *ioc, 339 char *buf, 340 size_t buflen, 341 Error **errp) 342 { 343 struct iovec iov = { .iov_base = buf, .iov_len = buflen }; 344 return qio_channel_readv_all(ioc, &iov, 1, errp); 345 } 346 347 348 int coroutine_mixed_fn qio_channel_write_all(QIOChannel *ioc, 349 const char *buf, 350 size_t buflen, 351 Error **errp) 352 { 353 struct iovec iov = { .iov_base = (char *)buf, .iov_len = buflen }; 354 return qio_channel_writev_all(ioc, &iov, 1, errp); 355 } 356 357 358 int qio_channel_set_blocking(QIOChannel *ioc, 359 bool enabled, 360 Error **errp) 361 { 362 QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc); 363 return klass->io_set_blocking(ioc, enabled, errp); 364 } 365 366 367 int qio_channel_close(QIOChannel *ioc, 368 Error **errp) 369 { 370 QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc); 371 return klass->io_close(ioc, errp); 372 } 373 374 375 GSource *qio_channel_create_watch(QIOChannel *ioc, 376 GIOCondition condition) 377 { 378 QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc); 379 GSource *ret = klass->io_create_watch(ioc, condition); 380 381 if (ioc->name) { 382 g_source_set_name(ret, ioc->name); 383 } 384 385 return ret; 386 } 387 388 389 void qio_channel_set_aio_fd_handler(QIOChannel *ioc, 390 AioContext *ctx, 391 IOHandler *io_read, 392 IOHandler *io_write, 393 void *opaque) 394 { 395 QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc); 396 397 klass->io_set_aio_fd_handler(ioc, ctx, io_read, io_write, opaque); 398 } 399 400 guint qio_channel_add_watch_full(QIOChannel *ioc, 401 GIOCondition condition, 402 QIOChannelFunc func, 403 gpointer user_data, 404 GDestroyNotify notify, 405 GMainContext *context) 406 { 407 GSource *source; 408 guint id; 409 410 source = qio_channel_create_watch(ioc, condition); 411 412 g_source_set_callback(source, (GSourceFunc)func, user_data, notify); 413 414 id = g_source_attach(source, context); 415 g_source_unref(source); 416 417 return id; 418 } 419 420 guint qio_channel_add_watch(QIOChannel *ioc, 421 GIOCondition condition, 422 QIOChannelFunc func, 423 gpointer user_data, 424 GDestroyNotify notify) 425 { 426 return qio_channel_add_watch_full(ioc, condition, func, 427 user_data, notify, NULL); 428 } 429 430 GSource *qio_channel_add_watch_source(QIOChannel *ioc, 431 GIOCondition condition, 432 QIOChannelFunc func, 433 gpointer user_data, 434 GDestroyNotify notify, 435 GMainContext *context) 436 { 437 GSource *source; 438 guint id; 439 440 id = qio_channel_add_watch_full(ioc, condition, func, 441 user_data, notify, context); 442 source = g_main_context_find_source_by_id(context, id); 443 g_source_ref(source); 444 return source; 445 } 446 447 448 int qio_channel_shutdown(QIOChannel *ioc, 449 QIOChannelShutdown how, 450 Error **errp) 451 { 452 QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc); 453 454 if (!klass->io_shutdown) { 455 error_setg(errp, "Data path shutdown not supported"); 456 return -1; 457 } 458 459 return klass->io_shutdown(ioc, how, errp); 460 } 461 462 463 void qio_channel_set_delay(QIOChannel *ioc, 464 bool enabled) 465 { 466 QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc); 467 468 if (klass->io_set_delay) { 469 klass->io_set_delay(ioc, enabled); 470 } 471 } 472 473 474 void qio_channel_set_cork(QIOChannel *ioc, 475 bool enabled) 476 { 477 QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc); 478 479 if (klass->io_set_cork) { 480 klass->io_set_cork(ioc, enabled); 481 } 482 } 483 484 485 off_t qio_channel_io_seek(QIOChannel *ioc, 486 off_t offset, 487 int whence, 488 Error **errp) 489 { 490 QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc); 491 492 if (!klass->io_seek) { 493 error_setg(errp, "Channel does not support random access"); 494 return -1; 495 } 496 497 return klass->io_seek(ioc, offset, whence, errp); 498 } 499 500 int qio_channel_flush(QIOChannel *ioc, 501 Error **errp) 502 { 503 QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc); 504 505 if (!klass->io_flush || 506 !qio_channel_has_feature(ioc, QIO_CHANNEL_FEATURE_WRITE_ZERO_COPY)) { 507 return 0; 508 } 509 510 return klass->io_flush(ioc, errp); 511 } 512 513 514 static void qio_channel_restart_read(void *opaque) 515 { 516 QIOChannel *ioc = opaque; 517 Coroutine *co = ioc->read_coroutine; 518 519 /* Assert that aio_co_wake() reenters the coroutine directly */ 520 assert(qemu_get_current_aio_context() == 521 qemu_coroutine_get_aio_context(co)); 522 aio_co_wake(co); 523 } 524 525 static void qio_channel_restart_write(void *opaque) 526 { 527 QIOChannel *ioc = opaque; 528 Coroutine *co = ioc->write_coroutine; 529 530 /* Assert that aio_co_wake() reenters the coroutine directly */ 531 assert(qemu_get_current_aio_context() == 532 qemu_coroutine_get_aio_context(co)); 533 aio_co_wake(co); 534 } 535 536 static void qio_channel_set_aio_fd_handlers(QIOChannel *ioc) 537 { 538 IOHandler *rd_handler = NULL, *wr_handler = NULL; 539 AioContext *ctx; 540 541 if (ioc->read_coroutine) { 542 rd_handler = qio_channel_restart_read; 543 } 544 if (ioc->write_coroutine) { 545 wr_handler = qio_channel_restart_write; 546 } 547 548 ctx = ioc->ctx ? ioc->ctx : iohandler_get_aio_context(); 549 qio_channel_set_aio_fd_handler(ioc, ctx, rd_handler, wr_handler, ioc); 550 } 551 552 void qio_channel_attach_aio_context(QIOChannel *ioc, 553 AioContext *ctx) 554 { 555 assert(!ioc->read_coroutine); 556 assert(!ioc->write_coroutine); 557 ioc->ctx = ctx; 558 } 559 560 void qio_channel_detach_aio_context(QIOChannel *ioc) 561 { 562 ioc->read_coroutine = NULL; 563 ioc->write_coroutine = NULL; 564 qio_channel_set_aio_fd_handlers(ioc); 565 ioc->ctx = NULL; 566 } 567 568 void coroutine_fn qio_channel_yield(QIOChannel *ioc, 569 GIOCondition condition) 570 { 571 assert(qemu_in_coroutine()); 572 if (condition == G_IO_IN) { 573 assert(!ioc->read_coroutine); 574 ioc->read_coroutine = qemu_coroutine_self(); 575 } else if (condition == G_IO_OUT) { 576 assert(!ioc->write_coroutine); 577 ioc->write_coroutine = qemu_coroutine_self(); 578 } else { 579 abort(); 580 } 581 qio_channel_set_aio_fd_handlers(ioc); 582 qemu_coroutine_yield(); 583 584 /* Allow interrupting the operation by reentering the coroutine other than 585 * through the aio_fd_handlers. */ 586 if (condition == G_IO_IN && ioc->read_coroutine) { 587 ioc->read_coroutine = NULL; 588 qio_channel_set_aio_fd_handlers(ioc); 589 } else if (condition == G_IO_OUT && ioc->write_coroutine) { 590 ioc->write_coroutine = NULL; 591 qio_channel_set_aio_fd_handlers(ioc); 592 } 593 } 594 595 596 static gboolean qio_channel_wait_complete(QIOChannel *ioc, 597 GIOCondition condition, 598 gpointer opaque) 599 { 600 GMainLoop *loop = opaque; 601 602 g_main_loop_quit(loop); 603 return FALSE; 604 } 605 606 607 void qio_channel_wait(QIOChannel *ioc, 608 GIOCondition condition) 609 { 610 GMainContext *ctxt = g_main_context_new(); 611 GMainLoop *loop = g_main_loop_new(ctxt, TRUE); 612 GSource *source; 613 614 source = qio_channel_create_watch(ioc, condition); 615 616 g_source_set_callback(source, 617 (GSourceFunc)qio_channel_wait_complete, 618 loop, 619 NULL); 620 621 g_source_attach(source, ctxt); 622 623 g_main_loop_run(loop); 624 625 g_source_unref(source); 626 g_main_loop_unref(loop); 627 g_main_context_unref(ctxt); 628 } 629 630 631 static void qio_channel_finalize(Object *obj) 632 { 633 QIOChannel *ioc = QIO_CHANNEL(obj); 634 635 g_free(ioc->name); 636 637 #ifdef _WIN32 638 if (ioc->event) { 639 CloseHandle(ioc->event); 640 } 641 #endif 642 } 643 644 static const TypeInfo qio_channel_info = { 645 .parent = TYPE_OBJECT, 646 .name = TYPE_QIO_CHANNEL, 647 .instance_size = sizeof(QIOChannel), 648 .instance_finalize = qio_channel_finalize, 649 .abstract = true, 650 .class_size = sizeof(QIOChannelClass), 651 }; 652 653 654 static void qio_channel_register_types(void) 655 { 656 type_register_static(&qio_channel_info); 657 } 658 659 660 type_init(qio_channel_register_types); 661