xref: /openbmc/linux/io_uring/rw.c (revision b866371f)
1  // SPDX-License-Identifier: GPL-2.0
2  #include <linux/kernel.h>
3  #include <linux/errno.h>
4  #include <linux/fs.h>
5  #include <linux/file.h>
6  #include <linux/blk-mq.h>
7  #include <linux/mm.h>
8  #include <linux/slab.h>
9  #include <linux/fsnotify.h>
10  #include <linux/poll.h>
11  #include <linux/nospec.h>
12  #include <linux/compat.h>
13  #include <linux/io_uring.h>
14  
15  #include <uapi/linux/io_uring.h>
16  
17  #include "io_uring.h"
18  #include "opdef.h"
19  #include "kbuf.h"
20  #include "rsrc.h"
21  #include "rw.h"
22  
23  struct io_rw {
24  	/* NOTE: kiocb has the file as the first member, so don't do it here */
25  	struct kiocb			kiocb;
26  	u64				addr;
27  	u32				len;
28  	rwf_t				flags;
29  };
30  
31  static inline bool io_file_supports_nowait(struct io_kiocb *req)
32  {
33  	return req->flags & REQ_F_SUPPORT_NOWAIT;
34  }
35  
36  #ifdef CONFIG_COMPAT
37  static int io_iov_compat_buffer_select_prep(struct io_rw *rw)
38  {
39  	struct compat_iovec __user *uiov;
40  	compat_ssize_t clen;
41  
42  	uiov = u64_to_user_ptr(rw->addr);
43  	if (!access_ok(uiov, sizeof(*uiov)))
44  		return -EFAULT;
45  	if (__get_user(clen, &uiov->iov_len))
46  		return -EFAULT;
47  	if (clen < 0)
48  		return -EINVAL;
49  
50  	rw->len = clen;
51  	return 0;
52  }
53  #endif
54  
55  static int io_iov_buffer_select_prep(struct io_kiocb *req)
56  {
57  	struct iovec __user *uiov;
58  	struct iovec iov;
59  	struct io_rw *rw = io_kiocb_to_cmd(req, struct io_rw);
60  
61  	if (rw->len != 1)
62  		return -EINVAL;
63  
64  #ifdef CONFIG_COMPAT
65  	if (req->ctx->compat)
66  		return io_iov_compat_buffer_select_prep(rw);
67  #endif
68  
69  	uiov = u64_to_user_ptr(rw->addr);
70  	if (copy_from_user(&iov, uiov, sizeof(*uiov)))
71  		return -EFAULT;
72  	rw->len = iov.iov_len;
73  	return 0;
74  }
75  
76  int io_prep_rw(struct io_kiocb *req, const struct io_uring_sqe *sqe)
77  {
78  	struct io_rw *rw = io_kiocb_to_cmd(req, struct io_rw);
79  	unsigned ioprio;
80  	int ret;
81  
82  	rw->kiocb.ki_pos = READ_ONCE(sqe->off);
83  	/* used for fixed read/write too - just read unconditionally */
84  	req->buf_index = READ_ONCE(sqe->buf_index);
85  
86  	if (req->opcode == IORING_OP_READ_FIXED ||
87  	    req->opcode == IORING_OP_WRITE_FIXED) {
88  		struct io_ring_ctx *ctx = req->ctx;
89  		u16 index;
90  
91  		if (unlikely(req->buf_index >= ctx->nr_user_bufs))
92  			return -EFAULT;
93  		index = array_index_nospec(req->buf_index, ctx->nr_user_bufs);
94  		req->imu = ctx->user_bufs[index];
95  		io_req_set_rsrc_node(req, ctx, 0);
96  	}
97  
98  	ioprio = READ_ONCE(sqe->ioprio);
99  	if (ioprio) {
100  		ret = ioprio_check_cap(ioprio);
101  		if (ret)
102  			return ret;
103  
104  		rw->kiocb.ki_ioprio = ioprio;
105  	} else {
106  		rw->kiocb.ki_ioprio = get_current_ioprio();
107  	}
108  	rw->kiocb.dio_complete = NULL;
109  
110  	rw->addr = READ_ONCE(sqe->addr);
111  	rw->len = READ_ONCE(sqe->len);
112  	rw->flags = READ_ONCE(sqe->rw_flags);
113  
114  	/* Have to do this validation here, as this is in io_read() rw->len might
115  	 * have chanaged due to buffer selection
116  	 */
117  	if (req->opcode == IORING_OP_READV && req->flags & REQ_F_BUFFER_SELECT) {
118  		ret = io_iov_buffer_select_prep(req);
119  		if (ret)
120  			return ret;
121  	}
122  
123  	return 0;
124  }
125  
126  void io_readv_writev_cleanup(struct io_kiocb *req)
127  {
128  	struct io_async_rw *io = req->async_data;
129  
130  	kfree(io->free_iovec);
131  }
132  
133  static inline void io_rw_done(struct kiocb *kiocb, ssize_t ret)
134  {
135  	switch (ret) {
136  	case -EIOCBQUEUED:
137  		break;
138  	case -ERESTARTSYS:
139  	case -ERESTARTNOINTR:
140  	case -ERESTARTNOHAND:
141  	case -ERESTART_RESTARTBLOCK:
142  		/*
143  		 * We can't just restart the syscall, since previously
144  		 * submitted sqes may already be in progress. Just fail this
145  		 * IO with EINTR.
146  		 */
147  		ret = -EINTR;
148  		fallthrough;
149  	default:
150  		kiocb->ki_complete(kiocb, ret);
151  	}
152  }
153  
154  static inline loff_t *io_kiocb_update_pos(struct io_kiocb *req)
155  {
156  	struct io_rw *rw = io_kiocb_to_cmd(req, struct io_rw);
157  
158  	if (rw->kiocb.ki_pos != -1)
159  		return &rw->kiocb.ki_pos;
160  
161  	if (!(req->file->f_mode & FMODE_STREAM)) {
162  		req->flags |= REQ_F_CUR_POS;
163  		rw->kiocb.ki_pos = req->file->f_pos;
164  		return &rw->kiocb.ki_pos;
165  	}
166  
167  	rw->kiocb.ki_pos = 0;
168  	return NULL;
169  }
170  
171  static void io_req_task_queue_reissue(struct io_kiocb *req)
172  {
173  	req->io_task_work.func = io_queue_iowq;
174  	io_req_task_work_add(req);
175  }
176  
177  #ifdef CONFIG_BLOCK
178  static bool io_resubmit_prep(struct io_kiocb *req)
179  {
180  	struct io_async_rw *io = req->async_data;
181  
182  	if (!req_has_async_data(req))
183  		return !io_req_prep_async(req);
184  	iov_iter_restore(&io->s.iter, &io->s.iter_state);
185  	return true;
186  }
187  
188  static bool io_rw_should_reissue(struct io_kiocb *req)
189  {
190  	umode_t mode = file_inode(req->file)->i_mode;
191  	struct io_ring_ctx *ctx = req->ctx;
192  
193  	if (!S_ISBLK(mode) && !S_ISREG(mode))
194  		return false;
195  	if ((req->flags & REQ_F_NOWAIT) || (io_wq_current_is_worker() &&
196  	    !(ctx->flags & IORING_SETUP_IOPOLL)))
197  		return false;
198  	/*
199  	 * If ref is dying, we might be running poll reap from the exit work.
200  	 * Don't attempt to reissue from that path, just let it fail with
201  	 * -EAGAIN.
202  	 */
203  	if (percpu_ref_is_dying(&ctx->refs))
204  		return false;
205  	/*
206  	 * Play it safe and assume not safe to re-import and reissue if we're
207  	 * not in the original thread group (or in task context).
208  	 */
209  	if (!same_thread_group(req->task, current) || !in_task())
210  		return false;
211  	return true;
212  }
213  #else
214  static bool io_resubmit_prep(struct io_kiocb *req)
215  {
216  	return false;
217  }
218  static bool io_rw_should_reissue(struct io_kiocb *req)
219  {
220  	return false;
221  }
222  #endif
223  
224  static void io_req_end_write(struct io_kiocb *req)
225  {
226  	if (req->flags & REQ_F_ISREG) {
227  		struct io_rw *rw = io_kiocb_to_cmd(req, struct io_rw);
228  
229  		kiocb_end_write(&rw->kiocb);
230  	}
231  }
232  
233  /*
234   * Trigger the notifications after having done some IO, and finish the write
235   * accounting, if any.
236   */
237  static void io_req_io_end(struct io_kiocb *req)
238  {
239  	struct io_rw *rw = io_kiocb_to_cmd(req, struct io_rw);
240  
241  	if (rw->kiocb.ki_flags & IOCB_WRITE) {
242  		io_req_end_write(req);
243  		fsnotify_modify(req->file);
244  	} else {
245  		fsnotify_access(req->file);
246  	}
247  }
248  
249  static bool __io_complete_rw_common(struct io_kiocb *req, long res)
250  {
251  	if (unlikely(res != req->cqe.res)) {
252  		if ((res == -EAGAIN || res == -EOPNOTSUPP) &&
253  		    io_rw_should_reissue(req)) {
254  			/*
255  			 * Reissue will start accounting again, finish the
256  			 * current cycle.
257  			 */
258  			io_req_io_end(req);
259  			req->flags |= REQ_F_REISSUE | REQ_F_PARTIAL_IO;
260  			return true;
261  		}
262  		req_set_fail(req);
263  		req->cqe.res = res;
264  	}
265  	return false;
266  }
267  
268  static inline int io_fixup_rw_res(struct io_kiocb *req, long res)
269  {
270  	struct io_async_rw *io = req->async_data;
271  
272  	/* add previously done IO, if any */
273  	if (req_has_async_data(req) && io->bytes_done > 0) {
274  		if (res < 0)
275  			res = io->bytes_done;
276  		else
277  			res += io->bytes_done;
278  	}
279  	return res;
280  }
281  
282  void io_req_rw_complete(struct io_kiocb *req, struct io_tw_state *ts)
283  {
284  	struct io_rw *rw = io_kiocb_to_cmd(req, struct io_rw);
285  	struct kiocb *kiocb = &rw->kiocb;
286  
287  	if ((kiocb->ki_flags & IOCB_DIO_CALLER_COMP) && kiocb->dio_complete) {
288  		long res = kiocb->dio_complete(rw->kiocb.private);
289  
290  		io_req_set_res(req, io_fixup_rw_res(req, res), 0);
291  	}
292  
293  	io_req_io_end(req);
294  
295  	if (req->flags & (REQ_F_BUFFER_SELECTED|REQ_F_BUFFER_RING)) {
296  		unsigned issue_flags = ts->locked ? 0 : IO_URING_F_UNLOCKED;
297  
298  		req->cqe.flags |= io_put_kbuf(req, issue_flags);
299  	}
300  	io_req_task_complete(req, ts);
301  }
302  
303  static void io_complete_rw(struct kiocb *kiocb, long res)
304  {
305  	struct io_rw *rw = container_of(kiocb, struct io_rw, kiocb);
306  	struct io_kiocb *req = cmd_to_io_kiocb(rw);
307  
308  	if (!kiocb->dio_complete || !(kiocb->ki_flags & IOCB_DIO_CALLER_COMP)) {
309  		if (__io_complete_rw_common(req, res))
310  			return;
311  		io_req_set_res(req, io_fixup_rw_res(req, res), 0);
312  	}
313  	req->io_task_work.func = io_req_rw_complete;
314  	__io_req_task_work_add(req, IOU_F_TWQ_LAZY_WAKE);
315  }
316  
317  static void io_complete_rw_iopoll(struct kiocb *kiocb, long res)
318  {
319  	struct io_rw *rw = container_of(kiocb, struct io_rw, kiocb);
320  	struct io_kiocb *req = cmd_to_io_kiocb(rw);
321  
322  	if (kiocb->ki_flags & IOCB_WRITE)
323  		io_req_end_write(req);
324  	if (unlikely(res != req->cqe.res)) {
325  		if (res == -EAGAIN && io_rw_should_reissue(req)) {
326  			req->flags |= REQ_F_REISSUE | REQ_F_PARTIAL_IO;
327  			return;
328  		}
329  		req->cqe.res = res;
330  	}
331  
332  	/* order with io_iopoll_complete() checking ->iopoll_completed */
333  	smp_store_release(&req->iopoll_completed, 1);
334  }
335  
336  static int kiocb_done(struct io_kiocb *req, ssize_t ret,
337  		       unsigned int issue_flags)
338  {
339  	struct io_rw *rw = io_kiocb_to_cmd(req, struct io_rw);
340  	unsigned final_ret = io_fixup_rw_res(req, ret);
341  
342  	if (ret >= 0 && req->flags & REQ_F_CUR_POS)
343  		req->file->f_pos = rw->kiocb.ki_pos;
344  	if (ret >= 0 && (rw->kiocb.ki_complete == io_complete_rw)) {
345  		if (!__io_complete_rw_common(req, ret)) {
346  			/*
347  			 * Safe to call io_end from here as we're inline
348  			 * from the submission path.
349  			 */
350  			io_req_io_end(req);
351  			io_req_set_res(req, final_ret,
352  				       io_put_kbuf(req, issue_flags));
353  			return IOU_OK;
354  		}
355  	} else {
356  		io_rw_done(&rw->kiocb, ret);
357  	}
358  
359  	if (req->flags & REQ_F_REISSUE) {
360  		req->flags &= ~REQ_F_REISSUE;
361  		if (io_resubmit_prep(req))
362  			io_req_task_queue_reissue(req);
363  		else
364  			io_req_task_queue_fail(req, final_ret);
365  	}
366  	return IOU_ISSUE_SKIP_COMPLETE;
367  }
368  
369  static struct iovec *__io_import_iovec(int ddir, struct io_kiocb *req,
370  				       struct io_rw_state *s,
371  				       unsigned int issue_flags)
372  {
373  	struct io_rw *rw = io_kiocb_to_cmd(req, struct io_rw);
374  	struct iov_iter *iter = &s->iter;
375  	u8 opcode = req->opcode;
376  	struct iovec *iovec;
377  	void __user *buf;
378  	size_t sqe_len;
379  	ssize_t ret;
380  
381  	if (opcode == IORING_OP_READ_FIXED || opcode == IORING_OP_WRITE_FIXED) {
382  		ret = io_import_fixed(ddir, iter, req->imu, rw->addr, rw->len);
383  		if (ret)
384  			return ERR_PTR(ret);
385  		return NULL;
386  	}
387  
388  	buf = u64_to_user_ptr(rw->addr);
389  	sqe_len = rw->len;
390  
391  	if (opcode == IORING_OP_READ || opcode == IORING_OP_WRITE ||
392  	    (req->flags & REQ_F_BUFFER_SELECT)) {
393  		if (io_do_buffer_select(req)) {
394  			buf = io_buffer_select(req, &sqe_len, issue_flags);
395  			if (!buf)
396  				return ERR_PTR(-ENOBUFS);
397  			rw->addr = (unsigned long) buf;
398  			rw->len = sqe_len;
399  		}
400  
401  		ret = import_ubuf(ddir, buf, sqe_len, iter);
402  		if (ret)
403  			return ERR_PTR(ret);
404  		return NULL;
405  	}
406  
407  	iovec = s->fast_iov;
408  	ret = __import_iovec(ddir, buf, sqe_len, UIO_FASTIOV, &iovec, iter,
409  			      req->ctx->compat);
410  	if (unlikely(ret < 0))
411  		return ERR_PTR(ret);
412  	return iovec;
413  }
414  
415  static inline int io_import_iovec(int rw, struct io_kiocb *req,
416  				  struct iovec **iovec, struct io_rw_state *s,
417  				  unsigned int issue_flags)
418  {
419  	*iovec = __io_import_iovec(rw, req, s, issue_flags);
420  	if (IS_ERR(*iovec))
421  		return PTR_ERR(*iovec);
422  
423  	iov_iter_save_state(&s->iter, &s->iter_state);
424  	return 0;
425  }
426  
427  static inline loff_t *io_kiocb_ppos(struct kiocb *kiocb)
428  {
429  	return (kiocb->ki_filp->f_mode & FMODE_STREAM) ? NULL : &kiocb->ki_pos;
430  }
431  
432  /*
433   * For files that don't have ->read_iter() and ->write_iter(), handle them
434   * by looping over ->read() or ->write() manually.
435   */
436  static ssize_t loop_rw_iter(int ddir, struct io_rw *rw, struct iov_iter *iter)
437  {
438  	struct kiocb *kiocb = &rw->kiocb;
439  	struct file *file = kiocb->ki_filp;
440  	ssize_t ret = 0;
441  	loff_t *ppos;
442  
443  	/*
444  	 * Don't support polled IO through this interface, and we can't
445  	 * support non-blocking either. For the latter, this just causes
446  	 * the kiocb to be handled from an async context.
447  	 */
448  	if (kiocb->ki_flags & IOCB_HIPRI)
449  		return -EOPNOTSUPP;
450  	if ((kiocb->ki_flags & IOCB_NOWAIT) &&
451  	    !(kiocb->ki_filp->f_flags & O_NONBLOCK))
452  		return -EAGAIN;
453  
454  	ppos = io_kiocb_ppos(kiocb);
455  
456  	while (iov_iter_count(iter)) {
457  		void __user *addr;
458  		size_t len;
459  		ssize_t nr;
460  
461  		if (iter_is_ubuf(iter)) {
462  			addr = iter->ubuf + iter->iov_offset;
463  			len = iov_iter_count(iter);
464  		} else if (!iov_iter_is_bvec(iter)) {
465  			addr = iter_iov_addr(iter);
466  			len = iter_iov_len(iter);
467  		} else {
468  			addr = u64_to_user_ptr(rw->addr);
469  			len = rw->len;
470  		}
471  
472  		if (ddir == READ)
473  			nr = file->f_op->read(file, addr, len, ppos);
474  		else
475  			nr = file->f_op->write(file, addr, len, ppos);
476  
477  		if (nr < 0) {
478  			if (!ret)
479  				ret = nr;
480  			break;
481  		}
482  		ret += nr;
483  		if (!iov_iter_is_bvec(iter)) {
484  			iov_iter_advance(iter, nr);
485  		} else {
486  			rw->addr += nr;
487  			rw->len -= nr;
488  			if (!rw->len)
489  				break;
490  		}
491  		if (nr != len)
492  			break;
493  	}
494  
495  	return ret;
496  }
497  
498  static void io_req_map_rw(struct io_kiocb *req, const struct iovec *iovec,
499  			  const struct iovec *fast_iov, struct iov_iter *iter)
500  {
501  	struct io_async_rw *io = req->async_data;
502  
503  	memcpy(&io->s.iter, iter, sizeof(*iter));
504  	io->free_iovec = iovec;
505  	io->bytes_done = 0;
506  	/* can only be fixed buffers, no need to do anything */
507  	if (iov_iter_is_bvec(iter) || iter_is_ubuf(iter))
508  		return;
509  	if (!iovec) {
510  		unsigned iov_off = 0;
511  
512  		io->s.iter.__iov = io->s.fast_iov;
513  		if (iter->__iov != fast_iov) {
514  			iov_off = iter_iov(iter) - fast_iov;
515  			io->s.iter.__iov += iov_off;
516  		}
517  		if (io->s.fast_iov != fast_iov)
518  			memcpy(io->s.fast_iov + iov_off, fast_iov + iov_off,
519  			       sizeof(struct iovec) * iter->nr_segs);
520  	} else {
521  		req->flags |= REQ_F_NEED_CLEANUP;
522  	}
523  }
524  
525  static int io_setup_async_rw(struct io_kiocb *req, const struct iovec *iovec,
526  			     struct io_rw_state *s, bool force)
527  {
528  	if (!force && !io_cold_defs[req->opcode].prep_async)
529  		return 0;
530  	if (!req_has_async_data(req)) {
531  		struct io_async_rw *iorw;
532  
533  		if (io_alloc_async_data(req)) {
534  			kfree(iovec);
535  			return -ENOMEM;
536  		}
537  
538  		io_req_map_rw(req, iovec, s->fast_iov, &s->iter);
539  		iorw = req->async_data;
540  		/* we've copied and mapped the iter, ensure state is saved */
541  		iov_iter_save_state(&iorw->s.iter, &iorw->s.iter_state);
542  	}
543  	return 0;
544  }
545  
546  static inline int io_rw_prep_async(struct io_kiocb *req, int rw)
547  {
548  	struct io_async_rw *iorw = req->async_data;
549  	struct iovec *iov;
550  	int ret;
551  
552  	/* submission path, ->uring_lock should already be taken */
553  	ret = io_import_iovec(rw, req, &iov, &iorw->s, 0);
554  	if (unlikely(ret < 0))
555  		return ret;
556  
557  	iorw->bytes_done = 0;
558  	iorw->free_iovec = iov;
559  	if (iov)
560  		req->flags |= REQ_F_NEED_CLEANUP;
561  	return 0;
562  }
563  
564  int io_readv_prep_async(struct io_kiocb *req)
565  {
566  	return io_rw_prep_async(req, ITER_DEST);
567  }
568  
569  int io_writev_prep_async(struct io_kiocb *req)
570  {
571  	return io_rw_prep_async(req, ITER_SOURCE);
572  }
573  
574  /*
575   * This is our waitqueue callback handler, registered through __folio_lock_async()
576   * when we initially tried to do the IO with the iocb armed our waitqueue.
577   * This gets called when the page is unlocked, and we generally expect that to
578   * happen when the page IO is completed and the page is now uptodate. This will
579   * queue a task_work based retry of the operation, attempting to copy the data
580   * again. If the latter fails because the page was NOT uptodate, then we will
581   * do a thread based blocking retry of the operation. That's the unexpected
582   * slow path.
583   */
584  static int io_async_buf_func(struct wait_queue_entry *wait, unsigned mode,
585  			     int sync, void *arg)
586  {
587  	struct wait_page_queue *wpq;
588  	struct io_kiocb *req = wait->private;
589  	struct io_rw *rw = io_kiocb_to_cmd(req, struct io_rw);
590  	struct wait_page_key *key = arg;
591  
592  	wpq = container_of(wait, struct wait_page_queue, wait);
593  
594  	if (!wake_page_match(wpq, key))
595  		return 0;
596  
597  	rw->kiocb.ki_flags &= ~IOCB_WAITQ;
598  	list_del_init(&wait->entry);
599  	io_req_task_queue(req);
600  	return 1;
601  }
602  
603  /*
604   * This controls whether a given IO request should be armed for async page
605   * based retry. If we return false here, the request is handed to the async
606   * worker threads for retry. If we're doing buffered reads on a regular file,
607   * we prepare a private wait_page_queue entry and retry the operation. This
608   * will either succeed because the page is now uptodate and unlocked, or it
609   * will register a callback when the page is unlocked at IO completion. Through
610   * that callback, io_uring uses task_work to setup a retry of the operation.
611   * That retry will attempt the buffered read again. The retry will generally
612   * succeed, or in rare cases where it fails, we then fall back to using the
613   * async worker threads for a blocking retry.
614   */
615  static bool io_rw_should_retry(struct io_kiocb *req)
616  {
617  	struct io_async_rw *io = req->async_data;
618  	struct wait_page_queue *wait = &io->wpq;
619  	struct io_rw *rw = io_kiocb_to_cmd(req, struct io_rw);
620  	struct kiocb *kiocb = &rw->kiocb;
621  
622  	/* never retry for NOWAIT, we just complete with -EAGAIN */
623  	if (req->flags & REQ_F_NOWAIT)
624  		return false;
625  
626  	/* Only for buffered IO */
627  	if (kiocb->ki_flags & (IOCB_DIRECT | IOCB_HIPRI))
628  		return false;
629  
630  	/*
631  	 * just use poll if we can, and don't attempt if the fs doesn't
632  	 * support callback based unlocks
633  	 */
634  	if (file_can_poll(req->file) || !(req->file->f_mode & FMODE_BUF_RASYNC))
635  		return false;
636  
637  	wait->wait.func = io_async_buf_func;
638  	wait->wait.private = req;
639  	wait->wait.flags = 0;
640  	INIT_LIST_HEAD(&wait->wait.entry);
641  	kiocb->ki_flags |= IOCB_WAITQ;
642  	kiocb->ki_flags &= ~IOCB_NOWAIT;
643  	kiocb->ki_waitq = wait;
644  	return true;
645  }
646  
647  static inline int io_iter_do_read(struct io_rw *rw, struct iov_iter *iter)
648  {
649  	struct file *file = rw->kiocb.ki_filp;
650  
651  	if (likely(file->f_op->read_iter))
652  		return call_read_iter(file, &rw->kiocb, iter);
653  	else if (file->f_op->read)
654  		return loop_rw_iter(READ, rw, iter);
655  	else
656  		return -EINVAL;
657  }
658  
659  static bool need_complete_io(struct io_kiocb *req)
660  {
661  	return req->flags & REQ_F_ISREG ||
662  		S_ISBLK(file_inode(req->file)->i_mode);
663  }
664  
665  static int io_rw_init_file(struct io_kiocb *req, fmode_t mode)
666  {
667  	struct io_rw *rw = io_kiocb_to_cmd(req, struct io_rw);
668  	struct kiocb *kiocb = &rw->kiocb;
669  	struct io_ring_ctx *ctx = req->ctx;
670  	struct file *file = req->file;
671  	int ret;
672  
673  	if (unlikely(!file || !(file->f_mode & mode)))
674  		return -EBADF;
675  
676  	if (!(req->flags & REQ_F_FIXED_FILE))
677  		req->flags |= io_file_get_flags(file);
678  
679  	kiocb->ki_flags = file->f_iocb_flags;
680  	ret = kiocb_set_rw_flags(kiocb, rw->flags);
681  	if (unlikely(ret))
682  		return ret;
683  	kiocb->ki_flags |= IOCB_ALLOC_CACHE;
684  
685  	/*
686  	 * If the file is marked O_NONBLOCK, still allow retry for it if it
687  	 * supports async. Otherwise it's impossible to use O_NONBLOCK files
688  	 * reliably. If not, or it IOCB_NOWAIT is set, don't retry.
689  	 */
690  	if ((kiocb->ki_flags & IOCB_NOWAIT) ||
691  	    ((file->f_flags & O_NONBLOCK) && !io_file_supports_nowait(req)))
692  		req->flags |= REQ_F_NOWAIT;
693  
694  	if (ctx->flags & IORING_SETUP_IOPOLL) {
695  		if (!(kiocb->ki_flags & IOCB_DIRECT) || !file->f_op->iopoll)
696  			return -EOPNOTSUPP;
697  
698  		kiocb->private = NULL;
699  		kiocb->ki_flags |= IOCB_HIPRI;
700  		kiocb->ki_complete = io_complete_rw_iopoll;
701  		req->iopoll_completed = 0;
702  	} else {
703  		if (kiocb->ki_flags & IOCB_HIPRI)
704  			return -EINVAL;
705  		kiocb->ki_complete = io_complete_rw;
706  	}
707  
708  	return 0;
709  }
710  
711  int io_read(struct io_kiocb *req, unsigned int issue_flags)
712  {
713  	struct io_rw *rw = io_kiocb_to_cmd(req, struct io_rw);
714  	struct io_rw_state __s, *s = &__s;
715  	struct iovec *iovec;
716  	struct kiocb *kiocb = &rw->kiocb;
717  	bool force_nonblock = issue_flags & IO_URING_F_NONBLOCK;
718  	struct io_async_rw *io;
719  	ssize_t ret, ret2;
720  	loff_t *ppos;
721  
722  	if (!req_has_async_data(req)) {
723  		ret = io_import_iovec(ITER_DEST, req, &iovec, s, issue_flags);
724  		if (unlikely(ret < 0))
725  			return ret;
726  	} else {
727  		io = req->async_data;
728  		s = &io->s;
729  
730  		/*
731  		 * Safe and required to re-import if we're using provided
732  		 * buffers, as we dropped the selected one before retry.
733  		 */
734  		if (io_do_buffer_select(req)) {
735  			ret = io_import_iovec(ITER_DEST, req, &iovec, s, issue_flags);
736  			if (unlikely(ret < 0))
737  				return ret;
738  		}
739  
740  		/*
741  		 * We come here from an earlier attempt, restore our state to
742  		 * match in case it doesn't. It's cheap enough that we don't
743  		 * need to make this conditional.
744  		 */
745  		iov_iter_restore(&s->iter, &s->iter_state);
746  		iovec = NULL;
747  	}
748  	ret = io_rw_init_file(req, FMODE_READ);
749  	if (unlikely(ret)) {
750  		kfree(iovec);
751  		return ret;
752  	}
753  	req->cqe.res = iov_iter_count(&s->iter);
754  
755  	if (force_nonblock) {
756  		/* If the file doesn't support async, just async punt */
757  		if (unlikely(!io_file_supports_nowait(req))) {
758  			ret = io_setup_async_rw(req, iovec, s, true);
759  			return ret ?: -EAGAIN;
760  		}
761  		kiocb->ki_flags |= IOCB_NOWAIT;
762  	} else {
763  		/* Ensure we clear previously set non-block flag */
764  		kiocb->ki_flags &= ~IOCB_NOWAIT;
765  	}
766  
767  	ppos = io_kiocb_update_pos(req);
768  
769  	ret = rw_verify_area(READ, req->file, ppos, req->cqe.res);
770  	if (unlikely(ret)) {
771  		kfree(iovec);
772  		return ret;
773  	}
774  
775  	ret = io_iter_do_read(rw, &s->iter);
776  
777  	if (ret == -EAGAIN || (req->flags & REQ_F_REISSUE)) {
778  		req->flags &= ~REQ_F_REISSUE;
779  		/* if we can poll, just do that */
780  		if (req->opcode == IORING_OP_READ && file_can_poll(req->file))
781  			return -EAGAIN;
782  		/* IOPOLL retry should happen for io-wq threads */
783  		if (!force_nonblock && !(req->ctx->flags & IORING_SETUP_IOPOLL))
784  			goto done;
785  		/* no retry on NONBLOCK nor RWF_NOWAIT */
786  		if (req->flags & REQ_F_NOWAIT)
787  			goto done;
788  		ret = 0;
789  	} else if (ret == -EIOCBQUEUED) {
790  		if (iovec)
791  			kfree(iovec);
792  		return IOU_ISSUE_SKIP_COMPLETE;
793  	} else if (ret == req->cqe.res || ret <= 0 || !force_nonblock ||
794  		   (req->flags & REQ_F_NOWAIT) || !need_complete_io(req)) {
795  		/* read all, failed, already did sync or don't want to retry */
796  		goto done;
797  	}
798  
799  	/*
800  	 * Don't depend on the iter state matching what was consumed, or being
801  	 * untouched in case of error. Restore it and we'll advance it
802  	 * manually if we need to.
803  	 */
804  	iov_iter_restore(&s->iter, &s->iter_state);
805  
806  	ret2 = io_setup_async_rw(req, iovec, s, true);
807  	iovec = NULL;
808  	if (ret2) {
809  		ret = ret > 0 ? ret : ret2;
810  		goto done;
811  	}
812  
813  	io = req->async_data;
814  	s = &io->s;
815  	/*
816  	 * Now use our persistent iterator and state, if we aren't already.
817  	 * We've restored and mapped the iter to match.
818  	 */
819  
820  	do {
821  		/*
822  		 * We end up here because of a partial read, either from
823  		 * above or inside this loop. Advance the iter by the bytes
824  		 * that were consumed.
825  		 */
826  		iov_iter_advance(&s->iter, ret);
827  		if (!iov_iter_count(&s->iter))
828  			break;
829  		io->bytes_done += ret;
830  		iov_iter_save_state(&s->iter, &s->iter_state);
831  
832  		/* if we can retry, do so with the callbacks armed */
833  		if (!io_rw_should_retry(req)) {
834  			kiocb->ki_flags &= ~IOCB_WAITQ;
835  			return -EAGAIN;
836  		}
837  
838  		req->cqe.res = iov_iter_count(&s->iter);
839  		/*
840  		 * Now retry read with the IOCB_WAITQ parts set in the iocb. If
841  		 * we get -EIOCBQUEUED, then we'll get a notification when the
842  		 * desired page gets unlocked. We can also get a partial read
843  		 * here, and if we do, then just retry at the new offset.
844  		 */
845  		ret = io_iter_do_read(rw, &s->iter);
846  		if (ret == -EIOCBQUEUED)
847  			return IOU_ISSUE_SKIP_COMPLETE;
848  		/* we got some bytes, but not all. retry. */
849  		kiocb->ki_flags &= ~IOCB_WAITQ;
850  		iov_iter_restore(&s->iter, &s->iter_state);
851  	} while (ret > 0);
852  done:
853  	/* it's faster to check here then delegate to kfree */
854  	if (iovec)
855  		kfree(iovec);
856  	return kiocb_done(req, ret, issue_flags);
857  }
858  
859  int io_write(struct io_kiocb *req, unsigned int issue_flags)
860  {
861  	struct io_rw *rw = io_kiocb_to_cmd(req, struct io_rw);
862  	struct io_rw_state __s, *s = &__s;
863  	struct iovec *iovec;
864  	struct kiocb *kiocb = &rw->kiocb;
865  	bool force_nonblock = issue_flags & IO_URING_F_NONBLOCK;
866  	ssize_t ret, ret2;
867  	loff_t *ppos;
868  
869  	if (!req_has_async_data(req)) {
870  		ret = io_import_iovec(ITER_SOURCE, req, &iovec, s, issue_flags);
871  		if (unlikely(ret < 0))
872  			return ret;
873  	} else {
874  		struct io_async_rw *io = req->async_data;
875  
876  		s = &io->s;
877  		iov_iter_restore(&s->iter, &s->iter_state);
878  		iovec = NULL;
879  	}
880  	ret = io_rw_init_file(req, FMODE_WRITE);
881  	if (unlikely(ret)) {
882  		kfree(iovec);
883  		return ret;
884  	}
885  	req->cqe.res = iov_iter_count(&s->iter);
886  
887  	if (force_nonblock) {
888  		/* If the file doesn't support async, just async punt */
889  		if (unlikely(!io_file_supports_nowait(req)))
890  			goto copy_iov;
891  
892  		/* File path supports NOWAIT for non-direct_IO only for block devices. */
893  		if (!(kiocb->ki_flags & IOCB_DIRECT) &&
894  			!(kiocb->ki_filp->f_mode & FMODE_BUF_WASYNC) &&
895  			(req->flags & REQ_F_ISREG))
896  			goto copy_iov;
897  
898  		kiocb->ki_flags |= IOCB_NOWAIT;
899  	} else {
900  		/* Ensure we clear previously set non-block flag */
901  		kiocb->ki_flags &= ~IOCB_NOWAIT;
902  	}
903  
904  	ppos = io_kiocb_update_pos(req);
905  
906  	ret = rw_verify_area(WRITE, req->file, ppos, req->cqe.res);
907  	if (unlikely(ret)) {
908  		kfree(iovec);
909  		return ret;
910  	}
911  
912  	if (req->flags & REQ_F_ISREG)
913  		kiocb_start_write(kiocb);
914  	kiocb->ki_flags |= IOCB_WRITE;
915  
916  	if (likely(req->file->f_op->write_iter))
917  		ret2 = call_write_iter(req->file, kiocb, &s->iter);
918  	else if (req->file->f_op->write)
919  		ret2 = loop_rw_iter(WRITE, rw, &s->iter);
920  	else
921  		ret2 = -EINVAL;
922  
923  	if (req->flags & REQ_F_REISSUE) {
924  		req->flags &= ~REQ_F_REISSUE;
925  		ret2 = -EAGAIN;
926  	}
927  
928  	/*
929  	 * Raw bdev writes will return -EOPNOTSUPP for IOCB_NOWAIT. Just
930  	 * retry them without IOCB_NOWAIT.
931  	 */
932  	if (ret2 == -EOPNOTSUPP && (kiocb->ki_flags & IOCB_NOWAIT))
933  		ret2 = -EAGAIN;
934  	/* no retry on NONBLOCK nor RWF_NOWAIT */
935  	if (ret2 == -EAGAIN && (req->flags & REQ_F_NOWAIT))
936  		goto done;
937  	if (!force_nonblock || ret2 != -EAGAIN) {
938  		/* IOPOLL retry should happen for io-wq threads */
939  		if (ret2 == -EAGAIN && (req->ctx->flags & IORING_SETUP_IOPOLL))
940  			goto copy_iov;
941  
942  		if (ret2 != req->cqe.res && ret2 >= 0 && need_complete_io(req)) {
943  			struct io_async_rw *io;
944  
945  			trace_io_uring_short_write(req->ctx, kiocb->ki_pos - ret2,
946  						req->cqe.res, ret2);
947  
948  			/* This is a partial write. The file pos has already been
949  			 * updated, setup the async struct to complete the request
950  			 * in the worker. Also update bytes_done to account for
951  			 * the bytes already written.
952  			 */
953  			iov_iter_save_state(&s->iter, &s->iter_state);
954  			ret = io_setup_async_rw(req, iovec, s, true);
955  
956  			io = req->async_data;
957  			if (io)
958  				io->bytes_done += ret2;
959  
960  			if (kiocb->ki_flags & IOCB_WRITE)
961  				io_req_end_write(req);
962  			return ret ? ret : -EAGAIN;
963  		}
964  done:
965  		ret = kiocb_done(req, ret2, issue_flags);
966  	} else {
967  copy_iov:
968  		iov_iter_restore(&s->iter, &s->iter_state);
969  		ret = io_setup_async_rw(req, iovec, s, false);
970  		if (!ret) {
971  			if (kiocb->ki_flags & IOCB_WRITE)
972  				io_req_end_write(req);
973  			return -EAGAIN;
974  		}
975  		return ret;
976  	}
977  	/* it's reportedly faster than delegating the null check to kfree() */
978  	if (iovec)
979  		kfree(iovec);
980  	return ret;
981  }
982  
983  void io_rw_fail(struct io_kiocb *req)
984  {
985  	int res;
986  
987  	res = io_fixup_rw_res(req, req->cqe.res);
988  	io_req_set_res(req, res, req->cqe.flags);
989  }
990  
991  int io_do_iopoll(struct io_ring_ctx *ctx, bool force_nonspin)
992  {
993  	struct io_wq_work_node *pos, *start, *prev;
994  	unsigned int poll_flags = 0;
995  	DEFINE_IO_COMP_BATCH(iob);
996  	int nr_events = 0;
997  
998  	/*
999  	 * Only spin for completions if we don't have multiple devices hanging
1000  	 * off our complete list.
1001  	 */
1002  	if (ctx->poll_multi_queue || force_nonspin)
1003  		poll_flags |= BLK_POLL_ONESHOT;
1004  
1005  	wq_list_for_each(pos, start, &ctx->iopoll_list) {
1006  		struct io_kiocb *req = container_of(pos, struct io_kiocb, comp_list);
1007  		struct file *file = req->file;
1008  		int ret;
1009  
1010  		/*
1011  		 * Move completed and retryable entries to our local lists.
1012  		 * If we find a request that requires polling, break out
1013  		 * and complete those lists first, if we have entries there.
1014  		 */
1015  		if (READ_ONCE(req->iopoll_completed))
1016  			break;
1017  
1018  		if (req->opcode == IORING_OP_URING_CMD) {
1019  			struct io_uring_cmd *ioucmd;
1020  
1021  			ioucmd = io_kiocb_to_cmd(req, struct io_uring_cmd);
1022  			ret = file->f_op->uring_cmd_iopoll(ioucmd, &iob,
1023  								poll_flags);
1024  		} else {
1025  			struct io_rw *rw = io_kiocb_to_cmd(req, struct io_rw);
1026  
1027  			ret = file->f_op->iopoll(&rw->kiocb, &iob, poll_flags);
1028  		}
1029  		if (unlikely(ret < 0))
1030  			return ret;
1031  		else if (ret)
1032  			poll_flags |= BLK_POLL_ONESHOT;
1033  
1034  		/* iopoll may have completed current req */
1035  		if (!rq_list_empty(iob.req_list) ||
1036  		    READ_ONCE(req->iopoll_completed))
1037  			break;
1038  	}
1039  
1040  	if (!rq_list_empty(iob.req_list))
1041  		iob.complete(&iob);
1042  	else if (!pos)
1043  		return 0;
1044  
1045  	prev = start;
1046  	wq_list_for_each_resume(pos, prev) {
1047  		struct io_kiocb *req = container_of(pos, struct io_kiocb, comp_list);
1048  
1049  		/* order with io_complete_rw_iopoll(), e.g. ->result updates */
1050  		if (!smp_load_acquire(&req->iopoll_completed))
1051  			break;
1052  		nr_events++;
1053  		req->cqe.flags = io_put_kbuf(req, 0);
1054  	}
1055  	if (unlikely(!nr_events))
1056  		return 0;
1057  
1058  	pos = start ? start->next : ctx->iopoll_list.first;
1059  	wq_list_cut(&ctx->iopoll_list, prev, start);
1060  
1061  	if (WARN_ON_ONCE(!wq_list_empty(&ctx->submit_state.compl_reqs)))
1062  		return 0;
1063  	ctx->submit_state.compl_reqs.first = pos;
1064  	__io_submit_flush_completions(ctx);
1065  	return nr_events;
1066  }
1067