xref: /openbmc/qemu/block/linux-aio.c (revision 197a1372)
1  /*
2   * Linux native AIO support.
3   *
4   * Copyright (C) 2009 IBM, Corp.
5   * Copyright (C) 2009 Red Hat, Inc.
6   *
7   * This work is licensed under the terms of the GNU GPL, version 2 or later.
8   * See the COPYING file in the top-level directory.
9   */
10  #include "qemu/osdep.h"
11  #include "block/aio.h"
12  #include "qemu/queue.h"
13  #include "block/block.h"
14  #include "block/raw-aio.h"
15  #include "qemu/event_notifier.h"
16  #include "qemu/coroutine.h"
17  #include "qapi/error.h"
18  
19  #include <libaio.h>
20  
21  /*
22   * Queue size (per-device).
23   *
24   * XXX: eventually we need to communicate this to the guest and/or make it
25   *      tunable by the guest.  If we get more outstanding requests at a time
26   *      than this we will get EAGAIN from io_submit which is communicated to
27   *      the guest as an I/O error.
28   */
29  #define MAX_EVENTS 1024
30  
31  /* Maximum number of requests in a batch. (default value) */
32  #define DEFAULT_MAX_BATCH 32
33  
34  struct qemu_laiocb {
35      Coroutine *co;
36      LinuxAioState *ctx;
37      struct iocb iocb;
38      ssize_t ret;
39      size_t nbytes;
40      QEMUIOVector *qiov;
41      bool is_read;
42      QSIMPLEQ_ENTRY(qemu_laiocb) next;
43  };
44  
45  typedef struct {
46      int plugged;
47      unsigned int in_queue;
48      unsigned int in_flight;
49      bool blocked;
50      QSIMPLEQ_HEAD(, qemu_laiocb) pending;
51  } LaioQueue;
52  
53  struct LinuxAioState {
54      AioContext *aio_context;
55  
56      io_context_t ctx;
57      EventNotifier e;
58  
59      /* io queue for submit at batch.  Protected by AioContext lock. */
60      LaioQueue io_q;
61  
62      /* I/O completion processing.  Only runs in I/O thread.  */
63      QEMUBH *completion_bh;
64      int event_idx;
65      int event_max;
66  };
67  
68  static void ioq_submit(LinuxAioState *s);
69  
70  static inline ssize_t io_event_ret(struct io_event *ev)
71  {
72      return (ssize_t)(((uint64_t)ev->res2 << 32) | ev->res);
73  }
74  
75  /*
76   * Completes an AIO request.
77   */
78  static void qemu_laio_process_completion(struct qemu_laiocb *laiocb)
79  {
80      int ret;
81  
82      ret = laiocb->ret;
83      if (ret != -ECANCELED) {
84          if (ret == laiocb->nbytes) {
85              ret = 0;
86          } else if (ret >= 0) {
87              /* Short reads mean EOF, pad with zeros. */
88              if (laiocb->is_read) {
89                  qemu_iovec_memset(laiocb->qiov, ret, 0,
90                      laiocb->qiov->size - ret);
91              } else {
92                  ret = -ENOSPC;
93              }
94          }
95      }
96  
97      laiocb->ret = ret;
98  
99      /*
100       * If the coroutine is already entered it must be in ioq_submit() and
101       * will notice laio->ret has been filled in when it eventually runs
102       * later.  Coroutines cannot be entered recursively so avoid doing
103       * that!
104       */
105      if (!qemu_coroutine_entered(laiocb->co)) {
106          aio_co_wake(laiocb->co);
107      }
108  }
109  
110  /**
111   * aio_ring buffer which is shared between userspace and kernel.
112   *
113   * This copied from linux/fs/aio.c, common header does not exist
114   * but AIO exists for ages so we assume ABI is stable.
115   */
116  struct aio_ring {
117      unsigned    id;    /* kernel internal index number */
118      unsigned    nr;    /* number of io_events */
119      unsigned    head;  /* Written to by userland or by kernel. */
120      unsigned    tail;
121  
122      unsigned    magic;
123      unsigned    compat_features;
124      unsigned    incompat_features;
125      unsigned    header_length;  /* size of aio_ring */
126  
127      struct io_event io_events[];
128  };
129  
130  /**
131   * io_getevents_peek:
132   * @ctx: AIO context
133   * @events: pointer on events array, output value
134  
135   * Returns the number of completed events and sets a pointer
136   * on events array.  This function does not update the internal
137   * ring buffer, only reads head and tail.  When @events has been
138   * processed io_getevents_commit() must be called.
139   */
140  static inline unsigned int io_getevents_peek(io_context_t ctx,
141                                               struct io_event **events)
142  {
143      struct aio_ring *ring = (struct aio_ring *)ctx;
144      unsigned int head = ring->head, tail = ring->tail;
145      unsigned int nr;
146  
147      nr = tail >= head ? tail - head : ring->nr - head;
148      *events = ring->io_events + head;
149      /* To avoid speculative loads of s->events[i] before observing tail.
150         Paired with smp_wmb() inside linux/fs/aio.c: aio_complete(). */
151      smp_rmb();
152  
153      return nr;
154  }
155  
156  /**
157   * io_getevents_commit:
158   * @ctx: AIO context
159   * @nr: the number of events on which head should be advanced
160   *
161   * Advances head of a ring buffer.
162   */
163  static inline void io_getevents_commit(io_context_t ctx, unsigned int nr)
164  {
165      struct aio_ring *ring = (struct aio_ring *)ctx;
166  
167      if (nr) {
168          ring->head = (ring->head + nr) % ring->nr;
169      }
170  }
171  
172  /**
173   * io_getevents_advance_and_peek:
174   * @ctx: AIO context
175   * @events: pointer on events array, output value
176   * @nr: the number of events on which head should be advanced
177   *
178   * Advances head of a ring buffer and returns number of elements left.
179   */
180  static inline unsigned int
181  io_getevents_advance_and_peek(io_context_t ctx,
182                                struct io_event **events,
183                                unsigned int nr)
184  {
185      io_getevents_commit(ctx, nr);
186      return io_getevents_peek(ctx, events);
187  }
188  
189  /**
190   * qemu_laio_process_completions:
191   * @s: AIO state
192   *
193   * Fetches completed I/O requests and invokes their callbacks.
194   *
195   * The function is somewhat tricky because it supports nested event loops, for
196   * example when a request callback invokes aio_poll().  In order to do this,
197   * indices are kept in LinuxAioState.  Function schedules BH completion so it
198   * can be called again in a nested event loop.  When there are no events left
199   * to complete the BH is being canceled.
200   */
201  static void qemu_laio_process_completions(LinuxAioState *s)
202  {
203      struct io_event *events;
204  
205      /* Reschedule so nested event loops see currently pending completions */
206      qemu_bh_schedule(s->completion_bh);
207  
208      while ((s->event_max = io_getevents_advance_and_peek(s->ctx, &events,
209                                                           s->event_idx))) {
210          for (s->event_idx = 0; s->event_idx < s->event_max; ) {
211              struct iocb *iocb = events[s->event_idx].obj;
212              struct qemu_laiocb *laiocb =
213                  container_of(iocb, struct qemu_laiocb, iocb);
214  
215              laiocb->ret = io_event_ret(&events[s->event_idx]);
216  
217              /* Change counters one-by-one because we can be nested. */
218              s->io_q.in_flight--;
219              s->event_idx++;
220              qemu_laio_process_completion(laiocb);
221          }
222      }
223  
224      qemu_bh_cancel(s->completion_bh);
225  
226      /* If we are nested we have to notify the level above that we are done
227       * by setting event_max to zero, upper level will then jump out of it's
228       * own `for` loop.  If we are the last all counters droped to zero. */
229      s->event_max = 0;
230      s->event_idx = 0;
231  }
232  
233  static void qemu_laio_process_completions_and_submit(LinuxAioState *s)
234  {
235      aio_context_acquire(s->aio_context);
236      qemu_laio_process_completions(s);
237  
238      if (!s->io_q.plugged && !QSIMPLEQ_EMPTY(&s->io_q.pending)) {
239          ioq_submit(s);
240      }
241      aio_context_release(s->aio_context);
242  }
243  
244  static void qemu_laio_completion_bh(void *opaque)
245  {
246      LinuxAioState *s = opaque;
247  
248      qemu_laio_process_completions_and_submit(s);
249  }
250  
251  static void qemu_laio_completion_cb(EventNotifier *e)
252  {
253      LinuxAioState *s = container_of(e, LinuxAioState, e);
254  
255      if (event_notifier_test_and_clear(&s->e)) {
256          qemu_laio_process_completions_and_submit(s);
257      }
258  }
259  
260  static bool qemu_laio_poll_cb(void *opaque)
261  {
262      EventNotifier *e = opaque;
263      LinuxAioState *s = container_of(e, LinuxAioState, e);
264      struct io_event *events;
265  
266      return io_getevents_peek(s->ctx, &events);
267  }
268  
269  static void qemu_laio_poll_ready(EventNotifier *opaque)
270  {
271      EventNotifier *e = opaque;
272      LinuxAioState *s = container_of(e, LinuxAioState, e);
273  
274      qemu_laio_process_completions_and_submit(s);
275  }
276  
277  static void ioq_init(LaioQueue *io_q)
278  {
279      QSIMPLEQ_INIT(&io_q->pending);
280      io_q->plugged = 0;
281      io_q->in_queue = 0;
282      io_q->in_flight = 0;
283      io_q->blocked = false;
284  }
285  
286  static void ioq_submit(LinuxAioState *s)
287  {
288      int ret, len;
289      struct qemu_laiocb *aiocb;
290      struct iocb *iocbs[MAX_EVENTS];
291      QSIMPLEQ_HEAD(, qemu_laiocb) completed;
292  
293      do {
294          if (s->io_q.in_flight >= MAX_EVENTS) {
295              break;
296          }
297          len = 0;
298          QSIMPLEQ_FOREACH(aiocb, &s->io_q.pending, next) {
299              iocbs[len++] = &aiocb->iocb;
300              if (s->io_q.in_flight + len >= MAX_EVENTS) {
301                  break;
302              }
303          }
304  
305          ret = io_submit(s->ctx, len, iocbs);
306          if (ret == -EAGAIN) {
307              break;
308          }
309          if (ret < 0) {
310              /* Fail the first request, retry the rest */
311              aiocb = QSIMPLEQ_FIRST(&s->io_q.pending);
312              QSIMPLEQ_REMOVE_HEAD(&s->io_q.pending, next);
313              s->io_q.in_queue--;
314              aiocb->ret = ret;
315              qemu_laio_process_completion(aiocb);
316              continue;
317          }
318  
319          s->io_q.in_flight += ret;
320          s->io_q.in_queue  -= ret;
321          aiocb = container_of(iocbs[ret - 1], struct qemu_laiocb, iocb);
322          QSIMPLEQ_SPLIT_AFTER(&s->io_q.pending, aiocb, next, &completed);
323      } while (ret == len && !QSIMPLEQ_EMPTY(&s->io_q.pending));
324      s->io_q.blocked = (s->io_q.in_queue > 0);
325  
326      if (s->io_q.in_flight) {
327          /* We can try to complete something just right away if there are
328           * still requests in-flight. */
329          qemu_laio_process_completions(s);
330          /*
331           * Even we have completed everything (in_flight == 0), the queue can
332           * have still pended requests (in_queue > 0).  We do not attempt to
333           * repeat submission to avoid IO hang.  The reason is simple: s->e is
334           * still set and completion callback will be called shortly and all
335           * pended requests will be submitted from there.
336           */
337      }
338  }
339  
340  static uint64_t laio_max_batch(LinuxAioState *s, uint64_t dev_max_batch)
341  {
342      uint64_t max_batch = s->aio_context->aio_max_batch ?: DEFAULT_MAX_BATCH;
343  
344      /*
345       * AIO context can be shared between multiple block devices, so
346       * `dev_max_batch` allows reducing the batch size for latency-sensitive
347       * devices.
348       */
349      max_batch = MIN_NON_ZERO(dev_max_batch, max_batch);
350  
351      /* limit the batch with the number of available events */
352      max_batch = MIN_NON_ZERO(MAX_EVENTS - s->io_q.in_flight, max_batch);
353  
354      return max_batch;
355  }
356  
357  void laio_io_plug(BlockDriverState *bs, LinuxAioState *s)
358  {
359      s->io_q.plugged++;
360  }
361  
362  void laio_io_unplug(BlockDriverState *bs, LinuxAioState *s,
363                      uint64_t dev_max_batch)
364  {
365      assert(s->io_q.plugged);
366      s->io_q.plugged--;
367  
368      /*
369       * Why max batch checking is performed here:
370       * Another BDS may have queued requests with a higher dev_max_batch and
371       * therefore in_queue could now exceed our dev_max_batch. Re-check the max
372       * batch so we can honor our device's dev_max_batch.
373       */
374      if (s->io_q.in_queue >= laio_max_batch(s, dev_max_batch) ||
375          (!s->io_q.plugged &&
376           !s->io_q.blocked && !QSIMPLEQ_EMPTY(&s->io_q.pending))) {
377          ioq_submit(s);
378      }
379  }
380  
381  static int laio_do_submit(int fd, struct qemu_laiocb *laiocb, off_t offset,
382                            int type, uint64_t dev_max_batch)
383  {
384      LinuxAioState *s = laiocb->ctx;
385      struct iocb *iocbs = &laiocb->iocb;
386      QEMUIOVector *qiov = laiocb->qiov;
387  
388      switch (type) {
389      case QEMU_AIO_WRITE:
390          io_prep_pwritev(iocbs, fd, qiov->iov, qiov->niov, offset);
391          break;
392      case QEMU_AIO_READ:
393          io_prep_preadv(iocbs, fd, qiov->iov, qiov->niov, offset);
394          break;
395      /* Currently Linux kernel does not support other operations */
396      default:
397          fprintf(stderr, "%s: invalid AIO request type 0x%x.\n",
398                          __func__, type);
399          return -EIO;
400      }
401      io_set_eventfd(&laiocb->iocb, event_notifier_get_fd(&s->e));
402  
403      QSIMPLEQ_INSERT_TAIL(&s->io_q.pending, laiocb, next);
404      s->io_q.in_queue++;
405      if (!s->io_q.blocked &&
406          (!s->io_q.plugged ||
407           s->io_q.in_queue >= laio_max_batch(s, dev_max_batch))) {
408          ioq_submit(s);
409      }
410  
411      return 0;
412  }
413  
414  int coroutine_fn laio_co_submit(BlockDriverState *bs, LinuxAioState *s, int fd,
415                                  uint64_t offset, QEMUIOVector *qiov, int type,
416                                  uint64_t dev_max_batch)
417  {
418      int ret;
419      struct qemu_laiocb laiocb = {
420          .co         = qemu_coroutine_self(),
421          .nbytes     = qiov->size,
422          .ctx        = s,
423          .ret        = -EINPROGRESS,
424          .is_read    = (type == QEMU_AIO_READ),
425          .qiov       = qiov,
426      };
427  
428      ret = laio_do_submit(fd, &laiocb, offset, type, dev_max_batch);
429      if (ret < 0) {
430          return ret;
431      }
432  
433      if (laiocb.ret == -EINPROGRESS) {
434          qemu_coroutine_yield();
435      }
436      return laiocb.ret;
437  }
438  
439  void laio_detach_aio_context(LinuxAioState *s, AioContext *old_context)
440  {
441      aio_set_event_notifier(old_context, &s->e, false, NULL, NULL, NULL);
442      qemu_bh_delete(s->completion_bh);
443      s->aio_context = NULL;
444  }
445  
446  void laio_attach_aio_context(LinuxAioState *s, AioContext *new_context)
447  {
448      s->aio_context = new_context;
449      s->completion_bh = aio_bh_new(new_context, qemu_laio_completion_bh, s);
450      aio_set_event_notifier(new_context, &s->e, false,
451                             qemu_laio_completion_cb,
452                             qemu_laio_poll_cb,
453                             qemu_laio_poll_ready);
454  }
455  
456  LinuxAioState *laio_init(Error **errp)
457  {
458      int rc;
459      LinuxAioState *s;
460  
461      s = g_malloc0(sizeof(*s));
462      rc = event_notifier_init(&s->e, false);
463      if (rc < 0) {
464          error_setg_errno(errp, -rc, "failed to initialize event notifier");
465          goto out_free_state;
466      }
467  
468      rc = io_setup(MAX_EVENTS, &s->ctx);
469      if (rc < 0) {
470          error_setg_errno(errp, -rc, "failed to create linux AIO context");
471          goto out_close_efd;
472      }
473  
474      ioq_init(&s->io_q);
475  
476      return s;
477  
478  out_close_efd:
479      event_notifier_cleanup(&s->e);
480  out_free_state:
481      g_free(s);
482      return NULL;
483  }
484  
485  void laio_cleanup(LinuxAioState *s)
486  {
487      event_notifier_cleanup(&s->e);
488  
489      if (io_destroy(s->ctx) != 0) {
490          fprintf(stderr, "%s: destroy AIO context %p failed\n",
491                          __func__, &s->ctx);
492      }
493      g_free(s);
494  }
495