1 /* 2 * QEMU I/O channels 3 * 4 * Copyright (c) 2015 Red Hat, Inc. 5 * 6 * This library is free software; you can redistribute it and/or 7 * modify it under the terms of the GNU Lesser General Public 8 * License as published by the Free Software Foundation; either 9 * version 2.1 of the License, or (at your option) any later version. 10 * 11 * This library is distributed in the hope that it will be useful, 12 * but WITHOUT ANY WARRANTY; without even the implied warranty of 13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 14 * Lesser General Public License for more details. 15 * 16 * You should have received a copy of the GNU Lesser General Public 17 * License along with this library; if not, see <http://www.gnu.org/licenses/>. 18 * 19 */ 20 21 #include "qemu/osdep.h" 22 #include "io/channel.h" 23 #include "qapi/error.h" 24 #include "qemu/main-loop.h" 25 #include "qemu/module.h" 26 #include "qemu/iov.h" 27 28 bool qio_channel_has_feature(QIOChannel *ioc, 29 QIOChannelFeature feature) 30 { 31 return ioc->features & (1 << feature); 32 } 33 34 35 void qio_channel_set_feature(QIOChannel *ioc, 36 QIOChannelFeature feature) 37 { 38 ioc->features |= (1 << feature); 39 } 40 41 42 void qio_channel_set_name(QIOChannel *ioc, 43 const char *name) 44 { 45 g_free(ioc->name); 46 ioc->name = g_strdup(name); 47 } 48 49 50 ssize_t qio_channel_readv_full(QIOChannel *ioc, 51 const struct iovec *iov, 52 size_t niov, 53 int **fds, 54 size_t *nfds, 55 Error **errp) 56 { 57 QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc); 58 59 if ((fds || nfds) && 60 !qio_channel_has_feature(ioc, QIO_CHANNEL_FEATURE_FD_PASS)) { 61 error_setg_errno(errp, EINVAL, 62 "Channel does not support file descriptor passing"); 63 return -1; 64 } 65 66 return klass->io_readv(ioc, iov, niov, fds, nfds, errp); 67 } 68 69 70 ssize_t qio_channel_writev_full(QIOChannel *ioc, 71 const struct iovec *iov, 72 size_t niov, 73 int *fds, 74 size_t nfds, 75 Error **errp) 76 { 77 QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc); 78 79 if ((fds || nfds) && 80 !qio_channel_has_feature(ioc, QIO_CHANNEL_FEATURE_FD_PASS)) { 81 error_setg_errno(errp, EINVAL, 82 "Channel does not support file descriptor passing"); 83 return -1; 84 } 85 86 return klass->io_writev(ioc, iov, niov, fds, nfds, errp); 87 } 88 89 90 int qio_channel_readv_all_eof(QIOChannel *ioc, 91 const struct iovec *iov, 92 size_t niov, 93 Error **errp) 94 { 95 int ret = -1; 96 struct iovec *local_iov = g_new(struct iovec, niov); 97 struct iovec *local_iov_head = local_iov; 98 unsigned int nlocal_iov = niov; 99 bool partial = false; 100 101 nlocal_iov = iov_copy(local_iov, nlocal_iov, 102 iov, niov, 103 0, iov_size(iov, niov)); 104 105 while (nlocal_iov > 0) { 106 ssize_t len; 107 len = qio_channel_readv(ioc, local_iov, nlocal_iov, errp); 108 if (len == QIO_CHANNEL_ERR_BLOCK) { 109 if (qemu_in_coroutine()) { 110 qio_channel_yield(ioc, G_IO_IN); 111 } else { 112 qio_channel_wait(ioc, G_IO_IN); 113 } 114 continue; 115 } else if (len < 0) { 116 goto cleanup; 117 } else if (len == 0) { 118 if (partial) { 119 error_setg(errp, 120 "Unexpected end-of-file before all bytes were read"); 121 } else { 122 ret = 0; 123 } 124 goto cleanup; 125 } 126 127 partial = true; 128 iov_discard_front(&local_iov, &nlocal_iov, len); 129 } 130 131 ret = 1; 132 133 cleanup: 134 g_free(local_iov_head); 135 return ret; 136 } 137 138 int qio_channel_readv_all(QIOChannel *ioc, 139 const struct iovec *iov, 140 size_t niov, 141 Error **errp) 142 { 143 int ret = qio_channel_readv_all_eof(ioc, iov, niov, errp); 144 145 if (ret == 0) { 146 ret = -1; 147 error_setg(errp, 148 "Unexpected end-of-file before all bytes were read"); 149 } else if (ret == 1) { 150 ret = 0; 151 } 152 return ret; 153 } 154 155 int qio_channel_writev_all(QIOChannel *ioc, 156 const struct iovec *iov, 157 size_t niov, 158 Error **errp) 159 { 160 int ret = -1; 161 struct iovec *local_iov = g_new(struct iovec, niov); 162 struct iovec *local_iov_head = local_iov; 163 unsigned int nlocal_iov = niov; 164 165 nlocal_iov = iov_copy(local_iov, nlocal_iov, 166 iov, niov, 167 0, iov_size(iov, niov)); 168 169 while (nlocal_iov > 0) { 170 ssize_t len; 171 len = qio_channel_writev(ioc, local_iov, nlocal_iov, errp); 172 if (len == QIO_CHANNEL_ERR_BLOCK) { 173 if (qemu_in_coroutine()) { 174 qio_channel_yield(ioc, G_IO_OUT); 175 } else { 176 qio_channel_wait(ioc, G_IO_OUT); 177 } 178 continue; 179 } 180 if (len < 0) { 181 goto cleanup; 182 } 183 184 iov_discard_front(&local_iov, &nlocal_iov, len); 185 } 186 187 ret = 0; 188 cleanup: 189 g_free(local_iov_head); 190 return ret; 191 } 192 193 ssize_t qio_channel_readv(QIOChannel *ioc, 194 const struct iovec *iov, 195 size_t niov, 196 Error **errp) 197 { 198 return qio_channel_readv_full(ioc, iov, niov, NULL, NULL, errp); 199 } 200 201 202 ssize_t qio_channel_writev(QIOChannel *ioc, 203 const struct iovec *iov, 204 size_t niov, 205 Error **errp) 206 { 207 return qio_channel_writev_full(ioc, iov, niov, NULL, 0, errp); 208 } 209 210 211 ssize_t qio_channel_read(QIOChannel *ioc, 212 char *buf, 213 size_t buflen, 214 Error **errp) 215 { 216 struct iovec iov = { .iov_base = buf, .iov_len = buflen }; 217 return qio_channel_readv_full(ioc, &iov, 1, NULL, NULL, errp); 218 } 219 220 221 ssize_t qio_channel_write(QIOChannel *ioc, 222 const char *buf, 223 size_t buflen, 224 Error **errp) 225 { 226 struct iovec iov = { .iov_base = (char *)buf, .iov_len = buflen }; 227 return qio_channel_writev_full(ioc, &iov, 1, NULL, 0, errp); 228 } 229 230 231 int qio_channel_read_all_eof(QIOChannel *ioc, 232 char *buf, 233 size_t buflen, 234 Error **errp) 235 { 236 struct iovec iov = { .iov_base = buf, .iov_len = buflen }; 237 return qio_channel_readv_all_eof(ioc, &iov, 1, errp); 238 } 239 240 241 int qio_channel_read_all(QIOChannel *ioc, 242 char *buf, 243 size_t buflen, 244 Error **errp) 245 { 246 struct iovec iov = { .iov_base = buf, .iov_len = buflen }; 247 return qio_channel_readv_all(ioc, &iov, 1, errp); 248 } 249 250 251 int qio_channel_write_all(QIOChannel *ioc, 252 const char *buf, 253 size_t buflen, 254 Error **errp) 255 { 256 struct iovec iov = { .iov_base = (char *)buf, .iov_len = buflen }; 257 return qio_channel_writev_all(ioc, &iov, 1, errp); 258 } 259 260 261 int qio_channel_set_blocking(QIOChannel *ioc, 262 bool enabled, 263 Error **errp) 264 { 265 QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc); 266 return klass->io_set_blocking(ioc, enabled, errp); 267 } 268 269 270 int qio_channel_close(QIOChannel *ioc, 271 Error **errp) 272 { 273 QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc); 274 return klass->io_close(ioc, errp); 275 } 276 277 278 GSource *qio_channel_create_watch(QIOChannel *ioc, 279 GIOCondition condition) 280 { 281 QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc); 282 GSource *ret = klass->io_create_watch(ioc, condition); 283 284 if (ioc->name) { 285 g_source_set_name(ret, ioc->name); 286 } 287 288 return ret; 289 } 290 291 292 void qio_channel_set_aio_fd_handler(QIOChannel *ioc, 293 AioContext *ctx, 294 IOHandler *io_read, 295 IOHandler *io_write, 296 void *opaque) 297 { 298 QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc); 299 300 klass->io_set_aio_fd_handler(ioc, ctx, io_read, io_write, opaque); 301 } 302 303 guint qio_channel_add_watch_full(QIOChannel *ioc, 304 GIOCondition condition, 305 QIOChannelFunc func, 306 gpointer user_data, 307 GDestroyNotify notify, 308 GMainContext *context) 309 { 310 GSource *source; 311 guint id; 312 313 source = qio_channel_create_watch(ioc, condition); 314 315 g_source_set_callback(source, (GSourceFunc)func, user_data, notify); 316 317 id = g_source_attach(source, context); 318 g_source_unref(source); 319 320 return id; 321 } 322 323 guint qio_channel_add_watch(QIOChannel *ioc, 324 GIOCondition condition, 325 QIOChannelFunc func, 326 gpointer user_data, 327 GDestroyNotify notify) 328 { 329 return qio_channel_add_watch_full(ioc, condition, func, 330 user_data, notify, NULL); 331 } 332 333 GSource *qio_channel_add_watch_source(QIOChannel *ioc, 334 GIOCondition condition, 335 QIOChannelFunc func, 336 gpointer user_data, 337 GDestroyNotify notify, 338 GMainContext *context) 339 { 340 GSource *source; 341 guint id; 342 343 id = qio_channel_add_watch_full(ioc, condition, func, 344 user_data, notify, context); 345 source = g_main_context_find_source_by_id(context, id); 346 g_source_ref(source); 347 return source; 348 } 349 350 351 int qio_channel_shutdown(QIOChannel *ioc, 352 QIOChannelShutdown how, 353 Error **errp) 354 { 355 QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc); 356 357 if (!klass->io_shutdown) { 358 error_setg(errp, "Data path shutdown not supported"); 359 return -1; 360 } 361 362 return klass->io_shutdown(ioc, how, errp); 363 } 364 365 366 void qio_channel_set_delay(QIOChannel *ioc, 367 bool enabled) 368 { 369 QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc); 370 371 if (klass->io_set_delay) { 372 klass->io_set_delay(ioc, enabled); 373 } 374 } 375 376 377 void qio_channel_set_cork(QIOChannel *ioc, 378 bool enabled) 379 { 380 QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc); 381 382 if (klass->io_set_cork) { 383 klass->io_set_cork(ioc, enabled); 384 } 385 } 386 387 388 off_t qio_channel_io_seek(QIOChannel *ioc, 389 off_t offset, 390 int whence, 391 Error **errp) 392 { 393 QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc); 394 395 if (!klass->io_seek) { 396 error_setg(errp, "Channel does not support random access"); 397 return -1; 398 } 399 400 return klass->io_seek(ioc, offset, whence, errp); 401 } 402 403 404 static void qio_channel_restart_read(void *opaque) 405 { 406 QIOChannel *ioc = opaque; 407 Coroutine *co = ioc->read_coroutine; 408 409 /* Assert that aio_co_wake() reenters the coroutine directly */ 410 assert(qemu_get_current_aio_context() == 411 qemu_coroutine_get_aio_context(co)); 412 aio_co_wake(co); 413 } 414 415 static void qio_channel_restart_write(void *opaque) 416 { 417 QIOChannel *ioc = opaque; 418 Coroutine *co = ioc->write_coroutine; 419 420 /* Assert that aio_co_wake() reenters the coroutine directly */ 421 assert(qemu_get_current_aio_context() == 422 qemu_coroutine_get_aio_context(co)); 423 aio_co_wake(co); 424 } 425 426 static void qio_channel_set_aio_fd_handlers(QIOChannel *ioc) 427 { 428 IOHandler *rd_handler = NULL, *wr_handler = NULL; 429 AioContext *ctx; 430 431 if (ioc->read_coroutine) { 432 rd_handler = qio_channel_restart_read; 433 } 434 if (ioc->write_coroutine) { 435 wr_handler = qio_channel_restart_write; 436 } 437 438 ctx = ioc->ctx ? ioc->ctx : iohandler_get_aio_context(); 439 qio_channel_set_aio_fd_handler(ioc, ctx, rd_handler, wr_handler, ioc); 440 } 441 442 void qio_channel_attach_aio_context(QIOChannel *ioc, 443 AioContext *ctx) 444 { 445 assert(!ioc->read_coroutine); 446 assert(!ioc->write_coroutine); 447 ioc->ctx = ctx; 448 } 449 450 void qio_channel_detach_aio_context(QIOChannel *ioc) 451 { 452 ioc->read_coroutine = NULL; 453 ioc->write_coroutine = NULL; 454 qio_channel_set_aio_fd_handlers(ioc); 455 ioc->ctx = NULL; 456 } 457 458 void coroutine_fn qio_channel_yield(QIOChannel *ioc, 459 GIOCondition condition) 460 { 461 assert(qemu_in_coroutine()); 462 if (condition == G_IO_IN) { 463 assert(!ioc->read_coroutine); 464 ioc->read_coroutine = qemu_coroutine_self(); 465 } else if (condition == G_IO_OUT) { 466 assert(!ioc->write_coroutine); 467 ioc->write_coroutine = qemu_coroutine_self(); 468 } else { 469 abort(); 470 } 471 qio_channel_set_aio_fd_handlers(ioc); 472 qemu_coroutine_yield(); 473 474 /* Allow interrupting the operation by reentering the coroutine other than 475 * through the aio_fd_handlers. */ 476 if (condition == G_IO_IN && ioc->read_coroutine) { 477 ioc->read_coroutine = NULL; 478 qio_channel_set_aio_fd_handlers(ioc); 479 } else if (condition == G_IO_OUT && ioc->write_coroutine) { 480 ioc->write_coroutine = NULL; 481 qio_channel_set_aio_fd_handlers(ioc); 482 } 483 } 484 485 486 static gboolean qio_channel_wait_complete(QIOChannel *ioc, 487 GIOCondition condition, 488 gpointer opaque) 489 { 490 GMainLoop *loop = opaque; 491 492 g_main_loop_quit(loop); 493 return FALSE; 494 } 495 496 497 void qio_channel_wait(QIOChannel *ioc, 498 GIOCondition condition) 499 { 500 GMainContext *ctxt = g_main_context_new(); 501 GMainLoop *loop = g_main_loop_new(ctxt, TRUE); 502 GSource *source; 503 504 source = qio_channel_create_watch(ioc, condition); 505 506 g_source_set_callback(source, 507 (GSourceFunc)qio_channel_wait_complete, 508 loop, 509 NULL); 510 511 g_source_attach(source, ctxt); 512 513 g_main_loop_run(loop); 514 515 g_source_unref(source); 516 g_main_loop_unref(loop); 517 g_main_context_unref(ctxt); 518 } 519 520 521 static void qio_channel_finalize(Object *obj) 522 { 523 QIOChannel *ioc = QIO_CHANNEL(obj); 524 525 g_free(ioc->name); 526 527 #ifdef _WIN32 528 if (ioc->event) { 529 CloseHandle(ioc->event); 530 } 531 #endif 532 } 533 534 static const TypeInfo qio_channel_info = { 535 .parent = TYPE_OBJECT, 536 .name = TYPE_QIO_CHANNEL, 537 .instance_size = sizeof(QIOChannel), 538 .instance_finalize = qio_channel_finalize, 539 .abstract = true, 540 .class_size = sizeof(QIOChannelClass), 541 }; 542 543 544 static void qio_channel_register_types(void) 545 { 546 type_register_static(&qio_channel_info); 547 } 548 549 550 type_init(qio_channel_register_types); 551