1 // SPDX-License-Identifier: GPL-2.0
2 /*
3 * Shared application/kernel submission and completion ring pairs, for
4 * supporting fast/efficient IO.
5 *
6 * A note on the read/write ordering memory barriers that are matched between
7 * the application and kernel side.
8 *
9 * After the application reads the CQ ring tail, it must use an
10 * appropriate smp_rmb() to pair with the smp_wmb() the kernel uses
11 * before writing the tail (using smp_load_acquire to read the tail will
12 * do). It also needs a smp_mb() before updating CQ head (ordering the
13 * entry load(s) with the head store), pairing with an implicit barrier
14 * through a control-dependency in io_get_cqe (smp_store_release to
15 * store head will do). Failure to do so could lead to reading invalid
16 * CQ entries.
17 *
18 * Likewise, the application must use an appropriate smp_wmb() before
19 * writing the SQ tail (ordering SQ entry stores with the tail store),
20 * which pairs with smp_load_acquire in io_get_sqring (smp_store_release
21 * to store the tail will do). And it needs a barrier ordering the SQ
22 * head load before writing new SQ entries (smp_load_acquire to read
23 * head will do).
24 *
25 * When using the SQ poll thread (IORING_SETUP_SQPOLL), the application
26 * needs to check the SQ flags for IORING_SQ_NEED_WAKEUP *after*
27 * updating the SQ tail; a full memory barrier smp_mb() is needed
28 * between.
29 *
30 * Also see the examples in the liburing library:
31 *
32 * git://git.kernel.dk/liburing
33 *
34 * io_uring also uses READ/WRITE_ONCE() for _any_ store or load that happens
35 * from data shared between the kernel and application. This is done both
36 * for ordering purposes, but also to ensure that once a value is loaded from
37 * data that the application could potentially modify, it remains stable.
38 *
39 * Copyright (C) 2018-2019 Jens Axboe
40 * Copyright (c) 2018-2019 Christoph Hellwig
41 */
42 #include <linux/kernel.h>
43 #include <linux/init.h>
44 #include <linux/errno.h>
45 #include <linux/syscalls.h>
46 #include <net/compat.h>
47 #include <linux/refcount.h>
48 #include <linux/uio.h>
49 #include <linux/bits.h>
50
51 #include <linux/sched/signal.h>
52 #include <linux/fs.h>
53 #include <linux/file.h>
54 #include <linux/fdtable.h>
55 #include <linux/mm.h>
56 #include <linux/mman.h>
57 #include <linux/percpu.h>
58 #include <linux/slab.h>
59 #include <linux/bvec.h>
60 #include <linux/net.h>
61 #include <net/sock.h>
62 #include <net/af_unix.h>
63 #include <linux/anon_inodes.h>
64 #include <linux/sched/mm.h>
65 #include <linux/uaccess.h>
66 #include <linux/nospec.h>
67 #include <linux/highmem.h>
68 #include <linux/fsnotify.h>
69 #include <linux/fadvise.h>
70 #include <linux/task_work.h>
71 #include <linux/io_uring.h>
72 #include <linux/audit.h>
73 #include <linux/security.h>
74 #include <asm/shmparam.h>
75
76 #define CREATE_TRACE_POINTS
77 #include <trace/events/io_uring.h>
78
79 #include <uapi/linux/io_uring.h>
80
81 #include "io-wq.h"
82
83 #include "io_uring.h"
84 #include "opdef.h"
85 #include "refs.h"
86 #include "tctx.h"
87 #include "sqpoll.h"
88 #include "fdinfo.h"
89 #include "kbuf.h"
90 #include "rsrc.h"
91 #include "cancel.h"
92 #include "net.h"
93 #include "notif.h"
94
95 #include "timeout.h"
96 #include "poll.h"
97 #include "rw.h"
98 #include "alloc_cache.h"
99
100 #define IORING_MAX_ENTRIES 32768
101 #define IORING_MAX_CQ_ENTRIES (2 * IORING_MAX_ENTRIES)
102
103 #define IORING_MAX_RESTRICTIONS (IORING_RESTRICTION_LAST + \
104 IORING_REGISTER_LAST + IORING_OP_LAST)
105
106 #define SQE_COMMON_FLAGS (IOSQE_FIXED_FILE | IOSQE_IO_LINK | \
107 IOSQE_IO_HARDLINK | IOSQE_ASYNC)
108
109 #define SQE_VALID_FLAGS (SQE_COMMON_FLAGS | IOSQE_BUFFER_SELECT | \
110 IOSQE_IO_DRAIN | IOSQE_CQE_SKIP_SUCCESS)
111
112 #define IO_REQ_CLEAN_FLAGS (REQ_F_BUFFER_SELECTED | REQ_F_NEED_CLEANUP | \
113 REQ_F_POLLED | REQ_F_INFLIGHT | REQ_F_CREDS | \
114 REQ_F_ASYNC_DATA)
115
116 #define IO_REQ_CLEAN_SLOW_FLAGS (REQ_F_REFCOUNT | REQ_F_LINK | REQ_F_HARDLINK |\
117 IO_REQ_CLEAN_FLAGS)
118
119 #define IO_TCTX_REFS_CACHE_NR (1U << 10)
120
121 #define IO_COMPL_BATCH 32
122 #define IO_REQ_ALLOC_BATCH 8
123
124 enum {
125 IO_CHECK_CQ_OVERFLOW_BIT,
126 IO_CHECK_CQ_DROPPED_BIT,
127 };
128
129 enum {
130 IO_EVENTFD_OP_SIGNAL_BIT,
131 IO_EVENTFD_OP_FREE_BIT,
132 };
133
134 struct io_defer_entry {
135 struct list_head list;
136 struct io_kiocb *req;
137 u32 seq;
138 };
139
140 /* requests with any of those set should undergo io_disarm_next() */
141 #define IO_DISARM_MASK (REQ_F_ARM_LTIMEOUT | REQ_F_LINK_TIMEOUT | REQ_F_FAIL)
142 #define IO_REQ_LINK_FLAGS (REQ_F_LINK | REQ_F_HARDLINK)
143
144 static bool io_uring_try_cancel_requests(struct io_ring_ctx *ctx,
145 struct task_struct *task,
146 bool cancel_all);
147
148 static void io_queue_sqe(struct io_kiocb *req);
149
150 struct kmem_cache *req_cachep;
151 static struct workqueue_struct *iou_wq __ro_after_init;
152
153 static int __read_mostly sysctl_io_uring_disabled;
154 static int __read_mostly sysctl_io_uring_group = -1;
155
156 #ifdef CONFIG_SYSCTL
157 static struct ctl_table kernel_io_uring_disabled_table[] = {
158 {
159 .procname = "io_uring_disabled",
160 .data = &sysctl_io_uring_disabled,
161 .maxlen = sizeof(sysctl_io_uring_disabled),
162 .mode = 0644,
163 .proc_handler = proc_dointvec_minmax,
164 .extra1 = SYSCTL_ZERO,
165 .extra2 = SYSCTL_TWO,
166 },
167 {
168 .procname = "io_uring_group",
169 .data = &sysctl_io_uring_group,
170 .maxlen = sizeof(gid_t),
171 .mode = 0644,
172 .proc_handler = proc_dointvec,
173 },
174 {},
175 };
176 #endif
177
io_submit_flush_completions(struct io_ring_ctx * ctx)178 static inline void io_submit_flush_completions(struct io_ring_ctx *ctx)
179 {
180 if (!wq_list_empty(&ctx->submit_state.compl_reqs) ||
181 ctx->submit_state.cqes_count)
182 __io_submit_flush_completions(ctx);
183 }
184
__io_cqring_events(struct io_ring_ctx * ctx)185 static inline unsigned int __io_cqring_events(struct io_ring_ctx *ctx)
186 {
187 return ctx->cached_cq_tail - READ_ONCE(ctx->rings->cq.head);
188 }
189
__io_cqring_events_user(struct io_ring_ctx * ctx)190 static inline unsigned int __io_cqring_events_user(struct io_ring_ctx *ctx)
191 {
192 return READ_ONCE(ctx->rings->cq.tail) - READ_ONCE(ctx->rings->cq.head);
193 }
194
io_match_linked(struct io_kiocb * head)195 static bool io_match_linked(struct io_kiocb *head)
196 {
197 struct io_kiocb *req;
198
199 io_for_each_link(req, head) {
200 if (req->flags & REQ_F_INFLIGHT)
201 return true;
202 }
203 return false;
204 }
205
206 /*
207 * As io_match_task() but protected against racing with linked timeouts.
208 * User must not hold timeout_lock.
209 */
io_match_task_safe(struct io_kiocb * head,struct task_struct * task,bool cancel_all)210 bool io_match_task_safe(struct io_kiocb *head, struct task_struct *task,
211 bool cancel_all)
212 {
213 bool matched;
214
215 if (task && head->task != task)
216 return false;
217 if (cancel_all)
218 return true;
219
220 if (head->flags & REQ_F_LINK_TIMEOUT) {
221 struct io_ring_ctx *ctx = head->ctx;
222
223 /* protect against races with linked timeouts */
224 spin_lock_irq(&ctx->timeout_lock);
225 matched = io_match_linked(head);
226 spin_unlock_irq(&ctx->timeout_lock);
227 } else {
228 matched = io_match_linked(head);
229 }
230 return matched;
231 }
232
req_fail_link_node(struct io_kiocb * req,int res)233 static inline void req_fail_link_node(struct io_kiocb *req, int res)
234 {
235 req_set_fail(req);
236 io_req_set_res(req, res, 0);
237 }
238
io_req_add_to_cache(struct io_kiocb * req,struct io_ring_ctx * ctx)239 static inline void io_req_add_to_cache(struct io_kiocb *req, struct io_ring_ctx *ctx)
240 {
241 wq_stack_add_head(&req->comp_list, &ctx->submit_state.free_list);
242 }
243
io_ring_ctx_ref_free(struct percpu_ref * ref)244 static __cold void io_ring_ctx_ref_free(struct percpu_ref *ref)
245 {
246 struct io_ring_ctx *ctx = container_of(ref, struct io_ring_ctx, refs);
247
248 complete(&ctx->ref_comp);
249 }
250
io_fallback_req_func(struct work_struct * work)251 static __cold void io_fallback_req_func(struct work_struct *work)
252 {
253 struct io_ring_ctx *ctx = container_of(work, struct io_ring_ctx,
254 fallback_work.work);
255 struct llist_node *node = llist_del_all(&ctx->fallback_llist);
256 struct io_kiocb *req, *tmp;
257 struct io_tw_state ts = { .locked = true, };
258
259 percpu_ref_get(&ctx->refs);
260 mutex_lock(&ctx->uring_lock);
261 llist_for_each_entry_safe(req, tmp, node, io_task_work.node)
262 req->io_task_work.func(req, &ts);
263 if (WARN_ON_ONCE(!ts.locked))
264 return;
265 io_submit_flush_completions(ctx);
266 mutex_unlock(&ctx->uring_lock);
267 percpu_ref_put(&ctx->refs);
268 }
269
io_alloc_hash_table(struct io_hash_table * table,unsigned bits)270 static int io_alloc_hash_table(struct io_hash_table *table, unsigned bits)
271 {
272 unsigned hash_buckets = 1U << bits;
273 size_t hash_size = hash_buckets * sizeof(table->hbs[0]);
274
275 table->hbs = kmalloc(hash_size, GFP_KERNEL);
276 if (!table->hbs)
277 return -ENOMEM;
278
279 table->hash_bits = bits;
280 init_hash_table(table, hash_buckets);
281 return 0;
282 }
283
io_ring_ctx_alloc(struct io_uring_params * p)284 static __cold struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p)
285 {
286 struct io_ring_ctx *ctx;
287 int hash_bits;
288
289 ctx = kzalloc(sizeof(*ctx), GFP_KERNEL);
290 if (!ctx)
291 return NULL;
292
293 xa_init(&ctx->io_bl_xa);
294
295 /*
296 * Use 5 bits less than the max cq entries, that should give us around
297 * 32 entries per hash list if totally full and uniformly spread, but
298 * don't keep too many buckets to not overconsume memory.
299 */
300 hash_bits = ilog2(p->cq_entries) - 5;
301 hash_bits = clamp(hash_bits, 1, 8);
302 if (io_alloc_hash_table(&ctx->cancel_table, hash_bits))
303 goto err;
304 if (io_alloc_hash_table(&ctx->cancel_table_locked, hash_bits))
305 goto err;
306 if (percpu_ref_init(&ctx->refs, io_ring_ctx_ref_free,
307 0, GFP_KERNEL))
308 goto err;
309
310 ctx->flags = p->flags;
311 init_waitqueue_head(&ctx->sqo_sq_wait);
312 INIT_LIST_HEAD(&ctx->sqd_list);
313 INIT_LIST_HEAD(&ctx->cq_overflow_list);
314 INIT_LIST_HEAD(&ctx->io_buffers_cache);
315 INIT_HLIST_HEAD(&ctx->io_buf_list);
316 io_alloc_cache_init(&ctx->rsrc_node_cache, IO_NODE_ALLOC_CACHE_MAX,
317 sizeof(struct io_rsrc_node));
318 io_alloc_cache_init(&ctx->apoll_cache, IO_ALLOC_CACHE_MAX,
319 sizeof(struct async_poll));
320 io_alloc_cache_init(&ctx->netmsg_cache, IO_ALLOC_CACHE_MAX,
321 sizeof(struct io_async_msghdr));
322 init_completion(&ctx->ref_comp);
323 xa_init_flags(&ctx->personalities, XA_FLAGS_ALLOC1);
324 mutex_init(&ctx->uring_lock);
325 init_waitqueue_head(&ctx->cq_wait);
326 init_waitqueue_head(&ctx->poll_wq);
327 init_waitqueue_head(&ctx->rsrc_quiesce_wq);
328 spin_lock_init(&ctx->completion_lock);
329 spin_lock_init(&ctx->timeout_lock);
330 INIT_WQ_LIST(&ctx->iopoll_list);
331 INIT_LIST_HEAD(&ctx->io_buffers_pages);
332 INIT_LIST_HEAD(&ctx->io_buffers_comp);
333 INIT_LIST_HEAD(&ctx->defer_list);
334 INIT_LIST_HEAD(&ctx->timeout_list);
335 INIT_LIST_HEAD(&ctx->ltimeout_list);
336 INIT_LIST_HEAD(&ctx->rsrc_ref_list);
337 init_llist_head(&ctx->work_llist);
338 INIT_LIST_HEAD(&ctx->tctx_list);
339 ctx->submit_state.free_list.next = NULL;
340 INIT_WQ_LIST(&ctx->locked_free_list);
341 INIT_DELAYED_WORK(&ctx->fallback_work, io_fallback_req_func);
342 INIT_WQ_LIST(&ctx->submit_state.compl_reqs);
343 return ctx;
344 err:
345 kfree(ctx->cancel_table.hbs);
346 kfree(ctx->cancel_table_locked.hbs);
347 xa_destroy(&ctx->io_bl_xa);
348 kfree(ctx);
349 return NULL;
350 }
351
io_account_cq_overflow(struct io_ring_ctx * ctx)352 static void io_account_cq_overflow(struct io_ring_ctx *ctx)
353 {
354 struct io_rings *r = ctx->rings;
355
356 WRITE_ONCE(r->cq_overflow, READ_ONCE(r->cq_overflow) + 1);
357 ctx->cq_extra--;
358 }
359
req_need_defer(struct io_kiocb * req,u32 seq)360 static bool req_need_defer(struct io_kiocb *req, u32 seq)
361 {
362 if (unlikely(req->flags & REQ_F_IO_DRAIN)) {
363 struct io_ring_ctx *ctx = req->ctx;
364
365 return seq + READ_ONCE(ctx->cq_extra) != ctx->cached_cq_tail;
366 }
367
368 return false;
369 }
370
io_clean_op(struct io_kiocb * req)371 static void io_clean_op(struct io_kiocb *req)
372 {
373 if (req->flags & REQ_F_BUFFER_SELECTED) {
374 spin_lock(&req->ctx->completion_lock);
375 io_put_kbuf_comp(req);
376 spin_unlock(&req->ctx->completion_lock);
377 }
378
379 if (req->flags & REQ_F_NEED_CLEANUP) {
380 const struct io_cold_def *def = &io_cold_defs[req->opcode];
381
382 if (def->cleanup)
383 def->cleanup(req);
384 }
385 if ((req->flags & REQ_F_POLLED) && req->apoll) {
386 kfree(req->apoll->double_poll);
387 kfree(req->apoll);
388 req->apoll = NULL;
389 }
390 if (req->flags & REQ_F_INFLIGHT) {
391 struct io_uring_task *tctx = req->task->io_uring;
392
393 atomic_dec(&tctx->inflight_tracked);
394 }
395 if (req->flags & REQ_F_CREDS)
396 put_cred(req->creds);
397 if (req->flags & REQ_F_ASYNC_DATA) {
398 kfree(req->async_data);
399 req->async_data = NULL;
400 }
401 req->flags &= ~IO_REQ_CLEAN_FLAGS;
402 }
403
io_req_track_inflight(struct io_kiocb * req)404 static inline void io_req_track_inflight(struct io_kiocb *req)
405 {
406 if (!(req->flags & REQ_F_INFLIGHT)) {
407 req->flags |= REQ_F_INFLIGHT;
408 atomic_inc(&req->task->io_uring->inflight_tracked);
409 }
410 }
411
__io_prep_linked_timeout(struct io_kiocb * req)412 static struct io_kiocb *__io_prep_linked_timeout(struct io_kiocb *req)
413 {
414 if (WARN_ON_ONCE(!req->link))
415 return NULL;
416
417 req->flags &= ~REQ_F_ARM_LTIMEOUT;
418 req->flags |= REQ_F_LINK_TIMEOUT;
419
420 /* linked timeouts should have two refs once prep'ed */
421 io_req_set_refcount(req);
422 __io_req_set_refcount(req->link, 2);
423 return req->link;
424 }
425
io_prep_linked_timeout(struct io_kiocb * req)426 static inline struct io_kiocb *io_prep_linked_timeout(struct io_kiocb *req)
427 {
428 if (likely(!(req->flags & REQ_F_ARM_LTIMEOUT)))
429 return NULL;
430 return __io_prep_linked_timeout(req);
431 }
432
__io_arm_ltimeout(struct io_kiocb * req)433 static noinline void __io_arm_ltimeout(struct io_kiocb *req)
434 {
435 io_queue_linked_timeout(__io_prep_linked_timeout(req));
436 }
437
io_arm_ltimeout(struct io_kiocb * req)438 static inline void io_arm_ltimeout(struct io_kiocb *req)
439 {
440 if (unlikely(req->flags & REQ_F_ARM_LTIMEOUT))
441 __io_arm_ltimeout(req);
442 }
443
io_prep_async_work(struct io_kiocb * req)444 static void io_prep_async_work(struct io_kiocb *req)
445 {
446 const struct io_issue_def *def = &io_issue_defs[req->opcode];
447 struct io_ring_ctx *ctx = req->ctx;
448
449 if (!(req->flags & REQ_F_CREDS)) {
450 req->flags |= REQ_F_CREDS;
451 req->creds = get_current_cred();
452 }
453
454 req->work.list.next = NULL;
455 req->work.flags = 0;
456 req->work.cancel_seq = atomic_read(&ctx->cancel_seq);
457 if (req->flags & REQ_F_FORCE_ASYNC)
458 req->work.flags |= IO_WQ_WORK_CONCURRENT;
459
460 if (req->file && !(req->flags & REQ_F_FIXED_FILE))
461 req->flags |= io_file_get_flags(req->file);
462
463 if (req->file && (req->flags & REQ_F_ISREG)) {
464 bool should_hash = def->hash_reg_file;
465
466 /* don't serialize this request if the fs doesn't need it */
467 if (should_hash && (req->file->f_flags & O_DIRECT) &&
468 (req->file->f_mode & FMODE_DIO_PARALLEL_WRITE))
469 should_hash = false;
470 if (should_hash || (ctx->flags & IORING_SETUP_IOPOLL))
471 io_wq_hash_work(&req->work, file_inode(req->file));
472 } else if (!req->file || !S_ISBLK(file_inode(req->file)->i_mode)) {
473 if (def->unbound_nonreg_file)
474 req->work.flags |= IO_WQ_WORK_UNBOUND;
475 }
476 }
477
io_prep_async_link(struct io_kiocb * req)478 static void io_prep_async_link(struct io_kiocb *req)
479 {
480 struct io_kiocb *cur;
481
482 if (req->flags & REQ_F_LINK_TIMEOUT) {
483 struct io_ring_ctx *ctx = req->ctx;
484
485 spin_lock_irq(&ctx->timeout_lock);
486 io_for_each_link(cur, req)
487 io_prep_async_work(cur);
488 spin_unlock_irq(&ctx->timeout_lock);
489 } else {
490 io_for_each_link(cur, req)
491 io_prep_async_work(cur);
492 }
493 }
494
io_queue_iowq(struct io_kiocb * req)495 static void io_queue_iowq(struct io_kiocb *req)
496 {
497 struct io_kiocb *link = io_prep_linked_timeout(req);
498 struct io_uring_task *tctx = req->task->io_uring;
499
500 BUG_ON(!tctx);
501
502 if ((current->flags & PF_KTHREAD) || !tctx->io_wq) {
503 io_req_task_queue_fail(req, -ECANCELED);
504 return;
505 }
506
507 /* init ->work of the whole link before punting */
508 io_prep_async_link(req);
509
510 /*
511 * Not expected to happen, but if we do have a bug where this _can_
512 * happen, catch it here and ensure the request is marked as
513 * canceled. That will make io-wq go through the usual work cancel
514 * procedure rather than attempt to run this request (or create a new
515 * worker for it).
516 */
517 if (WARN_ON_ONCE(!same_thread_group(req->task, current)))
518 req->work.flags |= IO_WQ_WORK_CANCEL;
519
520 trace_io_uring_queue_async_work(req, io_wq_is_hashed(&req->work));
521 io_wq_enqueue(tctx->io_wq, &req->work);
522 if (link)
523 io_queue_linked_timeout(link);
524 }
525
io_queue_deferred(struct io_ring_ctx * ctx)526 static __cold void io_queue_deferred(struct io_ring_ctx *ctx)
527 {
528 while (!list_empty(&ctx->defer_list)) {
529 struct io_defer_entry *de = list_first_entry(&ctx->defer_list,
530 struct io_defer_entry, list);
531
532 if (req_need_defer(de->req, de->seq))
533 break;
534 list_del_init(&de->list);
535 io_req_task_queue(de->req);
536 kfree(de);
537 }
538 }
539
540
io_eventfd_ops(struct rcu_head * rcu)541 static void io_eventfd_ops(struct rcu_head *rcu)
542 {
543 struct io_ev_fd *ev_fd = container_of(rcu, struct io_ev_fd, rcu);
544 int ops = atomic_xchg(&ev_fd->ops, 0);
545
546 if (ops & BIT(IO_EVENTFD_OP_SIGNAL_BIT))
547 eventfd_signal_mask(ev_fd->cq_ev_fd, 1, EPOLL_URING_WAKE);
548
549 /* IO_EVENTFD_OP_FREE_BIT may not be set here depending on callback
550 * ordering in a race but if references are 0 we know we have to free
551 * it regardless.
552 */
553 if (atomic_dec_and_test(&ev_fd->refs)) {
554 eventfd_ctx_put(ev_fd->cq_ev_fd);
555 kfree(ev_fd);
556 }
557 }
558
io_eventfd_signal(struct io_ring_ctx * ctx)559 static void io_eventfd_signal(struct io_ring_ctx *ctx)
560 {
561 struct io_ev_fd *ev_fd = NULL;
562
563 rcu_read_lock();
564 /*
565 * rcu_dereference ctx->io_ev_fd once and use it for both for checking
566 * and eventfd_signal
567 */
568 ev_fd = rcu_dereference(ctx->io_ev_fd);
569
570 /*
571 * Check again if ev_fd exists incase an io_eventfd_unregister call
572 * completed between the NULL check of ctx->io_ev_fd at the start of
573 * the function and rcu_read_lock.
574 */
575 if (unlikely(!ev_fd))
576 goto out;
577 if (READ_ONCE(ctx->rings->cq_flags) & IORING_CQ_EVENTFD_DISABLED)
578 goto out;
579 if (ev_fd->eventfd_async && !io_wq_current_is_worker())
580 goto out;
581
582 if (likely(eventfd_signal_allowed())) {
583 eventfd_signal_mask(ev_fd->cq_ev_fd, 1, EPOLL_URING_WAKE);
584 } else {
585 atomic_inc(&ev_fd->refs);
586 if (!atomic_fetch_or(BIT(IO_EVENTFD_OP_SIGNAL_BIT), &ev_fd->ops))
587 call_rcu_hurry(&ev_fd->rcu, io_eventfd_ops);
588 else
589 atomic_dec(&ev_fd->refs);
590 }
591
592 out:
593 rcu_read_unlock();
594 }
595
io_eventfd_flush_signal(struct io_ring_ctx * ctx)596 static void io_eventfd_flush_signal(struct io_ring_ctx *ctx)
597 {
598 bool skip;
599
600 spin_lock(&ctx->completion_lock);
601
602 /*
603 * Eventfd should only get triggered when at least one event has been
604 * posted. Some applications rely on the eventfd notification count
605 * only changing IFF a new CQE has been added to the CQ ring. There's
606 * no depedency on 1:1 relationship between how many times this
607 * function is called (and hence the eventfd count) and number of CQEs
608 * posted to the CQ ring.
609 */
610 skip = ctx->cached_cq_tail == ctx->evfd_last_cq_tail;
611 ctx->evfd_last_cq_tail = ctx->cached_cq_tail;
612 spin_unlock(&ctx->completion_lock);
613 if (skip)
614 return;
615
616 io_eventfd_signal(ctx);
617 }
618
__io_commit_cqring_flush(struct io_ring_ctx * ctx)619 void __io_commit_cqring_flush(struct io_ring_ctx *ctx)
620 {
621 if (ctx->poll_activated)
622 io_poll_wq_wake(ctx);
623 if (ctx->off_timeout_used)
624 io_flush_timeouts(ctx);
625 if (ctx->drain_active) {
626 spin_lock(&ctx->completion_lock);
627 io_queue_deferred(ctx);
628 spin_unlock(&ctx->completion_lock);
629 }
630 if (ctx->has_evfd)
631 io_eventfd_flush_signal(ctx);
632 }
633
__io_cq_lock(struct io_ring_ctx * ctx)634 static inline void __io_cq_lock(struct io_ring_ctx *ctx)
635 {
636 if (!ctx->lockless_cq)
637 spin_lock(&ctx->completion_lock);
638 }
639
io_cq_lock(struct io_ring_ctx * ctx)640 static inline void io_cq_lock(struct io_ring_ctx *ctx)
641 __acquires(ctx->completion_lock)
642 {
643 spin_lock(&ctx->completion_lock);
644 }
645
__io_cq_unlock_post(struct io_ring_ctx * ctx)646 static inline void __io_cq_unlock_post(struct io_ring_ctx *ctx)
647 {
648 io_commit_cqring(ctx);
649 if (!ctx->task_complete) {
650 if (!ctx->lockless_cq)
651 spin_unlock(&ctx->completion_lock);
652 /* IOPOLL rings only need to wake up if it's also SQPOLL */
653 if (!ctx->syscall_iopoll)
654 io_cqring_wake(ctx);
655 }
656 io_commit_cqring_flush(ctx);
657 }
658
io_cq_unlock_post(struct io_ring_ctx * ctx)659 static void io_cq_unlock_post(struct io_ring_ctx *ctx)
660 __releases(ctx->completion_lock)
661 {
662 io_commit_cqring(ctx);
663 spin_unlock(&ctx->completion_lock);
664 io_cqring_wake(ctx);
665 io_commit_cqring_flush(ctx);
666 }
667
668 /* Returns true if there are no backlogged entries after the flush */
io_cqring_overflow_kill(struct io_ring_ctx * ctx)669 static void io_cqring_overflow_kill(struct io_ring_ctx *ctx)
670 {
671 struct io_overflow_cqe *ocqe;
672 LIST_HEAD(list);
673
674 lockdep_assert_held(&ctx->uring_lock);
675
676 spin_lock(&ctx->completion_lock);
677 list_splice_init(&ctx->cq_overflow_list, &list);
678 clear_bit(IO_CHECK_CQ_OVERFLOW_BIT, &ctx->check_cq);
679 spin_unlock(&ctx->completion_lock);
680
681 while (!list_empty(&list)) {
682 ocqe = list_first_entry(&list, struct io_overflow_cqe, list);
683 list_del(&ocqe->list);
684 kfree(ocqe);
685 }
686 }
687
__io_cqring_overflow_flush(struct io_ring_ctx * ctx)688 static void __io_cqring_overflow_flush(struct io_ring_ctx *ctx)
689 {
690 size_t cqe_size = sizeof(struct io_uring_cqe);
691
692 lockdep_assert_held(&ctx->uring_lock);
693
694 if (__io_cqring_events(ctx) == ctx->cq_entries)
695 return;
696
697 if (ctx->flags & IORING_SETUP_CQE32)
698 cqe_size <<= 1;
699
700 io_cq_lock(ctx);
701 while (!list_empty(&ctx->cq_overflow_list)) {
702 struct io_uring_cqe *cqe;
703 struct io_overflow_cqe *ocqe;
704
705 if (!io_get_cqe_overflow(ctx, &cqe, true))
706 break;
707 ocqe = list_first_entry(&ctx->cq_overflow_list,
708 struct io_overflow_cqe, list);
709 memcpy(cqe, &ocqe->cqe, cqe_size);
710 list_del(&ocqe->list);
711 kfree(ocqe);
712
713 /*
714 * For silly syzbot cases that deliberately overflow by huge
715 * amounts, check if we need to resched and drop and
716 * reacquire the locks if so. Nothing real would ever hit this.
717 * Ideally we'd have a non-posting unlock for this, but hard
718 * to care for a non-real case.
719 */
720 if (need_resched()) {
721 io_cq_unlock_post(ctx);
722 mutex_unlock(&ctx->uring_lock);
723 cond_resched();
724 mutex_lock(&ctx->uring_lock);
725 io_cq_lock(ctx);
726 }
727 }
728
729 if (list_empty(&ctx->cq_overflow_list)) {
730 clear_bit(IO_CHECK_CQ_OVERFLOW_BIT, &ctx->check_cq);
731 atomic_andnot(IORING_SQ_CQ_OVERFLOW, &ctx->rings->sq_flags);
732 }
733 io_cq_unlock_post(ctx);
734 }
735
io_cqring_do_overflow_flush(struct io_ring_ctx * ctx)736 static void io_cqring_do_overflow_flush(struct io_ring_ctx *ctx)
737 {
738 mutex_lock(&ctx->uring_lock);
739 __io_cqring_overflow_flush(ctx);
740 mutex_unlock(&ctx->uring_lock);
741 }
742
io_cqring_overflow_flush(struct io_ring_ctx * ctx)743 static void io_cqring_overflow_flush(struct io_ring_ctx *ctx)
744 {
745 if (test_bit(IO_CHECK_CQ_OVERFLOW_BIT, &ctx->check_cq))
746 io_cqring_do_overflow_flush(ctx);
747 }
748
749 /* can be called by any task */
io_put_task_remote(struct task_struct * task)750 static void io_put_task_remote(struct task_struct *task)
751 {
752 struct io_uring_task *tctx = task->io_uring;
753
754 percpu_counter_sub(&tctx->inflight, 1);
755 if (unlikely(atomic_read(&tctx->in_cancel)))
756 wake_up(&tctx->wait);
757 put_task_struct(task);
758 }
759
760 /* used by a task to put its own references */
io_put_task_local(struct task_struct * task)761 static void io_put_task_local(struct task_struct *task)
762 {
763 task->io_uring->cached_refs++;
764 }
765
766 /* must to be called somewhat shortly after putting a request */
io_put_task(struct task_struct * task)767 static inline void io_put_task(struct task_struct *task)
768 {
769 if (likely(task == current))
770 io_put_task_local(task);
771 else
772 io_put_task_remote(task);
773 }
774
io_task_refs_refill(struct io_uring_task * tctx)775 void io_task_refs_refill(struct io_uring_task *tctx)
776 {
777 unsigned int refill = -tctx->cached_refs + IO_TCTX_REFS_CACHE_NR;
778
779 percpu_counter_add(&tctx->inflight, refill);
780 refcount_add(refill, ¤t->usage);
781 tctx->cached_refs += refill;
782 }
783
io_uring_drop_tctx_refs(struct task_struct * task)784 static __cold void io_uring_drop_tctx_refs(struct task_struct *task)
785 {
786 struct io_uring_task *tctx = task->io_uring;
787 unsigned int refs = tctx->cached_refs;
788
789 if (refs) {
790 tctx->cached_refs = 0;
791 percpu_counter_sub(&tctx->inflight, refs);
792 put_task_struct_many(task, refs);
793 }
794 }
795
io_cqring_event_overflow(struct io_ring_ctx * ctx,u64 user_data,s32 res,u32 cflags,u64 extra1,u64 extra2)796 static bool io_cqring_event_overflow(struct io_ring_ctx *ctx, u64 user_data,
797 s32 res, u32 cflags, u64 extra1, u64 extra2)
798 {
799 struct io_overflow_cqe *ocqe;
800 size_t ocq_size = sizeof(struct io_overflow_cqe);
801 bool is_cqe32 = (ctx->flags & IORING_SETUP_CQE32);
802
803 lockdep_assert_held(&ctx->completion_lock);
804
805 if (is_cqe32)
806 ocq_size += sizeof(struct io_uring_cqe);
807
808 ocqe = kmalloc(ocq_size, GFP_ATOMIC | __GFP_ACCOUNT);
809 trace_io_uring_cqe_overflow(ctx, user_data, res, cflags, ocqe);
810 if (!ocqe) {
811 /*
812 * If we're in ring overflow flush mode, or in task cancel mode,
813 * or cannot allocate an overflow entry, then we need to drop it
814 * on the floor.
815 */
816 io_account_cq_overflow(ctx);
817 set_bit(IO_CHECK_CQ_DROPPED_BIT, &ctx->check_cq);
818 return false;
819 }
820 if (list_empty(&ctx->cq_overflow_list)) {
821 set_bit(IO_CHECK_CQ_OVERFLOW_BIT, &ctx->check_cq);
822 atomic_or(IORING_SQ_CQ_OVERFLOW, &ctx->rings->sq_flags);
823
824 }
825 ocqe->cqe.user_data = user_data;
826 ocqe->cqe.res = res;
827 ocqe->cqe.flags = cflags;
828 if (is_cqe32) {
829 ocqe->cqe.big_cqe[0] = extra1;
830 ocqe->cqe.big_cqe[1] = extra2;
831 }
832 list_add_tail(&ocqe->list, &ctx->cq_overflow_list);
833 return true;
834 }
835
io_req_cqe_overflow(struct io_kiocb * req)836 void io_req_cqe_overflow(struct io_kiocb *req)
837 {
838 io_cqring_event_overflow(req->ctx, req->cqe.user_data,
839 req->cqe.res, req->cqe.flags,
840 req->big_cqe.extra1, req->big_cqe.extra2);
841 memset(&req->big_cqe, 0, sizeof(req->big_cqe));
842 }
843
844 /*
845 * writes to the cq entry need to come after reading head; the
846 * control dependency is enough as we're using WRITE_ONCE to
847 * fill the cq entry
848 */
io_cqe_cache_refill(struct io_ring_ctx * ctx,bool overflow)849 bool io_cqe_cache_refill(struct io_ring_ctx *ctx, bool overflow)
850 {
851 struct io_rings *rings = ctx->rings;
852 unsigned int off = ctx->cached_cq_tail & (ctx->cq_entries - 1);
853 unsigned int free, queued, len;
854
855 /*
856 * Posting into the CQ when there are pending overflowed CQEs may break
857 * ordering guarantees, which will affect links, F_MORE users and more.
858 * Force overflow the completion.
859 */
860 if (!overflow && (ctx->check_cq & BIT(IO_CHECK_CQ_OVERFLOW_BIT)))
861 return false;
862
863 /* userspace may cheat modifying the tail, be safe and do min */
864 queued = min(__io_cqring_events(ctx), ctx->cq_entries);
865 free = ctx->cq_entries - queued;
866 /* we need a contiguous range, limit based on the current array offset */
867 len = min(free, ctx->cq_entries - off);
868 if (!len)
869 return false;
870
871 if (ctx->flags & IORING_SETUP_CQE32) {
872 off <<= 1;
873 len <<= 1;
874 }
875
876 ctx->cqe_cached = &rings->cqes[off];
877 ctx->cqe_sentinel = ctx->cqe_cached + len;
878 return true;
879 }
880
io_fill_cqe_aux(struct io_ring_ctx * ctx,u64 user_data,s32 res,u32 cflags)881 static bool io_fill_cqe_aux(struct io_ring_ctx *ctx, u64 user_data, s32 res,
882 u32 cflags)
883 {
884 struct io_uring_cqe *cqe;
885
886 ctx->cq_extra++;
887
888 /*
889 * If we can't get a cq entry, userspace overflowed the
890 * submission (by quite a lot). Increment the overflow count in
891 * the ring.
892 */
893 if (likely(io_get_cqe(ctx, &cqe))) {
894 trace_io_uring_complete(ctx, NULL, user_data, res, cflags, 0, 0);
895
896 WRITE_ONCE(cqe->user_data, user_data);
897 WRITE_ONCE(cqe->res, res);
898 WRITE_ONCE(cqe->flags, cflags);
899
900 if (ctx->flags & IORING_SETUP_CQE32) {
901 WRITE_ONCE(cqe->big_cqe[0], 0);
902 WRITE_ONCE(cqe->big_cqe[1], 0);
903 }
904 return true;
905 }
906 return false;
907 }
908
__io_flush_post_cqes(struct io_ring_ctx * ctx)909 static void __io_flush_post_cqes(struct io_ring_ctx *ctx)
910 __must_hold(&ctx->uring_lock)
911 {
912 struct io_submit_state *state = &ctx->submit_state;
913 unsigned int i;
914
915 lockdep_assert_held(&ctx->uring_lock);
916 for (i = 0; i < state->cqes_count; i++) {
917 struct io_uring_cqe *cqe = &ctx->completion_cqes[i];
918
919 if (!io_fill_cqe_aux(ctx, cqe->user_data, cqe->res, cqe->flags)) {
920 if (ctx->lockless_cq) {
921 spin_lock(&ctx->completion_lock);
922 io_cqring_event_overflow(ctx, cqe->user_data,
923 cqe->res, cqe->flags, 0, 0);
924 spin_unlock(&ctx->completion_lock);
925 } else {
926 io_cqring_event_overflow(ctx, cqe->user_data,
927 cqe->res, cqe->flags, 0, 0);
928 }
929 }
930 }
931 state->cqes_count = 0;
932 }
933
__io_post_aux_cqe(struct io_ring_ctx * ctx,u64 user_data,s32 res,u32 cflags,bool allow_overflow)934 static bool __io_post_aux_cqe(struct io_ring_ctx *ctx, u64 user_data, s32 res, u32 cflags,
935 bool allow_overflow)
936 {
937 bool filled;
938
939 io_cq_lock(ctx);
940 filled = io_fill_cqe_aux(ctx, user_data, res, cflags);
941 if (!filled && allow_overflow)
942 filled = io_cqring_event_overflow(ctx, user_data, res, cflags, 0, 0);
943
944 io_cq_unlock_post(ctx);
945 return filled;
946 }
947
io_post_aux_cqe(struct io_ring_ctx * ctx,u64 user_data,s32 res,u32 cflags)948 bool io_post_aux_cqe(struct io_ring_ctx *ctx, u64 user_data, s32 res, u32 cflags)
949 {
950 return __io_post_aux_cqe(ctx, user_data, res, cflags, true);
951 }
952
953 /*
954 * A helper for multishot requests posting additional CQEs.
955 * Should only be used from a task_work including IO_URING_F_MULTISHOT.
956 */
io_fill_cqe_req_aux(struct io_kiocb * req,bool defer,s32 res,u32 cflags)957 bool io_fill_cqe_req_aux(struct io_kiocb *req, bool defer, s32 res, u32 cflags)
958 {
959 struct io_ring_ctx *ctx = req->ctx;
960 u64 user_data = req->cqe.user_data;
961 struct io_uring_cqe *cqe;
962
963 if (!defer)
964 return __io_post_aux_cqe(ctx, user_data, res, cflags, false);
965
966 lockdep_assert_held(&ctx->uring_lock);
967
968 if (ctx->submit_state.cqes_count == ARRAY_SIZE(ctx->completion_cqes)) {
969 __io_cq_lock(ctx);
970 __io_flush_post_cqes(ctx);
971 /* no need to flush - flush is deferred */
972 __io_cq_unlock_post(ctx);
973 }
974
975 /* For defered completions this is not as strict as it is otherwise,
976 * however it's main job is to prevent unbounded posted completions,
977 * and in that it works just as well.
978 */
979 if (test_bit(IO_CHECK_CQ_OVERFLOW_BIT, &ctx->check_cq))
980 return false;
981
982 cqe = &ctx->completion_cqes[ctx->submit_state.cqes_count++];
983 cqe->user_data = user_data;
984 cqe->res = res;
985 cqe->flags = cflags;
986 return true;
987 }
988
__io_req_complete_post(struct io_kiocb * req,unsigned issue_flags)989 static void __io_req_complete_post(struct io_kiocb *req, unsigned issue_flags)
990 {
991 struct io_ring_ctx *ctx = req->ctx;
992 struct io_rsrc_node *rsrc_node = NULL;
993
994 io_cq_lock(ctx);
995 if (!(req->flags & REQ_F_CQE_SKIP)) {
996 if (!io_fill_cqe_req(ctx, req))
997 io_req_cqe_overflow(req);
998 }
999
1000 /*
1001 * If we're the last reference to this request, add to our locked
1002 * free_list cache.
1003 */
1004 if (req_ref_put_and_test(req)) {
1005 if (req->flags & IO_REQ_LINK_FLAGS) {
1006 if (req->flags & IO_DISARM_MASK)
1007 io_disarm_next(req);
1008 if (req->link) {
1009 io_req_task_queue(req->link);
1010 req->link = NULL;
1011 }
1012 }
1013 io_put_kbuf_comp(req);
1014 if (unlikely(req->flags & IO_REQ_CLEAN_FLAGS))
1015 io_clean_op(req);
1016 io_put_file(req);
1017
1018 rsrc_node = req->rsrc_node;
1019 /*
1020 * Selected buffer deallocation in io_clean_op() assumes that
1021 * we don't hold ->completion_lock. Clean them here to avoid
1022 * deadlocks.
1023 */
1024 io_put_task_remote(req->task);
1025 wq_list_add_head(&req->comp_list, &ctx->locked_free_list);
1026 ctx->locked_free_nr++;
1027 }
1028 io_cq_unlock_post(ctx);
1029
1030 if (rsrc_node) {
1031 io_ring_submit_lock(ctx, issue_flags);
1032 io_put_rsrc_node(ctx, rsrc_node);
1033 io_ring_submit_unlock(ctx, issue_flags);
1034 }
1035 }
1036
io_req_complete_post(struct io_kiocb * req,unsigned issue_flags)1037 void io_req_complete_post(struct io_kiocb *req, unsigned issue_flags)
1038 {
1039 if (req->ctx->task_complete && req->ctx->submitter_task != current) {
1040 req->io_task_work.func = io_req_task_complete;
1041 io_req_task_work_add(req);
1042 } else if (!(issue_flags & IO_URING_F_UNLOCKED) ||
1043 !(req->ctx->flags & IORING_SETUP_IOPOLL)) {
1044 __io_req_complete_post(req, issue_flags);
1045 } else {
1046 struct io_ring_ctx *ctx = req->ctx;
1047
1048 mutex_lock(&ctx->uring_lock);
1049 __io_req_complete_post(req, issue_flags & ~IO_URING_F_UNLOCKED);
1050 mutex_unlock(&ctx->uring_lock);
1051 }
1052 }
1053
io_req_defer_failed(struct io_kiocb * req,s32 res)1054 void io_req_defer_failed(struct io_kiocb *req, s32 res)
1055 __must_hold(&ctx->uring_lock)
1056 {
1057 const struct io_cold_def *def = &io_cold_defs[req->opcode];
1058
1059 lockdep_assert_held(&req->ctx->uring_lock);
1060
1061 req_set_fail(req);
1062 io_req_set_res(req, res, io_put_kbuf(req, IO_URING_F_UNLOCKED));
1063 if (def->fail)
1064 def->fail(req);
1065 io_req_complete_defer(req);
1066 }
1067
1068 /*
1069 * Don't initialise the fields below on every allocation, but do that in
1070 * advance and keep them valid across allocations.
1071 */
io_preinit_req(struct io_kiocb * req,struct io_ring_ctx * ctx)1072 static void io_preinit_req(struct io_kiocb *req, struct io_ring_ctx *ctx)
1073 {
1074 req->ctx = ctx;
1075 req->link = NULL;
1076 req->async_data = NULL;
1077 /* not necessary, but safer to zero */
1078 memset(&req->cqe, 0, sizeof(req->cqe));
1079 memset(&req->big_cqe, 0, sizeof(req->big_cqe));
1080 }
1081
io_flush_cached_locked_reqs(struct io_ring_ctx * ctx,struct io_submit_state * state)1082 static void io_flush_cached_locked_reqs(struct io_ring_ctx *ctx,
1083 struct io_submit_state *state)
1084 {
1085 spin_lock(&ctx->completion_lock);
1086 wq_list_splice(&ctx->locked_free_list, &state->free_list);
1087 ctx->locked_free_nr = 0;
1088 spin_unlock(&ctx->completion_lock);
1089 }
1090
1091 /*
1092 * A request might get retired back into the request caches even before opcode
1093 * handlers and io_issue_sqe() are done with it, e.g. inline completion path.
1094 * Because of that, io_alloc_req() should be called only under ->uring_lock
1095 * and with extra caution to not get a request that is still worked on.
1096 */
__io_alloc_req_refill(struct io_ring_ctx * ctx)1097 __cold bool __io_alloc_req_refill(struct io_ring_ctx *ctx)
1098 __must_hold(&ctx->uring_lock)
1099 {
1100 gfp_t gfp = GFP_KERNEL | __GFP_NOWARN;
1101 void *reqs[IO_REQ_ALLOC_BATCH];
1102 int ret, i;
1103
1104 /*
1105 * If we have more than a batch's worth of requests in our IRQ side
1106 * locked cache, grab the lock and move them over to our submission
1107 * side cache.
1108 */
1109 if (data_race(ctx->locked_free_nr) > IO_COMPL_BATCH) {
1110 io_flush_cached_locked_reqs(ctx, &ctx->submit_state);
1111 if (!io_req_cache_empty(ctx))
1112 return true;
1113 }
1114
1115 ret = kmem_cache_alloc_bulk(req_cachep, gfp, ARRAY_SIZE(reqs), reqs);
1116
1117 /*
1118 * Bulk alloc is all-or-nothing. If we fail to get a batch,
1119 * retry single alloc to be on the safe side.
1120 */
1121 if (unlikely(ret <= 0)) {
1122 reqs[0] = kmem_cache_alloc(req_cachep, gfp);
1123 if (!reqs[0])
1124 return false;
1125 ret = 1;
1126 }
1127
1128 percpu_ref_get_many(&ctx->refs, ret);
1129 for (i = 0; i < ret; i++) {
1130 struct io_kiocb *req = reqs[i];
1131
1132 io_preinit_req(req, ctx);
1133 io_req_add_to_cache(req, ctx);
1134 }
1135 return true;
1136 }
1137
io_free_req(struct io_kiocb * req)1138 __cold void io_free_req(struct io_kiocb *req)
1139 {
1140 /* refs were already put, restore them for io_req_task_complete() */
1141 req->flags &= ~REQ_F_REFCOUNT;
1142 /* we only want to free it, don't post CQEs */
1143 req->flags |= REQ_F_CQE_SKIP;
1144 req->io_task_work.func = io_req_task_complete;
1145 io_req_task_work_add(req);
1146 }
1147
__io_req_find_next_prep(struct io_kiocb * req)1148 static void __io_req_find_next_prep(struct io_kiocb *req)
1149 {
1150 struct io_ring_ctx *ctx = req->ctx;
1151
1152 spin_lock(&ctx->completion_lock);
1153 io_disarm_next(req);
1154 spin_unlock(&ctx->completion_lock);
1155 }
1156
io_req_find_next(struct io_kiocb * req)1157 static inline struct io_kiocb *io_req_find_next(struct io_kiocb *req)
1158 {
1159 struct io_kiocb *nxt;
1160
1161 /*
1162 * If LINK is set, we have dependent requests in this chain. If we
1163 * didn't fail this request, queue the first one up, moving any other
1164 * dependencies to the next request. In case of failure, fail the rest
1165 * of the chain.
1166 */
1167 if (unlikely(req->flags & IO_DISARM_MASK))
1168 __io_req_find_next_prep(req);
1169 nxt = req->link;
1170 req->link = NULL;
1171 return nxt;
1172 }
1173
ctx_flush_and_put(struct io_ring_ctx * ctx,struct io_tw_state * ts)1174 static void ctx_flush_and_put(struct io_ring_ctx *ctx, struct io_tw_state *ts)
1175 {
1176 if (!ctx)
1177 return;
1178 if (ctx->flags & IORING_SETUP_TASKRUN_FLAG)
1179 atomic_andnot(IORING_SQ_TASKRUN, &ctx->rings->sq_flags);
1180 if (ts->locked) {
1181 io_submit_flush_completions(ctx);
1182 mutex_unlock(&ctx->uring_lock);
1183 ts->locked = false;
1184 }
1185 percpu_ref_put(&ctx->refs);
1186 }
1187
handle_tw_list(struct llist_node * node,struct io_ring_ctx ** ctx,struct io_tw_state * ts)1188 static unsigned int handle_tw_list(struct llist_node *node,
1189 struct io_ring_ctx **ctx,
1190 struct io_tw_state *ts)
1191 {
1192 unsigned int count = 0;
1193
1194 do {
1195 struct llist_node *next = node->next;
1196 struct io_kiocb *req = container_of(node, struct io_kiocb,
1197 io_task_work.node);
1198
1199 prefetch(container_of(next, struct io_kiocb, io_task_work.node));
1200
1201 if (req->ctx != *ctx) {
1202 ctx_flush_and_put(*ctx, ts);
1203 *ctx = req->ctx;
1204 /* if not contended, grab and improve batching */
1205 ts->locked = mutex_trylock(&(*ctx)->uring_lock);
1206 percpu_ref_get(&(*ctx)->refs);
1207 }
1208 INDIRECT_CALL_2(req->io_task_work.func,
1209 io_poll_task_func, io_req_rw_complete,
1210 req, ts);
1211 node = next;
1212 count++;
1213 if (unlikely(need_resched())) {
1214 ctx_flush_and_put(*ctx, ts);
1215 *ctx = NULL;
1216 cond_resched();
1217 }
1218 } while (node);
1219
1220 return count;
1221 }
1222
1223 /**
1224 * io_llist_xchg - swap all entries in a lock-less list
1225 * @head: the head of lock-less list to delete all entries
1226 * @new: new entry as the head of the list
1227 *
1228 * If list is empty, return NULL, otherwise, return the pointer to the first entry.
1229 * The order of entries returned is from the newest to the oldest added one.
1230 */
io_llist_xchg(struct llist_head * head,struct llist_node * new)1231 static inline struct llist_node *io_llist_xchg(struct llist_head *head,
1232 struct llist_node *new)
1233 {
1234 return xchg(&head->first, new);
1235 }
1236
io_fallback_tw(struct io_uring_task * tctx,bool sync)1237 static __cold void io_fallback_tw(struct io_uring_task *tctx, bool sync)
1238 {
1239 struct llist_node *node = llist_del_all(&tctx->task_list);
1240 struct io_ring_ctx *last_ctx = NULL;
1241 struct io_kiocb *req;
1242
1243 while (node) {
1244 req = container_of(node, struct io_kiocb, io_task_work.node);
1245 node = node->next;
1246 if (sync && last_ctx != req->ctx) {
1247 if (last_ctx) {
1248 flush_delayed_work(&last_ctx->fallback_work);
1249 percpu_ref_put(&last_ctx->refs);
1250 }
1251 last_ctx = req->ctx;
1252 percpu_ref_get(&last_ctx->refs);
1253 }
1254 if (llist_add(&req->io_task_work.node,
1255 &req->ctx->fallback_llist))
1256 schedule_delayed_work(&req->ctx->fallback_work, 1);
1257 }
1258
1259 if (last_ctx) {
1260 flush_delayed_work(&last_ctx->fallback_work);
1261 percpu_ref_put(&last_ctx->refs);
1262 }
1263 }
1264
tctx_task_work(struct callback_head * cb)1265 void tctx_task_work(struct callback_head *cb)
1266 {
1267 struct io_tw_state ts = {};
1268 struct io_ring_ctx *ctx = NULL;
1269 struct io_uring_task *tctx = container_of(cb, struct io_uring_task,
1270 task_work);
1271 struct llist_node *node;
1272 unsigned int count = 0;
1273
1274 if (unlikely(current->flags & PF_EXITING)) {
1275 io_fallback_tw(tctx, true);
1276 return;
1277 }
1278
1279 node = llist_del_all(&tctx->task_list);
1280 if (node)
1281 count = handle_tw_list(node, &ctx, &ts);
1282
1283 ctx_flush_and_put(ctx, &ts);
1284
1285 /* relaxed read is enough as only the task itself sets ->in_cancel */
1286 if (unlikely(atomic_read(&tctx->in_cancel)))
1287 io_uring_drop_tctx_refs(current);
1288
1289 trace_io_uring_task_work_run(tctx, count, 1);
1290 }
1291
io_req_local_work_add(struct io_kiocb * req,unsigned flags)1292 static inline void io_req_local_work_add(struct io_kiocb *req, unsigned flags)
1293 {
1294 struct io_ring_ctx *ctx = req->ctx;
1295 unsigned nr_wait, nr_tw, nr_tw_prev;
1296 struct llist_node *first;
1297
1298 if (req->flags & (REQ_F_LINK | REQ_F_HARDLINK))
1299 flags &= ~IOU_F_TWQ_LAZY_WAKE;
1300
1301 first = READ_ONCE(ctx->work_llist.first);
1302 do {
1303 nr_tw_prev = 0;
1304 if (first) {
1305 struct io_kiocb *first_req = container_of(first,
1306 struct io_kiocb,
1307 io_task_work.node);
1308 /*
1309 * Might be executed at any moment, rely on
1310 * SLAB_TYPESAFE_BY_RCU to keep it alive.
1311 */
1312 nr_tw_prev = READ_ONCE(first_req->nr_tw);
1313 }
1314 nr_tw = nr_tw_prev + 1;
1315 /* Large enough to fail the nr_wait comparison below */
1316 if (!(flags & IOU_F_TWQ_LAZY_WAKE))
1317 nr_tw = INT_MAX;
1318
1319 req->nr_tw = nr_tw;
1320 req->io_task_work.node.next = first;
1321 } while (!try_cmpxchg(&ctx->work_llist.first, &first,
1322 &req->io_task_work.node));
1323
1324 if (!first) {
1325 if (ctx->flags & IORING_SETUP_TASKRUN_FLAG)
1326 atomic_or(IORING_SQ_TASKRUN, &ctx->rings->sq_flags);
1327 if (ctx->has_evfd)
1328 io_eventfd_signal(ctx);
1329 }
1330
1331 nr_wait = atomic_read(&ctx->cq_wait_nr);
1332 /* no one is waiting */
1333 if (!nr_wait)
1334 return;
1335 /* either not enough or the previous add has already woken it up */
1336 if (nr_wait > nr_tw || nr_tw_prev >= nr_wait)
1337 return;
1338 /* pairs with set_current_state() in io_cqring_wait() */
1339 smp_mb__after_atomic();
1340 wake_up_state(ctx->submitter_task, TASK_INTERRUPTIBLE);
1341 }
1342
io_req_normal_work_add(struct io_kiocb * req)1343 static void io_req_normal_work_add(struct io_kiocb *req)
1344 {
1345 struct io_uring_task *tctx = req->task->io_uring;
1346 struct io_ring_ctx *ctx = req->ctx;
1347
1348 /* task_work already pending, we're done */
1349 if (!llist_add(&req->io_task_work.node, &tctx->task_list))
1350 return;
1351
1352 if (ctx->flags & IORING_SETUP_TASKRUN_FLAG)
1353 atomic_or(IORING_SQ_TASKRUN, &ctx->rings->sq_flags);
1354
1355 if (likely(!task_work_add(req->task, &tctx->task_work, ctx->notify_method)))
1356 return;
1357
1358 io_fallback_tw(tctx, false);
1359 }
1360
__io_req_task_work_add(struct io_kiocb * req,unsigned flags)1361 void __io_req_task_work_add(struct io_kiocb *req, unsigned flags)
1362 {
1363 if (req->ctx->flags & IORING_SETUP_DEFER_TASKRUN) {
1364 rcu_read_lock();
1365 io_req_local_work_add(req, flags);
1366 rcu_read_unlock();
1367 } else {
1368 io_req_normal_work_add(req);
1369 }
1370 }
1371
io_move_task_work_from_local(struct io_ring_ctx * ctx)1372 static void __cold io_move_task_work_from_local(struct io_ring_ctx *ctx)
1373 {
1374 struct llist_node *node;
1375
1376 node = llist_del_all(&ctx->work_llist);
1377 while (node) {
1378 struct io_kiocb *req = container_of(node, struct io_kiocb,
1379 io_task_work.node);
1380
1381 node = node->next;
1382 io_req_normal_work_add(req);
1383 }
1384 }
1385
io_run_local_work_continue(struct io_ring_ctx * ctx,int events,int min_events)1386 static bool io_run_local_work_continue(struct io_ring_ctx *ctx, int events,
1387 int min_events)
1388 {
1389 if (llist_empty(&ctx->work_llist))
1390 return false;
1391 if (events < min_events)
1392 return true;
1393 if (ctx->flags & IORING_SETUP_TASKRUN_FLAG)
1394 atomic_or(IORING_SQ_TASKRUN, &ctx->rings->sq_flags);
1395 return false;
1396 }
1397
__io_run_local_work(struct io_ring_ctx * ctx,struct io_tw_state * ts,int min_events)1398 static int __io_run_local_work(struct io_ring_ctx *ctx, struct io_tw_state *ts,
1399 int min_events)
1400 {
1401 struct llist_node *node;
1402 unsigned int loops = 0;
1403 int ret = 0;
1404
1405 if (WARN_ON_ONCE(ctx->submitter_task != current))
1406 return -EEXIST;
1407 if (ctx->flags & IORING_SETUP_TASKRUN_FLAG)
1408 atomic_andnot(IORING_SQ_TASKRUN, &ctx->rings->sq_flags);
1409 again:
1410 /*
1411 * llists are in reverse order, flip it back the right way before
1412 * running the pending items.
1413 */
1414 node = llist_reverse_order(io_llist_xchg(&ctx->work_llist, NULL));
1415 while (node) {
1416 struct llist_node *next = node->next;
1417 struct io_kiocb *req = container_of(node, struct io_kiocb,
1418 io_task_work.node);
1419 prefetch(container_of(next, struct io_kiocb, io_task_work.node));
1420 INDIRECT_CALL_2(req->io_task_work.func,
1421 io_poll_task_func, io_req_rw_complete,
1422 req, ts);
1423 ret++;
1424 node = next;
1425 }
1426 loops++;
1427
1428 if (io_run_local_work_continue(ctx, ret, min_events))
1429 goto again;
1430 if (ts->locked) {
1431 io_submit_flush_completions(ctx);
1432 if (io_run_local_work_continue(ctx, ret, min_events))
1433 goto again;
1434 }
1435
1436 trace_io_uring_local_work_run(ctx, ret, loops);
1437 return ret;
1438 }
1439
io_run_local_work_locked(struct io_ring_ctx * ctx,int min_events)1440 static inline int io_run_local_work_locked(struct io_ring_ctx *ctx,
1441 int min_events)
1442 {
1443 struct io_tw_state ts = { .locked = true, };
1444 int ret;
1445
1446 if (llist_empty(&ctx->work_llist))
1447 return 0;
1448
1449 ret = __io_run_local_work(ctx, &ts, min_events);
1450 /* shouldn't happen! */
1451 if (WARN_ON_ONCE(!ts.locked))
1452 mutex_lock(&ctx->uring_lock);
1453 return ret;
1454 }
1455
io_run_local_work(struct io_ring_ctx * ctx,int min_events)1456 static int io_run_local_work(struct io_ring_ctx *ctx, int min_events)
1457 {
1458 struct io_tw_state ts = {};
1459 int ret;
1460
1461 ts.locked = mutex_trylock(&ctx->uring_lock);
1462 ret = __io_run_local_work(ctx, &ts, min_events);
1463 if (ts.locked)
1464 mutex_unlock(&ctx->uring_lock);
1465
1466 return ret;
1467 }
1468
io_req_task_cancel(struct io_kiocb * req,struct io_tw_state * ts)1469 static void io_req_task_cancel(struct io_kiocb *req, struct io_tw_state *ts)
1470 {
1471 io_tw_lock(req->ctx, ts);
1472 io_req_defer_failed(req, req->cqe.res);
1473 }
1474
io_req_task_submit(struct io_kiocb * req,struct io_tw_state * ts)1475 void io_req_task_submit(struct io_kiocb *req, struct io_tw_state *ts)
1476 {
1477 io_tw_lock(req->ctx, ts);
1478 /* req->task == current here, checking PF_EXITING is safe */
1479 if (unlikely(req->task->flags & PF_EXITING))
1480 io_req_defer_failed(req, -EFAULT);
1481 else if (req->flags & REQ_F_FORCE_ASYNC)
1482 io_queue_iowq(req);
1483 else
1484 io_queue_sqe(req);
1485 }
1486
io_req_task_queue_fail(struct io_kiocb * req,int ret)1487 void io_req_task_queue_fail(struct io_kiocb *req, int ret)
1488 {
1489 io_req_set_res(req, ret, 0);
1490 req->io_task_work.func = io_req_task_cancel;
1491 io_req_task_work_add(req);
1492 }
1493
io_req_task_queue(struct io_kiocb * req)1494 void io_req_task_queue(struct io_kiocb *req)
1495 {
1496 req->io_task_work.func = io_req_task_submit;
1497 io_req_task_work_add(req);
1498 }
1499
io_queue_next(struct io_kiocb * req)1500 void io_queue_next(struct io_kiocb *req)
1501 {
1502 struct io_kiocb *nxt = io_req_find_next(req);
1503
1504 if (nxt)
1505 io_req_task_queue(nxt);
1506 }
1507
io_free_batch_list(struct io_ring_ctx * ctx,struct io_wq_work_node * node)1508 static void io_free_batch_list(struct io_ring_ctx *ctx,
1509 struct io_wq_work_node *node)
1510 __must_hold(&ctx->uring_lock)
1511 {
1512 do {
1513 struct io_kiocb *req = container_of(node, struct io_kiocb,
1514 comp_list);
1515
1516 if (unlikely(req->flags & IO_REQ_CLEAN_SLOW_FLAGS)) {
1517 if (req->flags & REQ_F_REFCOUNT) {
1518 node = req->comp_list.next;
1519 if (!req_ref_put_and_test(req))
1520 continue;
1521 }
1522 if ((req->flags & REQ_F_POLLED) && req->apoll) {
1523 struct async_poll *apoll = req->apoll;
1524
1525 if (apoll->double_poll)
1526 kfree(apoll->double_poll);
1527 if (!io_alloc_cache_put(&ctx->apoll_cache, &apoll->cache))
1528 kfree(apoll);
1529 req->flags &= ~REQ_F_POLLED;
1530 }
1531 if (req->flags & IO_REQ_LINK_FLAGS)
1532 io_queue_next(req);
1533 if (unlikely(req->flags & IO_REQ_CLEAN_FLAGS))
1534 io_clean_op(req);
1535 }
1536 io_put_file(req);
1537
1538 io_req_put_rsrc_locked(req, ctx);
1539
1540 io_put_task(req->task);
1541 node = req->comp_list.next;
1542 io_req_add_to_cache(req, ctx);
1543 } while (node);
1544 }
1545
__io_submit_flush_completions(struct io_ring_ctx * ctx)1546 void __io_submit_flush_completions(struct io_ring_ctx *ctx)
1547 __must_hold(&ctx->uring_lock)
1548 {
1549 struct io_submit_state *state = &ctx->submit_state;
1550 struct io_wq_work_node *node;
1551
1552 __io_cq_lock(ctx);
1553 /* must come first to preserve CQE ordering in failure cases */
1554 if (state->cqes_count)
1555 __io_flush_post_cqes(ctx);
1556 __wq_list_for_each(node, &state->compl_reqs) {
1557 struct io_kiocb *req = container_of(node, struct io_kiocb,
1558 comp_list);
1559
1560 if (!(req->flags & REQ_F_CQE_SKIP) &&
1561 unlikely(!io_fill_cqe_req(ctx, req))) {
1562 if (ctx->lockless_cq) {
1563 spin_lock(&ctx->completion_lock);
1564 io_req_cqe_overflow(req);
1565 spin_unlock(&ctx->completion_lock);
1566 } else {
1567 io_req_cqe_overflow(req);
1568 }
1569 }
1570 }
1571 __io_cq_unlock_post(ctx);
1572
1573 if (!wq_list_empty(&ctx->submit_state.compl_reqs)) {
1574 io_free_batch_list(ctx, state->compl_reqs.first);
1575 INIT_WQ_LIST(&state->compl_reqs);
1576 }
1577 }
1578
io_cqring_events(struct io_ring_ctx * ctx)1579 static unsigned io_cqring_events(struct io_ring_ctx *ctx)
1580 {
1581 /* See comment at the top of this file */
1582 smp_rmb();
1583 return __io_cqring_events(ctx);
1584 }
1585
1586 /*
1587 * We can't just wait for polled events to come to us, we have to actively
1588 * find and complete them.
1589 */
io_iopoll_try_reap_events(struct io_ring_ctx * ctx)1590 static __cold void io_iopoll_try_reap_events(struct io_ring_ctx *ctx)
1591 {
1592 if (!(ctx->flags & IORING_SETUP_IOPOLL))
1593 return;
1594
1595 mutex_lock(&ctx->uring_lock);
1596 while (!wq_list_empty(&ctx->iopoll_list)) {
1597 /* let it sleep and repeat later if can't complete a request */
1598 if (io_do_iopoll(ctx, true) == 0)
1599 break;
1600 /*
1601 * Ensure we allow local-to-the-cpu processing to take place,
1602 * in this case we need to ensure that we reap all events.
1603 * Also let task_work, etc. to progress by releasing the mutex
1604 */
1605 if (need_resched()) {
1606 mutex_unlock(&ctx->uring_lock);
1607 cond_resched();
1608 mutex_lock(&ctx->uring_lock);
1609 }
1610 }
1611 mutex_unlock(&ctx->uring_lock);
1612 }
1613
io_iopoll_check(struct io_ring_ctx * ctx,long min)1614 static int io_iopoll_check(struct io_ring_ctx *ctx, long min)
1615 {
1616 unsigned int nr_events = 0;
1617 unsigned long check_cq;
1618
1619 lockdep_assert_held(&ctx->uring_lock);
1620
1621 if (!io_allowed_run_tw(ctx))
1622 return -EEXIST;
1623
1624 check_cq = READ_ONCE(ctx->check_cq);
1625 if (unlikely(check_cq)) {
1626 if (check_cq & BIT(IO_CHECK_CQ_OVERFLOW_BIT))
1627 __io_cqring_overflow_flush(ctx);
1628 /*
1629 * Similarly do not spin if we have not informed the user of any
1630 * dropped CQE.
1631 */
1632 if (check_cq & BIT(IO_CHECK_CQ_DROPPED_BIT))
1633 return -EBADR;
1634 }
1635 /*
1636 * Don't enter poll loop if we already have events pending.
1637 * If we do, we can potentially be spinning for commands that
1638 * already triggered a CQE (eg in error).
1639 */
1640 if (io_cqring_events(ctx))
1641 return 0;
1642
1643 do {
1644 int ret = 0;
1645
1646 /*
1647 * If a submit got punted to a workqueue, we can have the
1648 * application entering polling for a command before it gets
1649 * issued. That app will hold the uring_lock for the duration
1650 * of the poll right here, so we need to take a breather every
1651 * now and then to ensure that the issue has a chance to add
1652 * the poll to the issued list. Otherwise we can spin here
1653 * forever, while the workqueue is stuck trying to acquire the
1654 * very same mutex.
1655 */
1656 if (wq_list_empty(&ctx->iopoll_list) ||
1657 io_task_work_pending(ctx)) {
1658 u32 tail = ctx->cached_cq_tail;
1659
1660 (void) io_run_local_work_locked(ctx, min);
1661
1662 if (task_work_pending(current) ||
1663 wq_list_empty(&ctx->iopoll_list)) {
1664 mutex_unlock(&ctx->uring_lock);
1665 io_run_task_work();
1666 mutex_lock(&ctx->uring_lock);
1667 }
1668 /* some requests don't go through iopoll_list */
1669 if (tail != ctx->cached_cq_tail ||
1670 wq_list_empty(&ctx->iopoll_list))
1671 break;
1672 }
1673 ret = io_do_iopoll(ctx, !min);
1674 if (unlikely(ret < 0))
1675 return ret;
1676
1677 if (task_sigpending(current))
1678 return -EINTR;
1679 if (need_resched())
1680 break;
1681
1682 nr_events += ret;
1683 } while (nr_events < min);
1684
1685 return 0;
1686 }
1687
io_req_task_complete(struct io_kiocb * req,struct io_tw_state * ts)1688 void io_req_task_complete(struct io_kiocb *req, struct io_tw_state *ts)
1689 {
1690 if (ts->locked)
1691 io_req_complete_defer(req);
1692 else
1693 io_req_complete_post(req, IO_URING_F_UNLOCKED);
1694 }
1695
1696 /*
1697 * After the iocb has been issued, it's safe to be found on the poll list.
1698 * Adding the kiocb to the list AFTER submission ensures that we don't
1699 * find it from a io_do_iopoll() thread before the issuer is done
1700 * accessing the kiocb cookie.
1701 */
io_iopoll_req_issued(struct io_kiocb * req,unsigned int issue_flags)1702 static void io_iopoll_req_issued(struct io_kiocb *req, unsigned int issue_flags)
1703 {
1704 struct io_ring_ctx *ctx = req->ctx;
1705 const bool needs_lock = issue_flags & IO_URING_F_UNLOCKED;
1706
1707 /* workqueue context doesn't hold uring_lock, grab it now */
1708 if (unlikely(needs_lock))
1709 mutex_lock(&ctx->uring_lock);
1710
1711 /*
1712 * Track whether we have multiple files in our lists. This will impact
1713 * how we do polling eventually, not spinning if we're on potentially
1714 * different devices.
1715 */
1716 if (wq_list_empty(&ctx->iopoll_list)) {
1717 ctx->poll_multi_queue = false;
1718 } else if (!ctx->poll_multi_queue) {
1719 struct io_kiocb *list_req;
1720
1721 list_req = container_of(ctx->iopoll_list.first, struct io_kiocb,
1722 comp_list);
1723 if (list_req->file != req->file)
1724 ctx->poll_multi_queue = true;
1725 }
1726
1727 /*
1728 * For fast devices, IO may have already completed. If it has, add
1729 * it to the front so we find it first.
1730 */
1731 if (READ_ONCE(req->iopoll_completed))
1732 wq_list_add_head(&req->comp_list, &ctx->iopoll_list);
1733 else
1734 wq_list_add_tail(&req->comp_list, &ctx->iopoll_list);
1735
1736 if (unlikely(needs_lock)) {
1737 /*
1738 * If IORING_SETUP_SQPOLL is enabled, sqes are either handle
1739 * in sq thread task context or in io worker task context. If
1740 * current task context is sq thread, we don't need to check
1741 * whether should wake up sq thread.
1742 */
1743 if ((ctx->flags & IORING_SETUP_SQPOLL) &&
1744 wq_has_sleeper(&ctx->sq_data->wait))
1745 wake_up(&ctx->sq_data->wait);
1746
1747 mutex_unlock(&ctx->uring_lock);
1748 }
1749 }
1750
io_file_get_flags(struct file * file)1751 unsigned int io_file_get_flags(struct file *file)
1752 {
1753 unsigned int res = 0;
1754
1755 if (S_ISREG(file_inode(file)->i_mode))
1756 res |= REQ_F_ISREG;
1757 if ((file->f_flags & O_NONBLOCK) || (file->f_mode & FMODE_NOWAIT))
1758 res |= REQ_F_SUPPORT_NOWAIT;
1759 return res;
1760 }
1761
io_alloc_async_data(struct io_kiocb * req)1762 bool io_alloc_async_data(struct io_kiocb *req)
1763 {
1764 WARN_ON_ONCE(!io_cold_defs[req->opcode].async_size);
1765 req->async_data = kmalloc(io_cold_defs[req->opcode].async_size, GFP_KERNEL);
1766 if (req->async_data) {
1767 req->flags |= REQ_F_ASYNC_DATA;
1768 return false;
1769 }
1770 return true;
1771 }
1772
io_req_prep_async(struct io_kiocb * req)1773 int io_req_prep_async(struct io_kiocb *req)
1774 {
1775 const struct io_cold_def *cdef = &io_cold_defs[req->opcode];
1776 const struct io_issue_def *def = &io_issue_defs[req->opcode];
1777
1778 /* assign early for deferred execution for non-fixed file */
1779 if (def->needs_file && !(req->flags & REQ_F_FIXED_FILE) && !req->file)
1780 req->file = io_file_get_normal(req, req->cqe.fd);
1781 if (!cdef->prep_async)
1782 return 0;
1783 if (WARN_ON_ONCE(req_has_async_data(req)))
1784 return -EFAULT;
1785 if (!def->manual_alloc) {
1786 if (io_alloc_async_data(req))
1787 return -EAGAIN;
1788 }
1789 return cdef->prep_async(req);
1790 }
1791
io_get_sequence(struct io_kiocb * req)1792 static u32 io_get_sequence(struct io_kiocb *req)
1793 {
1794 u32 seq = req->ctx->cached_sq_head;
1795 struct io_kiocb *cur;
1796
1797 /* need original cached_sq_head, but it was increased for each req */
1798 io_for_each_link(cur, req)
1799 seq--;
1800 return seq;
1801 }
1802
io_drain_req(struct io_kiocb * req)1803 static __cold void io_drain_req(struct io_kiocb *req)
1804 __must_hold(&ctx->uring_lock)
1805 {
1806 struct io_ring_ctx *ctx = req->ctx;
1807 struct io_defer_entry *de;
1808 int ret;
1809 u32 seq = io_get_sequence(req);
1810
1811 /* Still need defer if there is pending req in defer list. */
1812 spin_lock(&ctx->completion_lock);
1813 if (!req_need_defer(req, seq) && list_empty_careful(&ctx->defer_list)) {
1814 spin_unlock(&ctx->completion_lock);
1815 queue:
1816 ctx->drain_active = false;
1817 io_req_task_queue(req);
1818 return;
1819 }
1820 spin_unlock(&ctx->completion_lock);
1821
1822 io_prep_async_link(req);
1823 de = kmalloc(sizeof(*de), GFP_KERNEL);
1824 if (!de) {
1825 ret = -ENOMEM;
1826 io_req_defer_failed(req, ret);
1827 return;
1828 }
1829
1830 spin_lock(&ctx->completion_lock);
1831 if (!req_need_defer(req, seq) && list_empty(&ctx->defer_list)) {
1832 spin_unlock(&ctx->completion_lock);
1833 kfree(de);
1834 goto queue;
1835 }
1836
1837 trace_io_uring_defer(req);
1838 de->req = req;
1839 de->seq = seq;
1840 list_add_tail(&de->list, &ctx->defer_list);
1841 spin_unlock(&ctx->completion_lock);
1842 }
1843
io_assign_file(struct io_kiocb * req,const struct io_issue_def * def,unsigned int issue_flags)1844 static bool io_assign_file(struct io_kiocb *req, const struct io_issue_def *def,
1845 unsigned int issue_flags)
1846 {
1847 if (req->file || !def->needs_file)
1848 return true;
1849
1850 if (req->flags & REQ_F_FIXED_FILE)
1851 req->file = io_file_get_fixed(req, req->cqe.fd, issue_flags);
1852 else
1853 req->file = io_file_get_normal(req, req->cqe.fd);
1854
1855 return !!req->file;
1856 }
1857
io_issue_sqe(struct io_kiocb * req,unsigned int issue_flags)1858 static int io_issue_sqe(struct io_kiocb *req, unsigned int issue_flags)
1859 {
1860 const struct io_issue_def *def = &io_issue_defs[req->opcode];
1861 const struct cred *creds = NULL;
1862 int ret;
1863
1864 if (unlikely(!io_assign_file(req, def, issue_flags)))
1865 return -EBADF;
1866
1867 if (unlikely((req->flags & REQ_F_CREDS) && req->creds != current_cred()))
1868 creds = override_creds(req->creds);
1869
1870 if (!def->audit_skip)
1871 audit_uring_entry(req->opcode);
1872
1873 ret = def->issue(req, issue_flags);
1874
1875 if (!def->audit_skip)
1876 audit_uring_exit(!ret, ret);
1877
1878 if (creds)
1879 revert_creds(creds);
1880
1881 if (ret == IOU_OK) {
1882 if (issue_flags & IO_URING_F_COMPLETE_DEFER)
1883 io_req_complete_defer(req);
1884 else
1885 io_req_complete_post(req, issue_flags);
1886
1887 return 0;
1888 }
1889
1890 if (ret != IOU_ISSUE_SKIP_COMPLETE)
1891 return ret;
1892
1893 /* If the op doesn't have a file, we're not polling for it */
1894 if ((req->ctx->flags & IORING_SETUP_IOPOLL) && def->iopoll_queue)
1895 io_iopoll_req_issued(req, issue_flags);
1896
1897 return 0;
1898 }
1899
io_poll_issue(struct io_kiocb * req,struct io_tw_state * ts)1900 int io_poll_issue(struct io_kiocb *req, struct io_tw_state *ts)
1901 {
1902 io_tw_lock(req->ctx, ts);
1903 return io_issue_sqe(req, IO_URING_F_NONBLOCK|IO_URING_F_MULTISHOT|
1904 IO_URING_F_COMPLETE_DEFER);
1905 }
1906
io_wq_free_work(struct io_wq_work * work)1907 struct io_wq_work *io_wq_free_work(struct io_wq_work *work)
1908 {
1909 struct io_kiocb *req = container_of(work, struct io_kiocb, work);
1910 struct io_kiocb *nxt = NULL;
1911
1912 if (req_ref_put_and_test(req)) {
1913 if (req->flags & IO_REQ_LINK_FLAGS)
1914 nxt = io_req_find_next(req);
1915 io_free_req(req);
1916 }
1917 return nxt ? &nxt->work : NULL;
1918 }
1919
io_wq_submit_work(struct io_wq_work * work)1920 void io_wq_submit_work(struct io_wq_work *work)
1921 {
1922 struct io_kiocb *req = container_of(work, struct io_kiocb, work);
1923 const struct io_issue_def *def = &io_issue_defs[req->opcode];
1924 unsigned int issue_flags = IO_URING_F_UNLOCKED | IO_URING_F_IOWQ;
1925 bool needs_poll = false;
1926 int ret = 0, err = -ECANCELED;
1927
1928 /* one will be dropped by ->io_wq_free_work() after returning to io-wq */
1929 if (!(req->flags & REQ_F_REFCOUNT))
1930 __io_req_set_refcount(req, 2);
1931 else
1932 req_ref_get(req);
1933
1934 io_arm_ltimeout(req);
1935
1936 /* either cancelled or io-wq is dying, so don't touch tctx->iowq */
1937 if (work->flags & IO_WQ_WORK_CANCEL) {
1938 fail:
1939 io_req_task_queue_fail(req, err);
1940 return;
1941 }
1942 if (!io_assign_file(req, def, issue_flags)) {
1943 err = -EBADF;
1944 work->flags |= IO_WQ_WORK_CANCEL;
1945 goto fail;
1946 }
1947
1948 if (req->flags & REQ_F_FORCE_ASYNC) {
1949 bool opcode_poll = def->pollin || def->pollout;
1950
1951 if (opcode_poll && file_can_poll(req->file)) {
1952 needs_poll = true;
1953 issue_flags |= IO_URING_F_NONBLOCK;
1954 }
1955 }
1956
1957 do {
1958 ret = io_issue_sqe(req, issue_flags);
1959 if (ret != -EAGAIN)
1960 break;
1961
1962 /*
1963 * If REQ_F_NOWAIT is set, then don't wait or retry with
1964 * poll. -EAGAIN is final for that case.
1965 */
1966 if (req->flags & REQ_F_NOWAIT)
1967 break;
1968
1969 /*
1970 * We can get EAGAIN for iopolled IO even though we're
1971 * forcing a sync submission from here, since we can't
1972 * wait for request slots on the block side.
1973 */
1974 if (!needs_poll) {
1975 if (!(req->ctx->flags & IORING_SETUP_IOPOLL))
1976 break;
1977 if (io_wq_worker_stopped())
1978 break;
1979 cond_resched();
1980 continue;
1981 }
1982
1983 if (io_arm_poll_handler(req, issue_flags) == IO_APOLL_OK)
1984 return;
1985 /* aborted or ready, in either case retry blocking */
1986 needs_poll = false;
1987 issue_flags &= ~IO_URING_F_NONBLOCK;
1988 } while (1);
1989
1990 /* avoid locking problems by failing it from a clean context */
1991 if (ret < 0)
1992 io_req_task_queue_fail(req, ret);
1993 }
1994
io_file_get_fixed(struct io_kiocb * req,int fd,unsigned int issue_flags)1995 inline struct file *io_file_get_fixed(struct io_kiocb *req, int fd,
1996 unsigned int issue_flags)
1997 {
1998 struct io_ring_ctx *ctx = req->ctx;
1999 struct io_fixed_file *slot;
2000 struct file *file = NULL;
2001
2002 io_ring_submit_lock(ctx, issue_flags);
2003
2004 if (unlikely((unsigned int)fd >= ctx->nr_user_files))
2005 goto out;
2006 fd = array_index_nospec(fd, ctx->nr_user_files);
2007 slot = io_fixed_file_slot(&ctx->file_table, fd);
2008 file = io_slot_file(slot);
2009 req->flags |= io_slot_flags(slot);
2010 io_req_set_rsrc_node(req, ctx, 0);
2011 out:
2012 io_ring_submit_unlock(ctx, issue_flags);
2013 return file;
2014 }
2015
io_file_get_normal(struct io_kiocb * req,int fd)2016 struct file *io_file_get_normal(struct io_kiocb *req, int fd)
2017 {
2018 struct file *file = fget(fd);
2019
2020 trace_io_uring_file_get(req, fd);
2021
2022 /* we don't allow fixed io_uring files */
2023 if (file && io_is_uring_fops(file))
2024 io_req_track_inflight(req);
2025 return file;
2026 }
2027
io_queue_async(struct io_kiocb * req,int ret)2028 static void io_queue_async(struct io_kiocb *req, int ret)
2029 __must_hold(&req->ctx->uring_lock)
2030 {
2031 struct io_kiocb *linked_timeout;
2032
2033 if (ret != -EAGAIN || (req->flags & REQ_F_NOWAIT)) {
2034 io_req_defer_failed(req, ret);
2035 return;
2036 }
2037
2038 linked_timeout = io_prep_linked_timeout(req);
2039
2040 switch (io_arm_poll_handler(req, 0)) {
2041 case IO_APOLL_READY:
2042 io_kbuf_recycle(req, 0);
2043 io_req_task_queue(req);
2044 break;
2045 case IO_APOLL_ABORTED:
2046 io_kbuf_recycle(req, 0);
2047 io_queue_iowq(req);
2048 break;
2049 case IO_APOLL_OK:
2050 break;
2051 }
2052
2053 if (linked_timeout)
2054 io_queue_linked_timeout(linked_timeout);
2055 }
2056
io_queue_sqe(struct io_kiocb * req)2057 static inline void io_queue_sqe(struct io_kiocb *req)
2058 __must_hold(&req->ctx->uring_lock)
2059 {
2060 int ret;
2061
2062 ret = io_issue_sqe(req, IO_URING_F_NONBLOCK|IO_URING_F_COMPLETE_DEFER);
2063
2064 /*
2065 * We async punt it if the file wasn't marked NOWAIT, or if the file
2066 * doesn't support non-blocking read/write attempts
2067 */
2068 if (likely(!ret))
2069 io_arm_ltimeout(req);
2070 else
2071 io_queue_async(req, ret);
2072 }
2073
io_queue_sqe_fallback(struct io_kiocb * req)2074 static void io_queue_sqe_fallback(struct io_kiocb *req)
2075 __must_hold(&req->ctx->uring_lock)
2076 {
2077 if (unlikely(req->flags & REQ_F_FAIL)) {
2078 /*
2079 * We don't submit, fail them all, for that replace hardlinks
2080 * with normal links. Extra REQ_F_LINK is tolerated.
2081 */
2082 req->flags &= ~REQ_F_HARDLINK;
2083 req->flags |= REQ_F_LINK;
2084 io_req_defer_failed(req, req->cqe.res);
2085 } else {
2086 int ret = io_req_prep_async(req);
2087
2088 if (unlikely(ret)) {
2089 io_req_defer_failed(req, ret);
2090 return;
2091 }
2092
2093 if (unlikely(req->ctx->drain_active))
2094 io_drain_req(req);
2095 else
2096 io_queue_iowq(req);
2097 }
2098 }
2099
2100 /*
2101 * Check SQE restrictions (opcode and flags).
2102 *
2103 * Returns 'true' if SQE is allowed, 'false' otherwise.
2104 */
io_check_restriction(struct io_ring_ctx * ctx,struct io_kiocb * req,unsigned int sqe_flags)2105 static inline bool io_check_restriction(struct io_ring_ctx *ctx,
2106 struct io_kiocb *req,
2107 unsigned int sqe_flags)
2108 {
2109 if (!test_bit(req->opcode, ctx->restrictions.sqe_op))
2110 return false;
2111
2112 if ((sqe_flags & ctx->restrictions.sqe_flags_required) !=
2113 ctx->restrictions.sqe_flags_required)
2114 return false;
2115
2116 if (sqe_flags & ~(ctx->restrictions.sqe_flags_allowed |
2117 ctx->restrictions.sqe_flags_required))
2118 return false;
2119
2120 return true;
2121 }
2122
io_init_req_drain(struct io_kiocb * req)2123 static void io_init_req_drain(struct io_kiocb *req)
2124 {
2125 struct io_ring_ctx *ctx = req->ctx;
2126 struct io_kiocb *head = ctx->submit_state.link.head;
2127
2128 ctx->drain_active = true;
2129 if (head) {
2130 /*
2131 * If we need to drain a request in the middle of a link, drain
2132 * the head request and the next request/link after the current
2133 * link. Considering sequential execution of links,
2134 * REQ_F_IO_DRAIN will be maintained for every request of our
2135 * link.
2136 */
2137 head->flags |= REQ_F_IO_DRAIN | REQ_F_FORCE_ASYNC;
2138 ctx->drain_next = true;
2139 }
2140 }
2141
io_init_fail_req(struct io_kiocb * req,int err)2142 static __cold int io_init_fail_req(struct io_kiocb *req, int err)
2143 {
2144 /* ensure per-opcode data is cleared if we fail before prep */
2145 memset(&req->cmd.data, 0, sizeof(req->cmd.data));
2146 return err;
2147 }
2148
io_init_req(struct io_ring_ctx * ctx,struct io_kiocb * req,const struct io_uring_sqe * sqe)2149 static int io_init_req(struct io_ring_ctx *ctx, struct io_kiocb *req,
2150 const struct io_uring_sqe *sqe)
2151 __must_hold(&ctx->uring_lock)
2152 {
2153 const struct io_issue_def *def;
2154 unsigned int sqe_flags;
2155 int personality;
2156 u8 opcode;
2157
2158 /* req is partially pre-initialised, see io_preinit_req() */
2159 req->opcode = opcode = READ_ONCE(sqe->opcode);
2160 /* same numerical values with corresponding REQ_F_*, safe to copy */
2161 req->flags = sqe_flags = READ_ONCE(sqe->flags);
2162 req->cqe.user_data = READ_ONCE(sqe->user_data);
2163 req->file = NULL;
2164 req->rsrc_node = NULL;
2165 req->task = current;
2166
2167 if (unlikely(opcode >= IORING_OP_LAST)) {
2168 req->opcode = 0;
2169 return io_init_fail_req(req, -EINVAL);
2170 }
2171 def = &io_issue_defs[opcode];
2172 if (unlikely(sqe_flags & ~SQE_COMMON_FLAGS)) {
2173 /* enforce forwards compatibility on users */
2174 if (sqe_flags & ~SQE_VALID_FLAGS)
2175 return io_init_fail_req(req, -EINVAL);
2176 if (sqe_flags & IOSQE_BUFFER_SELECT) {
2177 if (!def->buffer_select)
2178 return io_init_fail_req(req, -EOPNOTSUPP);
2179 req->buf_index = READ_ONCE(sqe->buf_group);
2180 }
2181 if (sqe_flags & IOSQE_CQE_SKIP_SUCCESS)
2182 ctx->drain_disabled = true;
2183 if (sqe_flags & IOSQE_IO_DRAIN) {
2184 if (ctx->drain_disabled)
2185 return io_init_fail_req(req, -EOPNOTSUPP);
2186 io_init_req_drain(req);
2187 }
2188 }
2189 if (unlikely(ctx->restricted || ctx->drain_active || ctx->drain_next)) {
2190 if (ctx->restricted && !io_check_restriction(ctx, req, sqe_flags))
2191 return io_init_fail_req(req, -EACCES);
2192 /* knock it to the slow queue path, will be drained there */
2193 if (ctx->drain_active)
2194 req->flags |= REQ_F_FORCE_ASYNC;
2195 /* if there is no link, we're at "next" request and need to drain */
2196 if (unlikely(ctx->drain_next) && !ctx->submit_state.link.head) {
2197 ctx->drain_next = false;
2198 ctx->drain_active = true;
2199 req->flags |= REQ_F_IO_DRAIN | REQ_F_FORCE_ASYNC;
2200 }
2201 }
2202
2203 if (!def->ioprio && sqe->ioprio)
2204 return io_init_fail_req(req, -EINVAL);
2205 if (!def->iopoll && (ctx->flags & IORING_SETUP_IOPOLL))
2206 return io_init_fail_req(req, -EINVAL);
2207
2208 if (def->needs_file) {
2209 struct io_submit_state *state = &ctx->submit_state;
2210
2211 req->cqe.fd = READ_ONCE(sqe->fd);
2212
2213 /*
2214 * Plug now if we have more than 2 IO left after this, and the
2215 * target is potentially a read/write to block based storage.
2216 */
2217 if (state->need_plug && def->plug) {
2218 state->plug_started = true;
2219 state->need_plug = false;
2220 blk_start_plug_nr_ios(&state->plug, state->submit_nr);
2221 }
2222 }
2223
2224 personality = READ_ONCE(sqe->personality);
2225 if (personality) {
2226 int ret;
2227
2228 req->creds = xa_load(&ctx->personalities, personality);
2229 if (!req->creds)
2230 return io_init_fail_req(req, -EINVAL);
2231 get_cred(req->creds);
2232 ret = security_uring_override_creds(req->creds);
2233 if (ret) {
2234 put_cred(req->creds);
2235 return io_init_fail_req(req, ret);
2236 }
2237 req->flags |= REQ_F_CREDS;
2238 }
2239
2240 return def->prep(req, sqe);
2241 }
2242
io_submit_fail_init(const struct io_uring_sqe * sqe,struct io_kiocb * req,int ret)2243 static __cold int io_submit_fail_init(const struct io_uring_sqe *sqe,
2244 struct io_kiocb *req, int ret)
2245 {
2246 struct io_ring_ctx *ctx = req->ctx;
2247 struct io_submit_link *link = &ctx->submit_state.link;
2248 struct io_kiocb *head = link->head;
2249
2250 trace_io_uring_req_failed(sqe, req, ret);
2251
2252 /*
2253 * Avoid breaking links in the middle as it renders links with SQPOLL
2254 * unusable. Instead of failing eagerly, continue assembling the link if
2255 * applicable and mark the head with REQ_F_FAIL. The link flushing code
2256 * should find the flag and handle the rest.
2257 */
2258 req_fail_link_node(req, ret);
2259 if (head && !(head->flags & REQ_F_FAIL))
2260 req_fail_link_node(head, -ECANCELED);
2261
2262 if (!(req->flags & IO_REQ_LINK_FLAGS)) {
2263 if (head) {
2264 link->last->link = req;
2265 link->head = NULL;
2266 req = head;
2267 }
2268 io_queue_sqe_fallback(req);
2269 return ret;
2270 }
2271
2272 if (head)
2273 link->last->link = req;
2274 else
2275 link->head = req;
2276 link->last = req;
2277 return 0;
2278 }
2279
io_submit_sqe(struct io_ring_ctx * ctx,struct io_kiocb * req,const struct io_uring_sqe * sqe)2280 static inline int io_submit_sqe(struct io_ring_ctx *ctx, struct io_kiocb *req,
2281 const struct io_uring_sqe *sqe)
2282 __must_hold(&ctx->uring_lock)
2283 {
2284 struct io_submit_link *link = &ctx->submit_state.link;
2285 int ret;
2286
2287 ret = io_init_req(ctx, req, sqe);
2288 if (unlikely(ret))
2289 return io_submit_fail_init(sqe, req, ret);
2290
2291 trace_io_uring_submit_req(req);
2292
2293 /*
2294 * If we already have a head request, queue this one for async
2295 * submittal once the head completes. If we don't have a head but
2296 * IOSQE_IO_LINK is set in the sqe, start a new head. This one will be
2297 * submitted sync once the chain is complete. If none of those
2298 * conditions are true (normal request), then just queue it.
2299 */
2300 if (unlikely(link->head)) {
2301 ret = io_req_prep_async(req);
2302 if (unlikely(ret))
2303 return io_submit_fail_init(sqe, req, ret);
2304
2305 trace_io_uring_link(req, link->head);
2306 link->last->link = req;
2307 link->last = req;
2308
2309 if (req->flags & IO_REQ_LINK_FLAGS)
2310 return 0;
2311 /* last request of the link, flush it */
2312 req = link->head;
2313 link->head = NULL;
2314 if (req->flags & (REQ_F_FORCE_ASYNC | REQ_F_FAIL))
2315 goto fallback;
2316
2317 } else if (unlikely(req->flags & (IO_REQ_LINK_FLAGS |
2318 REQ_F_FORCE_ASYNC | REQ_F_FAIL))) {
2319 if (req->flags & IO_REQ_LINK_FLAGS) {
2320 link->head = req;
2321 link->last = req;
2322 } else {
2323 fallback:
2324 io_queue_sqe_fallback(req);
2325 }
2326 return 0;
2327 }
2328
2329 io_queue_sqe(req);
2330 return 0;
2331 }
2332
2333 /*
2334 * Batched submission is done, ensure local IO is flushed out.
2335 */
io_submit_state_end(struct io_ring_ctx * ctx)2336 static void io_submit_state_end(struct io_ring_ctx *ctx)
2337 {
2338 struct io_submit_state *state = &ctx->submit_state;
2339
2340 if (unlikely(state->link.head))
2341 io_queue_sqe_fallback(state->link.head);
2342 /* flush only after queuing links as they can generate completions */
2343 io_submit_flush_completions(ctx);
2344 if (state->plug_started)
2345 blk_finish_plug(&state->plug);
2346 }
2347
2348 /*
2349 * Start submission side cache.
2350 */
io_submit_state_start(struct io_submit_state * state,unsigned int max_ios)2351 static void io_submit_state_start(struct io_submit_state *state,
2352 unsigned int max_ios)
2353 {
2354 state->plug_started = false;
2355 state->need_plug = max_ios > 2;
2356 state->submit_nr = max_ios;
2357 /* set only head, no need to init link_last in advance */
2358 state->link.head = NULL;
2359 }
2360
io_commit_sqring(struct io_ring_ctx * ctx)2361 static void io_commit_sqring(struct io_ring_ctx *ctx)
2362 {
2363 struct io_rings *rings = ctx->rings;
2364
2365 /*
2366 * Ensure any loads from the SQEs are done at this point,
2367 * since once we write the new head, the application could
2368 * write new data to them.
2369 */
2370 smp_store_release(&rings->sq.head, ctx->cached_sq_head);
2371 }
2372
2373 /*
2374 * Fetch an sqe, if one is available. Note this returns a pointer to memory
2375 * that is mapped by userspace. This means that care needs to be taken to
2376 * ensure that reads are stable, as we cannot rely on userspace always
2377 * being a good citizen. If members of the sqe are validated and then later
2378 * used, it's important that those reads are done through READ_ONCE() to
2379 * prevent a re-load down the line.
2380 */
io_get_sqe(struct io_ring_ctx * ctx,const struct io_uring_sqe ** sqe)2381 static bool io_get_sqe(struct io_ring_ctx *ctx, const struct io_uring_sqe **sqe)
2382 {
2383 unsigned mask = ctx->sq_entries - 1;
2384 unsigned head = ctx->cached_sq_head++ & mask;
2385
2386 if (!(ctx->flags & IORING_SETUP_NO_SQARRAY)) {
2387 head = READ_ONCE(ctx->sq_array[head]);
2388 if (unlikely(head >= ctx->sq_entries)) {
2389 /* drop invalid entries */
2390 spin_lock(&ctx->completion_lock);
2391 ctx->cq_extra--;
2392 spin_unlock(&ctx->completion_lock);
2393 WRITE_ONCE(ctx->rings->sq_dropped,
2394 READ_ONCE(ctx->rings->sq_dropped) + 1);
2395 return false;
2396 }
2397 }
2398
2399 /*
2400 * The cached sq head (or cq tail) serves two purposes:
2401 *
2402 * 1) allows us to batch the cost of updating the user visible
2403 * head updates.
2404 * 2) allows the kernel side to track the head on its own, even
2405 * though the application is the one updating it.
2406 */
2407
2408 /* double index for 128-byte SQEs, twice as long */
2409 if (ctx->flags & IORING_SETUP_SQE128)
2410 head <<= 1;
2411 *sqe = &ctx->sq_sqes[head];
2412 return true;
2413 }
2414
io_submit_sqes(struct io_ring_ctx * ctx,unsigned int nr)2415 int io_submit_sqes(struct io_ring_ctx *ctx, unsigned int nr)
2416 __must_hold(&ctx->uring_lock)
2417 {
2418 unsigned int entries = io_sqring_entries(ctx);
2419 unsigned int left;
2420 int ret;
2421
2422 if (unlikely(!entries))
2423 return 0;
2424 /* make sure SQ entry isn't read before tail */
2425 ret = left = min(nr, entries);
2426 io_get_task_refs(left);
2427 io_submit_state_start(&ctx->submit_state, left);
2428
2429 do {
2430 const struct io_uring_sqe *sqe;
2431 struct io_kiocb *req;
2432
2433 if (unlikely(!io_alloc_req(ctx, &req)))
2434 break;
2435 if (unlikely(!io_get_sqe(ctx, &sqe))) {
2436 io_req_add_to_cache(req, ctx);
2437 break;
2438 }
2439
2440 /*
2441 * Continue submitting even for sqe failure if the
2442 * ring was setup with IORING_SETUP_SUBMIT_ALL
2443 */
2444 if (unlikely(io_submit_sqe(ctx, req, sqe)) &&
2445 !(ctx->flags & IORING_SETUP_SUBMIT_ALL)) {
2446 left--;
2447 break;
2448 }
2449 } while (--left);
2450
2451 if (unlikely(left)) {
2452 ret -= left;
2453 /* try again if it submitted nothing and can't allocate a req */
2454 if (!ret && io_req_cache_empty(ctx))
2455 ret = -EAGAIN;
2456 current->io_uring->cached_refs += left;
2457 }
2458
2459 io_submit_state_end(ctx);
2460 /* Commit SQ ring head once we've consumed and submitted all SQEs */
2461 io_commit_sqring(ctx);
2462 return ret;
2463 }
2464
2465 struct io_wait_queue {
2466 struct wait_queue_entry wq;
2467 struct io_ring_ctx *ctx;
2468 unsigned cq_tail;
2469 unsigned nr_timeouts;
2470 ktime_t timeout;
2471 };
2472
io_has_work(struct io_ring_ctx * ctx)2473 static inline bool io_has_work(struct io_ring_ctx *ctx)
2474 {
2475 return test_bit(IO_CHECK_CQ_OVERFLOW_BIT, &ctx->check_cq) ||
2476 !llist_empty(&ctx->work_llist);
2477 }
2478
io_should_wake(struct io_wait_queue * iowq)2479 static inline bool io_should_wake(struct io_wait_queue *iowq)
2480 {
2481 struct io_ring_ctx *ctx = iowq->ctx;
2482 int dist = READ_ONCE(ctx->rings->cq.tail) - (int) iowq->cq_tail;
2483
2484 /*
2485 * Wake up if we have enough events, or if a timeout occurred since we
2486 * started waiting. For timeouts, we always want to return to userspace,
2487 * regardless of event count.
2488 */
2489 return dist >= 0 || atomic_read(&ctx->cq_timeouts) != iowq->nr_timeouts;
2490 }
2491
io_wake_function(struct wait_queue_entry * curr,unsigned int mode,int wake_flags,void * key)2492 static int io_wake_function(struct wait_queue_entry *curr, unsigned int mode,
2493 int wake_flags, void *key)
2494 {
2495 struct io_wait_queue *iowq = container_of(curr, struct io_wait_queue, wq);
2496
2497 /*
2498 * Cannot safely flush overflowed CQEs from here, ensure we wake up
2499 * the task, and the next invocation will do it.
2500 */
2501 if (io_should_wake(iowq) || io_has_work(iowq->ctx))
2502 return autoremove_wake_function(curr, mode, wake_flags, key);
2503 return -1;
2504 }
2505
io_run_task_work_sig(struct io_ring_ctx * ctx)2506 int io_run_task_work_sig(struct io_ring_ctx *ctx)
2507 {
2508 if (!llist_empty(&ctx->work_llist)) {
2509 __set_current_state(TASK_RUNNING);
2510 if (io_run_local_work(ctx, INT_MAX) > 0)
2511 return 0;
2512 }
2513 if (io_run_task_work() > 0)
2514 return 0;
2515 if (task_sigpending(current))
2516 return -EINTR;
2517 return 0;
2518 }
2519
current_pending_io(void)2520 static bool current_pending_io(void)
2521 {
2522 struct io_uring_task *tctx = current->io_uring;
2523
2524 if (!tctx)
2525 return false;
2526 return percpu_counter_read_positive(&tctx->inflight);
2527 }
2528
2529 /* when returns >0, the caller should retry */
io_cqring_wait_schedule(struct io_ring_ctx * ctx,struct io_wait_queue * iowq)2530 static inline int io_cqring_wait_schedule(struct io_ring_ctx *ctx,
2531 struct io_wait_queue *iowq)
2532 {
2533 int ret;
2534
2535 if (unlikely(READ_ONCE(ctx->check_cq)))
2536 return 1;
2537 if (unlikely(!llist_empty(&ctx->work_llist)))
2538 return 1;
2539 if (unlikely(task_work_pending(current)))
2540 return 1;
2541 if (unlikely(task_sigpending(current)))
2542 return -EINTR;
2543 if (unlikely(io_should_wake(iowq)))
2544 return 0;
2545
2546 /*
2547 * Mark us as being in io_wait if we have pending requests, so cpufreq
2548 * can take into account that the task is waiting for IO - turns out
2549 * to be important for low QD IO.
2550 */
2551 if (current_pending_io())
2552 current->in_iowait = 1;
2553 ret = 0;
2554 if (iowq->timeout == KTIME_MAX)
2555 schedule();
2556 else if (!schedule_hrtimeout(&iowq->timeout, HRTIMER_MODE_ABS))
2557 ret = -ETIME;
2558 current->in_iowait = 0;
2559 return ret;
2560 }
2561
2562 /*
2563 * Wait until events become available, if we don't already have some. The
2564 * application must reap them itself, as they reside on the shared cq ring.
2565 */
io_cqring_wait(struct io_ring_ctx * ctx,int min_events,const sigset_t __user * sig,size_t sigsz,struct __kernel_timespec __user * uts)2566 static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events,
2567 const sigset_t __user *sig, size_t sigsz,
2568 struct __kernel_timespec __user *uts)
2569 {
2570 struct io_wait_queue iowq;
2571 struct io_rings *rings = ctx->rings;
2572 int ret;
2573
2574 if (!io_allowed_run_tw(ctx))
2575 return -EEXIST;
2576 if (!llist_empty(&ctx->work_llist))
2577 io_run_local_work(ctx, min_events);
2578 io_run_task_work();
2579 io_cqring_overflow_flush(ctx);
2580 /* if user messes with these they will just get an early return */
2581 if (__io_cqring_events_user(ctx) >= min_events)
2582 return 0;
2583
2584 init_waitqueue_func_entry(&iowq.wq, io_wake_function);
2585 iowq.wq.private = current;
2586 INIT_LIST_HEAD(&iowq.wq.entry);
2587 iowq.ctx = ctx;
2588 iowq.nr_timeouts = atomic_read(&ctx->cq_timeouts);
2589 iowq.cq_tail = READ_ONCE(ctx->rings->cq.head) + min_events;
2590 iowq.timeout = KTIME_MAX;
2591
2592 if (uts) {
2593 struct timespec64 ts;
2594
2595 if (get_timespec64(&ts, uts))
2596 return -EFAULT;
2597 iowq.timeout = ktime_add_ns(timespec64_to_ktime(ts), ktime_get_ns());
2598 }
2599
2600 if (sig) {
2601 #ifdef CONFIG_COMPAT
2602 if (in_compat_syscall())
2603 ret = set_compat_user_sigmask((const compat_sigset_t __user *)sig,
2604 sigsz);
2605 else
2606 #endif
2607 ret = set_user_sigmask(sig, sigsz);
2608
2609 if (ret)
2610 return ret;
2611 }
2612
2613 trace_io_uring_cqring_wait(ctx, min_events);
2614 do {
2615 int nr_wait = (int) iowq.cq_tail - READ_ONCE(ctx->rings->cq.tail);
2616 unsigned long check_cq;
2617
2618 if (ctx->flags & IORING_SETUP_DEFER_TASKRUN) {
2619 atomic_set(&ctx->cq_wait_nr, nr_wait);
2620 set_current_state(TASK_INTERRUPTIBLE);
2621 } else {
2622 prepare_to_wait_exclusive(&ctx->cq_wait, &iowq.wq,
2623 TASK_INTERRUPTIBLE);
2624 }
2625
2626 ret = io_cqring_wait_schedule(ctx, &iowq);
2627 __set_current_state(TASK_RUNNING);
2628 atomic_set(&ctx->cq_wait_nr, 0);
2629
2630 /*
2631 * Run task_work after scheduling and before io_should_wake().
2632 * If we got woken because of task_work being processed, run it
2633 * now rather than let the caller do another wait loop.
2634 */
2635 if (!llist_empty(&ctx->work_llist))
2636 io_run_local_work(ctx, nr_wait);
2637 io_run_task_work();
2638
2639 /*
2640 * Non-local task_work will be run on exit to userspace, but
2641 * if we're using DEFER_TASKRUN, then we could have waited
2642 * with a timeout for a number of requests. If the timeout
2643 * hits, we could have some requests ready to process. Ensure
2644 * this break is _after_ we have run task_work, to avoid
2645 * deferring running potentially pending requests until the
2646 * next time we wait for events.
2647 */
2648 if (ret < 0)
2649 break;
2650
2651 check_cq = READ_ONCE(ctx->check_cq);
2652 if (unlikely(check_cq)) {
2653 /* let the caller flush overflows, retry */
2654 if (check_cq & BIT(IO_CHECK_CQ_OVERFLOW_BIT))
2655 io_cqring_do_overflow_flush(ctx);
2656 if (check_cq & BIT(IO_CHECK_CQ_DROPPED_BIT)) {
2657 ret = -EBADR;
2658 break;
2659 }
2660 }
2661
2662 if (io_should_wake(&iowq)) {
2663 ret = 0;
2664 break;
2665 }
2666 cond_resched();
2667 } while (1);
2668
2669 if (!(ctx->flags & IORING_SETUP_DEFER_TASKRUN))
2670 finish_wait(&ctx->cq_wait, &iowq.wq);
2671 restore_saved_sigmask_unless(ret == -EINTR);
2672
2673 return READ_ONCE(rings->cq.head) == READ_ONCE(rings->cq.tail) ? ret : 0;
2674 }
2675
io_mem_free(void * ptr)2676 void io_mem_free(void *ptr)
2677 {
2678 if (!ptr)
2679 return;
2680
2681 folio_put(virt_to_folio(ptr));
2682 }
2683
io_pages_free(struct page *** pages,int npages)2684 static void io_pages_free(struct page ***pages, int npages)
2685 {
2686 struct page **page_array;
2687 int i;
2688
2689 if (!pages)
2690 return;
2691
2692 page_array = *pages;
2693 if (!page_array)
2694 return;
2695
2696 for (i = 0; i < npages; i++)
2697 unpin_user_page(page_array[i]);
2698 kvfree(page_array);
2699 *pages = NULL;
2700 }
2701
__io_uaddr_map(struct page *** pages,unsigned short * npages,unsigned long uaddr,size_t size)2702 static void *__io_uaddr_map(struct page ***pages, unsigned short *npages,
2703 unsigned long uaddr, size_t size)
2704 {
2705 struct page **page_array;
2706 unsigned int nr_pages;
2707 void *page_addr;
2708 int ret, i, pinned;
2709
2710 *npages = 0;
2711
2712 if (uaddr & (PAGE_SIZE - 1) || !size)
2713 return ERR_PTR(-EINVAL);
2714
2715 nr_pages = (size + PAGE_SIZE - 1) >> PAGE_SHIFT;
2716 if (nr_pages > USHRT_MAX)
2717 return ERR_PTR(-EINVAL);
2718 page_array = kvmalloc_array(nr_pages, sizeof(struct page *), GFP_KERNEL);
2719 if (!page_array)
2720 return ERR_PTR(-ENOMEM);
2721
2722
2723 pinned = pin_user_pages_fast(uaddr, nr_pages, FOLL_WRITE | FOLL_LONGTERM,
2724 page_array);
2725 if (pinned != nr_pages) {
2726 ret = (pinned < 0) ? pinned : -EFAULT;
2727 goto free_pages;
2728 }
2729
2730 page_addr = page_address(page_array[0]);
2731 for (i = 0; i < nr_pages; i++) {
2732 ret = -EINVAL;
2733
2734 /*
2735 * Can't support mapping user allocated ring memory on 32-bit
2736 * archs where it could potentially reside in highmem. Just
2737 * fail those with -EINVAL, just like we did on kernels that
2738 * didn't support this feature.
2739 */
2740 if (PageHighMem(page_array[i]))
2741 goto free_pages;
2742
2743 /*
2744 * No support for discontig pages for now, should either be a
2745 * single normal page, or a huge page. Later on we can add
2746 * support for remapping discontig pages, for now we will
2747 * just fail them with EINVAL.
2748 */
2749 if (page_address(page_array[i]) != page_addr)
2750 goto free_pages;
2751 page_addr += PAGE_SIZE;
2752 }
2753
2754 *pages = page_array;
2755 *npages = nr_pages;
2756 return page_to_virt(page_array[0]);
2757
2758 free_pages:
2759 io_pages_free(&page_array, pinned > 0 ? pinned : 0);
2760 return ERR_PTR(ret);
2761 }
2762
io_rings_map(struct io_ring_ctx * ctx,unsigned long uaddr,size_t size)2763 static void *io_rings_map(struct io_ring_ctx *ctx, unsigned long uaddr,
2764 size_t size)
2765 {
2766 return __io_uaddr_map(&ctx->ring_pages, &ctx->n_ring_pages, uaddr,
2767 size);
2768 }
2769
io_sqes_map(struct io_ring_ctx * ctx,unsigned long uaddr,size_t size)2770 static void *io_sqes_map(struct io_ring_ctx *ctx, unsigned long uaddr,
2771 size_t size)
2772 {
2773 return __io_uaddr_map(&ctx->sqe_pages, &ctx->n_sqe_pages, uaddr,
2774 size);
2775 }
2776
io_rings_free(struct io_ring_ctx * ctx)2777 static void io_rings_free(struct io_ring_ctx *ctx)
2778 {
2779 if (!(ctx->flags & IORING_SETUP_NO_MMAP)) {
2780 io_mem_free(ctx->rings);
2781 io_mem_free(ctx->sq_sqes);
2782 } else {
2783 io_pages_free(&ctx->ring_pages, ctx->n_ring_pages);
2784 ctx->n_ring_pages = 0;
2785 io_pages_free(&ctx->sqe_pages, ctx->n_sqe_pages);
2786 ctx->n_sqe_pages = 0;
2787 }
2788
2789 ctx->rings = NULL;
2790 ctx->sq_sqes = NULL;
2791 }
2792
io_mem_alloc(size_t size)2793 void *io_mem_alloc(size_t size)
2794 {
2795 gfp_t gfp = GFP_KERNEL_ACCOUNT | __GFP_ZERO | __GFP_NOWARN | __GFP_COMP;
2796 void *ret;
2797
2798 ret = (void *) __get_free_pages(gfp, get_order(size));
2799 if (ret)
2800 return ret;
2801 return ERR_PTR(-ENOMEM);
2802 }
2803
rings_size(struct io_ring_ctx * ctx,unsigned int sq_entries,unsigned int cq_entries,size_t * sq_offset)2804 static unsigned long rings_size(struct io_ring_ctx *ctx, unsigned int sq_entries,
2805 unsigned int cq_entries, size_t *sq_offset)
2806 {
2807 struct io_rings *rings;
2808 size_t off, sq_array_size;
2809
2810 off = struct_size(rings, cqes, cq_entries);
2811 if (off == SIZE_MAX)
2812 return SIZE_MAX;
2813 if (ctx->flags & IORING_SETUP_CQE32) {
2814 if (check_shl_overflow(off, 1, &off))
2815 return SIZE_MAX;
2816 }
2817
2818 #ifdef CONFIG_SMP
2819 off = ALIGN(off, SMP_CACHE_BYTES);
2820 if (off == 0)
2821 return SIZE_MAX;
2822 #endif
2823
2824 if (ctx->flags & IORING_SETUP_NO_SQARRAY) {
2825 if (sq_offset)
2826 *sq_offset = SIZE_MAX;
2827 return off;
2828 }
2829
2830 if (sq_offset)
2831 *sq_offset = off;
2832
2833 sq_array_size = array_size(sizeof(u32), sq_entries);
2834 if (sq_array_size == SIZE_MAX)
2835 return SIZE_MAX;
2836
2837 if (check_add_overflow(off, sq_array_size, &off))
2838 return SIZE_MAX;
2839
2840 return off;
2841 }
2842
io_eventfd_register(struct io_ring_ctx * ctx,void __user * arg,unsigned int eventfd_async)2843 static int io_eventfd_register(struct io_ring_ctx *ctx, void __user *arg,
2844 unsigned int eventfd_async)
2845 {
2846 struct io_ev_fd *ev_fd;
2847 __s32 __user *fds = arg;
2848 int fd;
2849
2850 ev_fd = rcu_dereference_protected(ctx->io_ev_fd,
2851 lockdep_is_held(&ctx->uring_lock));
2852 if (ev_fd)
2853 return -EBUSY;
2854
2855 if (copy_from_user(&fd, fds, sizeof(*fds)))
2856 return -EFAULT;
2857
2858 ev_fd = kmalloc(sizeof(*ev_fd), GFP_KERNEL);
2859 if (!ev_fd)
2860 return -ENOMEM;
2861
2862 ev_fd->cq_ev_fd = eventfd_ctx_fdget(fd);
2863 if (IS_ERR(ev_fd->cq_ev_fd)) {
2864 int ret = PTR_ERR(ev_fd->cq_ev_fd);
2865 kfree(ev_fd);
2866 return ret;
2867 }
2868
2869 spin_lock(&ctx->completion_lock);
2870 ctx->evfd_last_cq_tail = ctx->cached_cq_tail;
2871 spin_unlock(&ctx->completion_lock);
2872
2873 ev_fd->eventfd_async = eventfd_async;
2874 ctx->has_evfd = true;
2875 rcu_assign_pointer(ctx->io_ev_fd, ev_fd);
2876 atomic_set(&ev_fd->refs, 1);
2877 atomic_set(&ev_fd->ops, 0);
2878 return 0;
2879 }
2880
io_eventfd_unregister(struct io_ring_ctx * ctx)2881 static int io_eventfd_unregister(struct io_ring_ctx *ctx)
2882 {
2883 struct io_ev_fd *ev_fd;
2884
2885 ev_fd = rcu_dereference_protected(ctx->io_ev_fd,
2886 lockdep_is_held(&ctx->uring_lock));
2887 if (ev_fd) {
2888 ctx->has_evfd = false;
2889 rcu_assign_pointer(ctx->io_ev_fd, NULL);
2890 if (!atomic_fetch_or(BIT(IO_EVENTFD_OP_FREE_BIT), &ev_fd->ops))
2891 call_rcu(&ev_fd->rcu, io_eventfd_ops);
2892 return 0;
2893 }
2894
2895 return -ENXIO;
2896 }
2897
io_req_caches_free(struct io_ring_ctx * ctx)2898 static void io_req_caches_free(struct io_ring_ctx *ctx)
2899 {
2900 struct io_kiocb *req;
2901 int nr = 0;
2902
2903 mutex_lock(&ctx->uring_lock);
2904 io_flush_cached_locked_reqs(ctx, &ctx->submit_state);
2905
2906 while (!io_req_cache_empty(ctx)) {
2907 req = io_extract_req(ctx);
2908 kmem_cache_free(req_cachep, req);
2909 nr++;
2910 }
2911 if (nr)
2912 percpu_ref_put_many(&ctx->refs, nr);
2913 mutex_unlock(&ctx->uring_lock);
2914 }
2915
io_rsrc_node_cache_free(struct io_cache_entry * entry)2916 static void io_rsrc_node_cache_free(struct io_cache_entry *entry)
2917 {
2918 kfree(container_of(entry, struct io_rsrc_node, cache));
2919 }
2920
io_ring_ctx_free(struct io_ring_ctx * ctx)2921 static __cold void io_ring_ctx_free(struct io_ring_ctx *ctx)
2922 {
2923 io_sq_thread_finish(ctx);
2924 /* __io_rsrc_put_work() may need uring_lock to progress, wait w/o it */
2925 if (WARN_ON_ONCE(!list_empty(&ctx->rsrc_ref_list)))
2926 return;
2927
2928 mutex_lock(&ctx->uring_lock);
2929 if (ctx->buf_data)
2930 __io_sqe_buffers_unregister(ctx);
2931 if (ctx->file_data)
2932 __io_sqe_files_unregister(ctx);
2933 io_cqring_overflow_kill(ctx);
2934 io_eventfd_unregister(ctx);
2935 io_alloc_cache_free(&ctx->apoll_cache, io_apoll_cache_free);
2936 io_alloc_cache_free(&ctx->netmsg_cache, io_netmsg_cache_free);
2937 io_destroy_buffers(ctx);
2938 mutex_unlock(&ctx->uring_lock);
2939 if (ctx->sq_creds)
2940 put_cred(ctx->sq_creds);
2941 if (ctx->submitter_task)
2942 put_task_struct(ctx->submitter_task);
2943
2944 /* there are no registered resources left, nobody uses it */
2945 if (ctx->rsrc_node)
2946 io_rsrc_node_destroy(ctx, ctx->rsrc_node);
2947
2948 WARN_ON_ONCE(!list_empty(&ctx->rsrc_ref_list));
2949 WARN_ON_ONCE(!list_empty(&ctx->ltimeout_list));
2950
2951 io_alloc_cache_free(&ctx->rsrc_node_cache, io_rsrc_node_cache_free);
2952 if (ctx->mm_account) {
2953 mmdrop(ctx->mm_account);
2954 ctx->mm_account = NULL;
2955 }
2956 io_rings_free(ctx);
2957 io_kbuf_mmap_list_free(ctx);
2958
2959 percpu_ref_exit(&ctx->refs);
2960 free_uid(ctx->user);
2961 io_req_caches_free(ctx);
2962 if (ctx->hash_map)
2963 io_wq_put_hash(ctx->hash_map);
2964 kfree(ctx->cancel_table.hbs);
2965 kfree(ctx->cancel_table_locked.hbs);
2966 xa_destroy(&ctx->io_bl_xa);
2967 kfree(ctx);
2968 }
2969
io_activate_pollwq_cb(struct callback_head * cb)2970 static __cold void io_activate_pollwq_cb(struct callback_head *cb)
2971 {
2972 struct io_ring_ctx *ctx = container_of(cb, struct io_ring_ctx,
2973 poll_wq_task_work);
2974
2975 mutex_lock(&ctx->uring_lock);
2976 ctx->poll_activated = true;
2977 mutex_unlock(&ctx->uring_lock);
2978
2979 /*
2980 * Wake ups for some events between start of polling and activation
2981 * might've been lost due to loose synchronisation.
2982 */
2983 wake_up_all(&ctx->poll_wq);
2984 percpu_ref_put(&ctx->refs);
2985 }
2986
io_activate_pollwq(struct io_ring_ctx * ctx)2987 static __cold void io_activate_pollwq(struct io_ring_ctx *ctx)
2988 {
2989 spin_lock(&ctx->completion_lock);
2990 /* already activated or in progress */
2991 if (ctx->poll_activated || ctx->poll_wq_task_work.func)
2992 goto out;
2993 if (WARN_ON_ONCE(!ctx->task_complete))
2994 goto out;
2995 if (!ctx->submitter_task)
2996 goto out;
2997 /*
2998 * with ->submitter_task only the submitter task completes requests, we
2999 * only need to sync with it, which is done by injecting a tw
3000 */
3001 init_task_work(&ctx->poll_wq_task_work, io_activate_pollwq_cb);
3002 percpu_ref_get(&ctx->refs);
3003 if (task_work_add(ctx->submitter_task, &ctx->poll_wq_task_work, TWA_SIGNAL))
3004 percpu_ref_put(&ctx->refs);
3005 out:
3006 spin_unlock(&ctx->completion_lock);
3007 }
3008
io_uring_poll(struct file * file,poll_table * wait)3009 static __poll_t io_uring_poll(struct file *file, poll_table *wait)
3010 {
3011 struct io_ring_ctx *ctx = file->private_data;
3012 __poll_t mask = 0;
3013
3014 if (unlikely(!ctx->poll_activated))
3015 io_activate_pollwq(ctx);
3016
3017 poll_wait(file, &ctx->poll_wq, wait);
3018 /*
3019 * synchronizes with barrier from wq_has_sleeper call in
3020 * io_commit_cqring
3021 */
3022 smp_rmb();
3023 if (!io_sqring_full(ctx))
3024 mask |= EPOLLOUT | EPOLLWRNORM;
3025
3026 /*
3027 * Don't flush cqring overflow list here, just do a simple check.
3028 * Otherwise there could possible be ABBA deadlock:
3029 * CPU0 CPU1
3030 * ---- ----
3031 * lock(&ctx->uring_lock);
3032 * lock(&ep->mtx);
3033 * lock(&ctx->uring_lock);
3034 * lock(&ep->mtx);
3035 *
3036 * Users may get EPOLLIN meanwhile seeing nothing in cqring, this
3037 * pushes them to do the flush.
3038 */
3039
3040 if (__io_cqring_events_user(ctx) || io_has_work(ctx))
3041 mask |= EPOLLIN | EPOLLRDNORM;
3042
3043 return mask;
3044 }
3045
io_unregister_personality(struct io_ring_ctx * ctx,unsigned id)3046 static int io_unregister_personality(struct io_ring_ctx *ctx, unsigned id)
3047 {
3048 const struct cred *creds;
3049
3050 creds = xa_erase(&ctx->personalities, id);
3051 if (creds) {
3052 put_cred(creds);
3053 return 0;
3054 }
3055
3056 return -EINVAL;
3057 }
3058
3059 struct io_tctx_exit {
3060 struct callback_head task_work;
3061 struct completion completion;
3062 struct io_ring_ctx *ctx;
3063 };
3064
io_tctx_exit_cb(struct callback_head * cb)3065 static __cold void io_tctx_exit_cb(struct callback_head *cb)
3066 {
3067 struct io_uring_task *tctx = current->io_uring;
3068 struct io_tctx_exit *work;
3069
3070 work = container_of(cb, struct io_tctx_exit, task_work);
3071 /*
3072 * When @in_cancel, we're in cancellation and it's racy to remove the
3073 * node. It'll be removed by the end of cancellation, just ignore it.
3074 * tctx can be NULL if the queueing of this task_work raced with
3075 * work cancelation off the exec path.
3076 */
3077 if (tctx && !atomic_read(&tctx->in_cancel))
3078 io_uring_del_tctx_node((unsigned long)work->ctx);
3079 complete(&work->completion);
3080 }
3081
io_cancel_ctx_cb(struct io_wq_work * work,void * data)3082 static __cold bool io_cancel_ctx_cb(struct io_wq_work *work, void *data)
3083 {
3084 struct io_kiocb *req = container_of(work, struct io_kiocb, work);
3085
3086 return req->ctx == data;
3087 }
3088
io_ring_exit_work(struct work_struct * work)3089 static __cold void io_ring_exit_work(struct work_struct *work)
3090 {
3091 struct io_ring_ctx *ctx = container_of(work, struct io_ring_ctx, exit_work);
3092 unsigned long timeout = jiffies + HZ * 60 * 5;
3093 unsigned long interval = HZ / 20;
3094 struct io_tctx_exit exit;
3095 struct io_tctx_node *node;
3096 int ret;
3097
3098 /*
3099 * If we're doing polled IO and end up having requests being
3100 * submitted async (out-of-line), then completions can come in while
3101 * we're waiting for refs to drop. We need to reap these manually,
3102 * as nobody else will be looking for them.
3103 */
3104 do {
3105 if (test_bit(IO_CHECK_CQ_OVERFLOW_BIT, &ctx->check_cq)) {
3106 mutex_lock(&ctx->uring_lock);
3107 io_cqring_overflow_kill(ctx);
3108 mutex_unlock(&ctx->uring_lock);
3109 }
3110
3111 if (ctx->flags & IORING_SETUP_DEFER_TASKRUN)
3112 io_move_task_work_from_local(ctx);
3113
3114 while (io_uring_try_cancel_requests(ctx, NULL, true))
3115 cond_resched();
3116
3117 if (ctx->sq_data) {
3118 struct io_sq_data *sqd = ctx->sq_data;
3119 struct task_struct *tsk;
3120
3121 io_sq_thread_park(sqd);
3122 tsk = sqd->thread;
3123 if (tsk && tsk->io_uring && tsk->io_uring->io_wq)
3124 io_wq_cancel_cb(tsk->io_uring->io_wq,
3125 io_cancel_ctx_cb, ctx, true);
3126 io_sq_thread_unpark(sqd);
3127 }
3128
3129 io_req_caches_free(ctx);
3130
3131 if (WARN_ON_ONCE(time_after(jiffies, timeout))) {
3132 /* there is little hope left, don't run it too often */
3133 interval = HZ * 60;
3134 }
3135 /*
3136 * This is really an uninterruptible wait, as it has to be
3137 * complete. But it's also run from a kworker, which doesn't
3138 * take signals, so it's fine to make it interruptible. This
3139 * avoids scenarios where we knowingly can wait much longer
3140 * on completions, for example if someone does a SIGSTOP on
3141 * a task that needs to finish task_work to make this loop
3142 * complete. That's a synthetic situation that should not
3143 * cause a stuck task backtrace, and hence a potential panic
3144 * on stuck tasks if that is enabled.
3145 */
3146 } while (!wait_for_completion_interruptible_timeout(&ctx->ref_comp, interval));
3147
3148 init_completion(&exit.completion);
3149 init_task_work(&exit.task_work, io_tctx_exit_cb);
3150 exit.ctx = ctx;
3151
3152 mutex_lock(&ctx->uring_lock);
3153 while (!list_empty(&ctx->tctx_list)) {
3154 WARN_ON_ONCE(time_after(jiffies, timeout));
3155
3156 node = list_first_entry(&ctx->tctx_list, struct io_tctx_node,
3157 ctx_node);
3158 /* don't spin on a single task if cancellation failed */
3159 list_rotate_left(&ctx->tctx_list);
3160 ret = task_work_add(node->task, &exit.task_work, TWA_SIGNAL);
3161 if (WARN_ON_ONCE(ret))
3162 continue;
3163
3164 mutex_unlock(&ctx->uring_lock);
3165 /*
3166 * See comment above for
3167 * wait_for_completion_interruptible_timeout() on why this
3168 * wait is marked as interruptible.
3169 */
3170 wait_for_completion_interruptible(&exit.completion);
3171 mutex_lock(&ctx->uring_lock);
3172 }
3173 mutex_unlock(&ctx->uring_lock);
3174 spin_lock(&ctx->completion_lock);
3175 spin_unlock(&ctx->completion_lock);
3176
3177 /* pairs with RCU read section in io_req_local_work_add() */
3178 if (ctx->flags & IORING_SETUP_DEFER_TASKRUN)
3179 synchronize_rcu();
3180
3181 io_ring_ctx_free(ctx);
3182 }
3183
io_ring_ctx_wait_and_kill(struct io_ring_ctx * ctx)3184 static __cold void io_ring_ctx_wait_and_kill(struct io_ring_ctx *ctx)
3185 {
3186 unsigned long index;
3187 struct creds *creds;
3188
3189 mutex_lock(&ctx->uring_lock);
3190 percpu_ref_kill(&ctx->refs);
3191 xa_for_each(&ctx->personalities, index, creds)
3192 io_unregister_personality(ctx, index);
3193 if (ctx->rings)
3194 io_poll_remove_all(ctx, NULL, true);
3195 mutex_unlock(&ctx->uring_lock);
3196
3197 /*
3198 * If we failed setting up the ctx, we might not have any rings
3199 * and therefore did not submit any requests
3200 */
3201 if (ctx->rings)
3202 io_kill_timeouts(ctx, NULL, true);
3203
3204 flush_delayed_work(&ctx->fallback_work);
3205
3206 INIT_WORK(&ctx->exit_work, io_ring_exit_work);
3207 /*
3208 * Use system_unbound_wq to avoid spawning tons of event kworkers
3209 * if we're exiting a ton of rings at the same time. It just adds
3210 * noise and overhead, there's no discernable change in runtime
3211 * over using system_wq.
3212 */
3213 queue_work(iou_wq, &ctx->exit_work);
3214 }
3215
io_uring_release(struct inode * inode,struct file * file)3216 static int io_uring_release(struct inode *inode, struct file *file)
3217 {
3218 struct io_ring_ctx *ctx = file->private_data;
3219
3220 file->private_data = NULL;
3221 io_ring_ctx_wait_and_kill(ctx);
3222 return 0;
3223 }
3224
3225 struct io_task_cancel {
3226 struct task_struct *task;
3227 bool all;
3228 };
3229
io_cancel_task_cb(struct io_wq_work * work,void * data)3230 static bool io_cancel_task_cb(struct io_wq_work *work, void *data)
3231 {
3232 struct io_kiocb *req = container_of(work, struct io_kiocb, work);
3233 struct io_task_cancel *cancel = data;
3234
3235 return io_match_task_safe(req, cancel->task, cancel->all);
3236 }
3237
io_cancel_defer_files(struct io_ring_ctx * ctx,struct task_struct * task,bool cancel_all)3238 static __cold bool io_cancel_defer_files(struct io_ring_ctx *ctx,
3239 struct task_struct *task,
3240 bool cancel_all)
3241 {
3242 struct io_defer_entry *de;
3243 LIST_HEAD(list);
3244
3245 spin_lock(&ctx->completion_lock);
3246 list_for_each_entry_reverse(de, &ctx->defer_list, list) {
3247 if (io_match_task_safe(de->req, task, cancel_all)) {
3248 list_cut_position(&list, &ctx->defer_list, &de->list);
3249 break;
3250 }
3251 }
3252 spin_unlock(&ctx->completion_lock);
3253 if (list_empty(&list))
3254 return false;
3255
3256 while (!list_empty(&list)) {
3257 de = list_first_entry(&list, struct io_defer_entry, list);
3258 list_del_init(&de->list);
3259 io_req_task_queue_fail(de->req, -ECANCELED);
3260 kfree(de);
3261 }
3262 return true;
3263 }
3264
io_uring_try_cancel_iowq(struct io_ring_ctx * ctx)3265 static __cold bool io_uring_try_cancel_iowq(struct io_ring_ctx *ctx)
3266 {
3267 struct io_tctx_node *node;
3268 enum io_wq_cancel cret;
3269 bool ret = false;
3270
3271 mutex_lock(&ctx->uring_lock);
3272 list_for_each_entry(node, &ctx->tctx_list, ctx_node) {
3273 struct io_uring_task *tctx = node->task->io_uring;
3274
3275 /*
3276 * io_wq will stay alive while we hold uring_lock, because it's
3277 * killed after ctx nodes, which requires to take the lock.
3278 */
3279 if (!tctx || !tctx->io_wq)
3280 continue;
3281 cret = io_wq_cancel_cb(tctx->io_wq, io_cancel_ctx_cb, ctx, true);
3282 ret |= (cret != IO_WQ_CANCEL_NOTFOUND);
3283 }
3284 mutex_unlock(&ctx->uring_lock);
3285
3286 return ret;
3287 }
3288
io_uring_try_cancel_requests(struct io_ring_ctx * ctx,struct task_struct * task,bool cancel_all)3289 static __cold bool io_uring_try_cancel_requests(struct io_ring_ctx *ctx,
3290 struct task_struct *task,
3291 bool cancel_all)
3292 {
3293 struct io_task_cancel cancel = { .task = task, .all = cancel_all, };
3294 struct io_uring_task *tctx = task ? task->io_uring : NULL;
3295 enum io_wq_cancel cret;
3296 bool ret = false;
3297
3298 /* set it so io_req_local_work_add() would wake us up */
3299 if (ctx->flags & IORING_SETUP_DEFER_TASKRUN) {
3300 atomic_set(&ctx->cq_wait_nr, 1);
3301 smp_mb();
3302 }
3303
3304 /* failed during ring init, it couldn't have issued any requests */
3305 if (!ctx->rings)
3306 return false;
3307
3308 if (!task) {
3309 ret |= io_uring_try_cancel_iowq(ctx);
3310 } else if (tctx && tctx->io_wq) {
3311 /*
3312 * Cancels requests of all rings, not only @ctx, but
3313 * it's fine as the task is in exit/exec.
3314 */
3315 cret = io_wq_cancel_cb(tctx->io_wq, io_cancel_task_cb,
3316 &cancel, true);
3317 ret |= (cret != IO_WQ_CANCEL_NOTFOUND);
3318 }
3319
3320 /* SQPOLL thread does its own polling */
3321 if ((!(ctx->flags & IORING_SETUP_SQPOLL) && cancel_all) ||
3322 (ctx->sq_data && ctx->sq_data->thread == current)) {
3323 while (!wq_list_empty(&ctx->iopoll_list)) {
3324 io_iopoll_try_reap_events(ctx);
3325 ret = true;
3326 cond_resched();
3327 }
3328 }
3329
3330 if ((ctx->flags & IORING_SETUP_DEFER_TASKRUN) &&
3331 io_allowed_defer_tw_run(ctx))
3332 ret |= io_run_local_work(ctx, INT_MAX) > 0;
3333 ret |= io_cancel_defer_files(ctx, task, cancel_all);
3334 mutex_lock(&ctx->uring_lock);
3335 ret |= io_poll_remove_all(ctx, task, cancel_all);
3336 mutex_unlock(&ctx->uring_lock);
3337 ret |= io_kill_timeouts(ctx, task, cancel_all);
3338 if (task)
3339 ret |= io_run_task_work() > 0;
3340 return ret;
3341 }
3342
tctx_inflight(struct io_uring_task * tctx,bool tracked)3343 static s64 tctx_inflight(struct io_uring_task *tctx, bool tracked)
3344 {
3345 if (tracked)
3346 return atomic_read(&tctx->inflight_tracked);
3347 return percpu_counter_sum(&tctx->inflight);
3348 }
3349
3350 /*
3351 * Find any io_uring ctx that this task has registered or done IO on, and cancel
3352 * requests. @sqd should be not-null IFF it's an SQPOLL thread cancellation.
3353 */
io_uring_cancel_generic(bool cancel_all,struct io_sq_data * sqd)3354 __cold void io_uring_cancel_generic(bool cancel_all, struct io_sq_data *sqd)
3355 {
3356 struct io_uring_task *tctx = current->io_uring;
3357 struct io_ring_ctx *ctx;
3358 struct io_tctx_node *node;
3359 unsigned long index;
3360 s64 inflight;
3361 DEFINE_WAIT(wait);
3362
3363 WARN_ON_ONCE(sqd && sqd->thread != current);
3364
3365 if (!current->io_uring)
3366 return;
3367 if (tctx->io_wq)
3368 io_wq_exit_start(tctx->io_wq);
3369
3370 atomic_inc(&tctx->in_cancel);
3371 do {
3372 bool loop = false;
3373
3374 io_uring_drop_tctx_refs(current);
3375 if (!tctx_inflight(tctx, !cancel_all))
3376 break;
3377
3378 /* read completions before cancelations */
3379 inflight = tctx_inflight(tctx, false);
3380 if (!inflight)
3381 break;
3382
3383 if (!sqd) {
3384 xa_for_each(&tctx->xa, index, node) {
3385 /* sqpoll task will cancel all its requests */
3386 if (node->ctx->sq_data)
3387 continue;
3388 loop |= io_uring_try_cancel_requests(node->ctx,
3389 current, cancel_all);
3390 }
3391 } else {
3392 list_for_each_entry(ctx, &sqd->ctx_list, sqd_list)
3393 loop |= io_uring_try_cancel_requests(ctx,
3394 current,
3395 cancel_all);
3396 }
3397
3398 if (loop) {
3399 cond_resched();
3400 continue;
3401 }
3402
3403 prepare_to_wait(&tctx->wait, &wait, TASK_INTERRUPTIBLE);
3404 io_run_task_work();
3405 io_uring_drop_tctx_refs(current);
3406 xa_for_each(&tctx->xa, index, node) {
3407 if (!llist_empty(&node->ctx->work_llist)) {
3408 WARN_ON_ONCE(node->ctx->submitter_task &&
3409 node->ctx->submitter_task != current);
3410 goto end_wait;
3411 }
3412 }
3413 /*
3414 * If we've seen completions, retry without waiting. This
3415 * avoids a race where a completion comes in before we did
3416 * prepare_to_wait().
3417 */
3418 if (inflight == tctx_inflight(tctx, !cancel_all))
3419 schedule();
3420 end_wait:
3421 finish_wait(&tctx->wait, &wait);
3422 } while (1);
3423
3424 io_uring_clean_tctx(tctx);
3425 if (cancel_all) {
3426 /*
3427 * We shouldn't run task_works after cancel, so just leave
3428 * ->in_cancel set for normal exit.
3429 */
3430 atomic_dec(&tctx->in_cancel);
3431 /* for exec all current's requests should be gone, kill tctx */
3432 __io_uring_free(current);
3433 }
3434 }
3435
__io_uring_cancel(bool cancel_all)3436 void __io_uring_cancel(bool cancel_all)
3437 {
3438 io_uring_unreg_ringfd();
3439 io_uring_cancel_generic(cancel_all, NULL);
3440 }
3441
io_uring_validate_mmap_request(struct file * file,loff_t pgoff,size_t sz)3442 static void *io_uring_validate_mmap_request(struct file *file,
3443 loff_t pgoff, size_t sz)
3444 {
3445 struct io_ring_ctx *ctx = file->private_data;
3446 loff_t offset = pgoff << PAGE_SHIFT;
3447 struct page *page;
3448 void *ptr;
3449
3450 switch (offset & IORING_OFF_MMAP_MASK) {
3451 case IORING_OFF_SQ_RING:
3452 case IORING_OFF_CQ_RING:
3453 /* Don't allow mmap if the ring was setup without it */
3454 if (ctx->flags & IORING_SETUP_NO_MMAP)
3455 return ERR_PTR(-EINVAL);
3456 ptr = ctx->rings;
3457 break;
3458 case IORING_OFF_SQES:
3459 /* Don't allow mmap if the ring was setup without it */
3460 if (ctx->flags & IORING_SETUP_NO_MMAP)
3461 return ERR_PTR(-EINVAL);
3462 ptr = ctx->sq_sqes;
3463 break;
3464 case IORING_OFF_PBUF_RING: {
3465 struct io_buffer_list *bl;
3466 unsigned int bgid;
3467
3468 bgid = (offset & ~IORING_OFF_MMAP_MASK) >> IORING_OFF_PBUF_SHIFT;
3469 bl = io_pbuf_get_bl(ctx, bgid);
3470 if (IS_ERR(bl))
3471 return bl;
3472 ptr = bl->buf_ring;
3473 io_put_bl(ctx, bl);
3474 break;
3475 }
3476 default:
3477 return ERR_PTR(-EINVAL);
3478 }
3479
3480 page = virt_to_head_page(ptr);
3481 if (sz > page_size(page))
3482 return ERR_PTR(-EINVAL);
3483
3484 return ptr;
3485 }
3486
3487 #ifdef CONFIG_MMU
3488
io_uring_mmap(struct file * file,struct vm_area_struct * vma)3489 static __cold int io_uring_mmap(struct file *file, struct vm_area_struct *vma)
3490 {
3491 size_t sz = vma->vm_end - vma->vm_start;
3492 unsigned long pfn;
3493 void *ptr;
3494
3495 ptr = io_uring_validate_mmap_request(file, vma->vm_pgoff, sz);
3496 if (IS_ERR(ptr))
3497 return PTR_ERR(ptr);
3498
3499 pfn = virt_to_phys(ptr) >> PAGE_SHIFT;
3500 return remap_pfn_range(vma, vma->vm_start, pfn, sz, vma->vm_page_prot);
3501 }
3502
io_uring_mmu_get_unmapped_area(struct file * filp,unsigned long addr,unsigned long len,unsigned long pgoff,unsigned long flags)3503 static unsigned long io_uring_mmu_get_unmapped_area(struct file *filp,
3504 unsigned long addr, unsigned long len,
3505 unsigned long pgoff, unsigned long flags)
3506 {
3507 void *ptr;
3508
3509 /*
3510 * Do not allow to map to user-provided address to avoid breaking the
3511 * aliasing rules. Userspace is not able to guess the offset address of
3512 * kernel kmalloc()ed memory area.
3513 */
3514 if (addr)
3515 return -EINVAL;
3516
3517 ptr = io_uring_validate_mmap_request(filp, pgoff, len);
3518 if (IS_ERR(ptr))
3519 return -ENOMEM;
3520
3521 /*
3522 * Some architectures have strong cache aliasing requirements.
3523 * For such architectures we need a coherent mapping which aliases
3524 * kernel memory *and* userspace memory. To achieve that:
3525 * - use a NULL file pointer to reference physical memory, and
3526 * - use the kernel virtual address of the shared io_uring context
3527 * (instead of the userspace-provided address, which has to be 0UL
3528 * anyway).
3529 * - use the same pgoff which the get_unmapped_area() uses to
3530 * calculate the page colouring.
3531 * For architectures without such aliasing requirements, the
3532 * architecture will return any suitable mapping because addr is 0.
3533 */
3534 filp = NULL;
3535 flags |= MAP_SHARED;
3536 pgoff = 0; /* has been translated to ptr above */
3537 #ifdef SHM_COLOUR
3538 addr = (uintptr_t) ptr;
3539 pgoff = addr >> PAGE_SHIFT;
3540 #else
3541 addr = 0UL;
3542 #endif
3543 return current->mm->get_unmapped_area(filp, addr, len, pgoff, flags);
3544 }
3545
3546 #else /* !CONFIG_MMU */
3547
io_uring_mmap(struct file * file,struct vm_area_struct * vma)3548 static int io_uring_mmap(struct file *file, struct vm_area_struct *vma)
3549 {
3550 return is_nommu_shared_mapping(vma->vm_flags) ? 0 : -EINVAL;
3551 }
3552
io_uring_nommu_mmap_capabilities(struct file * file)3553 static unsigned int io_uring_nommu_mmap_capabilities(struct file *file)
3554 {
3555 return NOMMU_MAP_DIRECT | NOMMU_MAP_READ | NOMMU_MAP_WRITE;
3556 }
3557
io_uring_nommu_get_unmapped_area(struct file * file,unsigned long addr,unsigned long len,unsigned long pgoff,unsigned long flags)3558 static unsigned long io_uring_nommu_get_unmapped_area(struct file *file,
3559 unsigned long addr, unsigned long len,
3560 unsigned long pgoff, unsigned long flags)
3561 {
3562 void *ptr;
3563
3564 ptr = io_uring_validate_mmap_request(file, pgoff, len);
3565 if (IS_ERR(ptr))
3566 return PTR_ERR(ptr);
3567
3568 return (unsigned long) ptr;
3569 }
3570
3571 #endif /* !CONFIG_MMU */
3572
io_validate_ext_arg(unsigned flags,const void __user * argp,size_t argsz)3573 static int io_validate_ext_arg(unsigned flags, const void __user *argp, size_t argsz)
3574 {
3575 if (flags & IORING_ENTER_EXT_ARG) {
3576 struct io_uring_getevents_arg arg;
3577
3578 if (argsz != sizeof(arg))
3579 return -EINVAL;
3580 if (copy_from_user(&arg, argp, sizeof(arg)))
3581 return -EFAULT;
3582 }
3583 return 0;
3584 }
3585
io_get_ext_arg(unsigned flags,const void __user * argp,size_t * argsz,struct __kernel_timespec __user ** ts,const sigset_t __user ** sig)3586 static int io_get_ext_arg(unsigned flags, const void __user *argp, size_t *argsz,
3587 struct __kernel_timespec __user **ts,
3588 const sigset_t __user **sig)
3589 {
3590 struct io_uring_getevents_arg arg;
3591
3592 /*
3593 * If EXT_ARG isn't set, then we have no timespec and the argp pointer
3594 * is just a pointer to the sigset_t.
3595 */
3596 if (!(flags & IORING_ENTER_EXT_ARG)) {
3597 *sig = (const sigset_t __user *) argp;
3598 *ts = NULL;
3599 return 0;
3600 }
3601
3602 /*
3603 * EXT_ARG is set - ensure we agree on the size of it and copy in our
3604 * timespec and sigset_t pointers if good.
3605 */
3606 if (*argsz != sizeof(arg))
3607 return -EINVAL;
3608 if (copy_from_user(&arg, argp, sizeof(arg)))
3609 return -EFAULT;
3610 if (arg.pad)
3611 return -EINVAL;
3612 *sig = u64_to_user_ptr(arg.sigmask);
3613 *argsz = arg.sigmask_sz;
3614 *ts = u64_to_user_ptr(arg.ts);
3615 return 0;
3616 }
3617
SYSCALL_DEFINE6(io_uring_enter,unsigned int,fd,u32,to_submit,u32,min_complete,u32,flags,const void __user *,argp,size_t,argsz)3618 SYSCALL_DEFINE6(io_uring_enter, unsigned int, fd, u32, to_submit,
3619 u32, min_complete, u32, flags, const void __user *, argp,
3620 size_t, argsz)
3621 {
3622 struct io_ring_ctx *ctx;
3623 struct file *file;
3624 long ret;
3625
3626 if (unlikely(flags & ~(IORING_ENTER_GETEVENTS | IORING_ENTER_SQ_WAKEUP |
3627 IORING_ENTER_SQ_WAIT | IORING_ENTER_EXT_ARG |
3628 IORING_ENTER_REGISTERED_RING)))
3629 return -EINVAL;
3630
3631 /*
3632 * Ring fd has been registered via IORING_REGISTER_RING_FDS, we
3633 * need only dereference our task private array to find it.
3634 */
3635 if (flags & IORING_ENTER_REGISTERED_RING) {
3636 struct io_uring_task *tctx = current->io_uring;
3637
3638 if (unlikely(!tctx || fd >= IO_RINGFD_REG_MAX))
3639 return -EINVAL;
3640 fd = array_index_nospec(fd, IO_RINGFD_REG_MAX);
3641 file = tctx->registered_rings[fd];
3642 if (unlikely(!file))
3643 return -EBADF;
3644 } else {
3645 file = fget(fd);
3646 if (unlikely(!file))
3647 return -EBADF;
3648 ret = -EOPNOTSUPP;
3649 if (unlikely(!io_is_uring_fops(file)))
3650 goto out;
3651 }
3652
3653 ctx = file->private_data;
3654 ret = -EBADFD;
3655 if (unlikely(ctx->flags & IORING_SETUP_R_DISABLED))
3656 goto out;
3657
3658 /*
3659 * For SQ polling, the thread will do all submissions and completions.
3660 * Just return the requested submit count, and wake the thread if
3661 * we were asked to.
3662 */
3663 ret = 0;
3664 if (ctx->flags & IORING_SETUP_SQPOLL) {
3665 io_cqring_overflow_flush(ctx);
3666
3667 if (unlikely(ctx->sq_data->thread == NULL)) {
3668 ret = -EOWNERDEAD;
3669 goto out;
3670 }
3671 if (flags & IORING_ENTER_SQ_WAKEUP)
3672 wake_up(&ctx->sq_data->wait);
3673 if (flags & IORING_ENTER_SQ_WAIT)
3674 io_sqpoll_wait_sq(ctx);
3675
3676 ret = to_submit;
3677 } else if (to_submit) {
3678 ret = io_uring_add_tctx_node(ctx);
3679 if (unlikely(ret))
3680 goto out;
3681
3682 mutex_lock(&ctx->uring_lock);
3683 ret = io_submit_sqes(ctx, to_submit);
3684 if (ret != to_submit) {
3685 mutex_unlock(&ctx->uring_lock);
3686 goto out;
3687 }
3688 if (flags & IORING_ENTER_GETEVENTS) {
3689 if (ctx->syscall_iopoll)
3690 goto iopoll_locked;
3691 /*
3692 * Ignore errors, we'll soon call io_cqring_wait() and
3693 * it should handle ownership problems if any.
3694 */
3695 if (ctx->flags & IORING_SETUP_DEFER_TASKRUN)
3696 (void)io_run_local_work_locked(ctx, min_complete);
3697 }
3698 mutex_unlock(&ctx->uring_lock);
3699 }
3700
3701 if (flags & IORING_ENTER_GETEVENTS) {
3702 int ret2;
3703
3704 if (ctx->syscall_iopoll) {
3705 /*
3706 * We disallow the app entering submit/complete with
3707 * polling, but we still need to lock the ring to
3708 * prevent racing with polled issue that got punted to
3709 * a workqueue.
3710 */
3711 mutex_lock(&ctx->uring_lock);
3712 iopoll_locked:
3713 ret2 = io_validate_ext_arg(flags, argp, argsz);
3714 if (likely(!ret2)) {
3715 min_complete = min(min_complete,
3716 ctx->cq_entries);
3717 ret2 = io_iopoll_check(ctx, min_complete);
3718 }
3719 mutex_unlock(&ctx->uring_lock);
3720 } else {
3721 const sigset_t __user *sig;
3722 struct __kernel_timespec __user *ts;
3723
3724 ret2 = io_get_ext_arg(flags, argp, &argsz, &ts, &sig);
3725 if (likely(!ret2)) {
3726 min_complete = min(min_complete,
3727 ctx->cq_entries);
3728 ret2 = io_cqring_wait(ctx, min_complete, sig,
3729 argsz, ts);
3730 }
3731 }
3732
3733 if (!ret) {
3734 ret = ret2;
3735
3736 /*
3737 * EBADR indicates that one or more CQE were dropped.
3738 * Once the user has been informed we can clear the bit
3739 * as they are obviously ok with those drops.
3740 */
3741 if (unlikely(ret2 == -EBADR))
3742 clear_bit(IO_CHECK_CQ_DROPPED_BIT,
3743 &ctx->check_cq);
3744 }
3745 }
3746 out:
3747 if (!(flags & IORING_ENTER_REGISTERED_RING))
3748 fput(file);
3749 return ret;
3750 }
3751
3752 static const struct file_operations io_uring_fops = {
3753 .release = io_uring_release,
3754 .mmap = io_uring_mmap,
3755 #ifndef CONFIG_MMU
3756 .get_unmapped_area = io_uring_nommu_get_unmapped_area,
3757 .mmap_capabilities = io_uring_nommu_mmap_capabilities,
3758 #else
3759 .get_unmapped_area = io_uring_mmu_get_unmapped_area,
3760 #endif
3761 .poll = io_uring_poll,
3762 #ifdef CONFIG_PROC_FS
3763 .show_fdinfo = io_uring_show_fdinfo,
3764 #endif
3765 };
3766
io_is_uring_fops(struct file * file)3767 bool io_is_uring_fops(struct file *file)
3768 {
3769 return file->f_op == &io_uring_fops;
3770 }
3771
io_allocate_scq_urings(struct io_ring_ctx * ctx,struct io_uring_params * p)3772 static __cold int io_allocate_scq_urings(struct io_ring_ctx *ctx,
3773 struct io_uring_params *p)
3774 {
3775 struct io_rings *rings;
3776 size_t size, sq_array_offset;
3777 void *ptr;
3778
3779 /* make sure these are sane, as we already accounted them */
3780 ctx->sq_entries = p->sq_entries;
3781 ctx->cq_entries = p->cq_entries;
3782
3783 size = rings_size(ctx, p->sq_entries, p->cq_entries, &sq_array_offset);
3784 if (size == SIZE_MAX)
3785 return -EOVERFLOW;
3786
3787 if (!(ctx->flags & IORING_SETUP_NO_MMAP))
3788 rings = io_mem_alloc(size);
3789 else
3790 rings = io_rings_map(ctx, p->cq_off.user_addr, size);
3791
3792 if (IS_ERR(rings))
3793 return PTR_ERR(rings);
3794
3795 ctx->rings = rings;
3796 if (!(ctx->flags & IORING_SETUP_NO_SQARRAY))
3797 ctx->sq_array = (u32 *)((char *)rings + sq_array_offset);
3798 rings->sq_ring_mask = p->sq_entries - 1;
3799 rings->cq_ring_mask = p->cq_entries - 1;
3800 rings->sq_ring_entries = p->sq_entries;
3801 rings->cq_ring_entries = p->cq_entries;
3802
3803 if (p->flags & IORING_SETUP_SQE128)
3804 size = array_size(2 * sizeof(struct io_uring_sqe), p->sq_entries);
3805 else
3806 size = array_size(sizeof(struct io_uring_sqe), p->sq_entries);
3807 if (size == SIZE_MAX) {
3808 io_rings_free(ctx);
3809 return -EOVERFLOW;
3810 }
3811
3812 if (!(ctx->flags & IORING_SETUP_NO_MMAP))
3813 ptr = io_mem_alloc(size);
3814 else
3815 ptr = io_sqes_map(ctx, p->sq_off.user_addr, size);
3816
3817 if (IS_ERR(ptr)) {
3818 io_rings_free(ctx);
3819 return PTR_ERR(ptr);
3820 }
3821
3822 ctx->sq_sqes = ptr;
3823 return 0;
3824 }
3825
io_uring_install_fd(struct file * file)3826 static int io_uring_install_fd(struct file *file)
3827 {
3828 int fd;
3829
3830 fd = get_unused_fd_flags(O_RDWR | O_CLOEXEC);
3831 if (fd < 0)
3832 return fd;
3833 fd_install(fd, file);
3834 return fd;
3835 }
3836
3837 /*
3838 * Allocate an anonymous fd, this is what constitutes the application
3839 * visible backing of an io_uring instance. The application mmaps this
3840 * fd to gain access to the SQ/CQ ring details.
3841 */
io_uring_get_file(struct io_ring_ctx * ctx)3842 static struct file *io_uring_get_file(struct io_ring_ctx *ctx)
3843 {
3844 return anon_inode_getfile_secure("[io_uring]", &io_uring_fops, ctx,
3845 O_RDWR | O_CLOEXEC, NULL);
3846 }
3847
io_uring_create(unsigned entries,struct io_uring_params * p,struct io_uring_params __user * params)3848 static __cold int io_uring_create(unsigned entries, struct io_uring_params *p,
3849 struct io_uring_params __user *params)
3850 {
3851 struct io_ring_ctx *ctx;
3852 struct io_uring_task *tctx;
3853 struct file *file;
3854 int ret;
3855
3856 if (!entries)
3857 return -EINVAL;
3858 if (entries > IORING_MAX_ENTRIES) {
3859 if (!(p->flags & IORING_SETUP_CLAMP))
3860 return -EINVAL;
3861 entries = IORING_MAX_ENTRIES;
3862 }
3863
3864 if ((p->flags & IORING_SETUP_REGISTERED_FD_ONLY)
3865 && !(p->flags & IORING_SETUP_NO_MMAP))
3866 return -EINVAL;
3867
3868 /*
3869 * Use twice as many entries for the CQ ring. It's possible for the
3870 * application to drive a higher depth than the size of the SQ ring,
3871 * since the sqes are only used at submission time. This allows for
3872 * some flexibility in overcommitting a bit. If the application has
3873 * set IORING_SETUP_CQSIZE, it will have passed in the desired number
3874 * of CQ ring entries manually.
3875 */
3876 p->sq_entries = roundup_pow_of_two(entries);
3877 if (p->flags & IORING_SETUP_CQSIZE) {
3878 /*
3879 * If IORING_SETUP_CQSIZE is set, we do the same roundup
3880 * to a power-of-two, if it isn't already. We do NOT impose
3881 * any cq vs sq ring sizing.
3882 */
3883 if (!p->cq_entries)
3884 return -EINVAL;
3885 if (p->cq_entries > IORING_MAX_CQ_ENTRIES) {
3886 if (!(p->flags & IORING_SETUP_CLAMP))
3887 return -EINVAL;
3888 p->cq_entries = IORING_MAX_CQ_ENTRIES;
3889 }
3890 p->cq_entries = roundup_pow_of_two(p->cq_entries);
3891 if (p->cq_entries < p->sq_entries)
3892 return -EINVAL;
3893 } else {
3894 p->cq_entries = 2 * p->sq_entries;
3895 }
3896
3897 ctx = io_ring_ctx_alloc(p);
3898 if (!ctx)
3899 return -ENOMEM;
3900
3901 if ((ctx->flags & IORING_SETUP_DEFER_TASKRUN) &&
3902 !(ctx->flags & IORING_SETUP_IOPOLL) &&
3903 !(ctx->flags & IORING_SETUP_SQPOLL))
3904 ctx->task_complete = true;
3905
3906 if (ctx->task_complete || (ctx->flags & IORING_SETUP_IOPOLL))
3907 ctx->lockless_cq = true;
3908
3909 /*
3910 * lazy poll_wq activation relies on ->task_complete for synchronisation
3911 * purposes, see io_activate_pollwq()
3912 */
3913 if (!ctx->task_complete)
3914 ctx->poll_activated = true;
3915
3916 /*
3917 * When SETUP_IOPOLL and SETUP_SQPOLL are both enabled, user
3918 * space applications don't need to do io completion events
3919 * polling again, they can rely on io_sq_thread to do polling
3920 * work, which can reduce cpu usage and uring_lock contention.
3921 */
3922 if (ctx->flags & IORING_SETUP_IOPOLL &&
3923 !(ctx->flags & IORING_SETUP_SQPOLL))
3924 ctx->syscall_iopoll = 1;
3925
3926 ctx->compat = in_compat_syscall();
3927 if (!ns_capable_noaudit(&init_user_ns, CAP_IPC_LOCK))
3928 ctx->user = get_uid(current_user());
3929
3930 /*
3931 * For SQPOLL, we just need a wakeup, always. For !SQPOLL, if
3932 * COOP_TASKRUN is set, then IPIs are never needed by the app.
3933 */
3934 ret = -EINVAL;
3935 if (ctx->flags & IORING_SETUP_SQPOLL) {
3936 /* IPI related flags don't make sense with SQPOLL */
3937 if (ctx->flags & (IORING_SETUP_COOP_TASKRUN |
3938 IORING_SETUP_TASKRUN_FLAG |
3939 IORING_SETUP_DEFER_TASKRUN))
3940 goto err;
3941 ctx->notify_method = TWA_SIGNAL_NO_IPI;
3942 } else if (ctx->flags & IORING_SETUP_COOP_TASKRUN) {
3943 ctx->notify_method = TWA_SIGNAL_NO_IPI;
3944 } else {
3945 if (ctx->flags & IORING_SETUP_TASKRUN_FLAG &&
3946 !(ctx->flags & IORING_SETUP_DEFER_TASKRUN))
3947 goto err;
3948 ctx->notify_method = TWA_SIGNAL;
3949 }
3950
3951 /*
3952 * For DEFER_TASKRUN we require the completion task to be the same as the
3953 * submission task. This implies that there is only one submitter, so enforce
3954 * that.
3955 */
3956 if (ctx->flags & IORING_SETUP_DEFER_TASKRUN &&
3957 !(ctx->flags & IORING_SETUP_SINGLE_ISSUER)) {
3958 goto err;
3959 }
3960
3961 /*
3962 * This is just grabbed for accounting purposes. When a process exits,
3963 * the mm is exited and dropped before the files, hence we need to hang
3964 * on to this mm purely for the purposes of being able to unaccount
3965 * memory (locked/pinned vm). It's not used for anything else.
3966 */
3967 mmgrab(current->mm);
3968 ctx->mm_account = current->mm;
3969
3970 ret = io_allocate_scq_urings(ctx, p);
3971 if (ret)
3972 goto err;
3973
3974 ret = io_sq_offload_create(ctx, p);
3975 if (ret)
3976 goto err;
3977
3978 ret = io_rsrc_init(ctx);
3979 if (ret)
3980 goto err;
3981
3982 p->sq_off.head = offsetof(struct io_rings, sq.head);
3983 p->sq_off.tail = offsetof(struct io_rings, sq.tail);
3984 p->sq_off.ring_mask = offsetof(struct io_rings, sq_ring_mask);
3985 p->sq_off.ring_entries = offsetof(struct io_rings, sq_ring_entries);
3986 p->sq_off.flags = offsetof(struct io_rings, sq_flags);
3987 p->sq_off.dropped = offsetof(struct io_rings, sq_dropped);
3988 if (!(ctx->flags & IORING_SETUP_NO_SQARRAY))
3989 p->sq_off.array = (char *)ctx->sq_array - (char *)ctx->rings;
3990 p->sq_off.resv1 = 0;
3991 if (!(ctx->flags & IORING_SETUP_NO_MMAP))
3992 p->sq_off.user_addr = 0;
3993
3994 p->cq_off.head = offsetof(struct io_rings, cq.head);
3995 p->cq_off.tail = offsetof(struct io_rings, cq.tail);
3996 p->cq_off.ring_mask = offsetof(struct io_rings, cq_ring_mask);
3997 p->cq_off.ring_entries = offsetof(struct io_rings, cq_ring_entries);
3998 p->cq_off.overflow = offsetof(struct io_rings, cq_overflow);
3999 p->cq_off.cqes = offsetof(struct io_rings, cqes);
4000 p->cq_off.flags = offsetof(struct io_rings, cq_flags);
4001 p->cq_off.resv1 = 0;
4002 if (!(ctx->flags & IORING_SETUP_NO_MMAP))
4003 p->cq_off.user_addr = 0;
4004
4005 p->features = IORING_FEAT_SINGLE_MMAP | IORING_FEAT_NODROP |
4006 IORING_FEAT_SUBMIT_STABLE | IORING_FEAT_RW_CUR_POS |
4007 IORING_FEAT_CUR_PERSONALITY | IORING_FEAT_FAST_POLL |
4008 IORING_FEAT_POLL_32BITS | IORING_FEAT_SQPOLL_NONFIXED |
4009 IORING_FEAT_EXT_ARG | IORING_FEAT_NATIVE_WORKERS |
4010 IORING_FEAT_RSRC_TAGS | IORING_FEAT_CQE_SKIP |
4011 IORING_FEAT_LINKED_FILE | IORING_FEAT_REG_REG_RING;
4012
4013 if (copy_to_user(params, p, sizeof(*p))) {
4014 ret = -EFAULT;
4015 goto err;
4016 }
4017
4018 if (ctx->flags & IORING_SETUP_SINGLE_ISSUER
4019 && !(ctx->flags & IORING_SETUP_R_DISABLED))
4020 WRITE_ONCE(ctx->submitter_task, get_task_struct(current));
4021
4022 file = io_uring_get_file(ctx);
4023 if (IS_ERR(file)) {
4024 ret = PTR_ERR(file);
4025 goto err;
4026 }
4027
4028 ret = __io_uring_add_tctx_node(ctx);
4029 if (ret)
4030 goto err_fput;
4031 tctx = current->io_uring;
4032
4033 /*
4034 * Install ring fd as the very last thing, so we don't risk someone
4035 * having closed it before we finish setup
4036 */
4037 if (p->flags & IORING_SETUP_REGISTERED_FD_ONLY)
4038 ret = io_ring_add_registered_file(tctx, file, 0, IO_RINGFD_REG_MAX);
4039 else
4040 ret = io_uring_install_fd(file);
4041 if (ret < 0)
4042 goto err_fput;
4043
4044 trace_io_uring_create(ret, ctx, p->sq_entries, p->cq_entries, p->flags);
4045 return ret;
4046 err:
4047 io_ring_ctx_wait_and_kill(ctx);
4048 return ret;
4049 err_fput:
4050 fput(file);
4051 return ret;
4052 }
4053
4054 /*
4055 * Sets up an aio uring context, and returns the fd. Applications asks for a
4056 * ring size, we return the actual sq/cq ring sizes (among other things) in the
4057 * params structure passed in.
4058 */
io_uring_setup(u32 entries,struct io_uring_params __user * params)4059 static long io_uring_setup(u32 entries, struct io_uring_params __user *params)
4060 {
4061 struct io_uring_params p;
4062 int i;
4063
4064 if (copy_from_user(&p, params, sizeof(p)))
4065 return -EFAULT;
4066 for (i = 0; i < ARRAY_SIZE(p.resv); i++) {
4067 if (p.resv[i])
4068 return -EINVAL;
4069 }
4070
4071 if (p.flags & ~(IORING_SETUP_IOPOLL | IORING_SETUP_SQPOLL |
4072 IORING_SETUP_SQ_AFF | IORING_SETUP_CQSIZE |
4073 IORING_SETUP_CLAMP | IORING_SETUP_ATTACH_WQ |
4074 IORING_SETUP_R_DISABLED | IORING_SETUP_SUBMIT_ALL |
4075 IORING_SETUP_COOP_TASKRUN | IORING_SETUP_TASKRUN_FLAG |
4076 IORING_SETUP_SQE128 | IORING_SETUP_CQE32 |
4077 IORING_SETUP_SINGLE_ISSUER | IORING_SETUP_DEFER_TASKRUN |
4078 IORING_SETUP_NO_MMAP | IORING_SETUP_REGISTERED_FD_ONLY |
4079 IORING_SETUP_NO_SQARRAY))
4080 return -EINVAL;
4081
4082 return io_uring_create(entries, &p, params);
4083 }
4084
io_uring_allowed(void)4085 static inline bool io_uring_allowed(void)
4086 {
4087 int disabled = READ_ONCE(sysctl_io_uring_disabled);
4088 kgid_t io_uring_group;
4089
4090 if (disabled == 2)
4091 return false;
4092
4093 if (disabled == 0 || capable(CAP_SYS_ADMIN))
4094 return true;
4095
4096 io_uring_group = make_kgid(&init_user_ns, sysctl_io_uring_group);
4097 if (!gid_valid(io_uring_group))
4098 return false;
4099
4100 return in_group_p(io_uring_group);
4101 }
4102
SYSCALL_DEFINE2(io_uring_setup,u32,entries,struct io_uring_params __user *,params)4103 SYSCALL_DEFINE2(io_uring_setup, u32, entries,
4104 struct io_uring_params __user *, params)
4105 {
4106 if (!io_uring_allowed())
4107 return -EPERM;
4108
4109 return io_uring_setup(entries, params);
4110 }
4111
io_probe(struct io_ring_ctx * ctx,void __user * arg,unsigned nr_args)4112 static __cold int io_probe(struct io_ring_ctx *ctx, void __user *arg,
4113 unsigned nr_args)
4114 {
4115 struct io_uring_probe *p;
4116 size_t size;
4117 int i, ret;
4118
4119 size = struct_size(p, ops, nr_args);
4120 if (size == SIZE_MAX)
4121 return -EOVERFLOW;
4122 p = kzalloc(size, GFP_KERNEL);
4123 if (!p)
4124 return -ENOMEM;
4125
4126 ret = -EFAULT;
4127 if (copy_from_user(p, arg, size))
4128 goto out;
4129 ret = -EINVAL;
4130 if (memchr_inv(p, 0, size))
4131 goto out;
4132
4133 p->last_op = IORING_OP_LAST - 1;
4134 if (nr_args > IORING_OP_LAST)
4135 nr_args = IORING_OP_LAST;
4136
4137 for (i = 0; i < nr_args; i++) {
4138 p->ops[i].op = i;
4139 if (!io_issue_defs[i].not_supported)
4140 p->ops[i].flags = IO_URING_OP_SUPPORTED;
4141 }
4142 p->ops_len = i;
4143
4144 ret = 0;
4145 if (copy_to_user(arg, p, size))
4146 ret = -EFAULT;
4147 out:
4148 kfree(p);
4149 return ret;
4150 }
4151
io_register_personality(struct io_ring_ctx * ctx)4152 static int io_register_personality(struct io_ring_ctx *ctx)
4153 {
4154 const struct cred *creds;
4155 u32 id;
4156 int ret;
4157
4158 creds = get_current_cred();
4159
4160 ret = xa_alloc_cyclic(&ctx->personalities, &id, (void *)creds,
4161 XA_LIMIT(0, USHRT_MAX), &ctx->pers_next, GFP_KERNEL);
4162 if (ret < 0) {
4163 put_cred(creds);
4164 return ret;
4165 }
4166 return id;
4167 }
4168
io_register_restrictions(struct io_ring_ctx * ctx,void __user * arg,unsigned int nr_args)4169 static __cold int io_register_restrictions(struct io_ring_ctx *ctx,
4170 void __user *arg, unsigned int nr_args)
4171 {
4172 struct io_uring_restriction *res;
4173 size_t size;
4174 int i, ret;
4175
4176 /* Restrictions allowed only if rings started disabled */
4177 if (!(ctx->flags & IORING_SETUP_R_DISABLED))
4178 return -EBADFD;
4179
4180 /* We allow only a single restrictions registration */
4181 if (ctx->restrictions.registered)
4182 return -EBUSY;
4183
4184 if (!arg || nr_args > IORING_MAX_RESTRICTIONS)
4185 return -EINVAL;
4186
4187 size = array_size(nr_args, sizeof(*res));
4188 if (size == SIZE_MAX)
4189 return -EOVERFLOW;
4190
4191 res = memdup_user(arg, size);
4192 if (IS_ERR(res))
4193 return PTR_ERR(res);
4194
4195 ret = 0;
4196
4197 for (i = 0; i < nr_args; i++) {
4198 switch (res[i].opcode) {
4199 case IORING_RESTRICTION_REGISTER_OP:
4200 if (res[i].register_op >= IORING_REGISTER_LAST) {
4201 ret = -EINVAL;
4202 goto out;
4203 }
4204
4205 __set_bit(res[i].register_op,
4206 ctx->restrictions.register_op);
4207 break;
4208 case IORING_RESTRICTION_SQE_OP:
4209 if (res[i].sqe_op >= IORING_OP_LAST) {
4210 ret = -EINVAL;
4211 goto out;
4212 }
4213
4214 __set_bit(res[i].sqe_op, ctx->restrictions.sqe_op);
4215 break;
4216 case IORING_RESTRICTION_SQE_FLAGS_ALLOWED:
4217 ctx->restrictions.sqe_flags_allowed = res[i].sqe_flags;
4218 break;
4219 case IORING_RESTRICTION_SQE_FLAGS_REQUIRED:
4220 ctx->restrictions.sqe_flags_required = res[i].sqe_flags;
4221 break;
4222 default:
4223 ret = -EINVAL;
4224 goto out;
4225 }
4226 }
4227
4228 out:
4229 /* Reset all restrictions if an error happened */
4230 if (ret != 0)
4231 memset(&ctx->restrictions, 0, sizeof(ctx->restrictions));
4232 else
4233 ctx->restrictions.registered = true;
4234
4235 kfree(res);
4236 return ret;
4237 }
4238
io_register_enable_rings(struct io_ring_ctx * ctx)4239 static int io_register_enable_rings(struct io_ring_ctx *ctx)
4240 {
4241 if (!(ctx->flags & IORING_SETUP_R_DISABLED))
4242 return -EBADFD;
4243
4244 if (ctx->flags & IORING_SETUP_SINGLE_ISSUER && !ctx->submitter_task) {
4245 WRITE_ONCE(ctx->submitter_task, get_task_struct(current));
4246 /*
4247 * Lazy activation attempts would fail if it was polled before
4248 * submitter_task is set.
4249 */
4250 if (wq_has_sleeper(&ctx->poll_wq))
4251 io_activate_pollwq(ctx);
4252 }
4253
4254 if (ctx->restrictions.registered)
4255 ctx->restricted = 1;
4256
4257 ctx->flags &= ~IORING_SETUP_R_DISABLED;
4258 if (ctx->sq_data && wq_has_sleeper(&ctx->sq_data->wait))
4259 wake_up(&ctx->sq_data->wait);
4260 return 0;
4261 }
4262
__io_register_iowq_aff(struct io_ring_ctx * ctx,cpumask_var_t new_mask)4263 static __cold int __io_register_iowq_aff(struct io_ring_ctx *ctx,
4264 cpumask_var_t new_mask)
4265 {
4266 int ret;
4267
4268 if (!(ctx->flags & IORING_SETUP_SQPOLL)) {
4269 ret = io_wq_cpu_affinity(current->io_uring, new_mask);
4270 } else {
4271 mutex_unlock(&ctx->uring_lock);
4272 ret = io_sqpoll_wq_cpu_affinity(ctx, new_mask);
4273 mutex_lock(&ctx->uring_lock);
4274 }
4275
4276 return ret;
4277 }
4278
io_register_iowq_aff(struct io_ring_ctx * ctx,void __user * arg,unsigned len)4279 static __cold int io_register_iowq_aff(struct io_ring_ctx *ctx,
4280 void __user *arg, unsigned len)
4281 {
4282 cpumask_var_t new_mask;
4283 int ret;
4284
4285 if (!alloc_cpumask_var(&new_mask, GFP_KERNEL))
4286 return -ENOMEM;
4287
4288 cpumask_clear(new_mask);
4289 if (len > cpumask_size())
4290 len = cpumask_size();
4291
4292 if (in_compat_syscall()) {
4293 ret = compat_get_bitmap(cpumask_bits(new_mask),
4294 (const compat_ulong_t __user *)arg,
4295 len * 8 /* CHAR_BIT */);
4296 } else {
4297 ret = copy_from_user(new_mask, arg, len);
4298 }
4299
4300 if (ret) {
4301 free_cpumask_var(new_mask);
4302 return -EFAULT;
4303 }
4304
4305 ret = __io_register_iowq_aff(ctx, new_mask);
4306 free_cpumask_var(new_mask);
4307 return ret;
4308 }
4309
io_unregister_iowq_aff(struct io_ring_ctx * ctx)4310 static __cold int io_unregister_iowq_aff(struct io_ring_ctx *ctx)
4311 {
4312 return __io_register_iowq_aff(ctx, NULL);
4313 }
4314
io_register_iowq_max_workers(struct io_ring_ctx * ctx,void __user * arg)4315 static __cold int io_register_iowq_max_workers(struct io_ring_ctx *ctx,
4316 void __user *arg)
4317 __must_hold(&ctx->uring_lock)
4318 {
4319 struct io_tctx_node *node;
4320 struct io_uring_task *tctx = NULL;
4321 struct io_sq_data *sqd = NULL;
4322 __u32 new_count[2];
4323 int i, ret;
4324
4325 if (copy_from_user(new_count, arg, sizeof(new_count)))
4326 return -EFAULT;
4327 for (i = 0; i < ARRAY_SIZE(new_count); i++)
4328 if (new_count[i] > INT_MAX)
4329 return -EINVAL;
4330
4331 if (ctx->flags & IORING_SETUP_SQPOLL) {
4332 sqd = ctx->sq_data;
4333 if (sqd) {
4334 /*
4335 * Observe the correct sqd->lock -> ctx->uring_lock
4336 * ordering. Fine to drop uring_lock here, we hold
4337 * a ref to the ctx.
4338 */
4339 refcount_inc(&sqd->refs);
4340 mutex_unlock(&ctx->uring_lock);
4341 mutex_lock(&sqd->lock);
4342 mutex_lock(&ctx->uring_lock);
4343 if (sqd->thread)
4344 tctx = sqd->thread->io_uring;
4345 }
4346 } else {
4347 tctx = current->io_uring;
4348 }
4349
4350 BUILD_BUG_ON(sizeof(new_count) != sizeof(ctx->iowq_limits));
4351
4352 for (i = 0; i < ARRAY_SIZE(new_count); i++)
4353 if (new_count[i])
4354 ctx->iowq_limits[i] = new_count[i];
4355 ctx->iowq_limits_set = true;
4356
4357 if (tctx && tctx->io_wq) {
4358 ret = io_wq_max_workers(tctx->io_wq, new_count);
4359 if (ret)
4360 goto err;
4361 } else {
4362 memset(new_count, 0, sizeof(new_count));
4363 }
4364
4365 if (sqd) {
4366 mutex_unlock(&ctx->uring_lock);
4367 mutex_unlock(&sqd->lock);
4368 io_put_sq_data(sqd);
4369 mutex_lock(&ctx->uring_lock);
4370 }
4371
4372 if (copy_to_user(arg, new_count, sizeof(new_count)))
4373 return -EFAULT;
4374
4375 /* that's it for SQPOLL, only the SQPOLL task creates requests */
4376 if (sqd)
4377 return 0;
4378
4379 /* now propagate the restriction to all registered users */
4380 list_for_each_entry(node, &ctx->tctx_list, ctx_node) {
4381 struct io_uring_task *tctx = node->task->io_uring;
4382
4383 if (WARN_ON_ONCE(!tctx->io_wq))
4384 continue;
4385
4386 for (i = 0; i < ARRAY_SIZE(new_count); i++)
4387 new_count[i] = ctx->iowq_limits[i];
4388 /* ignore errors, it always returns zero anyway */
4389 (void)io_wq_max_workers(tctx->io_wq, new_count);
4390 }
4391 return 0;
4392 err:
4393 if (sqd) {
4394 mutex_unlock(&ctx->uring_lock);
4395 mutex_unlock(&sqd->lock);
4396 io_put_sq_data(sqd);
4397 mutex_lock(&ctx->uring_lock);
4398
4399 }
4400 return ret;
4401 }
4402
__io_uring_register(struct io_ring_ctx * ctx,unsigned opcode,void __user * arg,unsigned nr_args)4403 static int __io_uring_register(struct io_ring_ctx *ctx, unsigned opcode,
4404 void __user *arg, unsigned nr_args)
4405 __releases(ctx->uring_lock)
4406 __acquires(ctx->uring_lock)
4407 {
4408 int ret;
4409
4410 /*
4411 * We don't quiesce the refs for register anymore and so it can't be
4412 * dying as we're holding a file ref here.
4413 */
4414 if (WARN_ON_ONCE(percpu_ref_is_dying(&ctx->refs)))
4415 return -ENXIO;
4416
4417 if (ctx->submitter_task && ctx->submitter_task != current)
4418 return -EEXIST;
4419
4420 if (ctx->restricted) {
4421 opcode = array_index_nospec(opcode, IORING_REGISTER_LAST);
4422 if (!test_bit(opcode, ctx->restrictions.register_op))
4423 return -EACCES;
4424 }
4425
4426 switch (opcode) {
4427 case IORING_REGISTER_BUFFERS:
4428 ret = -EFAULT;
4429 if (!arg)
4430 break;
4431 ret = io_sqe_buffers_register(ctx, arg, nr_args, NULL);
4432 break;
4433 case IORING_UNREGISTER_BUFFERS:
4434 ret = -EINVAL;
4435 if (arg || nr_args)
4436 break;
4437 ret = io_sqe_buffers_unregister(ctx);
4438 break;
4439 case IORING_REGISTER_FILES:
4440 ret = -EFAULT;
4441 if (!arg)
4442 break;
4443 ret = io_sqe_files_register(ctx, arg, nr_args, NULL);
4444 break;
4445 case IORING_UNREGISTER_FILES:
4446 ret = -EINVAL;
4447 if (arg || nr_args)
4448 break;
4449 ret = io_sqe_files_unregister(ctx);
4450 break;
4451 case IORING_REGISTER_FILES_UPDATE:
4452 ret = io_register_files_update(ctx, arg, nr_args);
4453 break;
4454 case IORING_REGISTER_EVENTFD:
4455 ret = -EINVAL;
4456 if (nr_args != 1)
4457 break;
4458 ret = io_eventfd_register(ctx, arg, 0);
4459 break;
4460 case IORING_REGISTER_EVENTFD_ASYNC:
4461 ret = -EINVAL;
4462 if (nr_args != 1)
4463 break;
4464 ret = io_eventfd_register(ctx, arg, 1);
4465 break;
4466 case IORING_UNREGISTER_EVENTFD:
4467 ret = -EINVAL;
4468 if (arg || nr_args)
4469 break;
4470 ret = io_eventfd_unregister(ctx);
4471 break;
4472 case IORING_REGISTER_PROBE:
4473 ret = -EINVAL;
4474 if (!arg || nr_args > 256)
4475 break;
4476 ret = io_probe(ctx, arg, nr_args);
4477 break;
4478 case IORING_REGISTER_PERSONALITY:
4479 ret = -EINVAL;
4480 if (arg || nr_args)
4481 break;
4482 ret = io_register_personality(ctx);
4483 break;
4484 case IORING_UNREGISTER_PERSONALITY:
4485 ret = -EINVAL;
4486 if (arg)
4487 break;
4488 ret = io_unregister_personality(ctx, nr_args);
4489 break;
4490 case IORING_REGISTER_ENABLE_RINGS:
4491 ret = -EINVAL;
4492 if (arg || nr_args)
4493 break;
4494 ret = io_register_enable_rings(ctx);
4495 break;
4496 case IORING_REGISTER_RESTRICTIONS:
4497 ret = io_register_restrictions(ctx, arg, nr_args);
4498 break;
4499 case IORING_REGISTER_FILES2:
4500 ret = io_register_rsrc(ctx, arg, nr_args, IORING_RSRC_FILE);
4501 break;
4502 case IORING_REGISTER_FILES_UPDATE2:
4503 ret = io_register_rsrc_update(ctx, arg, nr_args,
4504 IORING_RSRC_FILE);
4505 break;
4506 case IORING_REGISTER_BUFFERS2:
4507 ret = io_register_rsrc(ctx, arg, nr_args, IORING_RSRC_BUFFER);
4508 break;
4509 case IORING_REGISTER_BUFFERS_UPDATE:
4510 ret = io_register_rsrc_update(ctx, arg, nr_args,
4511 IORING_RSRC_BUFFER);
4512 break;
4513 case IORING_REGISTER_IOWQ_AFF:
4514 ret = -EINVAL;
4515 if (!arg || !nr_args)
4516 break;
4517 ret = io_register_iowq_aff(ctx, arg, nr_args);
4518 break;
4519 case IORING_UNREGISTER_IOWQ_AFF:
4520 ret = -EINVAL;
4521 if (arg || nr_args)
4522 break;
4523 ret = io_unregister_iowq_aff(ctx);
4524 break;
4525 case IORING_REGISTER_IOWQ_MAX_WORKERS:
4526 ret = -EINVAL;
4527 if (!arg || nr_args != 2)
4528 break;
4529 ret = io_register_iowq_max_workers(ctx, arg);
4530 break;
4531 case IORING_REGISTER_RING_FDS:
4532 ret = io_ringfd_register(ctx, arg, nr_args);
4533 break;
4534 case IORING_UNREGISTER_RING_FDS:
4535 ret = io_ringfd_unregister(ctx, arg, nr_args);
4536 break;
4537 case IORING_REGISTER_PBUF_RING:
4538 ret = -EINVAL;
4539 if (!arg || nr_args != 1)
4540 break;
4541 ret = io_register_pbuf_ring(ctx, arg);
4542 break;
4543 case IORING_UNREGISTER_PBUF_RING:
4544 ret = -EINVAL;
4545 if (!arg || nr_args != 1)
4546 break;
4547 ret = io_unregister_pbuf_ring(ctx, arg);
4548 break;
4549 case IORING_REGISTER_SYNC_CANCEL:
4550 ret = -EINVAL;
4551 if (!arg || nr_args != 1)
4552 break;
4553 ret = io_sync_cancel(ctx, arg);
4554 break;
4555 case IORING_REGISTER_FILE_ALLOC_RANGE:
4556 ret = -EINVAL;
4557 if (!arg || nr_args)
4558 break;
4559 ret = io_register_file_alloc_range(ctx, arg);
4560 break;
4561 default:
4562 ret = -EINVAL;
4563 break;
4564 }
4565
4566 return ret;
4567 }
4568
SYSCALL_DEFINE4(io_uring_register,unsigned int,fd,unsigned int,opcode,void __user *,arg,unsigned int,nr_args)4569 SYSCALL_DEFINE4(io_uring_register, unsigned int, fd, unsigned int, opcode,
4570 void __user *, arg, unsigned int, nr_args)
4571 {
4572 struct io_ring_ctx *ctx;
4573 long ret = -EBADF;
4574 struct file *file;
4575 bool use_registered_ring;
4576
4577 use_registered_ring = !!(opcode & IORING_REGISTER_USE_REGISTERED_RING);
4578 opcode &= ~IORING_REGISTER_USE_REGISTERED_RING;
4579
4580 if (opcode >= IORING_REGISTER_LAST)
4581 return -EINVAL;
4582
4583 if (use_registered_ring) {
4584 /*
4585 * Ring fd has been registered via IORING_REGISTER_RING_FDS, we
4586 * need only dereference our task private array to find it.
4587 */
4588 struct io_uring_task *tctx = current->io_uring;
4589
4590 if (unlikely(!tctx || fd >= IO_RINGFD_REG_MAX))
4591 return -EINVAL;
4592 fd = array_index_nospec(fd, IO_RINGFD_REG_MAX);
4593 file = tctx->registered_rings[fd];
4594 if (unlikely(!file))
4595 return -EBADF;
4596 } else {
4597 file = fget(fd);
4598 if (unlikely(!file))
4599 return -EBADF;
4600 ret = -EOPNOTSUPP;
4601 if (!io_is_uring_fops(file))
4602 goto out_fput;
4603 }
4604
4605 ctx = file->private_data;
4606
4607 mutex_lock(&ctx->uring_lock);
4608 ret = __io_uring_register(ctx, opcode, arg, nr_args);
4609 mutex_unlock(&ctx->uring_lock);
4610 trace_io_uring_register(ctx, opcode, ctx->nr_user_files, ctx->nr_user_bufs, ret);
4611 out_fput:
4612 if (!use_registered_ring)
4613 fput(file);
4614 return ret;
4615 }
4616
io_uring_init(void)4617 static int __init io_uring_init(void)
4618 {
4619 #define __BUILD_BUG_VERIFY_OFFSET_SIZE(stype, eoffset, esize, ename) do { \
4620 BUILD_BUG_ON(offsetof(stype, ename) != eoffset); \
4621 BUILD_BUG_ON(sizeof_field(stype, ename) != esize); \
4622 } while (0)
4623
4624 #define BUILD_BUG_SQE_ELEM(eoffset, etype, ename) \
4625 __BUILD_BUG_VERIFY_OFFSET_SIZE(struct io_uring_sqe, eoffset, sizeof(etype), ename)
4626 #define BUILD_BUG_SQE_ELEM_SIZE(eoffset, esize, ename) \
4627 __BUILD_BUG_VERIFY_OFFSET_SIZE(struct io_uring_sqe, eoffset, esize, ename)
4628 BUILD_BUG_ON(sizeof(struct io_uring_sqe) != 64);
4629 BUILD_BUG_SQE_ELEM(0, __u8, opcode);
4630 BUILD_BUG_SQE_ELEM(1, __u8, flags);
4631 BUILD_BUG_SQE_ELEM(2, __u16, ioprio);
4632 BUILD_BUG_SQE_ELEM(4, __s32, fd);
4633 BUILD_BUG_SQE_ELEM(8, __u64, off);
4634 BUILD_BUG_SQE_ELEM(8, __u64, addr2);
4635 BUILD_BUG_SQE_ELEM(8, __u32, cmd_op);
4636 BUILD_BUG_SQE_ELEM(12, __u32, __pad1);
4637 BUILD_BUG_SQE_ELEM(16, __u64, addr);
4638 BUILD_BUG_SQE_ELEM(16, __u64, splice_off_in);
4639 BUILD_BUG_SQE_ELEM(24, __u32, len);
4640 BUILD_BUG_SQE_ELEM(28, __kernel_rwf_t, rw_flags);
4641 BUILD_BUG_SQE_ELEM(28, /* compat */ int, rw_flags);
4642 BUILD_BUG_SQE_ELEM(28, /* compat */ __u32, rw_flags);
4643 BUILD_BUG_SQE_ELEM(28, __u32, fsync_flags);
4644 BUILD_BUG_SQE_ELEM(28, /* compat */ __u16, poll_events);
4645 BUILD_BUG_SQE_ELEM(28, __u32, poll32_events);
4646 BUILD_BUG_SQE_ELEM(28, __u32, sync_range_flags);
4647 BUILD_BUG_SQE_ELEM(28, __u32, msg_flags);
4648 BUILD_BUG_SQE_ELEM(28, __u32, timeout_flags);
4649 BUILD_BUG_SQE_ELEM(28, __u32, accept_flags);
4650 BUILD_BUG_SQE_ELEM(28, __u32, cancel_flags);
4651 BUILD_BUG_SQE_ELEM(28, __u32, open_flags);
4652 BUILD_BUG_SQE_ELEM(28, __u32, statx_flags);
4653 BUILD_BUG_SQE_ELEM(28, __u32, fadvise_advice);
4654 BUILD_BUG_SQE_ELEM(28, __u32, splice_flags);
4655 BUILD_BUG_SQE_ELEM(28, __u32, rename_flags);
4656 BUILD_BUG_SQE_ELEM(28, __u32, unlink_flags);
4657 BUILD_BUG_SQE_ELEM(28, __u32, hardlink_flags);
4658 BUILD_BUG_SQE_ELEM(28, __u32, xattr_flags);
4659 BUILD_BUG_SQE_ELEM(28, __u32, msg_ring_flags);
4660 BUILD_BUG_SQE_ELEM(32, __u64, user_data);
4661 BUILD_BUG_SQE_ELEM(40, __u16, buf_index);
4662 BUILD_BUG_SQE_ELEM(40, __u16, buf_group);
4663 BUILD_BUG_SQE_ELEM(42, __u16, personality);
4664 BUILD_BUG_SQE_ELEM(44, __s32, splice_fd_in);
4665 BUILD_BUG_SQE_ELEM(44, __u32, file_index);
4666 BUILD_BUG_SQE_ELEM(44, __u16, addr_len);
4667 BUILD_BUG_SQE_ELEM(46, __u16, __pad3[0]);
4668 BUILD_BUG_SQE_ELEM(48, __u64, addr3);
4669 BUILD_BUG_SQE_ELEM_SIZE(48, 0, cmd);
4670 BUILD_BUG_SQE_ELEM(56, __u64, __pad2);
4671
4672 BUILD_BUG_ON(sizeof(struct io_uring_files_update) !=
4673 sizeof(struct io_uring_rsrc_update));
4674 BUILD_BUG_ON(sizeof(struct io_uring_rsrc_update) >
4675 sizeof(struct io_uring_rsrc_update2));
4676
4677 /* ->buf_index is u16 */
4678 BUILD_BUG_ON(offsetof(struct io_uring_buf_ring, bufs) != 0);
4679 BUILD_BUG_ON(offsetof(struct io_uring_buf, resv) !=
4680 offsetof(struct io_uring_buf_ring, tail));
4681
4682 /* should fit into one byte */
4683 BUILD_BUG_ON(SQE_VALID_FLAGS >= (1 << 8));
4684 BUILD_BUG_ON(SQE_COMMON_FLAGS >= (1 << 8));
4685 BUILD_BUG_ON((SQE_VALID_FLAGS | SQE_COMMON_FLAGS) != SQE_VALID_FLAGS);
4686
4687 BUILD_BUG_ON(__REQ_F_LAST_BIT > 8 * sizeof(int));
4688
4689 BUILD_BUG_ON(sizeof(atomic_t) != sizeof(u32));
4690
4691 io_uring_optable_init();
4692
4693 /*
4694 * Allow user copy in the per-command field, which starts after the
4695 * file in io_kiocb and until the opcode field. The openat2 handling
4696 * requires copying in user memory into the io_kiocb object in that
4697 * range, and HARDENED_USERCOPY will complain if we haven't
4698 * correctly annotated this range.
4699 */
4700 req_cachep = kmem_cache_create_usercopy("io_kiocb",
4701 sizeof(struct io_kiocb), 0,
4702 SLAB_HWCACHE_ALIGN | SLAB_PANIC |
4703 SLAB_ACCOUNT | SLAB_TYPESAFE_BY_RCU,
4704 offsetof(struct io_kiocb, cmd.data),
4705 sizeof_field(struct io_kiocb, cmd.data), NULL);
4706
4707 iou_wq = alloc_workqueue("iou_exit", WQ_UNBOUND, 64);
4708
4709 #ifdef CONFIG_SYSCTL
4710 register_sysctl_init("kernel", kernel_io_uring_disabled_table);
4711 #endif
4712
4713 return 0;
4714 };
4715 __initcall(io_uring_init);
4716