1 /* 2 * QEMU I/O channels 3 * 4 * Copyright (c) 2015 Red Hat, Inc. 5 * 6 * This library is free software; you can redistribute it and/or 7 * modify it under the terms of the GNU Lesser General Public 8 * License as published by the Free Software Foundation; either 9 * version 2 of the License, or (at your option) any later version. 10 * 11 * This library is distributed in the hope that it will be useful, 12 * but WITHOUT ANY WARRANTY; without even the implied warranty of 13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 14 * Lesser General Public License for more details. 15 * 16 * You should have received a copy of the GNU Lesser General Public 17 * License along with this library; if not, see <http://www.gnu.org/licenses/>. 18 * 19 */ 20 21 #include "qemu/osdep.h" 22 #include "io/channel.h" 23 #include "qapi/error.h" 24 #include "qemu/main-loop.h" 25 #include "qemu/iov.h" 26 27 bool qio_channel_has_feature(QIOChannel *ioc, 28 QIOChannelFeature feature) 29 { 30 return ioc->features & (1 << feature); 31 } 32 33 34 void qio_channel_set_feature(QIOChannel *ioc, 35 QIOChannelFeature feature) 36 { 37 ioc->features |= (1 << feature); 38 } 39 40 41 void qio_channel_set_name(QIOChannel *ioc, 42 const char *name) 43 { 44 g_free(ioc->name); 45 ioc->name = g_strdup(name); 46 } 47 48 49 ssize_t qio_channel_readv_full(QIOChannel *ioc, 50 const struct iovec *iov, 51 size_t niov, 52 int **fds, 53 size_t *nfds, 54 Error **errp) 55 { 56 QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc); 57 58 if ((fds || nfds) && 59 !qio_channel_has_feature(ioc, QIO_CHANNEL_FEATURE_FD_PASS)) { 60 error_setg_errno(errp, EINVAL, 61 "Channel does not support file descriptor passing"); 62 return -1; 63 } 64 65 return klass->io_readv(ioc, iov, niov, fds, nfds, errp); 66 } 67 68 69 ssize_t qio_channel_writev_full(QIOChannel *ioc, 70 const struct iovec *iov, 71 size_t niov, 72 int *fds, 73 size_t nfds, 74 Error **errp) 75 { 76 QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc); 77 78 if ((fds || nfds) && 79 !qio_channel_has_feature(ioc, QIO_CHANNEL_FEATURE_FD_PASS)) { 80 error_setg_errno(errp, EINVAL, 81 "Channel does not support file descriptor passing"); 82 return -1; 83 } 84 85 return klass->io_writev(ioc, iov, niov, fds, nfds, errp); 86 } 87 88 89 int qio_channel_readv_all_eof(QIOChannel *ioc, 90 const struct iovec *iov, 91 size_t niov, 92 Error **errp) 93 { 94 int ret = -1; 95 struct iovec *local_iov = g_new(struct iovec, niov); 96 struct iovec *local_iov_head = local_iov; 97 unsigned int nlocal_iov = niov; 98 bool partial = false; 99 100 nlocal_iov = iov_copy(local_iov, nlocal_iov, 101 iov, niov, 102 0, iov_size(iov, niov)); 103 104 while (nlocal_iov > 0) { 105 ssize_t len; 106 len = qio_channel_readv(ioc, local_iov, nlocal_iov, errp); 107 if (len == QIO_CHANNEL_ERR_BLOCK) { 108 if (qemu_in_coroutine()) { 109 qio_channel_yield(ioc, G_IO_IN); 110 } else { 111 qio_channel_wait(ioc, G_IO_IN); 112 } 113 continue; 114 } else if (len < 0) { 115 goto cleanup; 116 } else if (len == 0) { 117 if (partial) { 118 error_setg(errp, 119 "Unexpected end-of-file before all bytes were read"); 120 } else { 121 ret = 0; 122 } 123 goto cleanup; 124 } 125 126 partial = true; 127 iov_discard_front(&local_iov, &nlocal_iov, len); 128 } 129 130 ret = 1; 131 132 cleanup: 133 g_free(local_iov_head); 134 return ret; 135 } 136 137 int qio_channel_readv_all(QIOChannel *ioc, 138 const struct iovec *iov, 139 size_t niov, 140 Error **errp) 141 { 142 int ret = qio_channel_readv_all_eof(ioc, iov, niov, errp); 143 144 if (ret == 0) { 145 ret = -1; 146 error_setg(errp, 147 "Unexpected end-of-file before all bytes were read"); 148 } else if (ret == 1) { 149 ret = 0; 150 } 151 return ret; 152 } 153 154 int qio_channel_writev_all(QIOChannel *ioc, 155 const struct iovec *iov, 156 size_t niov, 157 Error **errp) 158 { 159 int ret = -1; 160 struct iovec *local_iov = g_new(struct iovec, niov); 161 struct iovec *local_iov_head = local_iov; 162 unsigned int nlocal_iov = niov; 163 164 nlocal_iov = iov_copy(local_iov, nlocal_iov, 165 iov, niov, 166 0, iov_size(iov, niov)); 167 168 while (nlocal_iov > 0) { 169 ssize_t len; 170 len = qio_channel_writev(ioc, local_iov, nlocal_iov, errp); 171 if (len == QIO_CHANNEL_ERR_BLOCK) { 172 if (qemu_in_coroutine()) { 173 qio_channel_yield(ioc, G_IO_OUT); 174 } else { 175 qio_channel_wait(ioc, G_IO_OUT); 176 } 177 continue; 178 } 179 if (len < 0) { 180 goto cleanup; 181 } 182 183 iov_discard_front(&local_iov, &nlocal_iov, len); 184 } 185 186 ret = 0; 187 cleanup: 188 g_free(local_iov_head); 189 return ret; 190 } 191 192 ssize_t qio_channel_readv(QIOChannel *ioc, 193 const struct iovec *iov, 194 size_t niov, 195 Error **errp) 196 { 197 return qio_channel_readv_full(ioc, iov, niov, NULL, NULL, errp); 198 } 199 200 201 ssize_t qio_channel_writev(QIOChannel *ioc, 202 const struct iovec *iov, 203 size_t niov, 204 Error **errp) 205 { 206 return qio_channel_writev_full(ioc, iov, niov, NULL, 0, errp); 207 } 208 209 210 ssize_t qio_channel_read(QIOChannel *ioc, 211 char *buf, 212 size_t buflen, 213 Error **errp) 214 { 215 struct iovec iov = { .iov_base = buf, .iov_len = buflen }; 216 return qio_channel_readv_full(ioc, &iov, 1, NULL, NULL, errp); 217 } 218 219 220 ssize_t qio_channel_write(QIOChannel *ioc, 221 const char *buf, 222 size_t buflen, 223 Error **errp) 224 { 225 struct iovec iov = { .iov_base = (char *)buf, .iov_len = buflen }; 226 return qio_channel_writev_full(ioc, &iov, 1, NULL, 0, errp); 227 } 228 229 230 int qio_channel_read_all_eof(QIOChannel *ioc, 231 char *buf, 232 size_t buflen, 233 Error **errp) 234 { 235 struct iovec iov = { .iov_base = buf, .iov_len = buflen }; 236 return qio_channel_readv_all_eof(ioc, &iov, 1, errp); 237 } 238 239 240 int qio_channel_read_all(QIOChannel *ioc, 241 char *buf, 242 size_t buflen, 243 Error **errp) 244 { 245 struct iovec iov = { .iov_base = buf, .iov_len = buflen }; 246 return qio_channel_readv_all(ioc, &iov, 1, errp); 247 } 248 249 250 int qio_channel_write_all(QIOChannel *ioc, 251 const char *buf, 252 size_t buflen, 253 Error **errp) 254 { 255 struct iovec iov = { .iov_base = (char *)buf, .iov_len = buflen }; 256 return qio_channel_writev_all(ioc, &iov, 1, errp); 257 } 258 259 260 int qio_channel_set_blocking(QIOChannel *ioc, 261 bool enabled, 262 Error **errp) 263 { 264 QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc); 265 return klass->io_set_blocking(ioc, enabled, errp); 266 } 267 268 269 int qio_channel_close(QIOChannel *ioc, 270 Error **errp) 271 { 272 QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc); 273 return klass->io_close(ioc, errp); 274 } 275 276 277 GSource *qio_channel_create_watch(QIOChannel *ioc, 278 GIOCondition condition) 279 { 280 QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc); 281 GSource *ret = klass->io_create_watch(ioc, condition); 282 283 if (ioc->name) { 284 g_source_set_name(ret, ioc->name); 285 } 286 287 return ret; 288 } 289 290 291 void qio_channel_set_aio_fd_handler(QIOChannel *ioc, 292 AioContext *ctx, 293 IOHandler *io_read, 294 IOHandler *io_write, 295 void *opaque) 296 { 297 QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc); 298 299 klass->io_set_aio_fd_handler(ioc, ctx, io_read, io_write, opaque); 300 } 301 302 guint qio_channel_add_watch_full(QIOChannel *ioc, 303 GIOCondition condition, 304 QIOChannelFunc func, 305 gpointer user_data, 306 GDestroyNotify notify, 307 GMainContext *context) 308 { 309 GSource *source; 310 guint id; 311 312 source = qio_channel_create_watch(ioc, condition); 313 314 g_source_set_callback(source, (GSourceFunc)func, user_data, notify); 315 316 id = g_source_attach(source, context); 317 g_source_unref(source); 318 319 return id; 320 } 321 322 guint qio_channel_add_watch(QIOChannel *ioc, 323 GIOCondition condition, 324 QIOChannelFunc func, 325 gpointer user_data, 326 GDestroyNotify notify) 327 { 328 return qio_channel_add_watch_full(ioc, condition, func, 329 user_data, notify, NULL); 330 } 331 332 GSource *qio_channel_add_watch_source(QIOChannel *ioc, 333 GIOCondition condition, 334 QIOChannelFunc func, 335 gpointer user_data, 336 GDestroyNotify notify, 337 GMainContext *context) 338 { 339 GSource *source; 340 guint id; 341 342 id = qio_channel_add_watch_full(ioc, condition, func, 343 user_data, notify, context); 344 source = g_main_context_find_source_by_id(context, id); 345 g_source_ref(source); 346 return source; 347 } 348 349 350 int qio_channel_shutdown(QIOChannel *ioc, 351 QIOChannelShutdown how, 352 Error **errp) 353 { 354 QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc); 355 356 if (!klass->io_shutdown) { 357 error_setg(errp, "Data path shutdown not supported"); 358 return -1; 359 } 360 361 return klass->io_shutdown(ioc, how, errp); 362 } 363 364 365 void qio_channel_set_delay(QIOChannel *ioc, 366 bool enabled) 367 { 368 QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc); 369 370 if (klass->io_set_delay) { 371 klass->io_set_delay(ioc, enabled); 372 } 373 } 374 375 376 void qio_channel_set_cork(QIOChannel *ioc, 377 bool enabled) 378 { 379 QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc); 380 381 if (klass->io_set_cork) { 382 klass->io_set_cork(ioc, enabled); 383 } 384 } 385 386 387 off_t qio_channel_io_seek(QIOChannel *ioc, 388 off_t offset, 389 int whence, 390 Error **errp) 391 { 392 QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc); 393 394 if (!klass->io_seek) { 395 error_setg(errp, "Channel does not support random access"); 396 return -1; 397 } 398 399 return klass->io_seek(ioc, offset, whence, errp); 400 } 401 402 403 static void qio_channel_restart_read(void *opaque) 404 { 405 QIOChannel *ioc = opaque; 406 Coroutine *co = ioc->read_coroutine; 407 408 /* Assert that aio_co_wake() reenters the coroutine directly */ 409 assert(qemu_get_current_aio_context() == 410 qemu_coroutine_get_aio_context(co)); 411 aio_co_wake(co); 412 } 413 414 static void qio_channel_restart_write(void *opaque) 415 { 416 QIOChannel *ioc = opaque; 417 Coroutine *co = ioc->write_coroutine; 418 419 /* Assert that aio_co_wake() reenters the coroutine directly */ 420 assert(qemu_get_current_aio_context() == 421 qemu_coroutine_get_aio_context(co)); 422 aio_co_wake(co); 423 } 424 425 static void qio_channel_set_aio_fd_handlers(QIOChannel *ioc) 426 { 427 IOHandler *rd_handler = NULL, *wr_handler = NULL; 428 AioContext *ctx; 429 430 if (ioc->read_coroutine) { 431 rd_handler = qio_channel_restart_read; 432 } 433 if (ioc->write_coroutine) { 434 wr_handler = qio_channel_restart_write; 435 } 436 437 ctx = ioc->ctx ? ioc->ctx : iohandler_get_aio_context(); 438 qio_channel_set_aio_fd_handler(ioc, ctx, rd_handler, wr_handler, ioc); 439 } 440 441 void qio_channel_attach_aio_context(QIOChannel *ioc, 442 AioContext *ctx) 443 { 444 assert(!ioc->read_coroutine); 445 assert(!ioc->write_coroutine); 446 ioc->ctx = ctx; 447 } 448 449 void qio_channel_detach_aio_context(QIOChannel *ioc) 450 { 451 ioc->read_coroutine = NULL; 452 ioc->write_coroutine = NULL; 453 qio_channel_set_aio_fd_handlers(ioc); 454 ioc->ctx = NULL; 455 } 456 457 void coroutine_fn qio_channel_yield(QIOChannel *ioc, 458 GIOCondition condition) 459 { 460 assert(qemu_in_coroutine()); 461 if (condition == G_IO_IN) { 462 assert(!ioc->read_coroutine); 463 ioc->read_coroutine = qemu_coroutine_self(); 464 } else if (condition == G_IO_OUT) { 465 assert(!ioc->write_coroutine); 466 ioc->write_coroutine = qemu_coroutine_self(); 467 } else { 468 abort(); 469 } 470 qio_channel_set_aio_fd_handlers(ioc); 471 qemu_coroutine_yield(); 472 473 /* Allow interrupting the operation by reentering the coroutine other than 474 * through the aio_fd_handlers. */ 475 if (condition == G_IO_IN && ioc->read_coroutine) { 476 ioc->read_coroutine = NULL; 477 qio_channel_set_aio_fd_handlers(ioc); 478 } else if (condition == G_IO_OUT && ioc->write_coroutine) { 479 ioc->write_coroutine = NULL; 480 qio_channel_set_aio_fd_handlers(ioc); 481 } 482 } 483 484 485 static gboolean qio_channel_wait_complete(QIOChannel *ioc, 486 GIOCondition condition, 487 gpointer opaque) 488 { 489 GMainLoop *loop = opaque; 490 491 g_main_loop_quit(loop); 492 return FALSE; 493 } 494 495 496 void qio_channel_wait(QIOChannel *ioc, 497 GIOCondition condition) 498 { 499 GMainContext *ctxt = g_main_context_new(); 500 GMainLoop *loop = g_main_loop_new(ctxt, TRUE); 501 GSource *source; 502 503 source = qio_channel_create_watch(ioc, condition); 504 505 g_source_set_callback(source, 506 (GSourceFunc)qio_channel_wait_complete, 507 loop, 508 NULL); 509 510 g_source_attach(source, ctxt); 511 512 g_main_loop_run(loop); 513 514 g_source_unref(source); 515 g_main_loop_unref(loop); 516 g_main_context_unref(ctxt); 517 } 518 519 520 static void qio_channel_finalize(Object *obj) 521 { 522 QIOChannel *ioc = QIO_CHANNEL(obj); 523 524 g_free(ioc->name); 525 526 #ifdef _WIN32 527 if (ioc->event) { 528 CloseHandle(ioc->event); 529 } 530 #endif 531 } 532 533 static const TypeInfo qio_channel_info = { 534 .parent = TYPE_OBJECT, 535 .name = TYPE_QIO_CHANNEL, 536 .instance_size = sizeof(QIOChannel), 537 .instance_finalize = qio_channel_finalize, 538 .abstract = true, 539 .class_size = sizeof(QIOChannelClass), 540 }; 541 542 543 static void qio_channel_register_types(void) 544 { 545 type_register_static(&qio_channel_info); 546 } 547 548 549 type_init(qio_channel_register_types); 550