1 /* 2 * Linux native AIO support. 3 * 4 * Copyright (C) 2009 IBM, Corp. 5 * Copyright (C) 2009 Red Hat, Inc. 6 * 7 * This work is licensed under the terms of the GNU GPL, version 2 or later. 8 * See the COPYING file in the top-level directory. 9 */ 10 #include "qemu/osdep.h" 11 #include "qemu-common.h" 12 #include "block/aio.h" 13 #include "qemu/queue.h" 14 #include "block/block.h" 15 #include "block/raw-aio.h" 16 #include "qemu/event_notifier.h" 17 #include "qemu/coroutine.h" 18 19 #include <libaio.h> 20 21 /* 22 * Queue size (per-device). 23 * 24 * XXX: eventually we need to communicate this to the guest and/or make it 25 * tunable by the guest. If we get more outstanding requests at a time 26 * than this we will get EAGAIN from io_submit which is communicated to 27 * the guest as an I/O error. 28 */ 29 #define MAX_EVENTS 128 30 31 #define MAX_QUEUED_IO 128 32 33 struct qemu_laiocb { 34 BlockAIOCB common; 35 Coroutine *co; 36 LinuxAioState *ctx; 37 struct iocb iocb; 38 ssize_t ret; 39 size_t nbytes; 40 QEMUIOVector *qiov; 41 bool is_read; 42 QSIMPLEQ_ENTRY(qemu_laiocb) next; 43 }; 44 45 typedef struct { 46 int plugged; 47 unsigned int n; 48 bool blocked; 49 QSIMPLEQ_HEAD(, qemu_laiocb) pending; 50 } LaioQueue; 51 52 struct LinuxAioState { 53 io_context_t ctx; 54 EventNotifier e; 55 56 /* io queue for submit at batch */ 57 LaioQueue io_q; 58 59 /* I/O completion processing */ 60 QEMUBH *completion_bh; 61 struct io_event events[MAX_EVENTS]; 62 int event_idx; 63 int event_max; 64 }; 65 66 static void ioq_submit(LinuxAioState *s); 67 68 static inline ssize_t io_event_ret(struct io_event *ev) 69 { 70 return (ssize_t)(((uint64_t)ev->res2 << 32) | ev->res); 71 } 72 73 /* 74 * Completes an AIO request (calls the callback and frees the ACB). 75 */ 76 static void qemu_laio_process_completion(struct qemu_laiocb *laiocb) 77 { 78 int ret; 79 80 ret = laiocb->ret; 81 if (ret != -ECANCELED) { 82 if (ret == laiocb->nbytes) { 83 ret = 0; 84 } else if (ret >= 0) { 85 /* Short reads mean EOF, pad with zeros. */ 86 if (laiocb->is_read) { 87 qemu_iovec_memset(laiocb->qiov, ret, 0, 88 laiocb->qiov->size - ret); 89 } else { 90 ret = -EINVAL; 91 } 92 } 93 } 94 95 laiocb->ret = ret; 96 if (laiocb->co) { 97 qemu_coroutine_enter(laiocb->co, NULL); 98 } else { 99 laiocb->common.cb(laiocb->common.opaque, ret); 100 qemu_aio_unref(laiocb); 101 } 102 } 103 104 /* The completion BH fetches completed I/O requests and invokes their 105 * callbacks. 106 * 107 * The function is somewhat tricky because it supports nested event loops, for 108 * example when a request callback invokes aio_poll(). In order to do this, 109 * the completion events array and index are kept in LinuxAioState. The BH 110 * reschedules itself as long as there are completions pending so it will 111 * either be called again in a nested event loop or will be called after all 112 * events have been completed. When there are no events left to complete, the 113 * BH returns without rescheduling. 114 */ 115 static void qemu_laio_completion_bh(void *opaque) 116 { 117 LinuxAioState *s = opaque; 118 119 /* Fetch more completion events when empty */ 120 if (s->event_idx == s->event_max) { 121 do { 122 struct timespec ts = { 0 }; 123 s->event_max = io_getevents(s->ctx, MAX_EVENTS, MAX_EVENTS, 124 s->events, &ts); 125 } while (s->event_max == -EINTR); 126 127 s->event_idx = 0; 128 if (s->event_max <= 0) { 129 s->event_max = 0; 130 return; /* no more events */ 131 } 132 } 133 134 /* Reschedule so nested event loops see currently pending completions */ 135 qemu_bh_schedule(s->completion_bh); 136 137 /* Process completion events */ 138 while (s->event_idx < s->event_max) { 139 struct iocb *iocb = s->events[s->event_idx].obj; 140 struct qemu_laiocb *laiocb = 141 container_of(iocb, struct qemu_laiocb, iocb); 142 143 laiocb->ret = io_event_ret(&s->events[s->event_idx]); 144 s->event_idx++; 145 146 qemu_laio_process_completion(laiocb); 147 } 148 149 if (!s->io_q.plugged && !QSIMPLEQ_EMPTY(&s->io_q.pending)) { 150 ioq_submit(s); 151 } 152 153 qemu_bh_cancel(s->completion_bh); 154 } 155 156 static void qemu_laio_completion_cb(EventNotifier *e) 157 { 158 LinuxAioState *s = container_of(e, LinuxAioState, e); 159 160 if (event_notifier_test_and_clear(&s->e)) { 161 qemu_laio_completion_bh(s); 162 } 163 } 164 165 static void laio_cancel(BlockAIOCB *blockacb) 166 { 167 struct qemu_laiocb *laiocb = (struct qemu_laiocb *)blockacb; 168 struct io_event event; 169 int ret; 170 171 if (laiocb->ret != -EINPROGRESS) { 172 return; 173 } 174 ret = io_cancel(laiocb->ctx->ctx, &laiocb->iocb, &event); 175 laiocb->ret = -ECANCELED; 176 if (ret != 0) { 177 /* iocb is not cancelled, cb will be called by the event loop later */ 178 return; 179 } 180 181 laiocb->common.cb(laiocb->common.opaque, laiocb->ret); 182 } 183 184 static const AIOCBInfo laio_aiocb_info = { 185 .aiocb_size = sizeof(struct qemu_laiocb), 186 .cancel_async = laio_cancel, 187 }; 188 189 static void ioq_init(LaioQueue *io_q) 190 { 191 QSIMPLEQ_INIT(&io_q->pending); 192 io_q->plugged = 0; 193 io_q->n = 0; 194 io_q->blocked = false; 195 } 196 197 static void ioq_submit(LinuxAioState *s) 198 { 199 int ret, len; 200 struct qemu_laiocb *aiocb; 201 struct iocb *iocbs[MAX_QUEUED_IO]; 202 QSIMPLEQ_HEAD(, qemu_laiocb) completed; 203 204 do { 205 len = 0; 206 QSIMPLEQ_FOREACH(aiocb, &s->io_q.pending, next) { 207 iocbs[len++] = &aiocb->iocb; 208 if (len == MAX_QUEUED_IO) { 209 break; 210 } 211 } 212 213 ret = io_submit(s->ctx, len, iocbs); 214 if (ret == -EAGAIN) { 215 break; 216 } 217 if (ret < 0) { 218 abort(); 219 } 220 221 s->io_q.n -= ret; 222 aiocb = container_of(iocbs[ret - 1], struct qemu_laiocb, iocb); 223 QSIMPLEQ_SPLIT_AFTER(&s->io_q.pending, aiocb, next, &completed); 224 } while (ret == len && !QSIMPLEQ_EMPTY(&s->io_q.pending)); 225 s->io_q.blocked = (s->io_q.n > 0); 226 } 227 228 void laio_io_plug(BlockDriverState *bs, LinuxAioState *s) 229 { 230 assert(!s->io_q.plugged); 231 s->io_q.plugged = 1; 232 } 233 234 void laio_io_unplug(BlockDriverState *bs, LinuxAioState *s) 235 { 236 assert(s->io_q.plugged); 237 s->io_q.plugged = 0; 238 if (!s->io_q.blocked && !QSIMPLEQ_EMPTY(&s->io_q.pending)) { 239 ioq_submit(s); 240 } 241 } 242 243 static int laio_do_submit(int fd, struct qemu_laiocb *laiocb, off_t offset, 244 int type) 245 { 246 LinuxAioState *s = laiocb->ctx; 247 struct iocb *iocbs = &laiocb->iocb; 248 QEMUIOVector *qiov = laiocb->qiov; 249 250 switch (type) { 251 case QEMU_AIO_WRITE: 252 io_prep_pwritev(iocbs, fd, qiov->iov, qiov->niov, offset); 253 break; 254 case QEMU_AIO_READ: 255 io_prep_preadv(iocbs, fd, qiov->iov, qiov->niov, offset); 256 break; 257 /* Currently Linux kernel does not support other operations */ 258 default: 259 fprintf(stderr, "%s: invalid AIO request type 0x%x.\n", 260 __func__, type); 261 return -EIO; 262 } 263 io_set_eventfd(&laiocb->iocb, event_notifier_get_fd(&s->e)); 264 265 QSIMPLEQ_INSERT_TAIL(&s->io_q.pending, laiocb, next); 266 s->io_q.n++; 267 if (!s->io_q.blocked && 268 (!s->io_q.plugged || s->io_q.n >= MAX_QUEUED_IO)) { 269 ioq_submit(s); 270 } 271 272 return 0; 273 } 274 275 int coroutine_fn laio_co_submit(BlockDriverState *bs, LinuxAioState *s, int fd, 276 uint64_t offset, QEMUIOVector *qiov, int type) 277 { 278 int ret; 279 struct qemu_laiocb laiocb = { 280 .co = qemu_coroutine_self(), 281 .nbytes = qiov->size, 282 .ctx = s, 283 .is_read = (type == QEMU_AIO_READ), 284 .qiov = qiov, 285 }; 286 287 ret = laio_do_submit(fd, &laiocb, offset, type); 288 if (ret < 0) { 289 return ret; 290 } 291 292 qemu_coroutine_yield(); 293 return laiocb.ret; 294 } 295 296 BlockAIOCB *laio_submit(BlockDriverState *bs, LinuxAioState *s, int fd, 297 int64_t sector_num, QEMUIOVector *qiov, int nb_sectors, 298 BlockCompletionFunc *cb, void *opaque, int type) 299 { 300 struct qemu_laiocb *laiocb; 301 off_t offset = sector_num * BDRV_SECTOR_SIZE; 302 int ret; 303 304 laiocb = qemu_aio_get(&laio_aiocb_info, bs, cb, opaque); 305 laiocb->nbytes = nb_sectors * BDRV_SECTOR_SIZE; 306 laiocb->ctx = s; 307 laiocb->ret = -EINPROGRESS; 308 laiocb->is_read = (type == QEMU_AIO_READ); 309 laiocb->qiov = qiov; 310 311 ret = laio_do_submit(fd, laiocb, offset, type); 312 if (ret < 0) { 313 qemu_aio_unref(laiocb); 314 return NULL; 315 } 316 317 return &laiocb->common; 318 } 319 320 void laio_detach_aio_context(LinuxAioState *s, AioContext *old_context) 321 { 322 aio_set_event_notifier(old_context, &s->e, false, NULL); 323 qemu_bh_delete(s->completion_bh); 324 } 325 326 void laio_attach_aio_context(LinuxAioState *s, AioContext *new_context) 327 { 328 s->completion_bh = aio_bh_new(new_context, qemu_laio_completion_bh, s); 329 aio_set_event_notifier(new_context, &s->e, false, 330 qemu_laio_completion_cb); 331 } 332 333 LinuxAioState *laio_init(void) 334 { 335 LinuxAioState *s; 336 337 s = g_malloc0(sizeof(*s)); 338 if (event_notifier_init(&s->e, false) < 0) { 339 goto out_free_state; 340 } 341 342 if (io_setup(MAX_EVENTS, &s->ctx) != 0) { 343 goto out_close_efd; 344 } 345 346 ioq_init(&s->io_q); 347 348 return s; 349 350 out_close_efd: 351 event_notifier_cleanup(&s->e); 352 out_free_state: 353 g_free(s); 354 return NULL; 355 } 356 357 void laio_cleanup(LinuxAioState *s) 358 { 359 event_notifier_cleanup(&s->e); 360 361 if (io_destroy(s->ctx) != 0) { 362 fprintf(stderr, "%s: destroy AIO context %p failed\n", 363 __func__, &s->ctx); 364 } 365 g_free(s); 366 } 367