1 /* 2 * QEMU aio implementation 3 * 4 * Copyright IBM Corp., 2008 5 * Copyright Red Hat Inc., 2012 6 * 7 * Authors: 8 * Anthony Liguori <aliguori@us.ibm.com> 9 * Paolo Bonzini <pbonzini@redhat.com> 10 * 11 * This work is licensed under the terms of the GNU GPL, version 2. See 12 * the COPYING file in the top-level directory. 13 * 14 * Contributions after 2012-01-13 are licensed under the terms of the 15 * GNU GPL, version 2 or (at your option) any later version. 16 */ 17 18 #include "qemu/osdep.h" 19 #include "qemu-common.h" 20 #include "block/block.h" 21 #include "qemu/queue.h" 22 #include "qemu/sockets.h" 23 #include "qapi/error.h" 24 #include "qemu/rcu_queue.h" 25 26 struct AioHandler { 27 EventNotifier *e; 28 IOHandler *io_read; 29 IOHandler *io_write; 30 EventNotifierHandler *io_notify; 31 GPollFD pfd; 32 int deleted; 33 void *opaque; 34 bool is_external; 35 QLIST_ENTRY(AioHandler) node; 36 }; 37 38 void aio_set_fd_handler(AioContext *ctx, 39 int fd, 40 bool is_external, 41 IOHandler *io_read, 42 IOHandler *io_write, 43 AioPollFn *io_poll, 44 void *opaque) 45 { 46 /* fd is a SOCKET in our case */ 47 AioHandler *node; 48 49 qemu_lockcnt_lock(&ctx->list_lock); 50 QLIST_FOREACH(node, &ctx->aio_handlers, node) { 51 if (node->pfd.fd == fd && !node->deleted) { 52 break; 53 } 54 } 55 56 /* Are we deleting the fd handler? */ 57 if (!io_read && !io_write) { 58 if (node) { 59 /* If aio_poll is in progress, just mark the node as deleted */ 60 if (qemu_lockcnt_count(&ctx->list_lock)) { 61 node->deleted = 1; 62 node->pfd.revents = 0; 63 } else { 64 /* Otherwise, delete it for real. We can't just mark it as 65 * deleted because deleted nodes are only cleaned up after 66 * releasing the list_lock. 67 */ 68 QLIST_REMOVE(node, node); 69 g_free(node); 70 } 71 } 72 } else { 73 HANDLE event; 74 long bitmask = 0; 75 76 if (node == NULL) { 77 /* Alloc and insert if it's not already there */ 78 node = g_new0(AioHandler, 1); 79 node->pfd.fd = fd; 80 QLIST_INSERT_HEAD_RCU(&ctx->aio_handlers, node, node); 81 } 82 83 node->pfd.events = 0; 84 if (node->io_read) { 85 node->pfd.events |= G_IO_IN; 86 } 87 if (node->io_write) { 88 node->pfd.events |= G_IO_OUT; 89 } 90 91 node->e = &ctx->notifier; 92 93 /* Update handler with latest information */ 94 node->opaque = opaque; 95 node->io_read = io_read; 96 node->io_write = io_write; 97 node->is_external = is_external; 98 99 if (io_read) { 100 bitmask |= FD_READ | FD_ACCEPT | FD_CLOSE; 101 } 102 103 if (io_write) { 104 bitmask |= FD_WRITE | FD_CONNECT; 105 } 106 107 event = event_notifier_get_handle(&ctx->notifier); 108 WSAEventSelect(node->pfd.fd, event, bitmask); 109 } 110 111 qemu_lockcnt_unlock(&ctx->list_lock); 112 aio_notify(ctx); 113 } 114 115 void aio_set_fd_poll(AioContext *ctx, int fd, 116 IOHandler *io_poll_begin, 117 IOHandler *io_poll_end) 118 { 119 /* Not implemented */ 120 } 121 122 void aio_set_event_notifier(AioContext *ctx, 123 EventNotifier *e, 124 bool is_external, 125 EventNotifierHandler *io_notify, 126 AioPollFn *io_poll) 127 { 128 AioHandler *node; 129 130 qemu_lockcnt_lock(&ctx->list_lock); 131 QLIST_FOREACH(node, &ctx->aio_handlers, node) { 132 if (node->e == e && !node->deleted) { 133 break; 134 } 135 } 136 137 /* Are we deleting the fd handler? */ 138 if (!io_notify) { 139 if (node) { 140 g_source_remove_poll(&ctx->source, &node->pfd); 141 142 /* aio_poll is in progress, just mark the node as deleted */ 143 if (qemu_lockcnt_count(&ctx->list_lock)) { 144 node->deleted = 1; 145 node->pfd.revents = 0; 146 } else { 147 /* Otherwise, delete it for real. We can't just mark it as 148 * deleted because deleted nodes are only cleaned up after 149 * releasing the list_lock. 150 */ 151 QLIST_REMOVE(node, node); 152 g_free(node); 153 } 154 } 155 } else { 156 if (node == NULL) { 157 /* Alloc and insert if it's not already there */ 158 node = g_new0(AioHandler, 1); 159 node->e = e; 160 node->pfd.fd = (uintptr_t)event_notifier_get_handle(e); 161 node->pfd.events = G_IO_IN; 162 node->is_external = is_external; 163 QLIST_INSERT_HEAD_RCU(&ctx->aio_handlers, node, node); 164 165 g_source_add_poll(&ctx->source, &node->pfd); 166 } 167 /* Update handler with latest information */ 168 node->io_notify = io_notify; 169 } 170 171 qemu_lockcnt_unlock(&ctx->list_lock); 172 aio_notify(ctx); 173 } 174 175 void aio_set_event_notifier_poll(AioContext *ctx, 176 EventNotifier *notifier, 177 EventNotifierHandler *io_poll_begin, 178 EventNotifierHandler *io_poll_end) 179 { 180 /* Not implemented */ 181 } 182 183 bool aio_prepare(AioContext *ctx) 184 { 185 static struct timeval tv0; 186 AioHandler *node; 187 bool have_select_revents = false; 188 fd_set rfds, wfds; 189 190 /* 191 * We have to walk very carefully in case aio_set_fd_handler is 192 * called while we're walking. 193 */ 194 qemu_lockcnt_inc(&ctx->list_lock); 195 196 /* fill fd sets */ 197 FD_ZERO(&rfds); 198 FD_ZERO(&wfds); 199 QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) { 200 if (node->io_read) { 201 FD_SET ((SOCKET)node->pfd.fd, &rfds); 202 } 203 if (node->io_write) { 204 FD_SET ((SOCKET)node->pfd.fd, &wfds); 205 } 206 } 207 208 if (select(0, &rfds, &wfds, NULL, &tv0) > 0) { 209 QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) { 210 node->pfd.revents = 0; 211 if (FD_ISSET(node->pfd.fd, &rfds)) { 212 node->pfd.revents |= G_IO_IN; 213 have_select_revents = true; 214 } 215 216 if (FD_ISSET(node->pfd.fd, &wfds)) { 217 node->pfd.revents |= G_IO_OUT; 218 have_select_revents = true; 219 } 220 } 221 } 222 223 qemu_lockcnt_dec(&ctx->list_lock); 224 return have_select_revents; 225 } 226 227 bool aio_pending(AioContext *ctx) 228 { 229 AioHandler *node; 230 bool result = false; 231 232 /* 233 * We have to walk very carefully in case aio_set_fd_handler is 234 * called while we're walking. 235 */ 236 qemu_lockcnt_inc(&ctx->list_lock); 237 QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) { 238 if (node->pfd.revents && node->io_notify) { 239 result = true; 240 break; 241 } 242 243 if ((node->pfd.revents & G_IO_IN) && node->io_read) { 244 result = true; 245 break; 246 } 247 if ((node->pfd.revents & G_IO_OUT) && node->io_write) { 248 result = true; 249 break; 250 } 251 } 252 253 qemu_lockcnt_dec(&ctx->list_lock); 254 return result; 255 } 256 257 static bool aio_dispatch_handlers(AioContext *ctx, HANDLE event) 258 { 259 AioHandler *node; 260 bool progress = false; 261 AioHandler *tmp; 262 263 /* 264 * We have to walk very carefully in case aio_set_fd_handler is 265 * called while we're walking. 266 */ 267 QLIST_FOREACH_SAFE_RCU(node, &ctx->aio_handlers, node, tmp) { 268 int revents = node->pfd.revents; 269 270 if (!node->deleted && 271 (revents || event_notifier_get_handle(node->e) == event) && 272 node->io_notify) { 273 node->pfd.revents = 0; 274 node->io_notify(node->e); 275 276 /* aio_notify() does not count as progress */ 277 if (node->e != &ctx->notifier) { 278 progress = true; 279 } 280 } 281 282 if (!node->deleted && 283 (node->io_read || node->io_write)) { 284 node->pfd.revents = 0; 285 if ((revents & G_IO_IN) && node->io_read) { 286 node->io_read(node->opaque); 287 progress = true; 288 } 289 if ((revents & G_IO_OUT) && node->io_write) { 290 node->io_write(node->opaque); 291 progress = true; 292 } 293 294 /* if the next select() will return an event, we have progressed */ 295 if (event == event_notifier_get_handle(&ctx->notifier)) { 296 WSANETWORKEVENTS ev; 297 WSAEnumNetworkEvents(node->pfd.fd, event, &ev); 298 if (ev.lNetworkEvents) { 299 progress = true; 300 } 301 } 302 } 303 304 if (node->deleted) { 305 if (qemu_lockcnt_dec_if_lock(&ctx->list_lock)) { 306 QLIST_REMOVE(node, node); 307 g_free(node); 308 qemu_lockcnt_inc_and_unlock(&ctx->list_lock); 309 } 310 } 311 } 312 313 return progress; 314 } 315 316 void aio_dispatch(AioContext *ctx) 317 { 318 qemu_lockcnt_inc(&ctx->list_lock); 319 aio_bh_poll(ctx); 320 aio_dispatch_handlers(ctx, INVALID_HANDLE_VALUE); 321 qemu_lockcnt_dec(&ctx->list_lock); 322 timerlistgroup_run_timers(&ctx->tlg); 323 } 324 325 bool aio_poll(AioContext *ctx, bool blocking) 326 { 327 AioHandler *node; 328 HANDLE events[MAXIMUM_WAIT_OBJECTS + 1]; 329 bool progress, have_select_revents, first; 330 int count; 331 int timeout; 332 333 progress = false; 334 335 /* aio_notify can avoid the expensive event_notifier_set if 336 * everything (file descriptors, bottom halves, timers) will 337 * be re-evaluated before the next blocking poll(). This is 338 * already true when aio_poll is called with blocking == false; 339 * if blocking == true, it is only true after poll() returns, 340 * so disable the optimization now. 341 */ 342 if (blocking) { 343 atomic_add(&ctx->notify_me, 2); 344 } 345 346 qemu_lockcnt_inc(&ctx->list_lock); 347 have_select_revents = aio_prepare(ctx); 348 349 /* fill fd sets */ 350 count = 0; 351 QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) { 352 if (!node->deleted && node->io_notify 353 && aio_node_check(ctx, node->is_external)) { 354 events[count++] = event_notifier_get_handle(node->e); 355 } 356 } 357 358 first = true; 359 360 /* ctx->notifier is always registered. */ 361 assert(count > 0); 362 363 /* Multiple iterations, all of them non-blocking except the first, 364 * may be necessary to process all pending events. After the first 365 * WaitForMultipleObjects call ctx->notify_me will be decremented. 366 */ 367 do { 368 HANDLE event; 369 int ret; 370 371 timeout = blocking && !have_select_revents 372 ? qemu_timeout_ns_to_ms(aio_compute_timeout(ctx)) : 0; 373 ret = WaitForMultipleObjects(count, events, FALSE, timeout); 374 if (blocking) { 375 assert(first); 376 assert(in_aio_context_home_thread(ctx)); 377 atomic_sub(&ctx->notify_me, 2); 378 aio_notify_accept(ctx); 379 } 380 381 if (first) { 382 progress |= aio_bh_poll(ctx); 383 first = false; 384 } 385 386 /* if we have any signaled events, dispatch event */ 387 event = NULL; 388 if ((DWORD) (ret - WAIT_OBJECT_0) < count) { 389 event = events[ret - WAIT_OBJECT_0]; 390 events[ret - WAIT_OBJECT_0] = events[--count]; 391 } else if (!have_select_revents) { 392 break; 393 } 394 395 have_select_revents = false; 396 blocking = false; 397 398 progress |= aio_dispatch_handlers(ctx, event); 399 } while (count > 0); 400 401 qemu_lockcnt_dec(&ctx->list_lock); 402 403 progress |= timerlistgroup_run_timers(&ctx->tlg); 404 return progress; 405 } 406 407 void aio_context_setup(AioContext *ctx) 408 { 409 } 410 411 void aio_context_destroy(AioContext *ctx) 412 { 413 } 414 415 void aio_context_set_poll_params(AioContext *ctx, int64_t max_ns, 416 int64_t grow, int64_t shrink, Error **errp) 417 { 418 if (max_ns) { 419 error_setg(errp, "AioContext polling is not implemented on Windows"); 420 } 421 } 422