1 /* 2 * QEMU aio implementation 3 * 4 * Copyright IBM Corp., 2008 5 * Copyright Red Hat Inc., 2012 6 * 7 * Authors: 8 * Anthony Liguori <aliguori@us.ibm.com> 9 * Paolo Bonzini <pbonzini@redhat.com> 10 * 11 * This work is licensed under the terms of the GNU GPL, version 2. See 12 * the COPYING file in the top-level directory. 13 * 14 * Contributions after 2012-01-13 are licensed under the terms of the 15 * GNU GPL, version 2 or (at your option) any later version. 16 */ 17 18 #include "qemu/osdep.h" 19 #include "qemu-common.h" 20 #include "block/block.h" 21 #include "qemu/queue.h" 22 #include "qemu/sockets.h" 23 #include "qapi/error.h" 24 #include "qemu/rcu_queue.h" 25 26 struct AioHandler { 27 EventNotifier *e; 28 IOHandler *io_read; 29 IOHandler *io_write; 30 EventNotifierHandler *io_notify; 31 GPollFD pfd; 32 int deleted; 33 void *opaque; 34 bool is_external; 35 QLIST_ENTRY(AioHandler) node; 36 }; 37 38 void aio_set_fd_handler(AioContext *ctx, 39 int fd, 40 bool is_external, 41 IOHandler *io_read, 42 IOHandler *io_write, 43 AioPollFn *io_poll, 44 void *opaque) 45 { 46 /* fd is a SOCKET in our case */ 47 AioHandler *node; 48 49 qemu_lockcnt_lock(&ctx->list_lock); 50 QLIST_FOREACH(node, &ctx->aio_handlers, node) { 51 if (node->pfd.fd == fd && !node->deleted) { 52 break; 53 } 54 } 55 56 /* Are we deleting the fd handler? */ 57 if (!io_read && !io_write) { 58 if (node) { 59 /* If aio_poll is in progress, just mark the node as deleted */ 60 if (qemu_lockcnt_count(&ctx->list_lock)) { 61 node->deleted = 1; 62 node->pfd.revents = 0; 63 } else { 64 /* Otherwise, delete it for real. We can't just mark it as 65 * deleted because deleted nodes are only cleaned up after 66 * releasing the list_lock. 67 */ 68 QLIST_REMOVE(node, node); 69 g_free(node); 70 } 71 } 72 } else { 73 HANDLE event; 74 75 if (node == NULL) { 76 /* Alloc and insert if it's not already there */ 77 node = g_new0(AioHandler, 1); 78 node->pfd.fd = fd; 79 QLIST_INSERT_HEAD_RCU(&ctx->aio_handlers, node, node); 80 } 81 82 node->pfd.events = 0; 83 if (node->io_read) { 84 node->pfd.events |= G_IO_IN; 85 } 86 if (node->io_write) { 87 node->pfd.events |= G_IO_OUT; 88 } 89 90 node->e = &ctx->notifier; 91 92 /* Update handler with latest information */ 93 node->opaque = opaque; 94 node->io_read = io_read; 95 node->io_write = io_write; 96 node->is_external = is_external; 97 98 event = event_notifier_get_handle(&ctx->notifier); 99 WSAEventSelect(node->pfd.fd, event, 100 FD_READ | FD_ACCEPT | FD_CLOSE | 101 FD_CONNECT | FD_WRITE | FD_OOB); 102 } 103 104 qemu_lockcnt_unlock(&ctx->list_lock); 105 aio_notify(ctx); 106 } 107 108 void aio_set_fd_poll(AioContext *ctx, int fd, 109 IOHandler *io_poll_begin, 110 IOHandler *io_poll_end) 111 { 112 /* Not implemented */ 113 } 114 115 void aio_set_event_notifier(AioContext *ctx, 116 EventNotifier *e, 117 bool is_external, 118 EventNotifierHandler *io_notify, 119 AioPollFn *io_poll) 120 { 121 AioHandler *node; 122 123 qemu_lockcnt_lock(&ctx->list_lock); 124 QLIST_FOREACH(node, &ctx->aio_handlers, node) { 125 if (node->e == e && !node->deleted) { 126 break; 127 } 128 } 129 130 /* Are we deleting the fd handler? */ 131 if (!io_notify) { 132 if (node) { 133 g_source_remove_poll(&ctx->source, &node->pfd); 134 135 /* aio_poll is in progress, just mark the node as deleted */ 136 if (qemu_lockcnt_count(&ctx->list_lock)) { 137 node->deleted = 1; 138 node->pfd.revents = 0; 139 } else { 140 /* Otherwise, delete it for real. We can't just mark it as 141 * deleted because deleted nodes are only cleaned up after 142 * releasing the list_lock. 143 */ 144 QLIST_REMOVE(node, node); 145 g_free(node); 146 } 147 } 148 } else { 149 if (node == NULL) { 150 /* Alloc and insert if it's not already there */ 151 node = g_new0(AioHandler, 1); 152 node->e = e; 153 node->pfd.fd = (uintptr_t)event_notifier_get_handle(e); 154 node->pfd.events = G_IO_IN; 155 node->is_external = is_external; 156 QLIST_INSERT_HEAD_RCU(&ctx->aio_handlers, node, node); 157 158 g_source_add_poll(&ctx->source, &node->pfd); 159 } 160 /* Update handler with latest information */ 161 node->io_notify = io_notify; 162 } 163 164 qemu_lockcnt_unlock(&ctx->list_lock); 165 aio_notify(ctx); 166 } 167 168 void aio_set_event_notifier_poll(AioContext *ctx, 169 EventNotifier *notifier, 170 EventNotifierHandler *io_poll_begin, 171 EventNotifierHandler *io_poll_end) 172 { 173 /* Not implemented */ 174 } 175 176 bool aio_prepare(AioContext *ctx) 177 { 178 static struct timeval tv0; 179 AioHandler *node; 180 bool have_select_revents = false; 181 fd_set rfds, wfds; 182 183 /* 184 * We have to walk very carefully in case aio_set_fd_handler is 185 * called while we're walking. 186 */ 187 qemu_lockcnt_inc(&ctx->list_lock); 188 189 /* fill fd sets */ 190 FD_ZERO(&rfds); 191 FD_ZERO(&wfds); 192 QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) { 193 if (node->io_read) { 194 FD_SET ((SOCKET)node->pfd.fd, &rfds); 195 } 196 if (node->io_write) { 197 FD_SET ((SOCKET)node->pfd.fd, &wfds); 198 } 199 } 200 201 if (select(0, &rfds, &wfds, NULL, &tv0) > 0) { 202 QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) { 203 node->pfd.revents = 0; 204 if (FD_ISSET(node->pfd.fd, &rfds)) { 205 node->pfd.revents |= G_IO_IN; 206 have_select_revents = true; 207 } 208 209 if (FD_ISSET(node->pfd.fd, &wfds)) { 210 node->pfd.revents |= G_IO_OUT; 211 have_select_revents = true; 212 } 213 } 214 } 215 216 qemu_lockcnt_dec(&ctx->list_lock); 217 return have_select_revents; 218 } 219 220 bool aio_pending(AioContext *ctx) 221 { 222 AioHandler *node; 223 bool result = false; 224 225 /* 226 * We have to walk very carefully in case aio_set_fd_handler is 227 * called while we're walking. 228 */ 229 qemu_lockcnt_inc(&ctx->list_lock); 230 QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) { 231 if (node->pfd.revents && node->io_notify) { 232 result = true; 233 break; 234 } 235 236 if ((node->pfd.revents & G_IO_IN) && node->io_read) { 237 result = true; 238 break; 239 } 240 if ((node->pfd.revents & G_IO_OUT) && node->io_write) { 241 result = true; 242 break; 243 } 244 } 245 246 qemu_lockcnt_dec(&ctx->list_lock); 247 return result; 248 } 249 250 static bool aio_dispatch_handlers(AioContext *ctx, HANDLE event) 251 { 252 AioHandler *node; 253 bool progress = false; 254 AioHandler *tmp; 255 256 /* 257 * We have to walk very carefully in case aio_set_fd_handler is 258 * called while we're walking. 259 */ 260 QLIST_FOREACH_SAFE_RCU(node, &ctx->aio_handlers, node, tmp) { 261 int revents = node->pfd.revents; 262 263 if (!node->deleted && 264 (revents || event_notifier_get_handle(node->e) == event) && 265 node->io_notify) { 266 node->pfd.revents = 0; 267 node->io_notify(node->e); 268 269 /* aio_notify() does not count as progress */ 270 if (node->e != &ctx->notifier) { 271 progress = true; 272 } 273 } 274 275 if (!node->deleted && 276 (node->io_read || node->io_write)) { 277 node->pfd.revents = 0; 278 if ((revents & G_IO_IN) && node->io_read) { 279 node->io_read(node->opaque); 280 progress = true; 281 } 282 if ((revents & G_IO_OUT) && node->io_write) { 283 node->io_write(node->opaque); 284 progress = true; 285 } 286 287 /* if the next select() will return an event, we have progressed */ 288 if (event == event_notifier_get_handle(&ctx->notifier)) { 289 WSANETWORKEVENTS ev; 290 WSAEnumNetworkEvents(node->pfd.fd, event, &ev); 291 if (ev.lNetworkEvents) { 292 progress = true; 293 } 294 } 295 } 296 297 if (node->deleted) { 298 if (qemu_lockcnt_dec_if_lock(&ctx->list_lock)) { 299 QLIST_REMOVE(node, node); 300 g_free(node); 301 qemu_lockcnt_inc_and_unlock(&ctx->list_lock); 302 } 303 } 304 } 305 306 return progress; 307 } 308 309 void aio_dispatch(AioContext *ctx) 310 { 311 qemu_lockcnt_inc(&ctx->list_lock); 312 aio_bh_poll(ctx); 313 aio_dispatch_handlers(ctx, INVALID_HANDLE_VALUE); 314 qemu_lockcnt_dec(&ctx->list_lock); 315 timerlistgroup_run_timers(&ctx->tlg); 316 } 317 318 bool aio_poll(AioContext *ctx, bool blocking) 319 { 320 AioHandler *node; 321 HANDLE events[MAXIMUM_WAIT_OBJECTS + 1]; 322 bool progress, have_select_revents, first; 323 int count; 324 int timeout; 325 326 progress = false; 327 328 /* aio_notify can avoid the expensive event_notifier_set if 329 * everything (file descriptors, bottom halves, timers) will 330 * be re-evaluated before the next blocking poll(). This is 331 * already true when aio_poll is called with blocking == false; 332 * if blocking == true, it is only true after poll() returns, 333 * so disable the optimization now. 334 */ 335 if (blocking) { 336 atomic_add(&ctx->notify_me, 2); 337 } 338 339 qemu_lockcnt_inc(&ctx->list_lock); 340 have_select_revents = aio_prepare(ctx); 341 342 /* fill fd sets */ 343 count = 0; 344 QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) { 345 if (!node->deleted && node->io_notify 346 && aio_node_check(ctx, node->is_external)) { 347 events[count++] = event_notifier_get_handle(node->e); 348 } 349 } 350 351 first = true; 352 353 /* ctx->notifier is always registered. */ 354 assert(count > 0); 355 356 /* Multiple iterations, all of them non-blocking except the first, 357 * may be necessary to process all pending events. After the first 358 * WaitForMultipleObjects call ctx->notify_me will be decremented. 359 */ 360 do { 361 HANDLE event; 362 int ret; 363 364 timeout = blocking && !have_select_revents 365 ? qemu_timeout_ns_to_ms(aio_compute_timeout(ctx)) : 0; 366 ret = WaitForMultipleObjects(count, events, FALSE, timeout); 367 if (blocking) { 368 assert(first); 369 atomic_sub(&ctx->notify_me, 2); 370 } 371 372 if (first) { 373 aio_notify_accept(ctx); 374 progress |= aio_bh_poll(ctx); 375 first = false; 376 } 377 378 /* if we have any signaled events, dispatch event */ 379 event = NULL; 380 if ((DWORD) (ret - WAIT_OBJECT_0) < count) { 381 event = events[ret - WAIT_OBJECT_0]; 382 events[ret - WAIT_OBJECT_0] = events[--count]; 383 } else if (!have_select_revents) { 384 break; 385 } 386 387 have_select_revents = false; 388 blocking = false; 389 390 progress |= aio_dispatch_handlers(ctx, event); 391 } while (count > 0); 392 393 qemu_lockcnt_dec(&ctx->list_lock); 394 395 progress |= timerlistgroup_run_timers(&ctx->tlg); 396 return progress; 397 } 398 399 void aio_context_setup(AioContext *ctx) 400 { 401 } 402 403 void aio_context_set_poll_params(AioContext *ctx, int64_t max_ns, 404 int64_t grow, int64_t shrink, Error **errp) 405 { 406 error_setg(errp, "AioContext polling is not implemented on Windows"); 407 } 408