xref: /openbmc/qemu/block/io_uring.c (revision a9bc470ec208bd27a82100abc9dccf1b69f41b45)
1  /*
2   * Linux io_uring support.
3   *
4   * Copyright (C) 2009 IBM, Corp.
5   * Copyright (C) 2009 Red Hat, Inc.
6   * Copyright (C) 2019 Aarushi Mehta
7   *
8   * This work is licensed under the terms of the GNU GPL, version 2 or later.
9   * See the COPYING file in the top-level directory.
10   */
11  #include "qemu/osdep.h"
12  #include <liburing.h>
13  #include "block/aio.h"
14  #include "qemu/queue.h"
15  #include "block/block.h"
16  #include "block/raw-aio.h"
17  #include "qemu/coroutine.h"
18  #include "qemu/defer-call.h"
19  #include "qapi/error.h"
20  #include "sysemu/block-backend.h"
21  #include "trace.h"
22  
23  /* Only used for assertions.  */
24  #include "qemu/coroutine_int.h"
25  
26  /* io_uring ring size */
27  #define MAX_ENTRIES 128
28  
29  typedef struct LuringAIOCB {
30      Coroutine *co;
31      struct io_uring_sqe sqeq;
32      ssize_t ret;
33      QEMUIOVector *qiov;
34      bool is_read;
35      QSIMPLEQ_ENTRY(LuringAIOCB) next;
36  
37      /*
38       * Buffered reads may require resubmission, see
39       * luring_resubmit_short_read().
40       */
41      int total_read;
42      QEMUIOVector resubmit_qiov;
43  } LuringAIOCB;
44  
45  typedef struct LuringQueue {
46      unsigned int in_queue;
47      unsigned int in_flight;
48      bool blocked;
49      QSIMPLEQ_HEAD(, LuringAIOCB) submit_queue;
50  } LuringQueue;
51  
52  typedef struct LuringState {
53      AioContext *aio_context;
54  
55      struct io_uring ring;
56  
57      /* No locking required, only accessed from AioContext home thread */
58      LuringQueue io_q;
59  
60      QEMUBH *completion_bh;
61  } LuringState;
62  
63  /**
64   * luring_resubmit:
65   *
66   * Resubmit a request by appending it to submit_queue.  The caller must ensure
67   * that ioq_submit() is called later so that submit_queue requests are started.
68   */
69  static void luring_resubmit(LuringState *s, LuringAIOCB *luringcb)
70  {
71      QSIMPLEQ_INSERT_TAIL(&s->io_q.submit_queue, luringcb, next);
72      s->io_q.in_queue++;
73  }
74  
75  /**
76   * luring_resubmit_short_read:
77   *
78   * Short reads are rare but may occur. The remaining read request needs to be
79   * resubmitted.
80   */
81  static void luring_resubmit_short_read(LuringState *s, LuringAIOCB *luringcb,
82                                         int nread)
83  {
84      QEMUIOVector *resubmit_qiov;
85      size_t remaining;
86  
87      trace_luring_resubmit_short_read(s, luringcb, nread);
88  
89      /* Update read position */
90      luringcb->total_read += nread;
91      remaining = luringcb->qiov->size - luringcb->total_read;
92  
93      /* Shorten qiov */
94      resubmit_qiov = &luringcb->resubmit_qiov;
95      if (resubmit_qiov->iov == NULL) {
96          qemu_iovec_init(resubmit_qiov, luringcb->qiov->niov);
97      } else {
98          qemu_iovec_reset(resubmit_qiov);
99      }
100      qemu_iovec_concat(resubmit_qiov, luringcb->qiov, luringcb->total_read,
101                        remaining);
102  
103      /* Update sqe */
104      luringcb->sqeq.off += nread;
105      luringcb->sqeq.addr = (__u64)(uintptr_t)luringcb->resubmit_qiov.iov;
106      luringcb->sqeq.len = luringcb->resubmit_qiov.niov;
107  
108      luring_resubmit(s, luringcb);
109  }
110  
111  /**
112   * luring_process_completions:
113   * @s: AIO state
114   *
115   * Fetches completed I/O requests, consumes cqes and invokes their callbacks
116   * The function is somewhat tricky because it supports nested event loops, for
117   * example when a request callback invokes aio_poll().
118   *
119   * Function schedules BH completion so it  can be called again in a nested
120   * event loop.  When there are no events left  to complete the BH is being
121   * canceled.
122   *
123   */
124  static void luring_process_completions(LuringState *s)
125  {
126      struct io_uring_cqe *cqes;
127      int total_bytes;
128  
129      defer_call_begin();
130  
131      /*
132       * Request completion callbacks can run the nested event loop.
133       * Schedule ourselves so the nested event loop will "see" remaining
134       * completed requests and process them.  Without this, completion
135       * callbacks that wait for other requests using a nested event loop
136       * would hang forever.
137       *
138       * This workaround is needed because io_uring uses poll_wait, which
139       * is woken up when new events are added to the uring, thus polling on
140       * the same uring fd will block unless more events are received.
141       *
142       * Other leaf block drivers (drivers that access the data themselves)
143       * are networking based, so they poll sockets for data and run the
144       * correct coroutine.
145       */
146      qemu_bh_schedule(s->completion_bh);
147  
148      while (io_uring_peek_cqe(&s->ring, &cqes) == 0) {
149          LuringAIOCB *luringcb;
150          int ret;
151  
152          if (!cqes) {
153              break;
154          }
155  
156          luringcb = io_uring_cqe_get_data(cqes);
157          ret = cqes->res;
158          io_uring_cqe_seen(&s->ring, cqes);
159          cqes = NULL;
160  
161          /* Change counters one-by-one because we can be nested. */
162          s->io_q.in_flight--;
163          trace_luring_process_completion(s, luringcb, ret);
164  
165          /* total_read is non-zero only for resubmitted read requests */
166          total_bytes = ret + luringcb->total_read;
167  
168          if (ret < 0) {
169              /*
170               * Only writev/readv/fsync requests on regular files or host block
171               * devices are submitted. Therefore -EAGAIN is not expected but it's
172               * known to happen sometimes with Linux SCSI. Submit again and hope
173               * the request completes successfully.
174               *
175               * For more information, see:
176               * https://lore.kernel.org/io-uring/20210727165811.284510-3-axboe@kernel.dk/T/#u
177               *
178               * If the code is changed to submit other types of requests in the
179               * future, then this workaround may need to be extended to deal with
180               * genuine -EAGAIN results that should not be resubmitted
181               * immediately.
182               */
183              if (ret == -EINTR || ret == -EAGAIN) {
184                  luring_resubmit(s, luringcb);
185                  continue;
186              }
187          } else if (!luringcb->qiov) {
188              goto end;
189          } else if (total_bytes == luringcb->qiov->size) {
190              ret = 0;
191          /* Only read/write */
192          } else {
193              /* Short Read/Write */
194              if (luringcb->is_read) {
195                  if (ret > 0) {
196                      luring_resubmit_short_read(s, luringcb, ret);
197                      continue;
198                  } else {
199                      /* Pad with zeroes */
200                      qemu_iovec_memset(luringcb->qiov, total_bytes, 0,
201                                        luringcb->qiov->size - total_bytes);
202                      ret = 0;
203                  }
204              } else {
205                  ret = -ENOSPC;
206              }
207          }
208  end:
209          luringcb->ret = ret;
210          qemu_iovec_destroy(&luringcb->resubmit_qiov);
211  
212          /*
213           * If the coroutine is already entered it must be in ioq_submit()
214           * and will notice luringcb->ret has been filled in when it
215           * eventually runs later. Coroutines cannot be entered recursively
216           * so avoid doing that!
217           */
218          assert(luringcb->co->ctx == s->aio_context);
219          if (!qemu_coroutine_entered(luringcb->co)) {
220              aio_co_wake(luringcb->co);
221          }
222      }
223  
224      qemu_bh_cancel(s->completion_bh);
225  
226      defer_call_end();
227  }
228  
229  static int ioq_submit(LuringState *s)
230  {
231      int ret = 0;
232      LuringAIOCB *luringcb, *luringcb_next;
233  
234      while (s->io_q.in_queue > 0) {
235          /*
236           * Try to fetch sqes from the ring for requests waiting in
237           * the overflow queue
238           */
239          QSIMPLEQ_FOREACH_SAFE(luringcb, &s->io_q.submit_queue, next,
240                                luringcb_next) {
241              struct io_uring_sqe *sqes = io_uring_get_sqe(&s->ring);
242              if (!sqes) {
243                  break;
244              }
245              /* Prep sqe for submission */
246              *sqes = luringcb->sqeq;
247              QSIMPLEQ_REMOVE_HEAD(&s->io_q.submit_queue, next);
248          }
249          ret = io_uring_submit(&s->ring);
250          trace_luring_io_uring_submit(s, ret);
251          /* Prevent infinite loop if submission is refused */
252          if (ret <= 0) {
253              if (ret == -EAGAIN || ret == -EINTR) {
254                  continue;
255              }
256              break;
257          }
258          s->io_q.in_flight += ret;
259          s->io_q.in_queue  -= ret;
260      }
261      s->io_q.blocked = (s->io_q.in_queue > 0);
262  
263      if (s->io_q.in_flight) {
264          /*
265           * We can try to complete something just right away if there are
266           * still requests in-flight.
267           */
268          luring_process_completions(s);
269      }
270      return ret;
271  }
272  
273  static void luring_process_completions_and_submit(LuringState *s)
274  {
275      luring_process_completions(s);
276  
277      if (s->io_q.in_queue > 0) {
278          ioq_submit(s);
279      }
280  }
281  
282  static void qemu_luring_completion_bh(void *opaque)
283  {
284      LuringState *s = opaque;
285      luring_process_completions_and_submit(s);
286  }
287  
288  static void qemu_luring_completion_cb(void *opaque)
289  {
290      LuringState *s = opaque;
291      luring_process_completions_and_submit(s);
292  }
293  
294  static bool qemu_luring_poll_cb(void *opaque)
295  {
296      LuringState *s = opaque;
297  
298      return io_uring_cq_ready(&s->ring);
299  }
300  
301  static void qemu_luring_poll_ready(void *opaque)
302  {
303      LuringState *s = opaque;
304  
305      luring_process_completions_and_submit(s);
306  }
307  
308  static void ioq_init(LuringQueue *io_q)
309  {
310      QSIMPLEQ_INIT(&io_q->submit_queue);
311      io_q->in_queue = 0;
312      io_q->in_flight = 0;
313      io_q->blocked = false;
314  }
315  
316  static void luring_deferred_fn(void *opaque)
317  {
318      LuringState *s = opaque;
319      trace_luring_unplug_fn(s, s->io_q.blocked, s->io_q.in_queue,
320                             s->io_q.in_flight);
321      if (!s->io_q.blocked && s->io_q.in_queue > 0) {
322          ioq_submit(s);
323      }
324  }
325  
326  /**
327   * luring_do_submit:
328   * @fd: file descriptor for I/O
329   * @luringcb: AIO control block
330   * @s: AIO state
331   * @offset: offset for request
332   * @type: type of request
333   *
334   * Fetches sqes from ring, adds to pending queue and preps them
335   *
336   */
337  static int luring_do_submit(int fd, LuringAIOCB *luringcb, LuringState *s,
338                              uint64_t offset, int type)
339  {
340      int ret;
341      struct io_uring_sqe *sqes = &luringcb->sqeq;
342  
343      switch (type) {
344      case QEMU_AIO_WRITE:
345          io_uring_prep_writev(sqes, fd, luringcb->qiov->iov,
346                               luringcb->qiov->niov, offset);
347          break;
348      case QEMU_AIO_ZONE_APPEND:
349          io_uring_prep_writev(sqes, fd, luringcb->qiov->iov,
350                               luringcb->qiov->niov, offset);
351          break;
352      case QEMU_AIO_READ:
353          io_uring_prep_readv(sqes, fd, luringcb->qiov->iov,
354                              luringcb->qiov->niov, offset);
355          break;
356      case QEMU_AIO_FLUSH:
357          io_uring_prep_fsync(sqes, fd, IORING_FSYNC_DATASYNC);
358          break;
359      default:
360          fprintf(stderr, "%s: invalid AIO request type, aborting 0x%x.\n",
361                          __func__, type);
362          abort();
363      }
364      io_uring_sqe_set_data(sqes, luringcb);
365  
366      QSIMPLEQ_INSERT_TAIL(&s->io_q.submit_queue, luringcb, next);
367      s->io_q.in_queue++;
368      trace_luring_do_submit(s, s->io_q.blocked, s->io_q.in_queue,
369                             s->io_q.in_flight);
370      if (!s->io_q.blocked) {
371          if (s->io_q.in_flight + s->io_q.in_queue >= MAX_ENTRIES) {
372              ret = ioq_submit(s);
373              trace_luring_do_submit_done(s, ret);
374              return ret;
375          }
376  
377          defer_call(luring_deferred_fn, s);
378      }
379      return 0;
380  }
381  
382  int coroutine_fn luring_co_submit(BlockDriverState *bs, int fd, uint64_t offset,
383                                    QEMUIOVector *qiov, int type)
384  {
385      int ret;
386      AioContext *ctx = qemu_get_current_aio_context();
387      LuringState *s = aio_get_linux_io_uring(ctx);
388      LuringAIOCB luringcb = {
389          .co         = qemu_coroutine_self(),
390          .ret        = -EINPROGRESS,
391          .qiov       = qiov,
392          .is_read    = (type == QEMU_AIO_READ),
393      };
394      trace_luring_co_submit(bs, s, &luringcb, fd, offset, qiov ? qiov->size : 0,
395                             type);
396      ret = luring_do_submit(fd, &luringcb, s, offset, type);
397  
398      if (ret < 0) {
399          return ret;
400      }
401  
402      if (luringcb.ret == -EINPROGRESS) {
403          qemu_coroutine_yield();
404      }
405      return luringcb.ret;
406  }
407  
408  void luring_detach_aio_context(LuringState *s, AioContext *old_context)
409  {
410      aio_set_fd_handler(old_context, s->ring.ring_fd,
411                         NULL, NULL, NULL, NULL, s);
412      qemu_bh_delete(s->completion_bh);
413      s->aio_context = NULL;
414  }
415  
416  void luring_attach_aio_context(LuringState *s, AioContext *new_context)
417  {
418      s->aio_context = new_context;
419      s->completion_bh = aio_bh_new(new_context, qemu_luring_completion_bh, s);
420      aio_set_fd_handler(s->aio_context, s->ring.ring_fd,
421                         qemu_luring_completion_cb, NULL,
422                         qemu_luring_poll_cb, qemu_luring_poll_ready, s);
423  }
424  
425  LuringState *luring_init(Error **errp)
426  {
427      int rc;
428      LuringState *s = g_new0(LuringState, 1);
429      struct io_uring *ring = &s->ring;
430  
431      trace_luring_init_state(s, sizeof(*s));
432  
433      rc = io_uring_queue_init(MAX_ENTRIES, ring, 0);
434      if (rc < 0) {
435          error_setg_errno(errp, errno, "failed to init linux io_uring ring");
436          g_free(s);
437          return NULL;
438      }
439  
440      ioq_init(&s->io_q);
441      return s;
442  
443  }
444  
445  void luring_cleanup(LuringState *s)
446  {
447      io_uring_queue_exit(&s->ring);
448      trace_luring_cleanup_state(s);
449      g_free(s);
450  }
451