1 /* 2 * QEMU I/O channels 3 * 4 * Copyright (c) 2015 Red Hat, Inc. 5 * 6 * This library is free software; you can redistribute it and/or 7 * modify it under the terms of the GNU Lesser General Public 8 * License as published by the Free Software Foundation; either 9 * version 2.1 of the License, or (at your option) any later version. 10 * 11 * This library is distributed in the hope that it will be useful, 12 * but WITHOUT ANY WARRANTY; without even the implied warranty of 13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 14 * Lesser General Public License for more details. 15 * 16 * You should have received a copy of the GNU Lesser General Public 17 * License along with this library; if not, see <http://www.gnu.org/licenses/>. 18 * 19 */ 20 21 #include "qemu/osdep.h" 22 #include "io/channel.h" 23 #include "qapi/error.h" 24 #include "qemu/main-loop.h" 25 #include "qemu/module.h" 26 #include "qemu/iov.h" 27 28 bool qio_channel_has_feature(QIOChannel *ioc, 29 QIOChannelFeature feature) 30 { 31 return ioc->features & (1 << feature); 32 } 33 34 35 void qio_channel_set_feature(QIOChannel *ioc, 36 QIOChannelFeature feature) 37 { 38 ioc->features |= (1 << feature); 39 } 40 41 42 void qio_channel_set_name(QIOChannel *ioc, 43 const char *name) 44 { 45 g_free(ioc->name); 46 ioc->name = g_strdup(name); 47 } 48 49 50 ssize_t qio_channel_readv_full(QIOChannel *ioc, 51 const struct iovec *iov, 52 size_t niov, 53 int **fds, 54 size_t *nfds, 55 Error **errp) 56 { 57 QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc); 58 59 if ((fds || nfds) && 60 !qio_channel_has_feature(ioc, QIO_CHANNEL_FEATURE_FD_PASS)) { 61 error_setg_errno(errp, EINVAL, 62 "Channel does not support file descriptor passing"); 63 return -1; 64 } 65 66 return klass->io_readv(ioc, iov, niov, fds, nfds, errp); 67 } 68 69 70 ssize_t qio_channel_writev_full(QIOChannel *ioc, 71 const struct iovec *iov, 72 size_t niov, 73 int *fds, 74 size_t nfds, 75 Error **errp) 76 { 77 QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc); 78 79 if ((fds || nfds) && 80 !qio_channel_has_feature(ioc, QIO_CHANNEL_FEATURE_FD_PASS)) { 81 error_setg_errno(errp, EINVAL, 82 "Channel does not support file descriptor passing"); 83 return -1; 84 } 85 86 return klass->io_writev(ioc, iov, niov, fds, nfds, errp); 87 } 88 89 90 int qio_channel_readv_all_eof(QIOChannel *ioc, 91 const struct iovec *iov, 92 size_t niov, 93 Error **errp) 94 { 95 return qio_channel_readv_full_all_eof(ioc, iov, niov, NULL, NULL, errp); 96 } 97 98 int qio_channel_readv_all(QIOChannel *ioc, 99 const struct iovec *iov, 100 size_t niov, 101 Error **errp) 102 { 103 return qio_channel_readv_full_all(ioc, iov, niov, NULL, NULL, errp); 104 } 105 106 int qio_channel_readv_full_all_eof(QIOChannel *ioc, 107 const struct iovec *iov, 108 size_t niov, 109 int **fds, size_t *nfds, 110 Error **errp) 111 { 112 int ret = -1; 113 struct iovec *local_iov = g_new(struct iovec, niov); 114 struct iovec *local_iov_head = local_iov; 115 unsigned int nlocal_iov = niov; 116 int **local_fds = fds; 117 size_t *local_nfds = nfds; 118 bool partial = false; 119 120 if (nfds) { 121 *nfds = 0; 122 } 123 124 if (fds) { 125 *fds = NULL; 126 } 127 128 nlocal_iov = iov_copy(local_iov, nlocal_iov, 129 iov, niov, 130 0, iov_size(iov, niov)); 131 132 while ((nlocal_iov > 0) || local_fds) { 133 ssize_t len; 134 len = qio_channel_readv_full(ioc, local_iov, nlocal_iov, local_fds, 135 local_nfds, errp); 136 if (len == QIO_CHANNEL_ERR_BLOCK) { 137 if (qemu_in_coroutine()) { 138 qio_channel_yield(ioc, G_IO_IN); 139 } else { 140 qio_channel_wait(ioc, G_IO_IN); 141 } 142 continue; 143 } 144 145 if (len == 0) { 146 if (local_nfds && *local_nfds) { 147 /* 148 * Got some FDs, but no data yet. This isn't an EOF 149 * scenario (yet), so carry on to try to read data 150 * on next loop iteration 151 */ 152 goto next_iter; 153 } else if (!partial) { 154 /* No fds and no data - EOF before any data read */ 155 ret = 0; 156 goto cleanup; 157 } else { 158 len = -1; 159 error_setg(errp, 160 "Unexpected end-of-file before all data were read"); 161 /* Fallthrough into len < 0 handling */ 162 } 163 } 164 165 if (len < 0) { 166 /* Close any FDs we previously received */ 167 if (nfds && fds) { 168 size_t i; 169 for (i = 0; i < (*nfds); i++) { 170 close((*fds)[i]); 171 } 172 g_free(*fds); 173 *fds = NULL; 174 *nfds = 0; 175 } 176 goto cleanup; 177 } 178 179 if (nlocal_iov) { 180 iov_discard_front(&local_iov, &nlocal_iov, len); 181 } 182 183 next_iter: 184 partial = true; 185 local_fds = NULL; 186 local_nfds = NULL; 187 } 188 189 ret = 1; 190 191 cleanup: 192 g_free(local_iov_head); 193 return ret; 194 } 195 196 int qio_channel_readv_full_all(QIOChannel *ioc, 197 const struct iovec *iov, 198 size_t niov, 199 int **fds, size_t *nfds, 200 Error **errp) 201 { 202 int ret = qio_channel_readv_full_all_eof(ioc, iov, niov, fds, nfds, errp); 203 204 if (ret == 0) { 205 error_prepend(errp, 206 "Unexpected end-of-file before all data were read."); 207 return -1; 208 } 209 if (ret == 1) { 210 return 0; 211 } 212 213 return ret; 214 } 215 216 int qio_channel_writev_all(QIOChannel *ioc, 217 const struct iovec *iov, 218 size_t niov, 219 Error **errp) 220 { 221 return qio_channel_writev_full_all(ioc, iov, niov, NULL, 0, errp); 222 } 223 224 int qio_channel_writev_full_all(QIOChannel *ioc, 225 const struct iovec *iov, 226 size_t niov, 227 int *fds, size_t nfds, 228 Error **errp) 229 { 230 int ret = -1; 231 struct iovec *local_iov = g_new(struct iovec, niov); 232 struct iovec *local_iov_head = local_iov; 233 unsigned int nlocal_iov = niov; 234 235 nlocal_iov = iov_copy(local_iov, nlocal_iov, 236 iov, niov, 237 0, iov_size(iov, niov)); 238 239 while (nlocal_iov > 0) { 240 ssize_t len; 241 len = qio_channel_writev_full(ioc, local_iov, nlocal_iov, fds, nfds, 242 errp); 243 if (len == QIO_CHANNEL_ERR_BLOCK) { 244 if (qemu_in_coroutine()) { 245 qio_channel_yield(ioc, G_IO_OUT); 246 } else { 247 qio_channel_wait(ioc, G_IO_OUT); 248 } 249 continue; 250 } 251 if (len < 0) { 252 goto cleanup; 253 } 254 255 iov_discard_front(&local_iov, &nlocal_iov, len); 256 257 fds = NULL; 258 nfds = 0; 259 } 260 261 ret = 0; 262 cleanup: 263 g_free(local_iov_head); 264 return ret; 265 } 266 267 ssize_t qio_channel_readv(QIOChannel *ioc, 268 const struct iovec *iov, 269 size_t niov, 270 Error **errp) 271 { 272 return qio_channel_readv_full(ioc, iov, niov, NULL, NULL, errp); 273 } 274 275 276 ssize_t qio_channel_writev(QIOChannel *ioc, 277 const struct iovec *iov, 278 size_t niov, 279 Error **errp) 280 { 281 return qio_channel_writev_full(ioc, iov, niov, NULL, 0, errp); 282 } 283 284 285 ssize_t qio_channel_read(QIOChannel *ioc, 286 char *buf, 287 size_t buflen, 288 Error **errp) 289 { 290 struct iovec iov = { .iov_base = buf, .iov_len = buflen }; 291 return qio_channel_readv_full(ioc, &iov, 1, NULL, NULL, errp); 292 } 293 294 295 ssize_t qio_channel_write(QIOChannel *ioc, 296 const char *buf, 297 size_t buflen, 298 Error **errp) 299 { 300 struct iovec iov = { .iov_base = (char *)buf, .iov_len = buflen }; 301 return qio_channel_writev_full(ioc, &iov, 1, NULL, 0, errp); 302 } 303 304 305 int qio_channel_read_all_eof(QIOChannel *ioc, 306 char *buf, 307 size_t buflen, 308 Error **errp) 309 { 310 struct iovec iov = { .iov_base = buf, .iov_len = buflen }; 311 return qio_channel_readv_all_eof(ioc, &iov, 1, errp); 312 } 313 314 315 int qio_channel_read_all(QIOChannel *ioc, 316 char *buf, 317 size_t buflen, 318 Error **errp) 319 { 320 struct iovec iov = { .iov_base = buf, .iov_len = buflen }; 321 return qio_channel_readv_all(ioc, &iov, 1, errp); 322 } 323 324 325 int qio_channel_write_all(QIOChannel *ioc, 326 const char *buf, 327 size_t buflen, 328 Error **errp) 329 { 330 struct iovec iov = { .iov_base = (char *)buf, .iov_len = buflen }; 331 return qio_channel_writev_all(ioc, &iov, 1, errp); 332 } 333 334 335 int qio_channel_set_blocking(QIOChannel *ioc, 336 bool enabled, 337 Error **errp) 338 { 339 QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc); 340 return klass->io_set_blocking(ioc, enabled, errp); 341 } 342 343 344 int qio_channel_close(QIOChannel *ioc, 345 Error **errp) 346 { 347 QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc); 348 return klass->io_close(ioc, errp); 349 } 350 351 352 GSource *qio_channel_create_watch(QIOChannel *ioc, 353 GIOCondition condition) 354 { 355 QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc); 356 GSource *ret = klass->io_create_watch(ioc, condition); 357 358 if (ioc->name) { 359 g_source_set_name(ret, ioc->name); 360 } 361 362 return ret; 363 } 364 365 366 void qio_channel_set_aio_fd_handler(QIOChannel *ioc, 367 AioContext *ctx, 368 IOHandler *io_read, 369 IOHandler *io_write, 370 void *opaque) 371 { 372 QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc); 373 374 klass->io_set_aio_fd_handler(ioc, ctx, io_read, io_write, opaque); 375 } 376 377 guint qio_channel_add_watch_full(QIOChannel *ioc, 378 GIOCondition condition, 379 QIOChannelFunc func, 380 gpointer user_data, 381 GDestroyNotify notify, 382 GMainContext *context) 383 { 384 GSource *source; 385 guint id; 386 387 source = qio_channel_create_watch(ioc, condition); 388 389 g_source_set_callback(source, (GSourceFunc)func, user_data, notify); 390 391 id = g_source_attach(source, context); 392 g_source_unref(source); 393 394 return id; 395 } 396 397 guint qio_channel_add_watch(QIOChannel *ioc, 398 GIOCondition condition, 399 QIOChannelFunc func, 400 gpointer user_data, 401 GDestroyNotify notify) 402 { 403 return qio_channel_add_watch_full(ioc, condition, func, 404 user_data, notify, NULL); 405 } 406 407 GSource *qio_channel_add_watch_source(QIOChannel *ioc, 408 GIOCondition condition, 409 QIOChannelFunc func, 410 gpointer user_data, 411 GDestroyNotify notify, 412 GMainContext *context) 413 { 414 GSource *source; 415 guint id; 416 417 id = qio_channel_add_watch_full(ioc, condition, func, 418 user_data, notify, context); 419 source = g_main_context_find_source_by_id(context, id); 420 g_source_ref(source); 421 return source; 422 } 423 424 425 int qio_channel_shutdown(QIOChannel *ioc, 426 QIOChannelShutdown how, 427 Error **errp) 428 { 429 QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc); 430 431 if (!klass->io_shutdown) { 432 error_setg(errp, "Data path shutdown not supported"); 433 return -1; 434 } 435 436 return klass->io_shutdown(ioc, how, errp); 437 } 438 439 440 void qio_channel_set_delay(QIOChannel *ioc, 441 bool enabled) 442 { 443 QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc); 444 445 if (klass->io_set_delay) { 446 klass->io_set_delay(ioc, enabled); 447 } 448 } 449 450 451 void qio_channel_set_cork(QIOChannel *ioc, 452 bool enabled) 453 { 454 QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc); 455 456 if (klass->io_set_cork) { 457 klass->io_set_cork(ioc, enabled); 458 } 459 } 460 461 462 off_t qio_channel_io_seek(QIOChannel *ioc, 463 off_t offset, 464 int whence, 465 Error **errp) 466 { 467 QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc); 468 469 if (!klass->io_seek) { 470 error_setg(errp, "Channel does not support random access"); 471 return -1; 472 } 473 474 return klass->io_seek(ioc, offset, whence, errp); 475 } 476 477 478 static void qio_channel_restart_read(void *opaque) 479 { 480 QIOChannel *ioc = opaque; 481 Coroutine *co = ioc->read_coroutine; 482 483 /* Assert that aio_co_wake() reenters the coroutine directly */ 484 assert(qemu_get_current_aio_context() == 485 qemu_coroutine_get_aio_context(co)); 486 aio_co_wake(co); 487 } 488 489 static void qio_channel_restart_write(void *opaque) 490 { 491 QIOChannel *ioc = opaque; 492 Coroutine *co = ioc->write_coroutine; 493 494 /* Assert that aio_co_wake() reenters the coroutine directly */ 495 assert(qemu_get_current_aio_context() == 496 qemu_coroutine_get_aio_context(co)); 497 aio_co_wake(co); 498 } 499 500 static void qio_channel_set_aio_fd_handlers(QIOChannel *ioc) 501 { 502 IOHandler *rd_handler = NULL, *wr_handler = NULL; 503 AioContext *ctx; 504 505 if (ioc->read_coroutine) { 506 rd_handler = qio_channel_restart_read; 507 } 508 if (ioc->write_coroutine) { 509 wr_handler = qio_channel_restart_write; 510 } 511 512 ctx = ioc->ctx ? ioc->ctx : iohandler_get_aio_context(); 513 qio_channel_set_aio_fd_handler(ioc, ctx, rd_handler, wr_handler, ioc); 514 } 515 516 void qio_channel_attach_aio_context(QIOChannel *ioc, 517 AioContext *ctx) 518 { 519 assert(!ioc->read_coroutine); 520 assert(!ioc->write_coroutine); 521 ioc->ctx = ctx; 522 } 523 524 void qio_channel_detach_aio_context(QIOChannel *ioc) 525 { 526 ioc->read_coroutine = NULL; 527 ioc->write_coroutine = NULL; 528 qio_channel_set_aio_fd_handlers(ioc); 529 ioc->ctx = NULL; 530 } 531 532 void coroutine_fn qio_channel_yield(QIOChannel *ioc, 533 GIOCondition condition) 534 { 535 assert(qemu_in_coroutine()); 536 if (condition == G_IO_IN) { 537 assert(!ioc->read_coroutine); 538 ioc->read_coroutine = qemu_coroutine_self(); 539 } else if (condition == G_IO_OUT) { 540 assert(!ioc->write_coroutine); 541 ioc->write_coroutine = qemu_coroutine_self(); 542 } else { 543 abort(); 544 } 545 qio_channel_set_aio_fd_handlers(ioc); 546 qemu_coroutine_yield(); 547 548 /* Allow interrupting the operation by reentering the coroutine other than 549 * through the aio_fd_handlers. */ 550 if (condition == G_IO_IN && ioc->read_coroutine) { 551 ioc->read_coroutine = NULL; 552 qio_channel_set_aio_fd_handlers(ioc); 553 } else if (condition == G_IO_OUT && ioc->write_coroutine) { 554 ioc->write_coroutine = NULL; 555 qio_channel_set_aio_fd_handlers(ioc); 556 } 557 } 558 559 560 static gboolean qio_channel_wait_complete(QIOChannel *ioc, 561 GIOCondition condition, 562 gpointer opaque) 563 { 564 GMainLoop *loop = opaque; 565 566 g_main_loop_quit(loop); 567 return FALSE; 568 } 569 570 571 void qio_channel_wait(QIOChannel *ioc, 572 GIOCondition condition) 573 { 574 GMainContext *ctxt = g_main_context_new(); 575 GMainLoop *loop = g_main_loop_new(ctxt, TRUE); 576 GSource *source; 577 578 source = qio_channel_create_watch(ioc, condition); 579 580 g_source_set_callback(source, 581 (GSourceFunc)qio_channel_wait_complete, 582 loop, 583 NULL); 584 585 g_source_attach(source, ctxt); 586 587 g_main_loop_run(loop); 588 589 g_source_unref(source); 590 g_main_loop_unref(loop); 591 g_main_context_unref(ctxt); 592 } 593 594 595 static void qio_channel_finalize(Object *obj) 596 { 597 QIOChannel *ioc = QIO_CHANNEL(obj); 598 599 g_free(ioc->name); 600 601 #ifdef _WIN32 602 if (ioc->event) { 603 CloseHandle(ioc->event); 604 } 605 #endif 606 } 607 608 static const TypeInfo qio_channel_info = { 609 .parent = TYPE_OBJECT, 610 .name = TYPE_QIO_CHANNEL, 611 .instance_size = sizeof(QIOChannel), 612 .instance_finalize = qio_channel_finalize, 613 .abstract = true, 614 .class_size = sizeof(QIOChannelClass), 615 }; 616 617 618 static void qio_channel_register_types(void) 619 { 620 type_register_static(&qio_channel_info); 621 } 622 623 624 type_init(qio_channel_register_types); 625