xref: /openbmc/linux/io_uring/rw.c (revision 278d3ba6)
1  // SPDX-License-Identifier: GPL-2.0
2  #include <linux/kernel.h>
3  #include <linux/errno.h>
4  #include <linux/fs.h>
5  #include <linux/file.h>
6  #include <linux/blk-mq.h>
7  #include <linux/mm.h>
8  #include <linux/slab.h>
9  #include <linux/fsnotify.h>
10  #include <linux/poll.h>
11  #include <linux/nospec.h>
12  #include <linux/compat.h>
13  #include <linux/io_uring.h>
14  
15  #include <uapi/linux/io_uring.h>
16  
17  #include "io_uring.h"
18  #include "opdef.h"
19  #include "kbuf.h"
20  #include "rsrc.h"
21  #include "rw.h"
22  
23  struct io_rw {
24  	/* NOTE: kiocb has the file as the first member, so don't do it here */
25  	struct kiocb			kiocb;
26  	u64				addr;
27  	u32				len;
28  	rwf_t				flags;
29  };
30  
31  static inline bool io_file_supports_nowait(struct io_kiocb *req)
32  {
33  	return req->flags & REQ_F_SUPPORT_NOWAIT;
34  }
35  
36  int io_prep_rw(struct io_kiocb *req, const struct io_uring_sqe *sqe)
37  {
38  	struct io_rw *rw = io_kiocb_to_cmd(req, struct io_rw);
39  	unsigned ioprio;
40  	int ret;
41  
42  	rw->kiocb.ki_pos = READ_ONCE(sqe->off);
43  	/* used for fixed read/write too - just read unconditionally */
44  	req->buf_index = READ_ONCE(sqe->buf_index);
45  
46  	if (req->opcode == IORING_OP_READ_FIXED ||
47  	    req->opcode == IORING_OP_WRITE_FIXED) {
48  		struct io_ring_ctx *ctx = req->ctx;
49  		u16 index;
50  
51  		if (unlikely(req->buf_index >= ctx->nr_user_bufs))
52  			return -EFAULT;
53  		index = array_index_nospec(req->buf_index, ctx->nr_user_bufs);
54  		req->imu = ctx->user_bufs[index];
55  		io_req_set_rsrc_node(req, ctx, 0);
56  	}
57  
58  	ioprio = READ_ONCE(sqe->ioprio);
59  	if (ioprio) {
60  		ret = ioprio_check_cap(ioprio);
61  		if (ret)
62  			return ret;
63  
64  		rw->kiocb.ki_ioprio = ioprio;
65  	} else {
66  		rw->kiocb.ki_ioprio = get_current_ioprio();
67  	}
68  
69  	rw->addr = READ_ONCE(sqe->addr);
70  	rw->len = READ_ONCE(sqe->len);
71  	rw->flags = READ_ONCE(sqe->rw_flags);
72  	return 0;
73  }
74  
75  void io_readv_writev_cleanup(struct io_kiocb *req)
76  {
77  	struct io_async_rw *io = req->async_data;
78  
79  	kfree(io->free_iovec);
80  }
81  
82  static inline void io_rw_done(struct kiocb *kiocb, ssize_t ret)
83  {
84  	switch (ret) {
85  	case -EIOCBQUEUED:
86  		break;
87  	case -ERESTARTSYS:
88  	case -ERESTARTNOINTR:
89  	case -ERESTARTNOHAND:
90  	case -ERESTART_RESTARTBLOCK:
91  		/*
92  		 * We can't just restart the syscall, since previously
93  		 * submitted sqes may already be in progress. Just fail this
94  		 * IO with EINTR.
95  		 */
96  		ret = -EINTR;
97  		fallthrough;
98  	default:
99  		kiocb->ki_complete(kiocb, ret);
100  	}
101  }
102  
103  static inline loff_t *io_kiocb_update_pos(struct io_kiocb *req)
104  {
105  	struct io_rw *rw = io_kiocb_to_cmd(req, struct io_rw);
106  
107  	if (rw->kiocb.ki_pos != -1)
108  		return &rw->kiocb.ki_pos;
109  
110  	if (!(req->file->f_mode & FMODE_STREAM)) {
111  		req->flags |= REQ_F_CUR_POS;
112  		rw->kiocb.ki_pos = req->file->f_pos;
113  		return &rw->kiocb.ki_pos;
114  	}
115  
116  	rw->kiocb.ki_pos = 0;
117  	return NULL;
118  }
119  
120  static void io_req_task_queue_reissue(struct io_kiocb *req)
121  {
122  	req->io_task_work.func = io_queue_iowq;
123  	io_req_task_work_add(req);
124  }
125  
126  #ifdef CONFIG_BLOCK
127  static bool io_resubmit_prep(struct io_kiocb *req)
128  {
129  	struct io_async_rw *io = req->async_data;
130  
131  	if (!req_has_async_data(req))
132  		return !io_req_prep_async(req);
133  	iov_iter_restore(&io->s.iter, &io->s.iter_state);
134  	return true;
135  }
136  
137  static bool io_rw_should_reissue(struct io_kiocb *req)
138  {
139  	umode_t mode = file_inode(req->file)->i_mode;
140  	struct io_ring_ctx *ctx = req->ctx;
141  
142  	if (!S_ISBLK(mode) && !S_ISREG(mode))
143  		return false;
144  	if ((req->flags & REQ_F_NOWAIT) || (io_wq_current_is_worker() &&
145  	    !(ctx->flags & IORING_SETUP_IOPOLL)))
146  		return false;
147  	/*
148  	 * If ref is dying, we might be running poll reap from the exit work.
149  	 * Don't attempt to reissue from that path, just let it fail with
150  	 * -EAGAIN.
151  	 */
152  	if (percpu_ref_is_dying(&ctx->refs))
153  		return false;
154  	/*
155  	 * Play it safe and assume not safe to re-import and reissue if we're
156  	 * not in the original thread group (or in task context).
157  	 */
158  	if (!same_thread_group(req->task, current) || !in_task())
159  		return false;
160  	return true;
161  }
162  #else
163  static bool io_resubmit_prep(struct io_kiocb *req)
164  {
165  	return false;
166  }
167  static bool io_rw_should_reissue(struct io_kiocb *req)
168  {
169  	return false;
170  }
171  #endif
172  
173  static void kiocb_end_write(struct io_kiocb *req)
174  {
175  	/*
176  	 * Tell lockdep we inherited freeze protection from submission
177  	 * thread.
178  	 */
179  	if (req->flags & REQ_F_ISREG) {
180  		struct super_block *sb = file_inode(req->file)->i_sb;
181  
182  		__sb_writers_acquired(sb, SB_FREEZE_WRITE);
183  		sb_end_write(sb);
184  	}
185  }
186  
187  static bool __io_complete_rw_common(struct io_kiocb *req, long res)
188  {
189  	struct io_rw *rw = io_kiocb_to_cmd(req, struct io_rw);
190  
191  	if (rw->kiocb.ki_flags & IOCB_WRITE) {
192  		kiocb_end_write(req);
193  		fsnotify_modify(req->file);
194  	} else {
195  		fsnotify_access(req->file);
196  	}
197  	if (unlikely(res != req->cqe.res)) {
198  		if ((res == -EAGAIN || res == -EOPNOTSUPP) &&
199  		    io_rw_should_reissue(req)) {
200  			req->flags |= REQ_F_REISSUE | REQ_F_PARTIAL_IO;
201  			return true;
202  		}
203  		req_set_fail(req);
204  		req->cqe.res = res;
205  	}
206  	return false;
207  }
208  
209  static void io_complete_rw(struct kiocb *kiocb, long res)
210  {
211  	struct io_rw *rw = container_of(kiocb, struct io_rw, kiocb);
212  	struct io_kiocb *req = cmd_to_io_kiocb(rw);
213  
214  	if (__io_complete_rw_common(req, res))
215  		return;
216  	io_req_set_res(req, res, 0);
217  	req->io_task_work.func = io_req_task_complete;
218  	io_req_task_work_add(req);
219  }
220  
221  static void io_complete_rw_iopoll(struct kiocb *kiocb, long res)
222  {
223  	struct io_rw *rw = container_of(kiocb, struct io_rw, kiocb);
224  	struct io_kiocb *req = cmd_to_io_kiocb(rw);
225  
226  	if (kiocb->ki_flags & IOCB_WRITE)
227  		kiocb_end_write(req);
228  	if (unlikely(res != req->cqe.res)) {
229  		if (res == -EAGAIN && io_rw_should_reissue(req)) {
230  			req->flags |= REQ_F_REISSUE | REQ_F_PARTIAL_IO;
231  			return;
232  		}
233  		req->cqe.res = res;
234  	}
235  
236  	/* order with io_iopoll_complete() checking ->iopoll_completed */
237  	smp_store_release(&req->iopoll_completed, 1);
238  }
239  
240  static int kiocb_done(struct io_kiocb *req, ssize_t ret,
241  		       unsigned int issue_flags)
242  {
243  	struct io_async_rw *io = req->async_data;
244  	struct io_rw *rw = io_kiocb_to_cmd(req, struct io_rw);
245  
246  	/* add previously done IO, if any */
247  	if (req_has_async_data(req) && io->bytes_done > 0) {
248  		if (ret < 0)
249  			ret = io->bytes_done;
250  		else
251  			ret += io->bytes_done;
252  	}
253  
254  	if (req->flags & REQ_F_CUR_POS)
255  		req->file->f_pos = rw->kiocb.ki_pos;
256  	if (ret >= 0 && (rw->kiocb.ki_complete == io_complete_rw)) {
257  		if (!__io_complete_rw_common(req, ret)) {
258  			io_req_set_res(req, req->cqe.res,
259  				       io_put_kbuf(req, issue_flags));
260  			return IOU_OK;
261  		}
262  	} else {
263  		io_rw_done(&rw->kiocb, ret);
264  	}
265  
266  	if (req->flags & REQ_F_REISSUE) {
267  		req->flags &= ~REQ_F_REISSUE;
268  		if (io_resubmit_prep(req))
269  			io_req_task_queue_reissue(req);
270  		else
271  			io_req_task_queue_fail(req, ret);
272  	}
273  	return IOU_ISSUE_SKIP_COMPLETE;
274  }
275  
276  #ifdef CONFIG_COMPAT
277  static ssize_t io_compat_import(struct io_kiocb *req, struct iovec *iov,
278  				unsigned int issue_flags)
279  {
280  	struct io_rw *rw = io_kiocb_to_cmd(req, struct io_rw);
281  	struct compat_iovec __user *uiov;
282  	compat_ssize_t clen;
283  	void __user *buf;
284  	size_t len;
285  
286  	uiov = u64_to_user_ptr(rw->addr);
287  	if (!access_ok(uiov, sizeof(*uiov)))
288  		return -EFAULT;
289  	if (__get_user(clen, &uiov->iov_len))
290  		return -EFAULT;
291  	if (clen < 0)
292  		return -EINVAL;
293  
294  	len = clen;
295  	buf = io_buffer_select(req, &len, issue_flags);
296  	if (!buf)
297  		return -ENOBUFS;
298  	rw->addr = (unsigned long) buf;
299  	iov[0].iov_base = buf;
300  	rw->len = iov[0].iov_len = (compat_size_t) len;
301  	return 0;
302  }
303  #endif
304  
305  static ssize_t __io_iov_buffer_select(struct io_kiocb *req, struct iovec *iov,
306  				      unsigned int issue_flags)
307  {
308  	struct io_rw *rw = io_kiocb_to_cmd(req, struct io_rw);
309  	struct iovec __user *uiov = u64_to_user_ptr(rw->addr);
310  	void __user *buf;
311  	ssize_t len;
312  
313  	if (copy_from_user(iov, uiov, sizeof(*uiov)))
314  		return -EFAULT;
315  
316  	len = iov[0].iov_len;
317  	if (len < 0)
318  		return -EINVAL;
319  	buf = io_buffer_select(req, &len, issue_flags);
320  	if (!buf)
321  		return -ENOBUFS;
322  	rw->addr = (unsigned long) buf;
323  	iov[0].iov_base = buf;
324  	rw->len = iov[0].iov_len = len;
325  	return 0;
326  }
327  
328  static ssize_t io_iov_buffer_select(struct io_kiocb *req, struct iovec *iov,
329  				    unsigned int issue_flags)
330  {
331  	struct io_rw *rw = io_kiocb_to_cmd(req, struct io_rw);
332  
333  	if (req->flags & (REQ_F_BUFFER_SELECTED|REQ_F_BUFFER_RING)) {
334  		iov[0].iov_base = u64_to_user_ptr(rw->addr);
335  		iov[0].iov_len = rw->len;
336  		return 0;
337  	}
338  	if (rw->len != 1)
339  		return -EINVAL;
340  
341  #ifdef CONFIG_COMPAT
342  	if (req->ctx->compat)
343  		return io_compat_import(req, iov, issue_flags);
344  #endif
345  
346  	return __io_iov_buffer_select(req, iov, issue_flags);
347  }
348  
349  static struct iovec *__io_import_iovec(int ddir, struct io_kiocb *req,
350  				       struct io_rw_state *s,
351  				       unsigned int issue_flags)
352  {
353  	struct io_rw *rw = io_kiocb_to_cmd(req, struct io_rw);
354  	struct iov_iter *iter = &s->iter;
355  	u8 opcode = req->opcode;
356  	struct iovec *iovec;
357  	void __user *buf;
358  	size_t sqe_len;
359  	ssize_t ret;
360  
361  	if (opcode == IORING_OP_READ_FIXED || opcode == IORING_OP_WRITE_FIXED) {
362  		ret = io_import_fixed(ddir, iter, req->imu, rw->addr, rw->len);
363  		if (ret)
364  			return ERR_PTR(ret);
365  		return NULL;
366  	}
367  
368  	buf = u64_to_user_ptr(rw->addr);
369  	sqe_len = rw->len;
370  
371  	if (opcode == IORING_OP_READ || opcode == IORING_OP_WRITE) {
372  		if (io_do_buffer_select(req)) {
373  			buf = io_buffer_select(req, &sqe_len, issue_flags);
374  			if (!buf)
375  				return ERR_PTR(-ENOBUFS);
376  			rw->addr = (unsigned long) buf;
377  			rw->len = sqe_len;
378  		}
379  
380  		ret = import_single_range(ddir, buf, sqe_len, s->fast_iov, iter);
381  		if (ret)
382  			return ERR_PTR(ret);
383  		return NULL;
384  	}
385  
386  	iovec = s->fast_iov;
387  	if (req->flags & REQ_F_BUFFER_SELECT) {
388  		ret = io_iov_buffer_select(req, iovec, issue_flags);
389  		if (ret)
390  			return ERR_PTR(ret);
391  		iov_iter_init(iter, ddir, iovec, 1, iovec->iov_len);
392  		return NULL;
393  	}
394  
395  	ret = __import_iovec(ddir, buf, sqe_len, UIO_FASTIOV, &iovec, iter,
396  			      req->ctx->compat);
397  	if (unlikely(ret < 0))
398  		return ERR_PTR(ret);
399  	return iovec;
400  }
401  
402  static inline int io_import_iovec(int rw, struct io_kiocb *req,
403  				  struct iovec **iovec, struct io_rw_state *s,
404  				  unsigned int issue_flags)
405  {
406  	*iovec = __io_import_iovec(rw, req, s, issue_flags);
407  	if (unlikely(IS_ERR(*iovec)))
408  		return PTR_ERR(*iovec);
409  
410  	iov_iter_save_state(&s->iter, &s->iter_state);
411  	return 0;
412  }
413  
414  static inline loff_t *io_kiocb_ppos(struct kiocb *kiocb)
415  {
416  	return (kiocb->ki_filp->f_mode & FMODE_STREAM) ? NULL : &kiocb->ki_pos;
417  }
418  
419  /*
420   * For files that don't have ->read_iter() and ->write_iter(), handle them
421   * by looping over ->read() or ->write() manually.
422   */
423  static ssize_t loop_rw_iter(int ddir, struct io_rw *rw, struct iov_iter *iter)
424  {
425  	struct kiocb *kiocb = &rw->kiocb;
426  	struct file *file = kiocb->ki_filp;
427  	ssize_t ret = 0;
428  	loff_t *ppos;
429  
430  	/*
431  	 * Don't support polled IO through this interface, and we can't
432  	 * support non-blocking either. For the latter, this just causes
433  	 * the kiocb to be handled from an async context.
434  	 */
435  	if (kiocb->ki_flags & IOCB_HIPRI)
436  		return -EOPNOTSUPP;
437  	if ((kiocb->ki_flags & IOCB_NOWAIT) &&
438  	    !(kiocb->ki_filp->f_flags & O_NONBLOCK))
439  		return -EAGAIN;
440  
441  	ppos = io_kiocb_ppos(kiocb);
442  
443  	while (iov_iter_count(iter)) {
444  		struct iovec iovec;
445  		ssize_t nr;
446  
447  		if (!iov_iter_is_bvec(iter)) {
448  			iovec = iov_iter_iovec(iter);
449  		} else {
450  			iovec.iov_base = u64_to_user_ptr(rw->addr);
451  			iovec.iov_len = rw->len;
452  		}
453  
454  		if (ddir == READ) {
455  			nr = file->f_op->read(file, iovec.iov_base,
456  					      iovec.iov_len, ppos);
457  		} else {
458  			nr = file->f_op->write(file, iovec.iov_base,
459  					       iovec.iov_len, ppos);
460  		}
461  
462  		if (nr < 0) {
463  			if (!ret)
464  				ret = nr;
465  			break;
466  		}
467  		ret += nr;
468  		if (!iov_iter_is_bvec(iter)) {
469  			iov_iter_advance(iter, nr);
470  		} else {
471  			rw->addr += nr;
472  			rw->len -= nr;
473  			if (!rw->len)
474  				break;
475  		}
476  		if (nr != iovec.iov_len)
477  			break;
478  	}
479  
480  	return ret;
481  }
482  
483  static void io_req_map_rw(struct io_kiocb *req, const struct iovec *iovec,
484  			  const struct iovec *fast_iov, struct iov_iter *iter)
485  {
486  	struct io_async_rw *io = req->async_data;
487  
488  	memcpy(&io->s.iter, iter, sizeof(*iter));
489  	io->free_iovec = iovec;
490  	io->bytes_done = 0;
491  	/* can only be fixed buffers, no need to do anything */
492  	if (iov_iter_is_bvec(iter))
493  		return;
494  	if (!iovec) {
495  		unsigned iov_off = 0;
496  
497  		io->s.iter.iov = io->s.fast_iov;
498  		if (iter->iov != fast_iov) {
499  			iov_off = iter->iov - fast_iov;
500  			io->s.iter.iov += iov_off;
501  		}
502  		if (io->s.fast_iov != fast_iov)
503  			memcpy(io->s.fast_iov + iov_off, fast_iov + iov_off,
504  			       sizeof(struct iovec) * iter->nr_segs);
505  	} else {
506  		req->flags |= REQ_F_NEED_CLEANUP;
507  	}
508  }
509  
510  static int io_setup_async_rw(struct io_kiocb *req, const struct iovec *iovec,
511  			     struct io_rw_state *s, bool force)
512  {
513  	if (!force && !io_op_defs[req->opcode].prep_async)
514  		return 0;
515  	if (!req_has_async_data(req)) {
516  		struct io_async_rw *iorw;
517  
518  		if (io_alloc_async_data(req)) {
519  			kfree(iovec);
520  			return -ENOMEM;
521  		}
522  
523  		io_req_map_rw(req, iovec, s->fast_iov, &s->iter);
524  		iorw = req->async_data;
525  		/* we've copied and mapped the iter, ensure state is saved */
526  		iov_iter_save_state(&iorw->s.iter, &iorw->s.iter_state);
527  	}
528  	return 0;
529  }
530  
531  static inline int io_rw_prep_async(struct io_kiocb *req, int rw)
532  {
533  	struct io_async_rw *iorw = req->async_data;
534  	struct iovec *iov;
535  	int ret;
536  
537  	/* submission path, ->uring_lock should already be taken */
538  	ret = io_import_iovec(rw, req, &iov, &iorw->s, 0);
539  	if (unlikely(ret < 0))
540  		return ret;
541  
542  	iorw->bytes_done = 0;
543  	iorw->free_iovec = iov;
544  	if (iov)
545  		req->flags |= REQ_F_NEED_CLEANUP;
546  	return 0;
547  }
548  
549  int io_readv_prep_async(struct io_kiocb *req)
550  {
551  	return io_rw_prep_async(req, READ);
552  }
553  
554  int io_writev_prep_async(struct io_kiocb *req)
555  {
556  	return io_rw_prep_async(req, WRITE);
557  }
558  
559  /*
560   * This is our waitqueue callback handler, registered through __folio_lock_async()
561   * when we initially tried to do the IO with the iocb armed our waitqueue.
562   * This gets called when the page is unlocked, and we generally expect that to
563   * happen when the page IO is completed and the page is now uptodate. This will
564   * queue a task_work based retry of the operation, attempting to copy the data
565   * again. If the latter fails because the page was NOT uptodate, then we will
566   * do a thread based blocking retry of the operation. That's the unexpected
567   * slow path.
568   */
569  static int io_async_buf_func(struct wait_queue_entry *wait, unsigned mode,
570  			     int sync, void *arg)
571  {
572  	struct wait_page_queue *wpq;
573  	struct io_kiocb *req = wait->private;
574  	struct io_rw *rw = io_kiocb_to_cmd(req, struct io_rw);
575  	struct wait_page_key *key = arg;
576  
577  	wpq = container_of(wait, struct wait_page_queue, wait);
578  
579  	if (!wake_page_match(wpq, key))
580  		return 0;
581  
582  	rw->kiocb.ki_flags &= ~IOCB_WAITQ;
583  	list_del_init(&wait->entry);
584  	io_req_task_queue(req);
585  	return 1;
586  }
587  
588  /*
589   * This controls whether a given IO request should be armed for async page
590   * based retry. If we return false here, the request is handed to the async
591   * worker threads for retry. If we're doing buffered reads on a regular file,
592   * we prepare a private wait_page_queue entry and retry the operation. This
593   * will either succeed because the page is now uptodate and unlocked, or it
594   * will register a callback when the page is unlocked at IO completion. Through
595   * that callback, io_uring uses task_work to setup a retry of the operation.
596   * That retry will attempt the buffered read again. The retry will generally
597   * succeed, or in rare cases where it fails, we then fall back to using the
598   * async worker threads for a blocking retry.
599   */
600  static bool io_rw_should_retry(struct io_kiocb *req)
601  {
602  	struct io_async_rw *io = req->async_data;
603  	struct wait_page_queue *wait = &io->wpq;
604  	struct io_rw *rw = io_kiocb_to_cmd(req, struct io_rw);
605  	struct kiocb *kiocb = &rw->kiocb;
606  
607  	/* never retry for NOWAIT, we just complete with -EAGAIN */
608  	if (req->flags & REQ_F_NOWAIT)
609  		return false;
610  
611  	/* Only for buffered IO */
612  	if (kiocb->ki_flags & (IOCB_DIRECT | IOCB_HIPRI))
613  		return false;
614  
615  	/*
616  	 * just use poll if we can, and don't attempt if the fs doesn't
617  	 * support callback based unlocks
618  	 */
619  	if (file_can_poll(req->file) || !(req->file->f_mode & FMODE_BUF_RASYNC))
620  		return false;
621  
622  	wait->wait.func = io_async_buf_func;
623  	wait->wait.private = req;
624  	wait->wait.flags = 0;
625  	INIT_LIST_HEAD(&wait->wait.entry);
626  	kiocb->ki_flags |= IOCB_WAITQ;
627  	kiocb->ki_flags &= ~IOCB_NOWAIT;
628  	kiocb->ki_waitq = wait;
629  	return true;
630  }
631  
632  static inline int io_iter_do_read(struct io_rw *rw, struct iov_iter *iter)
633  {
634  	struct file *file = rw->kiocb.ki_filp;
635  
636  	if (likely(file->f_op->read_iter))
637  		return call_read_iter(file, &rw->kiocb, iter);
638  	else if (file->f_op->read)
639  		return loop_rw_iter(READ, rw, iter);
640  	else
641  		return -EINVAL;
642  }
643  
644  static bool need_complete_io(struct io_kiocb *req)
645  {
646  	return req->flags & REQ_F_ISREG ||
647  		S_ISBLK(file_inode(req->file)->i_mode);
648  }
649  
650  static int io_rw_init_file(struct io_kiocb *req, fmode_t mode)
651  {
652  	struct io_rw *rw = io_kiocb_to_cmd(req, struct io_rw);
653  	struct kiocb *kiocb = &rw->kiocb;
654  	struct io_ring_ctx *ctx = req->ctx;
655  	struct file *file = req->file;
656  	int ret;
657  
658  	if (unlikely(!file || !(file->f_mode & mode)))
659  		return -EBADF;
660  
661  	if (!io_req_ffs_set(req))
662  		req->flags |= io_file_get_flags(file) << REQ_F_SUPPORT_NOWAIT_BIT;
663  
664  	kiocb->ki_flags = file->f_iocb_flags;
665  	ret = kiocb_set_rw_flags(kiocb, rw->flags);
666  	if (unlikely(ret))
667  		return ret;
668  
669  	/*
670  	 * If the file is marked O_NONBLOCK, still allow retry for it if it
671  	 * supports async. Otherwise it's impossible to use O_NONBLOCK files
672  	 * reliably. If not, or it IOCB_NOWAIT is set, don't retry.
673  	 */
674  	if ((kiocb->ki_flags & IOCB_NOWAIT) ||
675  	    ((file->f_flags & O_NONBLOCK) && !io_file_supports_nowait(req)))
676  		req->flags |= REQ_F_NOWAIT;
677  
678  	if (ctx->flags & IORING_SETUP_IOPOLL) {
679  		if (!(kiocb->ki_flags & IOCB_DIRECT) || !file->f_op->iopoll)
680  			return -EOPNOTSUPP;
681  
682  		kiocb->private = NULL;
683  		kiocb->ki_flags |= IOCB_HIPRI | IOCB_ALLOC_CACHE;
684  		kiocb->ki_complete = io_complete_rw_iopoll;
685  		req->iopoll_completed = 0;
686  	} else {
687  		if (kiocb->ki_flags & IOCB_HIPRI)
688  			return -EINVAL;
689  		kiocb->ki_complete = io_complete_rw;
690  	}
691  
692  	return 0;
693  }
694  
695  int io_read(struct io_kiocb *req, unsigned int issue_flags)
696  {
697  	struct io_rw *rw = io_kiocb_to_cmd(req, struct io_rw);
698  	struct io_rw_state __s, *s = &__s;
699  	struct iovec *iovec;
700  	struct kiocb *kiocb = &rw->kiocb;
701  	bool force_nonblock = issue_flags & IO_URING_F_NONBLOCK;
702  	struct io_async_rw *io;
703  	ssize_t ret, ret2;
704  	loff_t *ppos;
705  
706  	if (!req_has_async_data(req)) {
707  		ret = io_import_iovec(READ, req, &iovec, s, issue_flags);
708  		if (unlikely(ret < 0))
709  			return ret;
710  	} else {
711  		io = req->async_data;
712  		s = &io->s;
713  
714  		/*
715  		 * Safe and required to re-import if we're using provided
716  		 * buffers, as we dropped the selected one before retry.
717  		 */
718  		if (io_do_buffer_select(req)) {
719  			ret = io_import_iovec(READ, req, &iovec, s, issue_flags);
720  			if (unlikely(ret < 0))
721  				return ret;
722  		}
723  
724  		/*
725  		 * We come here from an earlier attempt, restore our state to
726  		 * match in case it doesn't. It's cheap enough that we don't
727  		 * need to make this conditional.
728  		 */
729  		iov_iter_restore(&s->iter, &s->iter_state);
730  		iovec = NULL;
731  	}
732  	ret = io_rw_init_file(req, FMODE_READ);
733  	if (unlikely(ret)) {
734  		kfree(iovec);
735  		return ret;
736  	}
737  	req->cqe.res = iov_iter_count(&s->iter);
738  
739  	if (force_nonblock) {
740  		/* If the file doesn't support async, just async punt */
741  		if (unlikely(!io_file_supports_nowait(req))) {
742  			ret = io_setup_async_rw(req, iovec, s, true);
743  			return ret ?: -EAGAIN;
744  		}
745  		kiocb->ki_flags |= IOCB_NOWAIT;
746  	} else {
747  		/* Ensure we clear previously set non-block flag */
748  		kiocb->ki_flags &= ~IOCB_NOWAIT;
749  	}
750  
751  	ppos = io_kiocb_update_pos(req);
752  
753  	ret = rw_verify_area(READ, req->file, ppos, req->cqe.res);
754  	if (unlikely(ret)) {
755  		kfree(iovec);
756  		return ret;
757  	}
758  
759  	ret = io_iter_do_read(rw, &s->iter);
760  
761  	if (ret == -EAGAIN || (req->flags & REQ_F_REISSUE)) {
762  		req->flags &= ~REQ_F_REISSUE;
763  		/* if we can poll, just do that */
764  		if (req->opcode == IORING_OP_READ && file_can_poll(req->file))
765  			return -EAGAIN;
766  		/* IOPOLL retry should happen for io-wq threads */
767  		if (!force_nonblock && !(req->ctx->flags & IORING_SETUP_IOPOLL))
768  			goto done;
769  		/* no retry on NONBLOCK nor RWF_NOWAIT */
770  		if (req->flags & REQ_F_NOWAIT)
771  			goto done;
772  		ret = 0;
773  	} else if (ret == -EIOCBQUEUED) {
774  		if (iovec)
775  			kfree(iovec);
776  		return IOU_ISSUE_SKIP_COMPLETE;
777  	} else if (ret == req->cqe.res || ret <= 0 || !force_nonblock ||
778  		   (req->flags & REQ_F_NOWAIT) || !need_complete_io(req)) {
779  		/* read all, failed, already did sync or don't want to retry */
780  		goto done;
781  	}
782  
783  	/*
784  	 * Don't depend on the iter state matching what was consumed, or being
785  	 * untouched in case of error. Restore it and we'll advance it
786  	 * manually if we need to.
787  	 */
788  	iov_iter_restore(&s->iter, &s->iter_state);
789  
790  	ret2 = io_setup_async_rw(req, iovec, s, true);
791  	if (ret2)
792  		return ret2;
793  
794  	iovec = NULL;
795  	io = req->async_data;
796  	s = &io->s;
797  	/*
798  	 * Now use our persistent iterator and state, if we aren't already.
799  	 * We've restored and mapped the iter to match.
800  	 */
801  
802  	do {
803  		/*
804  		 * We end up here because of a partial read, either from
805  		 * above or inside this loop. Advance the iter by the bytes
806  		 * that were consumed.
807  		 */
808  		iov_iter_advance(&s->iter, ret);
809  		if (!iov_iter_count(&s->iter))
810  			break;
811  		io->bytes_done += ret;
812  		iov_iter_save_state(&s->iter, &s->iter_state);
813  
814  		/* if we can retry, do so with the callbacks armed */
815  		if (!io_rw_should_retry(req)) {
816  			kiocb->ki_flags &= ~IOCB_WAITQ;
817  			return -EAGAIN;
818  		}
819  
820  		/*
821  		 * Now retry read with the IOCB_WAITQ parts set in the iocb. If
822  		 * we get -EIOCBQUEUED, then we'll get a notification when the
823  		 * desired page gets unlocked. We can also get a partial read
824  		 * here, and if we do, then just retry at the new offset.
825  		 */
826  		ret = io_iter_do_read(rw, &s->iter);
827  		if (ret == -EIOCBQUEUED)
828  			return IOU_ISSUE_SKIP_COMPLETE;
829  		/* we got some bytes, but not all. retry. */
830  		kiocb->ki_flags &= ~IOCB_WAITQ;
831  		iov_iter_restore(&s->iter, &s->iter_state);
832  	} while (ret > 0);
833  done:
834  	/* it's faster to check here then delegate to kfree */
835  	if (iovec)
836  		kfree(iovec);
837  	return kiocb_done(req, ret, issue_flags);
838  }
839  
840  int io_write(struct io_kiocb *req, unsigned int issue_flags)
841  {
842  	struct io_rw *rw = io_kiocb_to_cmd(req, struct io_rw);
843  	struct io_rw_state __s, *s = &__s;
844  	struct iovec *iovec;
845  	struct kiocb *kiocb = &rw->kiocb;
846  	bool force_nonblock = issue_flags & IO_URING_F_NONBLOCK;
847  	ssize_t ret, ret2;
848  	loff_t *ppos;
849  
850  	if (!req_has_async_data(req)) {
851  		ret = io_import_iovec(WRITE, req, &iovec, s, issue_flags);
852  		if (unlikely(ret < 0))
853  			return ret;
854  	} else {
855  		struct io_async_rw *io = req->async_data;
856  
857  		s = &io->s;
858  		iov_iter_restore(&s->iter, &s->iter_state);
859  		iovec = NULL;
860  	}
861  	ret = io_rw_init_file(req, FMODE_WRITE);
862  	if (unlikely(ret)) {
863  		kfree(iovec);
864  		return ret;
865  	}
866  	req->cqe.res = iov_iter_count(&s->iter);
867  
868  	if (force_nonblock) {
869  		/* If the file doesn't support async, just async punt */
870  		if (unlikely(!io_file_supports_nowait(req)))
871  			goto copy_iov;
872  
873  		/* File path supports NOWAIT for non-direct_IO only for block devices. */
874  		if (!(kiocb->ki_flags & IOCB_DIRECT) &&
875  			!(kiocb->ki_filp->f_mode & FMODE_BUF_WASYNC) &&
876  			(req->flags & REQ_F_ISREG))
877  			goto copy_iov;
878  
879  		kiocb->ki_flags |= IOCB_NOWAIT;
880  	} else {
881  		/* Ensure we clear previously set non-block flag */
882  		kiocb->ki_flags &= ~IOCB_NOWAIT;
883  	}
884  
885  	ppos = io_kiocb_update_pos(req);
886  
887  	ret = rw_verify_area(WRITE, req->file, ppos, req->cqe.res);
888  	if (unlikely(ret)) {
889  		kfree(iovec);
890  		return ret;
891  	}
892  
893  	/*
894  	 * Open-code file_start_write here to grab freeze protection,
895  	 * which will be released by another thread in
896  	 * io_complete_rw().  Fool lockdep by telling it the lock got
897  	 * released so that it doesn't complain about the held lock when
898  	 * we return to userspace.
899  	 */
900  	if (req->flags & REQ_F_ISREG) {
901  		sb_start_write(file_inode(req->file)->i_sb);
902  		__sb_writers_release(file_inode(req->file)->i_sb,
903  					SB_FREEZE_WRITE);
904  	}
905  	kiocb->ki_flags |= IOCB_WRITE;
906  
907  	if (likely(req->file->f_op->write_iter))
908  		ret2 = call_write_iter(req->file, kiocb, &s->iter);
909  	else if (req->file->f_op->write)
910  		ret2 = loop_rw_iter(WRITE, rw, &s->iter);
911  	else
912  		ret2 = -EINVAL;
913  
914  	if (req->flags & REQ_F_REISSUE) {
915  		req->flags &= ~REQ_F_REISSUE;
916  		ret2 = -EAGAIN;
917  	}
918  
919  	/*
920  	 * Raw bdev writes will return -EOPNOTSUPP for IOCB_NOWAIT. Just
921  	 * retry them without IOCB_NOWAIT.
922  	 */
923  	if (ret2 == -EOPNOTSUPP && (kiocb->ki_flags & IOCB_NOWAIT))
924  		ret2 = -EAGAIN;
925  	/* no retry on NONBLOCK nor RWF_NOWAIT */
926  	if (ret2 == -EAGAIN && (req->flags & REQ_F_NOWAIT))
927  		goto done;
928  	if (!force_nonblock || ret2 != -EAGAIN) {
929  		/* IOPOLL retry should happen for io-wq threads */
930  		if (ret2 == -EAGAIN && (req->ctx->flags & IORING_SETUP_IOPOLL))
931  			goto copy_iov;
932  
933  		if (ret2 != req->cqe.res && ret2 >= 0 && need_complete_io(req)) {
934  			struct io_async_rw *rw;
935  
936  			trace_io_uring_short_write(req->ctx, kiocb->ki_pos - ret2,
937  						req->cqe.res, ret2);
938  
939  			/* This is a partial write. The file pos has already been
940  			 * updated, setup the async struct to complete the request
941  			 * in the worker. Also update bytes_done to account for
942  			 * the bytes already written.
943  			 */
944  			iov_iter_save_state(&s->iter, &s->iter_state);
945  			ret = io_setup_async_rw(req, iovec, s, true);
946  
947  			rw = req->async_data;
948  			if (rw)
949  				rw->bytes_done += ret2;
950  
951  			if (kiocb->ki_flags & IOCB_WRITE)
952  				kiocb_end_write(req);
953  			return ret ? ret : -EAGAIN;
954  		}
955  done:
956  		ret = kiocb_done(req, ret2, issue_flags);
957  	} else {
958  copy_iov:
959  		iov_iter_restore(&s->iter, &s->iter_state);
960  		ret = io_setup_async_rw(req, iovec, s, false);
961  		if (!ret) {
962  			if (kiocb->ki_flags & IOCB_WRITE)
963  				kiocb_end_write(req);
964  			return -EAGAIN;
965  		}
966  		return ret;
967  	}
968  	/* it's reportedly faster than delegating the null check to kfree() */
969  	if (iovec)
970  		kfree(iovec);
971  	return ret;
972  }
973  
974  static void io_cqring_ev_posted_iopoll(struct io_ring_ctx *ctx)
975  {
976  	io_commit_cqring_flush(ctx);
977  	if (ctx->flags & IORING_SETUP_SQPOLL)
978  		io_cqring_wake(ctx);
979  }
980  
981  int io_do_iopoll(struct io_ring_ctx *ctx, bool force_nonspin)
982  {
983  	struct io_wq_work_node *pos, *start, *prev;
984  	unsigned int poll_flags = BLK_POLL_NOSLEEP;
985  	DEFINE_IO_COMP_BATCH(iob);
986  	int nr_events = 0;
987  
988  	/*
989  	 * Only spin for completions if we don't have multiple devices hanging
990  	 * off our complete list.
991  	 */
992  	if (ctx->poll_multi_queue || force_nonspin)
993  		poll_flags |= BLK_POLL_ONESHOT;
994  
995  	wq_list_for_each(pos, start, &ctx->iopoll_list) {
996  		struct io_kiocb *req = container_of(pos, struct io_kiocb, comp_list);
997  		struct io_rw *rw = io_kiocb_to_cmd(req, struct io_rw);
998  		int ret;
999  
1000  		/*
1001  		 * Move completed and retryable entries to our local lists.
1002  		 * If we find a request that requires polling, break out
1003  		 * and complete those lists first, if we have entries there.
1004  		 */
1005  		if (READ_ONCE(req->iopoll_completed))
1006  			break;
1007  
1008  		ret = rw->kiocb.ki_filp->f_op->iopoll(&rw->kiocb, &iob, poll_flags);
1009  		if (unlikely(ret < 0))
1010  			return ret;
1011  		else if (ret)
1012  			poll_flags |= BLK_POLL_ONESHOT;
1013  
1014  		/* iopoll may have completed current req */
1015  		if (!rq_list_empty(iob.req_list) ||
1016  		    READ_ONCE(req->iopoll_completed))
1017  			break;
1018  	}
1019  
1020  	if (!rq_list_empty(iob.req_list))
1021  		iob.complete(&iob);
1022  	else if (!pos)
1023  		return 0;
1024  
1025  	prev = start;
1026  	wq_list_for_each_resume(pos, prev) {
1027  		struct io_kiocb *req = container_of(pos, struct io_kiocb, comp_list);
1028  
1029  		/* order with io_complete_rw_iopoll(), e.g. ->result updates */
1030  		if (!smp_load_acquire(&req->iopoll_completed))
1031  			break;
1032  		nr_events++;
1033  		if (unlikely(req->flags & REQ_F_CQE_SKIP))
1034  			continue;
1035  
1036  		req->cqe.flags = io_put_kbuf(req, 0);
1037  		__io_fill_cqe_req(req->ctx, req);
1038  	}
1039  
1040  	if (unlikely(!nr_events))
1041  		return 0;
1042  
1043  	io_commit_cqring(ctx);
1044  	io_cqring_ev_posted_iopoll(ctx);
1045  	pos = start ? start->next : ctx->iopoll_list.first;
1046  	wq_list_cut(&ctx->iopoll_list, prev, start);
1047  	io_free_batch_list(ctx, pos);
1048  	return nr_events;
1049  }
1050