1 /* 2 * QEMU aio implementation 3 * 4 * Copyright IBM Corp., 2008 5 * Copyright Red Hat Inc., 2012 6 * 7 * Authors: 8 * Anthony Liguori <aliguori@us.ibm.com> 9 * Paolo Bonzini <pbonzini@redhat.com> 10 * 11 * This work is licensed under the terms of the GNU GPL, version 2. See 12 * the COPYING file in the top-level directory. 13 * 14 * Contributions after 2012-01-13 are licensed under the terms of the 15 * GNU GPL, version 2 or (at your option) any later version. 16 */ 17 18 #include "qemu/osdep.h" 19 #include "qemu-common.h" 20 #include "block/block.h" 21 #include "qemu/queue.h" 22 #include "qemu/sockets.h" 23 #include "qapi/error.h" 24 #include "qemu/rcu_queue.h" 25 26 struct AioHandler { 27 EventNotifier *e; 28 IOHandler *io_read; 29 IOHandler *io_write; 30 EventNotifierHandler *io_notify; 31 GPollFD pfd; 32 int deleted; 33 void *opaque; 34 bool is_external; 35 QLIST_ENTRY(AioHandler) node; 36 }; 37 38 void aio_set_fd_handler(AioContext *ctx, 39 int fd, 40 bool is_external, 41 IOHandler *io_read, 42 IOHandler *io_write, 43 AioPollFn *io_poll, 44 void *opaque) 45 { 46 /* fd is a SOCKET in our case */ 47 AioHandler *node; 48 49 qemu_lockcnt_lock(&ctx->list_lock); 50 QLIST_FOREACH(node, &ctx->aio_handlers, node) { 51 if (node->pfd.fd == fd && !node->deleted) { 52 break; 53 } 54 } 55 56 /* Are we deleting the fd handler? */ 57 if (!io_read && !io_write) { 58 if (node) { 59 /* If aio_poll is in progress, just mark the node as deleted */ 60 if (qemu_lockcnt_count(&ctx->list_lock)) { 61 node->deleted = 1; 62 node->pfd.revents = 0; 63 } else { 64 /* Otherwise, delete it for real. We can't just mark it as 65 * deleted because deleted nodes are only cleaned up after 66 * releasing the list_lock. 67 */ 68 QLIST_REMOVE(node, node); 69 g_free(node); 70 } 71 } 72 } else { 73 HANDLE event; 74 75 if (node == NULL) { 76 /* Alloc and insert if it's not already there */ 77 node = g_new0(AioHandler, 1); 78 node->pfd.fd = fd; 79 QLIST_INSERT_HEAD_RCU(&ctx->aio_handlers, node, node); 80 } 81 82 node->pfd.events = 0; 83 if (node->io_read) { 84 node->pfd.events |= G_IO_IN; 85 } 86 if (node->io_write) { 87 node->pfd.events |= G_IO_OUT; 88 } 89 90 node->e = &ctx->notifier; 91 92 /* Update handler with latest information */ 93 node->opaque = opaque; 94 node->io_read = io_read; 95 node->io_write = io_write; 96 node->is_external = is_external; 97 98 event = event_notifier_get_handle(&ctx->notifier); 99 WSAEventSelect(node->pfd.fd, event, 100 FD_READ | FD_ACCEPT | FD_CLOSE | 101 FD_CONNECT | FD_WRITE | FD_OOB); 102 } 103 104 qemu_lockcnt_unlock(&ctx->list_lock); 105 aio_notify(ctx); 106 } 107 108 void aio_set_fd_poll(AioContext *ctx, int fd, 109 IOHandler *io_poll_begin, 110 IOHandler *io_poll_end) 111 { 112 /* Not implemented */ 113 } 114 115 void aio_set_event_notifier(AioContext *ctx, 116 EventNotifier *e, 117 bool is_external, 118 EventNotifierHandler *io_notify, 119 AioPollFn *io_poll) 120 { 121 AioHandler *node; 122 123 qemu_lockcnt_lock(&ctx->list_lock); 124 QLIST_FOREACH(node, &ctx->aio_handlers, node) { 125 if (node->e == e && !node->deleted) { 126 break; 127 } 128 } 129 130 /* Are we deleting the fd handler? */ 131 if (!io_notify) { 132 if (node) { 133 g_source_remove_poll(&ctx->source, &node->pfd); 134 135 /* aio_poll is in progress, just mark the node as deleted */ 136 if (qemu_lockcnt_count(&ctx->list_lock)) { 137 node->deleted = 1; 138 node->pfd.revents = 0; 139 } else { 140 /* Otherwise, delete it for real. We can't just mark it as 141 * deleted because deleted nodes are only cleaned up after 142 * releasing the list_lock. 143 */ 144 QLIST_REMOVE(node, node); 145 g_free(node); 146 } 147 } 148 } else { 149 if (node == NULL) { 150 /* Alloc and insert if it's not already there */ 151 node = g_new0(AioHandler, 1); 152 node->e = e; 153 node->pfd.fd = (uintptr_t)event_notifier_get_handle(e); 154 node->pfd.events = G_IO_IN; 155 node->is_external = is_external; 156 QLIST_INSERT_HEAD_RCU(&ctx->aio_handlers, node, node); 157 158 g_source_add_poll(&ctx->source, &node->pfd); 159 } 160 /* Update handler with latest information */ 161 node->io_notify = io_notify; 162 } 163 164 qemu_lockcnt_unlock(&ctx->list_lock); 165 aio_notify(ctx); 166 } 167 168 void aio_set_event_notifier_poll(AioContext *ctx, 169 EventNotifier *notifier, 170 EventNotifierHandler *io_poll_begin, 171 EventNotifierHandler *io_poll_end) 172 { 173 /* Not implemented */ 174 } 175 176 bool aio_prepare(AioContext *ctx) 177 { 178 static struct timeval tv0; 179 AioHandler *node; 180 bool have_select_revents = false; 181 fd_set rfds, wfds; 182 183 /* 184 * We have to walk very carefully in case aio_set_fd_handler is 185 * called while we're walking. 186 */ 187 qemu_lockcnt_inc(&ctx->list_lock); 188 189 /* fill fd sets */ 190 FD_ZERO(&rfds); 191 FD_ZERO(&wfds); 192 QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) { 193 if (node->io_read) { 194 FD_SET ((SOCKET)node->pfd.fd, &rfds); 195 } 196 if (node->io_write) { 197 FD_SET ((SOCKET)node->pfd.fd, &wfds); 198 } 199 } 200 201 if (select(0, &rfds, &wfds, NULL, &tv0) > 0) { 202 QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) { 203 node->pfd.revents = 0; 204 if (FD_ISSET(node->pfd.fd, &rfds)) { 205 node->pfd.revents |= G_IO_IN; 206 have_select_revents = true; 207 } 208 209 if (FD_ISSET(node->pfd.fd, &wfds)) { 210 node->pfd.revents |= G_IO_OUT; 211 have_select_revents = true; 212 } 213 } 214 } 215 216 qemu_lockcnt_dec(&ctx->list_lock); 217 return have_select_revents; 218 } 219 220 bool aio_pending(AioContext *ctx) 221 { 222 AioHandler *node; 223 bool result = false; 224 225 /* 226 * We have to walk very carefully in case aio_set_fd_handler is 227 * called while we're walking. 228 */ 229 qemu_lockcnt_inc(&ctx->list_lock); 230 QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) { 231 if (node->pfd.revents && node->io_notify) { 232 result = true; 233 break; 234 } 235 236 if ((node->pfd.revents & G_IO_IN) && node->io_read) { 237 result = true; 238 break; 239 } 240 if ((node->pfd.revents & G_IO_OUT) && node->io_write) { 241 result = true; 242 break; 243 } 244 } 245 246 qemu_lockcnt_dec(&ctx->list_lock); 247 return result; 248 } 249 250 static bool aio_dispatch_handlers(AioContext *ctx, HANDLE event) 251 { 252 AioHandler *node; 253 bool progress = false; 254 AioHandler *tmp; 255 256 qemu_lockcnt_inc(&ctx->list_lock); 257 258 /* 259 * We have to walk very carefully in case aio_set_fd_handler is 260 * called while we're walking. 261 */ 262 QLIST_FOREACH_SAFE_RCU(node, &ctx->aio_handlers, node, tmp) { 263 int revents = node->pfd.revents; 264 265 if (!node->deleted && 266 (revents || event_notifier_get_handle(node->e) == event) && 267 node->io_notify) { 268 node->pfd.revents = 0; 269 node->io_notify(node->e); 270 271 /* aio_notify() does not count as progress */ 272 if (node->e != &ctx->notifier) { 273 progress = true; 274 } 275 } 276 277 if (!node->deleted && 278 (node->io_read || node->io_write)) { 279 node->pfd.revents = 0; 280 if ((revents & G_IO_IN) && node->io_read) { 281 node->io_read(node->opaque); 282 progress = true; 283 } 284 if ((revents & G_IO_OUT) && node->io_write) { 285 node->io_write(node->opaque); 286 progress = true; 287 } 288 289 /* if the next select() will return an event, we have progressed */ 290 if (event == event_notifier_get_handle(&ctx->notifier)) { 291 WSANETWORKEVENTS ev; 292 WSAEnumNetworkEvents(node->pfd.fd, event, &ev); 293 if (ev.lNetworkEvents) { 294 progress = true; 295 } 296 } 297 } 298 299 if (node->deleted) { 300 if (qemu_lockcnt_dec_if_lock(&ctx->list_lock)) { 301 QLIST_REMOVE(node, node); 302 g_free(node); 303 qemu_lockcnt_inc_and_unlock(&ctx->list_lock); 304 } 305 } 306 } 307 308 qemu_lockcnt_dec(&ctx->list_lock); 309 return progress; 310 } 311 312 bool aio_dispatch(AioContext *ctx, bool dispatch_fds) 313 { 314 bool progress; 315 316 progress = aio_bh_poll(ctx); 317 if (dispatch_fds) { 318 progress |= aio_dispatch_handlers(ctx, INVALID_HANDLE_VALUE); 319 } 320 progress |= timerlistgroup_run_timers(&ctx->tlg); 321 return progress; 322 } 323 324 bool aio_poll(AioContext *ctx, bool blocking) 325 { 326 AioHandler *node; 327 HANDLE events[MAXIMUM_WAIT_OBJECTS + 1]; 328 bool progress, have_select_revents, first; 329 int count; 330 int timeout; 331 332 aio_context_acquire(ctx); 333 progress = false; 334 335 /* aio_notify can avoid the expensive event_notifier_set if 336 * everything (file descriptors, bottom halves, timers) will 337 * be re-evaluated before the next blocking poll(). This is 338 * already true when aio_poll is called with blocking == false; 339 * if blocking == true, it is only true after poll() returns, 340 * so disable the optimization now. 341 */ 342 if (blocking) { 343 atomic_add(&ctx->notify_me, 2); 344 } 345 346 qemu_lockcnt_inc(&ctx->list_lock); 347 have_select_revents = aio_prepare(ctx); 348 349 /* fill fd sets */ 350 count = 0; 351 QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) { 352 if (!node->deleted && node->io_notify 353 && aio_node_check(ctx, node->is_external)) { 354 events[count++] = event_notifier_get_handle(node->e); 355 } 356 } 357 358 qemu_lockcnt_dec(&ctx->list_lock); 359 first = true; 360 361 /* ctx->notifier is always registered. */ 362 assert(count > 0); 363 364 /* Multiple iterations, all of them non-blocking except the first, 365 * may be necessary to process all pending events. After the first 366 * WaitForMultipleObjects call ctx->notify_me will be decremented. 367 */ 368 do { 369 HANDLE event; 370 int ret; 371 372 timeout = blocking && !have_select_revents 373 ? qemu_timeout_ns_to_ms(aio_compute_timeout(ctx)) : 0; 374 if (timeout) { 375 aio_context_release(ctx); 376 } 377 ret = WaitForMultipleObjects(count, events, FALSE, timeout); 378 if (blocking) { 379 assert(first); 380 atomic_sub(&ctx->notify_me, 2); 381 } 382 if (timeout) { 383 aio_context_acquire(ctx); 384 } 385 386 if (first) { 387 aio_notify_accept(ctx); 388 progress |= aio_bh_poll(ctx); 389 first = false; 390 } 391 392 /* if we have any signaled events, dispatch event */ 393 event = NULL; 394 if ((DWORD) (ret - WAIT_OBJECT_0) < count) { 395 event = events[ret - WAIT_OBJECT_0]; 396 events[ret - WAIT_OBJECT_0] = events[--count]; 397 } else if (!have_select_revents) { 398 break; 399 } 400 401 have_select_revents = false; 402 blocking = false; 403 404 progress |= aio_dispatch_handlers(ctx, event); 405 } while (count > 0); 406 407 progress |= timerlistgroup_run_timers(&ctx->tlg); 408 409 aio_context_release(ctx); 410 return progress; 411 } 412 413 void aio_context_setup(AioContext *ctx) 414 { 415 } 416 417 void aio_context_set_poll_params(AioContext *ctx, int64_t max_ns, 418 int64_t grow, int64_t shrink, Error **errp) 419 { 420 error_setg(errp, "AioContext polling is not implemented on Windows"); 421 } 422