1 /* 2 * QEMU aio implementation 3 * 4 * Copyright IBM Corp., 2008 5 * Copyright Red Hat Inc., 2012 6 * 7 * Authors: 8 * Anthony Liguori <aliguori@us.ibm.com> 9 * Paolo Bonzini <pbonzini@redhat.com> 10 * 11 * This work is licensed under the terms of the GNU GPL, version 2. See 12 * the COPYING file in the top-level directory. 13 * 14 * Contributions after 2012-01-13 are licensed under the terms of the 15 * GNU GPL, version 2 or (at your option) any later version. 16 */ 17 18 #include "qemu/osdep.h" 19 #include "block/block.h" 20 #include "qemu/main-loop.h" 21 #include "qemu/lockcnt.h" 22 #include "qemu/queue.h" 23 #include "qemu/sockets.h" 24 #include "qapi/error.h" 25 #include "qemu/rcu_queue.h" 26 #include "qemu/error-report.h" 27 28 struct AioHandler { 29 EventNotifier *e; 30 IOHandler *io_read; 31 IOHandler *io_write; 32 EventNotifierHandler *io_notify; 33 GPollFD pfd; 34 int deleted; 35 void *opaque; 36 QLIST_ENTRY(AioHandler) node; 37 }; 38 39 static void aio_remove_fd_handler(AioContext *ctx, AioHandler *node) 40 { 41 /* 42 * If the GSource is in the process of being destroyed then 43 * g_source_remove_poll() causes an assertion failure. Skip 44 * removal in that case, because glib cleans up its state during 45 * destruction anyway. 46 */ 47 if (!g_source_is_destroyed(&ctx->source)) { 48 g_source_remove_poll(&ctx->source, &node->pfd); 49 } 50 51 /* If aio_poll is in progress, just mark the node as deleted */ 52 if (qemu_lockcnt_count(&ctx->list_lock)) { 53 node->deleted = 1; 54 node->pfd.revents = 0; 55 } else { 56 /* Otherwise, delete it for real. We can't just mark it as 57 * deleted because deleted nodes are only cleaned up after 58 * releasing the list_lock. 59 */ 60 QLIST_REMOVE(node, node); 61 g_free(node); 62 } 63 } 64 65 void aio_set_fd_handler(AioContext *ctx, 66 int fd, 67 IOHandler *io_read, 68 IOHandler *io_write, 69 AioPollFn *io_poll, 70 IOHandler *io_poll_ready, 71 void *opaque) 72 { 73 AioHandler *old_node; 74 AioHandler *node = NULL; 75 SOCKET s; 76 77 if (!fd_is_socket(fd)) { 78 error_report("fd=%d is not a socket, AIO implementation is missing", fd); 79 return; 80 } 81 82 s = _get_osfhandle(fd); 83 84 qemu_lockcnt_lock(&ctx->list_lock); 85 QLIST_FOREACH(old_node, &ctx->aio_handlers, node) { 86 if (old_node->pfd.fd == s && !old_node->deleted) { 87 break; 88 } 89 } 90 91 if (io_read || io_write) { 92 HANDLE event; 93 long bitmask = 0; 94 95 /* Alloc and insert if it's not already there */ 96 node = g_new0(AioHandler, 1); 97 node->pfd.fd = s; 98 99 node->pfd.events = 0; 100 if (node->io_read) { 101 node->pfd.events |= G_IO_IN; 102 } 103 if (node->io_write) { 104 node->pfd.events |= G_IO_OUT; 105 } 106 107 node->e = &ctx->notifier; 108 109 /* Update handler with latest information */ 110 node->opaque = opaque; 111 node->io_read = io_read; 112 node->io_write = io_write; 113 114 if (io_read) { 115 bitmask |= FD_READ | FD_ACCEPT | FD_CLOSE; 116 } 117 118 if (io_write) { 119 bitmask |= FD_WRITE | FD_CONNECT; 120 } 121 122 QLIST_INSERT_HEAD_RCU(&ctx->aio_handlers, node, node); 123 event = event_notifier_get_handle(&ctx->notifier); 124 qemu_socket_select(fd, event, bitmask, NULL); 125 } 126 if (old_node) { 127 aio_remove_fd_handler(ctx, old_node); 128 } 129 130 qemu_lockcnt_unlock(&ctx->list_lock); 131 aio_notify(ctx); 132 } 133 134 void aio_set_event_notifier(AioContext *ctx, 135 EventNotifier *e, 136 EventNotifierHandler *io_notify, 137 AioPollFn *io_poll, 138 EventNotifierHandler *io_poll_ready) 139 { 140 AioHandler *node; 141 142 qemu_lockcnt_lock(&ctx->list_lock); 143 QLIST_FOREACH(node, &ctx->aio_handlers, node) { 144 if (node->e == e && !node->deleted) { 145 break; 146 } 147 } 148 149 /* Are we deleting the fd handler? */ 150 if (!io_notify) { 151 if (node) { 152 aio_remove_fd_handler(ctx, node); 153 } 154 } else { 155 if (node == NULL) { 156 /* Alloc and insert if it's not already there */ 157 node = g_new0(AioHandler, 1); 158 node->e = e; 159 node->pfd.fd = (uintptr_t)event_notifier_get_handle(e); 160 node->pfd.events = G_IO_IN; 161 QLIST_INSERT_HEAD_RCU(&ctx->aio_handlers, node, node); 162 163 g_source_add_poll(&ctx->source, &node->pfd); 164 } 165 /* Update handler with latest information */ 166 node->io_notify = io_notify; 167 } 168 169 qemu_lockcnt_unlock(&ctx->list_lock); 170 aio_notify(ctx); 171 } 172 173 void aio_set_event_notifier_poll(AioContext *ctx, 174 EventNotifier *notifier, 175 EventNotifierHandler *io_poll_begin, 176 EventNotifierHandler *io_poll_end) 177 { 178 /* Not implemented */ 179 } 180 181 bool aio_prepare(AioContext *ctx) 182 { 183 static struct timeval tv0; 184 AioHandler *node; 185 bool have_select_revents = false; 186 fd_set rfds, wfds; 187 188 /* 189 * We have to walk very carefully in case aio_set_fd_handler is 190 * called while we're walking. 191 */ 192 qemu_lockcnt_inc(&ctx->list_lock); 193 194 /* fill fd sets */ 195 FD_ZERO(&rfds); 196 FD_ZERO(&wfds); 197 QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) { 198 if (node->io_read) { 199 FD_SET ((SOCKET)node->pfd.fd, &rfds); 200 } 201 if (node->io_write) { 202 FD_SET ((SOCKET)node->pfd.fd, &wfds); 203 } 204 } 205 206 if (select(0, &rfds, &wfds, NULL, &tv0) > 0) { 207 QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) { 208 node->pfd.revents = 0; 209 if (FD_ISSET(node->pfd.fd, &rfds)) { 210 node->pfd.revents |= G_IO_IN; 211 have_select_revents = true; 212 } 213 214 if (FD_ISSET(node->pfd.fd, &wfds)) { 215 node->pfd.revents |= G_IO_OUT; 216 have_select_revents = true; 217 } 218 } 219 } 220 221 qemu_lockcnt_dec(&ctx->list_lock); 222 return have_select_revents; 223 } 224 225 bool aio_pending(AioContext *ctx) 226 { 227 AioHandler *node; 228 bool result = false; 229 230 /* 231 * We have to walk very carefully in case aio_set_fd_handler is 232 * called while we're walking. 233 */ 234 qemu_lockcnt_inc(&ctx->list_lock); 235 QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) { 236 if (node->pfd.revents && node->io_notify) { 237 result = true; 238 break; 239 } 240 241 if ((node->pfd.revents & G_IO_IN) && node->io_read) { 242 result = true; 243 break; 244 } 245 if ((node->pfd.revents & G_IO_OUT) && node->io_write) { 246 result = true; 247 break; 248 } 249 } 250 251 qemu_lockcnt_dec(&ctx->list_lock); 252 return result; 253 } 254 255 static bool aio_dispatch_handlers(AioContext *ctx, HANDLE event) 256 { 257 AioHandler *node; 258 bool progress = false; 259 AioHandler *tmp; 260 261 /* 262 * We have to walk very carefully in case aio_set_fd_handler is 263 * called while we're walking. 264 */ 265 QLIST_FOREACH_SAFE_RCU(node, &ctx->aio_handlers, node, tmp) { 266 int revents = node->pfd.revents; 267 268 if (!node->deleted && 269 (revents || event_notifier_get_handle(node->e) == event) && 270 node->io_notify) { 271 node->pfd.revents = 0; 272 node->io_notify(node->e); 273 274 /* aio_notify() does not count as progress */ 275 if (node->e != &ctx->notifier) { 276 progress = true; 277 } 278 } 279 280 if (!node->deleted && 281 (node->io_read || node->io_write)) { 282 node->pfd.revents = 0; 283 if ((revents & G_IO_IN) && node->io_read) { 284 node->io_read(node->opaque); 285 progress = true; 286 } 287 if ((revents & G_IO_OUT) && node->io_write) { 288 node->io_write(node->opaque); 289 progress = true; 290 } 291 292 /* if the next select() will return an event, we have progressed */ 293 if (event == event_notifier_get_handle(&ctx->notifier)) { 294 WSANETWORKEVENTS ev; 295 WSAEnumNetworkEvents(node->pfd.fd, event, &ev); 296 if (ev.lNetworkEvents) { 297 progress = true; 298 } 299 } 300 } 301 302 if (node->deleted) { 303 if (qemu_lockcnt_dec_if_lock(&ctx->list_lock)) { 304 QLIST_REMOVE(node, node); 305 g_free(node); 306 qemu_lockcnt_inc_and_unlock(&ctx->list_lock); 307 } 308 } 309 } 310 311 return progress; 312 } 313 314 void aio_dispatch(AioContext *ctx) 315 { 316 qemu_lockcnt_inc(&ctx->list_lock); 317 aio_bh_poll(ctx); 318 aio_dispatch_handlers(ctx, INVALID_HANDLE_VALUE); 319 qemu_lockcnt_dec(&ctx->list_lock); 320 timerlistgroup_run_timers(&ctx->tlg); 321 } 322 323 bool aio_poll(AioContext *ctx, bool blocking) 324 { 325 AioHandler *node; 326 HANDLE events[MAXIMUM_WAIT_OBJECTS]; 327 bool progress, have_select_revents, first; 328 unsigned count; 329 int timeout; 330 331 /* 332 * There cannot be two concurrent aio_poll calls for the same AioContext (or 333 * an aio_poll concurrent with a GSource prepare/check/dispatch callback). 334 * We rely on this below to avoid slow locked accesses to ctx->notify_me. 335 * 336 * aio_poll() may only be called in the AioContext's thread. iohandler_ctx 337 * is special in that it runs in the main thread, but that thread's context 338 * is qemu_aio_context. 339 */ 340 assert(in_aio_context_home_thread(ctx == iohandler_get_aio_context() ? 341 qemu_get_aio_context() : ctx)); 342 progress = false; 343 344 /* aio_notify can avoid the expensive event_notifier_set if 345 * everything (file descriptors, bottom halves, timers) will 346 * be re-evaluated before the next blocking poll(). This is 347 * already true when aio_poll is called with blocking == false; 348 * if blocking == true, it is only true after poll() returns, 349 * so disable the optimization now. 350 */ 351 if (blocking) { 352 qatomic_set(&ctx->notify_me, qatomic_read(&ctx->notify_me) + 2); 353 /* 354 * Write ctx->notify_me before computing the timeout 355 * (reading bottom half flags, etc.). Pairs with 356 * smp_mb in aio_notify(). 357 */ 358 smp_mb(); 359 } 360 361 qemu_lockcnt_inc(&ctx->list_lock); 362 have_select_revents = aio_prepare(ctx); 363 364 /* fill fd sets */ 365 count = 0; 366 QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) { 367 if (!node->deleted && node->io_notify) { 368 assert(count < MAXIMUM_WAIT_OBJECTS); 369 events[count++] = event_notifier_get_handle(node->e); 370 } 371 } 372 373 first = true; 374 375 /* ctx->notifier is always registered. */ 376 assert(count > 0); 377 378 /* Multiple iterations, all of them non-blocking except the first, 379 * may be necessary to process all pending events. After the first 380 * WaitForMultipleObjects call ctx->notify_me will be decremented. 381 */ 382 do { 383 HANDLE event; 384 int ret; 385 386 timeout = blocking && !have_select_revents 387 ? qemu_timeout_ns_to_ms(aio_compute_timeout(ctx)) : 0; 388 ret = WaitForMultipleObjects(count, events, FALSE, timeout); 389 if (blocking) { 390 assert(first); 391 qatomic_store_release(&ctx->notify_me, 392 qatomic_read(&ctx->notify_me) - 2); 393 aio_notify_accept(ctx); 394 } 395 396 if (first) { 397 progress |= aio_bh_poll(ctx); 398 first = false; 399 } 400 401 /* if we have any signaled events, dispatch event */ 402 event = NULL; 403 if ((DWORD) (ret - WAIT_OBJECT_0) < count) { 404 event = events[ret - WAIT_OBJECT_0]; 405 events[ret - WAIT_OBJECT_0] = events[--count]; 406 } else if (!have_select_revents) { 407 break; 408 } 409 410 have_select_revents = false; 411 blocking = false; 412 413 progress |= aio_dispatch_handlers(ctx, event); 414 } while (count > 0); 415 416 qemu_lockcnt_dec(&ctx->list_lock); 417 418 progress |= timerlistgroup_run_timers(&ctx->tlg); 419 return progress; 420 } 421 422 void aio_context_setup(AioContext *ctx) 423 { 424 } 425 426 void aio_context_destroy(AioContext *ctx) 427 { 428 } 429 430 void aio_context_use_g_source(AioContext *ctx) 431 { 432 } 433 434 void aio_context_set_poll_params(AioContext *ctx, int64_t max_ns, 435 int64_t grow, int64_t shrink, Error **errp) 436 { 437 if (max_ns) { 438 error_setg(errp, "AioContext polling is not implemented on Windows"); 439 } 440 } 441 442 void aio_context_set_aio_params(AioContext *ctx, int64_t max_batch) 443 { 444 } 445