1 /* 2 * QEMU I/O channels 3 * 4 * Copyright (c) 2015 Red Hat, Inc. 5 * 6 * This library is free software; you can redistribute it and/or 7 * modify it under the terms of the GNU Lesser General Public 8 * License as published by the Free Software Foundation; either 9 * version 2 of the License, or (at your option) any later version. 10 * 11 * This library is distributed in the hope that it will be useful, 12 * but WITHOUT ANY WARRANTY; without even the implied warranty of 13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 14 * Lesser General Public License for more details. 15 * 16 * You should have received a copy of the GNU Lesser General Public 17 * License along with this library; if not, see <http://www.gnu.org/licenses/>. 18 * 19 */ 20 21 #include "qemu/osdep.h" 22 #include "io/channel.h" 23 #include "qapi/error.h" 24 #include "qemu/main-loop.h" 25 #include "qemu/iov.h" 26 27 bool qio_channel_has_feature(QIOChannel *ioc, 28 QIOChannelFeature feature) 29 { 30 return ioc->features & (1 << feature); 31 } 32 33 34 void qio_channel_set_feature(QIOChannel *ioc, 35 QIOChannelFeature feature) 36 { 37 ioc->features |= (1 << feature); 38 } 39 40 41 void qio_channel_set_name(QIOChannel *ioc, 42 const char *name) 43 { 44 g_free(ioc->name); 45 ioc->name = g_strdup(name); 46 } 47 48 49 ssize_t qio_channel_readv_full(QIOChannel *ioc, 50 const struct iovec *iov, 51 size_t niov, 52 int **fds, 53 size_t *nfds, 54 Error **errp) 55 { 56 QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc); 57 58 if ((fds || nfds) && 59 !qio_channel_has_feature(ioc, QIO_CHANNEL_FEATURE_FD_PASS)) { 60 error_setg_errno(errp, EINVAL, 61 "Channel does not support file descriptor passing"); 62 return -1; 63 } 64 65 return klass->io_readv(ioc, iov, niov, fds, nfds, errp); 66 } 67 68 69 ssize_t qio_channel_writev_full(QIOChannel *ioc, 70 const struct iovec *iov, 71 size_t niov, 72 int *fds, 73 size_t nfds, 74 Error **errp) 75 { 76 QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc); 77 78 if ((fds || nfds) && 79 !qio_channel_has_feature(ioc, QIO_CHANNEL_FEATURE_FD_PASS)) { 80 error_setg_errno(errp, EINVAL, 81 "Channel does not support file descriptor passing"); 82 return -1; 83 } 84 85 return klass->io_writev(ioc, iov, niov, fds, nfds, errp); 86 } 87 88 89 int qio_channel_readv_all_eof(QIOChannel *ioc, 90 const struct iovec *iov, 91 size_t niov, 92 Error **errp) 93 { 94 int ret = -1; 95 struct iovec *local_iov = g_new(struct iovec, niov); 96 struct iovec *local_iov_head = local_iov; 97 unsigned int nlocal_iov = niov; 98 bool partial = false; 99 100 nlocal_iov = iov_copy(local_iov, nlocal_iov, 101 iov, niov, 102 0, iov_size(iov, niov)); 103 104 while (nlocal_iov > 0) { 105 ssize_t len; 106 len = qio_channel_readv(ioc, local_iov, nlocal_iov, errp); 107 if (len == QIO_CHANNEL_ERR_BLOCK) { 108 if (qemu_in_coroutine()) { 109 qio_channel_yield(ioc, G_IO_IN); 110 } else { 111 qio_channel_wait(ioc, G_IO_IN); 112 } 113 continue; 114 } else if (len < 0) { 115 goto cleanup; 116 } else if (len == 0) { 117 if (partial) { 118 error_setg(errp, 119 "Unexpected end-of-file before all bytes were read"); 120 } else { 121 ret = 0; 122 } 123 goto cleanup; 124 } 125 126 partial = true; 127 iov_discard_front(&local_iov, &nlocal_iov, len); 128 } 129 130 ret = 1; 131 132 cleanup: 133 g_free(local_iov_head); 134 return ret; 135 } 136 137 int qio_channel_readv_all(QIOChannel *ioc, 138 const struct iovec *iov, 139 size_t niov, 140 Error **errp) 141 { 142 int ret = qio_channel_readv_all_eof(ioc, iov, niov, errp); 143 144 if (ret == 0) { 145 ret = -1; 146 error_setg(errp, 147 "Unexpected end-of-file before all bytes were read"); 148 } else if (ret == 1) { 149 ret = 0; 150 } 151 return ret; 152 } 153 154 int qio_channel_writev_all(QIOChannel *ioc, 155 const struct iovec *iov, 156 size_t niov, 157 Error **errp) 158 { 159 int ret = -1; 160 struct iovec *local_iov = g_new(struct iovec, niov); 161 struct iovec *local_iov_head = local_iov; 162 unsigned int nlocal_iov = niov; 163 164 nlocal_iov = iov_copy(local_iov, nlocal_iov, 165 iov, niov, 166 0, iov_size(iov, niov)); 167 168 while (nlocal_iov > 0) { 169 ssize_t len; 170 len = qio_channel_writev(ioc, local_iov, nlocal_iov, errp); 171 if (len == QIO_CHANNEL_ERR_BLOCK) { 172 if (qemu_in_coroutine()) { 173 qio_channel_yield(ioc, G_IO_OUT); 174 } else { 175 qio_channel_wait(ioc, G_IO_OUT); 176 } 177 continue; 178 } 179 if (len < 0) { 180 goto cleanup; 181 } 182 183 iov_discard_front(&local_iov, &nlocal_iov, len); 184 } 185 186 ret = 0; 187 cleanup: 188 g_free(local_iov_head); 189 return ret; 190 } 191 192 ssize_t qio_channel_readv(QIOChannel *ioc, 193 const struct iovec *iov, 194 size_t niov, 195 Error **errp) 196 { 197 return qio_channel_readv_full(ioc, iov, niov, NULL, NULL, errp); 198 } 199 200 201 ssize_t qio_channel_writev(QIOChannel *ioc, 202 const struct iovec *iov, 203 size_t niov, 204 Error **errp) 205 { 206 return qio_channel_writev_full(ioc, iov, niov, NULL, 0, errp); 207 } 208 209 210 ssize_t qio_channel_read(QIOChannel *ioc, 211 char *buf, 212 size_t buflen, 213 Error **errp) 214 { 215 struct iovec iov = { .iov_base = buf, .iov_len = buflen }; 216 return qio_channel_readv_full(ioc, &iov, 1, NULL, NULL, errp); 217 } 218 219 220 ssize_t qio_channel_write(QIOChannel *ioc, 221 const char *buf, 222 size_t buflen, 223 Error **errp) 224 { 225 struct iovec iov = { .iov_base = (char *)buf, .iov_len = buflen }; 226 return qio_channel_writev_full(ioc, &iov, 1, NULL, 0, errp); 227 } 228 229 230 int qio_channel_read_all_eof(QIOChannel *ioc, 231 char *buf, 232 size_t buflen, 233 Error **errp) 234 { 235 struct iovec iov = { .iov_base = buf, .iov_len = buflen }; 236 return qio_channel_readv_all_eof(ioc, &iov, 1, errp); 237 } 238 239 240 int qio_channel_read_all(QIOChannel *ioc, 241 char *buf, 242 size_t buflen, 243 Error **errp) 244 { 245 struct iovec iov = { .iov_base = buf, .iov_len = buflen }; 246 return qio_channel_readv_all(ioc, &iov, 1, errp); 247 } 248 249 250 int qio_channel_write_all(QIOChannel *ioc, 251 const char *buf, 252 size_t buflen, 253 Error **errp) 254 { 255 struct iovec iov = { .iov_base = (char *)buf, .iov_len = buflen }; 256 return qio_channel_writev_all(ioc, &iov, 1, errp); 257 } 258 259 260 int qio_channel_set_blocking(QIOChannel *ioc, 261 bool enabled, 262 Error **errp) 263 { 264 QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc); 265 return klass->io_set_blocking(ioc, enabled, errp); 266 } 267 268 269 int qio_channel_close(QIOChannel *ioc, 270 Error **errp) 271 { 272 QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc); 273 return klass->io_close(ioc, errp); 274 } 275 276 277 GSource *qio_channel_create_watch(QIOChannel *ioc, 278 GIOCondition condition) 279 { 280 QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc); 281 GSource *ret = klass->io_create_watch(ioc, condition); 282 283 if (ioc->name) { 284 g_source_set_name(ret, ioc->name); 285 } 286 287 return ret; 288 } 289 290 291 void qio_channel_set_aio_fd_handler(QIOChannel *ioc, 292 AioContext *ctx, 293 IOHandler *io_read, 294 IOHandler *io_write, 295 void *opaque) 296 { 297 QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc); 298 299 klass->io_set_aio_fd_handler(ioc, ctx, io_read, io_write, opaque); 300 } 301 302 guint qio_channel_add_watch(QIOChannel *ioc, 303 GIOCondition condition, 304 QIOChannelFunc func, 305 gpointer user_data, 306 GDestroyNotify notify) 307 { 308 GSource *source; 309 guint id; 310 311 source = qio_channel_create_watch(ioc, condition); 312 313 g_source_set_callback(source, (GSourceFunc)func, user_data, notify); 314 315 id = g_source_attach(source, NULL); 316 g_source_unref(source); 317 318 return id; 319 } 320 321 322 int qio_channel_shutdown(QIOChannel *ioc, 323 QIOChannelShutdown how, 324 Error **errp) 325 { 326 QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc); 327 328 if (!klass->io_shutdown) { 329 error_setg(errp, "Data path shutdown not supported"); 330 return -1; 331 } 332 333 return klass->io_shutdown(ioc, how, errp); 334 } 335 336 337 void qio_channel_set_delay(QIOChannel *ioc, 338 bool enabled) 339 { 340 QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc); 341 342 if (klass->io_set_delay) { 343 klass->io_set_delay(ioc, enabled); 344 } 345 } 346 347 348 void qio_channel_set_cork(QIOChannel *ioc, 349 bool enabled) 350 { 351 QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc); 352 353 if (klass->io_set_cork) { 354 klass->io_set_cork(ioc, enabled); 355 } 356 } 357 358 359 off_t qio_channel_io_seek(QIOChannel *ioc, 360 off_t offset, 361 int whence, 362 Error **errp) 363 { 364 QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc); 365 366 if (!klass->io_seek) { 367 error_setg(errp, "Channel does not support random access"); 368 return -1; 369 } 370 371 return klass->io_seek(ioc, offset, whence, errp); 372 } 373 374 375 static void qio_channel_set_aio_fd_handlers(QIOChannel *ioc); 376 377 static void qio_channel_restart_read(void *opaque) 378 { 379 QIOChannel *ioc = opaque; 380 Coroutine *co = ioc->read_coroutine; 381 382 ioc->read_coroutine = NULL; 383 qio_channel_set_aio_fd_handlers(ioc); 384 aio_co_wake(co); 385 } 386 387 static void qio_channel_restart_write(void *opaque) 388 { 389 QIOChannel *ioc = opaque; 390 Coroutine *co = ioc->write_coroutine; 391 392 ioc->write_coroutine = NULL; 393 qio_channel_set_aio_fd_handlers(ioc); 394 aio_co_wake(co); 395 } 396 397 static void qio_channel_set_aio_fd_handlers(QIOChannel *ioc) 398 { 399 IOHandler *rd_handler = NULL, *wr_handler = NULL; 400 AioContext *ctx; 401 402 if (ioc->read_coroutine) { 403 rd_handler = qio_channel_restart_read; 404 } 405 if (ioc->write_coroutine) { 406 wr_handler = qio_channel_restart_write; 407 } 408 409 ctx = ioc->ctx ? ioc->ctx : iohandler_get_aio_context(); 410 qio_channel_set_aio_fd_handler(ioc, ctx, rd_handler, wr_handler, ioc); 411 } 412 413 void qio_channel_attach_aio_context(QIOChannel *ioc, 414 AioContext *ctx) 415 { 416 assert(!ioc->read_coroutine); 417 assert(!ioc->write_coroutine); 418 ioc->ctx = ctx; 419 } 420 421 void qio_channel_detach_aio_context(QIOChannel *ioc) 422 { 423 ioc->read_coroutine = NULL; 424 ioc->write_coroutine = NULL; 425 qio_channel_set_aio_fd_handlers(ioc); 426 ioc->ctx = NULL; 427 } 428 429 void coroutine_fn qio_channel_yield(QIOChannel *ioc, 430 GIOCondition condition) 431 { 432 assert(qemu_in_coroutine()); 433 if (condition == G_IO_IN) { 434 assert(!ioc->read_coroutine); 435 ioc->read_coroutine = qemu_coroutine_self(); 436 } else if (condition == G_IO_OUT) { 437 assert(!ioc->write_coroutine); 438 ioc->write_coroutine = qemu_coroutine_self(); 439 } else { 440 abort(); 441 } 442 qio_channel_set_aio_fd_handlers(ioc); 443 qemu_coroutine_yield(); 444 } 445 446 447 static gboolean qio_channel_wait_complete(QIOChannel *ioc, 448 GIOCondition condition, 449 gpointer opaque) 450 { 451 GMainLoop *loop = opaque; 452 453 g_main_loop_quit(loop); 454 return FALSE; 455 } 456 457 458 void qio_channel_wait(QIOChannel *ioc, 459 GIOCondition condition) 460 { 461 GMainContext *ctxt = g_main_context_new(); 462 GMainLoop *loop = g_main_loop_new(ctxt, TRUE); 463 GSource *source; 464 465 source = qio_channel_create_watch(ioc, condition); 466 467 g_source_set_callback(source, 468 (GSourceFunc)qio_channel_wait_complete, 469 loop, 470 NULL); 471 472 g_source_attach(source, ctxt); 473 474 g_main_loop_run(loop); 475 476 g_source_unref(source); 477 g_main_loop_unref(loop); 478 g_main_context_unref(ctxt); 479 } 480 481 482 static void qio_channel_finalize(Object *obj) 483 { 484 QIOChannel *ioc = QIO_CHANNEL(obj); 485 486 g_free(ioc->name); 487 488 #ifdef _WIN32 489 if (ioc->event) { 490 CloseHandle(ioc->event); 491 } 492 #endif 493 } 494 495 static const TypeInfo qio_channel_info = { 496 .parent = TYPE_OBJECT, 497 .name = TYPE_QIO_CHANNEL, 498 .instance_size = sizeof(QIOChannel), 499 .instance_finalize = qio_channel_finalize, 500 .abstract = true, 501 .class_size = sizeof(QIOChannelClass), 502 }; 503 504 505 static void qio_channel_register_types(void) 506 { 507 type_register_static(&qio_channel_info); 508 } 509 510 511 type_init(qio_channel_register_types); 512