xref: /openbmc/qemu/block/linux-aio.c (revision 000c4dff)
1 /*
2  * Linux native AIO support.
3  *
4  * Copyright (C) 2009 IBM, Corp.
5  * Copyright (C) 2009 Red Hat, Inc.
6  *
7  * This work is licensed under the terms of the GNU GPL, version 2 or later.
8  * See the COPYING file in the top-level directory.
9  */
10 #include "qemu-common.h"
11 #include "block/aio.h"
12 #include "qemu/queue.h"
13 #include "block/raw-aio.h"
14 #include "qemu/event_notifier.h"
15 
16 #include <libaio.h>
17 
18 /*
19  * Queue size (per-device).
20  *
21  * XXX: eventually we need to communicate this to the guest and/or make it
22  *      tunable by the guest.  If we get more outstanding requests at a time
23  *      than this we will get EAGAIN from io_submit which is communicated to
24  *      the guest as an I/O error.
25  */
26 #define MAX_EVENTS 128
27 
28 #define MAX_QUEUED_IO  128
29 
30 struct qemu_laiocb {
31     BlockDriverAIOCB common;
32     struct qemu_laio_state *ctx;
33     struct iocb iocb;
34     ssize_t ret;
35     size_t nbytes;
36     QEMUIOVector *qiov;
37     bool is_read;
38     QLIST_ENTRY(qemu_laiocb) node;
39 };
40 
41 typedef struct {
42     struct iocb *iocbs[MAX_QUEUED_IO];
43     int plugged;
44     unsigned int size;
45     unsigned int idx;
46 } LaioQueue;
47 
48 struct qemu_laio_state {
49     io_context_t ctx;
50     EventNotifier e;
51 
52     /* io queue for submit at batch */
53     LaioQueue io_q;
54 };
55 
56 static inline ssize_t io_event_ret(struct io_event *ev)
57 {
58     return (ssize_t)(((uint64_t)ev->res2 << 32) | ev->res);
59 }
60 
61 /*
62  * Completes an AIO request (calls the callback and frees the ACB).
63  */
64 static void qemu_laio_process_completion(struct qemu_laio_state *s,
65     struct qemu_laiocb *laiocb)
66 {
67     int ret;
68 
69     ret = laiocb->ret;
70     if (ret != -ECANCELED) {
71         if (ret == laiocb->nbytes) {
72             ret = 0;
73         } else if (ret >= 0) {
74             /* Short reads mean EOF, pad with zeros. */
75             if (laiocb->is_read) {
76                 qemu_iovec_memset(laiocb->qiov, ret, 0,
77                     laiocb->qiov->size - ret);
78             } else {
79                 ret = -EINVAL;
80             }
81         }
82 
83         laiocb->common.cb(laiocb->common.opaque, ret);
84     }
85 
86     qemu_aio_release(laiocb);
87 }
88 
89 static void qemu_laio_completion_cb(EventNotifier *e)
90 {
91     struct qemu_laio_state *s = container_of(e, struct qemu_laio_state, e);
92 
93     while (event_notifier_test_and_clear(&s->e)) {
94         struct io_event events[MAX_EVENTS];
95         struct timespec ts = { 0 };
96         int nevents, i;
97 
98         do {
99             nevents = io_getevents(s->ctx, MAX_EVENTS, MAX_EVENTS, events, &ts);
100         } while (nevents == -EINTR);
101 
102         for (i = 0; i < nevents; i++) {
103             struct iocb *iocb = events[i].obj;
104             struct qemu_laiocb *laiocb =
105                     container_of(iocb, struct qemu_laiocb, iocb);
106 
107             laiocb->ret = io_event_ret(&events[i]);
108             qemu_laio_process_completion(s, laiocb);
109         }
110     }
111 }
112 
113 static void laio_cancel(BlockDriverAIOCB *blockacb)
114 {
115     struct qemu_laiocb *laiocb = (struct qemu_laiocb *)blockacb;
116     struct io_event event;
117     int ret;
118 
119     if (laiocb->ret != -EINPROGRESS)
120         return;
121 
122     /*
123      * Note that as of Linux 2.6.31 neither the block device code nor any
124      * filesystem implements cancellation of AIO request.
125      * Thus the polling loop below is the normal code path.
126      */
127     ret = io_cancel(laiocb->ctx->ctx, &laiocb->iocb, &event);
128     if (ret == 0) {
129         laiocb->ret = -ECANCELED;
130         return;
131     }
132 
133     /*
134      * We have to wait for the iocb to finish.
135      *
136      * The only way to get the iocb status update is by polling the io context.
137      * We might be able to do this slightly more optimal by removing the
138      * O_NONBLOCK flag.
139      */
140     while (laiocb->ret == -EINPROGRESS) {
141         qemu_laio_completion_cb(&laiocb->ctx->e);
142     }
143 }
144 
145 static const AIOCBInfo laio_aiocb_info = {
146     .aiocb_size         = sizeof(struct qemu_laiocb),
147     .cancel             = laio_cancel,
148 };
149 
150 static void ioq_init(LaioQueue *io_q)
151 {
152     io_q->size = MAX_QUEUED_IO;
153     io_q->idx = 0;
154     io_q->plugged = 0;
155 }
156 
157 static int ioq_submit(struct qemu_laio_state *s)
158 {
159     int ret, i = 0;
160     int len = s->io_q.idx;
161 
162     do {
163         ret = io_submit(s->ctx, len, s->io_q.iocbs);
164     } while (i++ < 3 && ret == -EAGAIN);
165 
166     /* empty io queue */
167     s->io_q.idx = 0;
168 
169     if (ret < 0) {
170         i = 0;
171     } else {
172         i = ret;
173     }
174 
175     for (; i < len; i++) {
176         struct qemu_laiocb *laiocb =
177             container_of(s->io_q.iocbs[i], struct qemu_laiocb, iocb);
178 
179         laiocb->ret = (ret < 0) ? ret : -EIO;
180         qemu_laio_process_completion(s, laiocb);
181     }
182     return ret;
183 }
184 
185 static void ioq_enqueue(struct qemu_laio_state *s, struct iocb *iocb)
186 {
187     unsigned int idx = s->io_q.idx;
188 
189     s->io_q.iocbs[idx++] = iocb;
190     s->io_q.idx = idx;
191 
192     /* submit immediately if queue is full */
193     if (idx == s->io_q.size) {
194         ioq_submit(s);
195     }
196 }
197 
198 void laio_io_plug(BlockDriverState *bs, void *aio_ctx)
199 {
200     struct qemu_laio_state *s = aio_ctx;
201 
202     s->io_q.plugged++;
203 }
204 
205 int laio_io_unplug(BlockDriverState *bs, void *aio_ctx, bool unplug)
206 {
207     struct qemu_laio_state *s = aio_ctx;
208     int ret = 0;
209 
210     assert(s->io_q.plugged > 0 || !unplug);
211 
212     if (unplug && --s->io_q.plugged > 0) {
213         return 0;
214     }
215 
216     if (s->io_q.idx > 0) {
217         ret = ioq_submit(s);
218     }
219 
220     return ret;
221 }
222 
223 BlockDriverAIOCB *laio_submit(BlockDriverState *bs, void *aio_ctx, int fd,
224         int64_t sector_num, QEMUIOVector *qiov, int nb_sectors,
225         BlockDriverCompletionFunc *cb, void *opaque, int type)
226 {
227     struct qemu_laio_state *s = aio_ctx;
228     struct qemu_laiocb *laiocb;
229     struct iocb *iocbs;
230     off_t offset = sector_num * 512;
231 
232     laiocb = qemu_aio_get(&laio_aiocb_info, bs, cb, opaque);
233     laiocb->nbytes = nb_sectors * 512;
234     laiocb->ctx = s;
235     laiocb->ret = -EINPROGRESS;
236     laiocb->is_read = (type == QEMU_AIO_READ);
237     laiocb->qiov = qiov;
238 
239     iocbs = &laiocb->iocb;
240 
241     switch (type) {
242     case QEMU_AIO_WRITE:
243         io_prep_pwritev(iocbs, fd, qiov->iov, qiov->niov, offset);
244 	break;
245     case QEMU_AIO_READ:
246         io_prep_preadv(iocbs, fd, qiov->iov, qiov->niov, offset);
247 	break;
248     /* Currently Linux kernel does not support other operations */
249     default:
250         fprintf(stderr, "%s: invalid AIO request type 0x%x.\n",
251                         __func__, type);
252         goto out_free_aiocb;
253     }
254     io_set_eventfd(&laiocb->iocb, event_notifier_get_fd(&s->e));
255 
256     if (!s->io_q.plugged) {
257         if (io_submit(s->ctx, 1, &iocbs) < 0) {
258             goto out_free_aiocb;
259         }
260     } else {
261         ioq_enqueue(s, iocbs);
262     }
263     return &laiocb->common;
264 
265 out_free_aiocb:
266     qemu_aio_release(laiocb);
267     return NULL;
268 }
269 
270 void laio_detach_aio_context(void *s_, AioContext *old_context)
271 {
272     struct qemu_laio_state *s = s_;
273 
274     aio_set_event_notifier(old_context, &s->e, NULL);
275 }
276 
277 void laio_attach_aio_context(void *s_, AioContext *new_context)
278 {
279     struct qemu_laio_state *s = s_;
280 
281     aio_set_event_notifier(new_context, &s->e, qemu_laio_completion_cb);
282 }
283 
284 void *laio_init(void)
285 {
286     struct qemu_laio_state *s;
287 
288     s = g_malloc0(sizeof(*s));
289     if (event_notifier_init(&s->e, false) < 0) {
290         goto out_free_state;
291     }
292 
293     if (io_setup(MAX_EVENTS, &s->ctx) != 0) {
294         goto out_close_efd;
295     }
296 
297     ioq_init(&s->io_q);
298 
299     return s;
300 
301 out_close_efd:
302     event_notifier_cleanup(&s->e);
303 out_free_state:
304     g_free(s);
305     return NULL;
306 }
307 
308 void laio_cleanup(void *s_)
309 {
310     struct qemu_laio_state *s = s_;
311 
312     event_notifier_cleanup(&s->e);
313 
314     if (io_destroy(s->ctx) != 0) {
315         fprintf(stderr, "%s: destroy AIO context %p failed\n",
316                         __func__, &s->ctx);
317     }
318     g_free(s);
319 }
320