1 /* 2 * QEMU I/O channels 3 * 4 * Copyright (c) 2015 Red Hat, Inc. 5 * 6 * This library is free software; you can redistribute it and/or 7 * modify it under the terms of the GNU Lesser General Public 8 * License as published by the Free Software Foundation; either 9 * version 2 of the License, or (at your option) any later version. 10 * 11 * This library is distributed in the hope that it will be useful, 12 * but WITHOUT ANY WARRANTY; without even the implied warranty of 13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 14 * Lesser General Public License for more details. 15 * 16 * You should have received a copy of the GNU Lesser General Public 17 * License along with this library; if not, see <http://www.gnu.org/licenses/>. 18 * 19 */ 20 21 #include "qemu/osdep.h" 22 #include "io/channel.h" 23 #include "qapi/error.h" 24 #include "qemu/main-loop.h" 25 #include "qemu/iov.h" 26 27 bool qio_channel_has_feature(QIOChannel *ioc, 28 QIOChannelFeature feature) 29 { 30 return ioc->features & (1 << feature); 31 } 32 33 34 void qio_channel_set_feature(QIOChannel *ioc, 35 QIOChannelFeature feature) 36 { 37 ioc->features |= (1 << feature); 38 } 39 40 41 void qio_channel_set_name(QIOChannel *ioc, 42 const char *name) 43 { 44 g_free(ioc->name); 45 ioc->name = g_strdup(name); 46 } 47 48 49 ssize_t qio_channel_readv_full(QIOChannel *ioc, 50 const struct iovec *iov, 51 size_t niov, 52 int **fds, 53 size_t *nfds, 54 Error **errp) 55 { 56 QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc); 57 58 if ((fds || nfds) && 59 !qio_channel_has_feature(ioc, QIO_CHANNEL_FEATURE_FD_PASS)) { 60 error_setg_errno(errp, EINVAL, 61 "Channel does not support file descriptor passing"); 62 return -1; 63 } 64 65 return klass->io_readv(ioc, iov, niov, fds, nfds, errp); 66 } 67 68 69 ssize_t qio_channel_writev_full(QIOChannel *ioc, 70 const struct iovec *iov, 71 size_t niov, 72 int *fds, 73 size_t nfds, 74 Error **errp) 75 { 76 QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc); 77 78 if ((fds || nfds) && 79 !qio_channel_has_feature(ioc, QIO_CHANNEL_FEATURE_FD_PASS)) { 80 error_setg_errno(errp, EINVAL, 81 "Channel does not support file descriptor passing"); 82 return -1; 83 } 84 85 return klass->io_writev(ioc, iov, niov, fds, nfds, errp); 86 } 87 88 89 90 int qio_channel_readv_all(QIOChannel *ioc, 91 const struct iovec *iov, 92 size_t niov, 93 Error **errp) 94 { 95 int ret = -1; 96 struct iovec *local_iov = g_new(struct iovec, niov); 97 struct iovec *local_iov_head = local_iov; 98 unsigned int nlocal_iov = niov; 99 100 nlocal_iov = iov_copy(local_iov, nlocal_iov, 101 iov, niov, 102 0, iov_size(iov, niov)); 103 104 while (nlocal_iov > 0) { 105 ssize_t len; 106 len = qio_channel_readv(ioc, local_iov, nlocal_iov, errp); 107 if (len == QIO_CHANNEL_ERR_BLOCK) { 108 qio_channel_wait(ioc, G_IO_IN); 109 continue; 110 } else if (len < 0) { 111 goto cleanup; 112 } else if (len == 0) { 113 error_setg(errp, 114 "Unexpected end-of-file before all bytes were read"); 115 goto cleanup; 116 } 117 118 iov_discard_front(&local_iov, &nlocal_iov, len); 119 } 120 121 ret = 0; 122 123 cleanup: 124 g_free(local_iov_head); 125 return ret; 126 } 127 128 int qio_channel_writev_all(QIOChannel *ioc, 129 const struct iovec *iov, 130 size_t niov, 131 Error **errp) 132 { 133 int ret = -1; 134 struct iovec *local_iov = g_new(struct iovec, niov); 135 struct iovec *local_iov_head = local_iov; 136 unsigned int nlocal_iov = niov; 137 138 nlocal_iov = iov_copy(local_iov, nlocal_iov, 139 iov, niov, 140 0, iov_size(iov, niov)); 141 142 while (nlocal_iov > 0) { 143 ssize_t len; 144 len = qio_channel_writev(ioc, local_iov, nlocal_iov, errp); 145 if (len == QIO_CHANNEL_ERR_BLOCK) { 146 qio_channel_wait(ioc, G_IO_OUT); 147 continue; 148 } 149 if (len < 0) { 150 goto cleanup; 151 } 152 153 iov_discard_front(&local_iov, &nlocal_iov, len); 154 } 155 156 ret = 0; 157 cleanup: 158 g_free(local_iov_head); 159 return ret; 160 } 161 162 ssize_t qio_channel_readv(QIOChannel *ioc, 163 const struct iovec *iov, 164 size_t niov, 165 Error **errp) 166 { 167 return qio_channel_readv_full(ioc, iov, niov, NULL, NULL, errp); 168 } 169 170 171 ssize_t qio_channel_writev(QIOChannel *ioc, 172 const struct iovec *iov, 173 size_t niov, 174 Error **errp) 175 { 176 return qio_channel_writev_full(ioc, iov, niov, NULL, 0, errp); 177 } 178 179 180 ssize_t qio_channel_read(QIOChannel *ioc, 181 char *buf, 182 size_t buflen, 183 Error **errp) 184 { 185 struct iovec iov = { .iov_base = buf, .iov_len = buflen }; 186 return qio_channel_readv_full(ioc, &iov, 1, NULL, NULL, errp); 187 } 188 189 190 ssize_t qio_channel_write(QIOChannel *ioc, 191 const char *buf, 192 size_t buflen, 193 Error **errp) 194 { 195 struct iovec iov = { .iov_base = (char *)buf, .iov_len = buflen }; 196 return qio_channel_writev_full(ioc, &iov, 1, NULL, 0, errp); 197 } 198 199 200 int qio_channel_read_all(QIOChannel *ioc, 201 char *buf, 202 size_t buflen, 203 Error **errp) 204 { 205 struct iovec iov = { .iov_base = buf, .iov_len = buflen }; 206 return qio_channel_readv_all(ioc, &iov, 1, errp); 207 } 208 209 210 int qio_channel_write_all(QIOChannel *ioc, 211 const char *buf, 212 size_t buflen, 213 Error **errp) 214 { 215 struct iovec iov = { .iov_base = (char *)buf, .iov_len = buflen }; 216 return qio_channel_writev_all(ioc, &iov, 1, errp); 217 } 218 219 220 int qio_channel_set_blocking(QIOChannel *ioc, 221 bool enabled, 222 Error **errp) 223 { 224 QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc); 225 return klass->io_set_blocking(ioc, enabled, errp); 226 } 227 228 229 int qio_channel_close(QIOChannel *ioc, 230 Error **errp) 231 { 232 QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc); 233 return klass->io_close(ioc, errp); 234 } 235 236 237 GSource *qio_channel_create_watch(QIOChannel *ioc, 238 GIOCondition condition) 239 { 240 QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc); 241 GSource *ret = klass->io_create_watch(ioc, condition); 242 243 if (ioc->name) { 244 g_source_set_name(ret, ioc->name); 245 } 246 247 return ret; 248 } 249 250 251 void qio_channel_set_aio_fd_handler(QIOChannel *ioc, 252 AioContext *ctx, 253 IOHandler *io_read, 254 IOHandler *io_write, 255 void *opaque) 256 { 257 QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc); 258 259 klass->io_set_aio_fd_handler(ioc, ctx, io_read, io_write, opaque); 260 } 261 262 guint qio_channel_add_watch(QIOChannel *ioc, 263 GIOCondition condition, 264 QIOChannelFunc func, 265 gpointer user_data, 266 GDestroyNotify notify) 267 { 268 GSource *source; 269 guint id; 270 271 source = qio_channel_create_watch(ioc, condition); 272 273 g_source_set_callback(source, (GSourceFunc)func, user_data, notify); 274 275 id = g_source_attach(source, NULL); 276 g_source_unref(source); 277 278 return id; 279 } 280 281 282 int qio_channel_shutdown(QIOChannel *ioc, 283 QIOChannelShutdown how, 284 Error **errp) 285 { 286 QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc); 287 288 if (!klass->io_shutdown) { 289 error_setg(errp, "Data path shutdown not supported"); 290 return -1; 291 } 292 293 return klass->io_shutdown(ioc, how, errp); 294 } 295 296 297 void qio_channel_set_delay(QIOChannel *ioc, 298 bool enabled) 299 { 300 QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc); 301 302 if (klass->io_set_delay) { 303 klass->io_set_delay(ioc, enabled); 304 } 305 } 306 307 308 void qio_channel_set_cork(QIOChannel *ioc, 309 bool enabled) 310 { 311 QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc); 312 313 if (klass->io_set_cork) { 314 klass->io_set_cork(ioc, enabled); 315 } 316 } 317 318 319 off_t qio_channel_io_seek(QIOChannel *ioc, 320 off_t offset, 321 int whence, 322 Error **errp) 323 { 324 QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc); 325 326 if (!klass->io_seek) { 327 error_setg(errp, "Channel does not support random access"); 328 return -1; 329 } 330 331 return klass->io_seek(ioc, offset, whence, errp); 332 } 333 334 335 static void qio_channel_set_aio_fd_handlers(QIOChannel *ioc); 336 337 static void qio_channel_restart_read(void *opaque) 338 { 339 QIOChannel *ioc = opaque; 340 Coroutine *co = ioc->read_coroutine; 341 342 ioc->read_coroutine = NULL; 343 qio_channel_set_aio_fd_handlers(ioc); 344 aio_co_wake(co); 345 } 346 347 static void qio_channel_restart_write(void *opaque) 348 { 349 QIOChannel *ioc = opaque; 350 Coroutine *co = ioc->write_coroutine; 351 352 ioc->write_coroutine = NULL; 353 qio_channel_set_aio_fd_handlers(ioc); 354 aio_co_wake(co); 355 } 356 357 static void qio_channel_set_aio_fd_handlers(QIOChannel *ioc) 358 { 359 IOHandler *rd_handler = NULL, *wr_handler = NULL; 360 AioContext *ctx; 361 362 if (ioc->read_coroutine) { 363 rd_handler = qio_channel_restart_read; 364 } 365 if (ioc->write_coroutine) { 366 wr_handler = qio_channel_restart_write; 367 } 368 369 ctx = ioc->ctx ? ioc->ctx : iohandler_get_aio_context(); 370 qio_channel_set_aio_fd_handler(ioc, ctx, rd_handler, wr_handler, ioc); 371 } 372 373 void qio_channel_attach_aio_context(QIOChannel *ioc, 374 AioContext *ctx) 375 { 376 assert(!ioc->read_coroutine); 377 assert(!ioc->write_coroutine); 378 ioc->ctx = ctx; 379 } 380 381 void qio_channel_detach_aio_context(QIOChannel *ioc) 382 { 383 ioc->read_coroutine = NULL; 384 ioc->write_coroutine = NULL; 385 qio_channel_set_aio_fd_handlers(ioc); 386 ioc->ctx = NULL; 387 } 388 389 void coroutine_fn qio_channel_yield(QIOChannel *ioc, 390 GIOCondition condition) 391 { 392 assert(qemu_in_coroutine()); 393 if (condition == G_IO_IN) { 394 assert(!ioc->read_coroutine); 395 ioc->read_coroutine = qemu_coroutine_self(); 396 } else if (condition == G_IO_OUT) { 397 assert(!ioc->write_coroutine); 398 ioc->write_coroutine = qemu_coroutine_self(); 399 } else { 400 abort(); 401 } 402 qio_channel_set_aio_fd_handlers(ioc); 403 qemu_coroutine_yield(); 404 } 405 406 407 static gboolean qio_channel_wait_complete(QIOChannel *ioc, 408 GIOCondition condition, 409 gpointer opaque) 410 { 411 GMainLoop *loop = opaque; 412 413 g_main_loop_quit(loop); 414 return FALSE; 415 } 416 417 418 void qio_channel_wait(QIOChannel *ioc, 419 GIOCondition condition) 420 { 421 GMainContext *ctxt = g_main_context_new(); 422 GMainLoop *loop = g_main_loop_new(ctxt, TRUE); 423 GSource *source; 424 425 source = qio_channel_create_watch(ioc, condition); 426 427 g_source_set_callback(source, 428 (GSourceFunc)qio_channel_wait_complete, 429 loop, 430 NULL); 431 432 g_source_attach(source, ctxt); 433 434 g_main_loop_run(loop); 435 436 g_source_unref(source); 437 g_main_loop_unref(loop); 438 g_main_context_unref(ctxt); 439 } 440 441 442 static void qio_channel_finalize(Object *obj) 443 { 444 QIOChannel *ioc = QIO_CHANNEL(obj); 445 446 g_free(ioc->name); 447 448 #ifdef _WIN32 449 if (ioc->event) { 450 CloseHandle(ioc->event); 451 } 452 #endif 453 } 454 455 static const TypeInfo qio_channel_info = { 456 .parent = TYPE_OBJECT, 457 .name = TYPE_QIO_CHANNEL, 458 .instance_size = sizeof(QIOChannel), 459 .instance_finalize = qio_channel_finalize, 460 .abstract = true, 461 .class_size = sizeof(QIOChannelClass), 462 }; 463 464 465 static void qio_channel_register_types(void) 466 { 467 type_register_static(&qio_channel_info); 468 } 469 470 471 type_init(qio_channel_register_types); 472