xref: /openbmc/qemu/block/linux-aio.c (revision 9b4b4e510bcb8b1c3c4789615dce3b520aa1f1d3)
1  /*
2   * Linux native AIO support.
3   *
4   * Copyright (C) 2009 IBM, Corp.
5   * Copyright (C) 2009 Red Hat, Inc.
6   *
7   * This work is licensed under the terms of the GNU GPL, version 2 or later.
8   * See the COPYING file in the top-level directory.
9   */
10  #include "qemu/osdep.h"
11  #include "block/aio.h"
12  #include "qemu/queue.h"
13  #include "block/block.h"
14  #include "block/raw-aio.h"
15  #include "qemu/event_notifier.h"
16  #include "qemu/coroutine.h"
17  #include "qapi/error.h"
18  #include "sysemu/block-backend.h"
19  
20  /* Only used for assertions.  */
21  #include "qemu/coroutine_int.h"
22  
23  #include <libaio.h>
24  
25  /*
26   * Queue size (per-device).
27   *
28   * XXX: eventually we need to communicate this to the guest and/or make it
29   *      tunable by the guest.  If we get more outstanding requests at a time
30   *      than this we will get EAGAIN from io_submit which is communicated to
31   *      the guest as an I/O error.
32   */
33  #define MAX_EVENTS 1024
34  
35  /* Maximum number of requests in a batch. (default value) */
36  #define DEFAULT_MAX_BATCH 32
37  
38  struct qemu_laiocb {
39      Coroutine *co;
40      LinuxAioState *ctx;
41      struct iocb iocb;
42      ssize_t ret;
43      size_t nbytes;
44      QEMUIOVector *qiov;
45      bool is_read;
46      QSIMPLEQ_ENTRY(qemu_laiocb) next;
47  };
48  
49  typedef struct {
50      unsigned int in_queue;
51      unsigned int in_flight;
52      bool blocked;
53      QSIMPLEQ_HEAD(, qemu_laiocb) pending;
54  } LaioQueue;
55  
56  struct LinuxAioState {
57      AioContext *aio_context;
58  
59      io_context_t ctx;
60      EventNotifier e;
61  
62      /* No locking required, only accessed from AioContext home thread */
63      LaioQueue io_q;
64      QEMUBH *completion_bh;
65      int event_idx;
66      int event_max;
67  };
68  
69  static void ioq_submit(LinuxAioState *s);
70  
71  static inline ssize_t io_event_ret(struct io_event *ev)
72  {
73      return (ssize_t)(((uint64_t)ev->res2 << 32) | ev->res);
74  }
75  
76  /*
77   * Completes an AIO request.
78   */
79  static void qemu_laio_process_completion(struct qemu_laiocb *laiocb)
80  {
81      int ret;
82  
83      ret = laiocb->ret;
84      if (ret != -ECANCELED) {
85          if (ret == laiocb->nbytes) {
86              ret = 0;
87          } else if (ret >= 0) {
88              /* Short reads mean EOF, pad with zeros. */
89              if (laiocb->is_read) {
90                  qemu_iovec_memset(laiocb->qiov, ret, 0,
91                      laiocb->qiov->size - ret);
92              } else {
93                  ret = -ENOSPC;
94              }
95          }
96      }
97  
98      laiocb->ret = ret;
99  
100      /*
101       * If the coroutine is already entered it must be in ioq_submit() and
102       * will notice laio->ret has been filled in when it eventually runs
103       * later.  Coroutines cannot be entered recursively so avoid doing
104       * that!
105       */
106      assert(laiocb->co->ctx == laiocb->ctx->aio_context);
107      if (!qemu_coroutine_entered(laiocb->co)) {
108          aio_co_wake(laiocb->co);
109      }
110  }
111  
112  /**
113   * aio_ring buffer which is shared between userspace and kernel.
114   *
115   * This copied from linux/fs/aio.c, common header does not exist
116   * but AIO exists for ages so we assume ABI is stable.
117   */
118  struct aio_ring {
119      unsigned    id;    /* kernel internal index number */
120      unsigned    nr;    /* number of io_events */
121      unsigned    head;  /* Written to by userland or by kernel. */
122      unsigned    tail;
123  
124      unsigned    magic;
125      unsigned    compat_features;
126      unsigned    incompat_features;
127      unsigned    header_length;  /* size of aio_ring */
128  
129      struct io_event io_events[];
130  };
131  
132  /**
133   * io_getevents_peek:
134   * @ctx: AIO context
135   * @events: pointer on events array, output value
136  
137   * Returns the number of completed events and sets a pointer
138   * on events array.  This function does not update the internal
139   * ring buffer, only reads head and tail.  When @events has been
140   * processed io_getevents_commit() must be called.
141   */
142  static inline unsigned int io_getevents_peek(io_context_t ctx,
143                                               struct io_event **events)
144  {
145      struct aio_ring *ring = (struct aio_ring *)ctx;
146      unsigned int head = ring->head, tail = ring->tail;
147      unsigned int nr;
148  
149      nr = tail >= head ? tail - head : ring->nr - head;
150      *events = ring->io_events + head;
151      /* To avoid speculative loads of s->events[i] before observing tail.
152         Paired with smp_wmb() inside linux/fs/aio.c: aio_complete(). */
153      smp_rmb();
154  
155      return nr;
156  }
157  
158  /**
159   * io_getevents_commit:
160   * @ctx: AIO context
161   * @nr: the number of events on which head should be advanced
162   *
163   * Advances head of a ring buffer.
164   */
165  static inline void io_getevents_commit(io_context_t ctx, unsigned int nr)
166  {
167      struct aio_ring *ring = (struct aio_ring *)ctx;
168  
169      if (nr) {
170          ring->head = (ring->head + nr) % ring->nr;
171      }
172  }
173  
174  /**
175   * io_getevents_advance_and_peek:
176   * @ctx: AIO context
177   * @events: pointer on events array, output value
178   * @nr: the number of events on which head should be advanced
179   *
180   * Advances head of a ring buffer and returns number of elements left.
181   */
182  static inline unsigned int
183  io_getevents_advance_and_peek(io_context_t ctx,
184                                struct io_event **events,
185                                unsigned int nr)
186  {
187      io_getevents_commit(ctx, nr);
188      return io_getevents_peek(ctx, events);
189  }
190  
191  /**
192   * qemu_laio_process_completions:
193   * @s: AIO state
194   *
195   * Fetches completed I/O requests and invokes their callbacks.
196   *
197   * The function is somewhat tricky because it supports nested event loops, for
198   * example when a request callback invokes aio_poll().  In order to do this,
199   * indices are kept in LinuxAioState.  Function schedules BH completion so it
200   * can be called again in a nested event loop.  When there are no events left
201   * to complete the BH is being canceled.
202   */
203  static void qemu_laio_process_completions(LinuxAioState *s)
204  {
205      struct io_event *events;
206  
207      /* Reschedule so nested event loops see currently pending completions */
208      qemu_bh_schedule(s->completion_bh);
209  
210      while ((s->event_max = io_getevents_advance_and_peek(s->ctx, &events,
211                                                           s->event_idx))) {
212          for (s->event_idx = 0; s->event_idx < s->event_max; ) {
213              struct iocb *iocb = events[s->event_idx].obj;
214              struct qemu_laiocb *laiocb =
215                  container_of(iocb, struct qemu_laiocb, iocb);
216  
217              laiocb->ret = io_event_ret(&events[s->event_idx]);
218  
219              /* Change counters one-by-one because we can be nested. */
220              s->io_q.in_flight--;
221              s->event_idx++;
222              qemu_laio_process_completion(laiocb);
223          }
224      }
225  
226      qemu_bh_cancel(s->completion_bh);
227  
228      /* If we are nested we have to notify the level above that we are done
229       * by setting event_max to zero, upper level will then jump out of it's
230       * own `for` loop.  If we are the last all counters dropped to zero. */
231      s->event_max = 0;
232      s->event_idx = 0;
233  }
234  
235  static void qemu_laio_process_completions_and_submit(LinuxAioState *s)
236  {
237      qemu_laio_process_completions(s);
238  
239      if (!QSIMPLEQ_EMPTY(&s->io_q.pending)) {
240          ioq_submit(s);
241      }
242  }
243  
244  static void qemu_laio_completion_bh(void *opaque)
245  {
246      LinuxAioState *s = opaque;
247  
248      qemu_laio_process_completions_and_submit(s);
249  }
250  
251  static void qemu_laio_completion_cb(EventNotifier *e)
252  {
253      LinuxAioState *s = container_of(e, LinuxAioState, e);
254  
255      if (event_notifier_test_and_clear(&s->e)) {
256          qemu_laio_process_completions_and_submit(s);
257      }
258  }
259  
260  static bool qemu_laio_poll_cb(void *opaque)
261  {
262      EventNotifier *e = opaque;
263      LinuxAioState *s = container_of(e, LinuxAioState, e);
264      struct io_event *events;
265  
266      return io_getevents_peek(s->ctx, &events);
267  }
268  
269  static void qemu_laio_poll_ready(EventNotifier *opaque)
270  {
271      EventNotifier *e = opaque;
272      LinuxAioState *s = container_of(e, LinuxAioState, e);
273  
274      qemu_laio_process_completions_and_submit(s);
275  }
276  
277  static void ioq_init(LaioQueue *io_q)
278  {
279      QSIMPLEQ_INIT(&io_q->pending);
280      io_q->in_queue = 0;
281      io_q->in_flight = 0;
282      io_q->blocked = false;
283  }
284  
285  static void ioq_submit(LinuxAioState *s)
286  {
287      int ret, len;
288      struct qemu_laiocb *aiocb;
289      struct iocb *iocbs[MAX_EVENTS];
290      QSIMPLEQ_HEAD(, qemu_laiocb) completed;
291  
292      do {
293          if (s->io_q.in_flight >= MAX_EVENTS) {
294              break;
295          }
296          len = 0;
297          QSIMPLEQ_FOREACH(aiocb, &s->io_q.pending, next) {
298              iocbs[len++] = &aiocb->iocb;
299              if (s->io_q.in_flight + len >= MAX_EVENTS) {
300                  break;
301              }
302          }
303  
304          ret = io_submit(s->ctx, len, iocbs);
305          if (ret == -EAGAIN) {
306              break;
307          }
308          if (ret < 0) {
309              /* Fail the first request, retry the rest */
310              aiocb = QSIMPLEQ_FIRST(&s->io_q.pending);
311              QSIMPLEQ_REMOVE_HEAD(&s->io_q.pending, next);
312              s->io_q.in_queue--;
313              aiocb->ret = ret;
314              qemu_laio_process_completion(aiocb);
315              continue;
316          }
317  
318          s->io_q.in_flight += ret;
319          s->io_q.in_queue  -= ret;
320          aiocb = container_of(iocbs[ret - 1], struct qemu_laiocb, iocb);
321          QSIMPLEQ_SPLIT_AFTER(&s->io_q.pending, aiocb, next, &completed);
322      } while (ret == len && !QSIMPLEQ_EMPTY(&s->io_q.pending));
323      s->io_q.blocked = (s->io_q.in_queue > 0);
324  
325      if (s->io_q.in_flight) {
326          /* We can try to complete something just right away if there are
327           * still requests in-flight. */
328          qemu_laio_process_completions(s);
329          /*
330           * Even we have completed everything (in_flight == 0), the queue can
331           * have still pended requests (in_queue > 0).  We do not attempt to
332           * repeat submission to avoid IO hang.  The reason is simple: s->e is
333           * still set and completion callback will be called shortly and all
334           * pended requests will be submitted from there.
335           */
336      }
337  }
338  
339  static uint64_t laio_max_batch(LinuxAioState *s, uint64_t dev_max_batch)
340  {
341      uint64_t max_batch = s->aio_context->aio_max_batch ?: DEFAULT_MAX_BATCH;
342  
343      /*
344       * AIO context can be shared between multiple block devices, so
345       * `dev_max_batch` allows reducing the batch size for latency-sensitive
346       * devices.
347       */
348      max_batch = MIN_NON_ZERO(dev_max_batch, max_batch);
349  
350      /* limit the batch with the number of available events */
351      max_batch = MIN_NON_ZERO(MAX_EVENTS - s->io_q.in_flight, max_batch);
352  
353      return max_batch;
354  }
355  
356  static void laio_unplug_fn(void *opaque)
357  {
358      LinuxAioState *s = opaque;
359  
360      if (!s->io_q.blocked && !QSIMPLEQ_EMPTY(&s->io_q.pending)) {
361          ioq_submit(s);
362      }
363  }
364  
365  static int laio_do_submit(int fd, struct qemu_laiocb *laiocb, off_t offset,
366                            int type, uint64_t dev_max_batch)
367  {
368      LinuxAioState *s = laiocb->ctx;
369      struct iocb *iocbs = &laiocb->iocb;
370      QEMUIOVector *qiov = laiocb->qiov;
371  
372      switch (type) {
373      case QEMU_AIO_WRITE:
374          io_prep_pwritev(iocbs, fd, qiov->iov, qiov->niov, offset);
375          break;
376      case QEMU_AIO_ZONE_APPEND:
377          io_prep_pwritev(iocbs, fd, qiov->iov, qiov->niov, offset);
378          break;
379      case QEMU_AIO_READ:
380          io_prep_preadv(iocbs, fd, qiov->iov, qiov->niov, offset);
381          break;
382      /* Currently Linux kernel does not support other operations */
383      default:
384          fprintf(stderr, "%s: invalid AIO request type 0x%x.\n",
385                          __func__, type);
386          return -EIO;
387      }
388      io_set_eventfd(&laiocb->iocb, event_notifier_get_fd(&s->e));
389  
390      QSIMPLEQ_INSERT_TAIL(&s->io_q.pending, laiocb, next);
391      s->io_q.in_queue++;
392      if (!s->io_q.blocked) {
393          if (s->io_q.in_queue >= laio_max_batch(s, dev_max_batch)) {
394              ioq_submit(s);
395          } else {
396              blk_io_plug_call(laio_unplug_fn, s);
397          }
398      }
399  
400      return 0;
401  }
402  
403  int coroutine_fn laio_co_submit(int fd, uint64_t offset, QEMUIOVector *qiov,
404                                  int type, uint64_t dev_max_batch)
405  {
406      int ret;
407      AioContext *ctx = qemu_get_current_aio_context();
408      struct qemu_laiocb laiocb = {
409          .co         = qemu_coroutine_self(),
410          .nbytes     = qiov->size,
411          .ctx        = aio_get_linux_aio(ctx),
412          .ret        = -EINPROGRESS,
413          .is_read    = (type == QEMU_AIO_READ),
414          .qiov       = qiov,
415      };
416  
417      ret = laio_do_submit(fd, &laiocb, offset, type, dev_max_batch);
418      if (ret < 0) {
419          return ret;
420      }
421  
422      if (laiocb.ret == -EINPROGRESS) {
423          qemu_coroutine_yield();
424      }
425      return laiocb.ret;
426  }
427  
428  void laio_detach_aio_context(LinuxAioState *s, AioContext *old_context)
429  {
430      aio_set_event_notifier(old_context, &s->e, NULL, NULL, NULL);
431      qemu_bh_delete(s->completion_bh);
432      s->aio_context = NULL;
433  }
434  
435  void laio_attach_aio_context(LinuxAioState *s, AioContext *new_context)
436  {
437      s->aio_context = new_context;
438      s->completion_bh = aio_bh_new(new_context, qemu_laio_completion_bh, s);
439      aio_set_event_notifier(new_context, &s->e,
440                             qemu_laio_completion_cb,
441                             qemu_laio_poll_cb,
442                             qemu_laio_poll_ready);
443  }
444  
445  LinuxAioState *laio_init(Error **errp)
446  {
447      int rc;
448      LinuxAioState *s;
449  
450      s = g_malloc0(sizeof(*s));
451      rc = event_notifier_init(&s->e, false);
452      if (rc < 0) {
453          error_setg_errno(errp, -rc, "failed to initialize event notifier");
454          goto out_free_state;
455      }
456  
457      rc = io_setup(MAX_EVENTS, &s->ctx);
458      if (rc < 0) {
459          error_setg_errno(errp, -rc, "failed to create linux AIO context");
460          goto out_close_efd;
461      }
462  
463      ioq_init(&s->io_q);
464  
465      return s;
466  
467  out_close_efd:
468      event_notifier_cleanup(&s->e);
469  out_free_state:
470      g_free(s);
471      return NULL;
472  }
473  
474  void laio_cleanup(LinuxAioState *s)
475  {
476      event_notifier_cleanup(&s->e);
477  
478      if (io_destroy(s->ctx) != 0) {
479          fprintf(stderr, "%s: destroy AIO context %p failed\n",
480                          __func__, &s->ctx);
481      }
482      g_free(s);
483  }
484