xref: /openbmc/linux/fs/aio.c (revision bd336e63)
1  /*
2   *	An async IO implementation for Linux
3   *	Written by Benjamin LaHaise <bcrl@kvack.org>
4   *
5   *	Implements an efficient asynchronous io interface.
6   *
7   *	Copyright 2000, 2001, 2002 Red Hat, Inc.  All Rights Reserved.
8   *
9   *	See ../COPYING for licensing terms.
10   */
11  #define pr_fmt(fmt) "%s: " fmt, __func__
12  
13  #include <linux/kernel.h>
14  #include <linux/init.h>
15  #include <linux/errno.h>
16  #include <linux/time.h>
17  #include <linux/aio_abi.h>
18  #include <linux/export.h>
19  #include <linux/syscalls.h>
20  #include <linux/backing-dev.h>
21  #include <linux/uio.h>
22  
23  #include <linux/sched.h>
24  #include <linux/fs.h>
25  #include <linux/file.h>
26  #include <linux/mm.h>
27  #include <linux/mman.h>
28  #include <linux/mmu_context.h>
29  #include <linux/percpu.h>
30  #include <linux/slab.h>
31  #include <linux/timer.h>
32  #include <linux/aio.h>
33  #include <linux/highmem.h>
34  #include <linux/workqueue.h>
35  #include <linux/security.h>
36  #include <linux/eventfd.h>
37  #include <linux/blkdev.h>
38  #include <linux/compat.h>
39  #include <linux/migrate.h>
40  #include <linux/ramfs.h>
41  #include <linux/percpu-refcount.h>
42  #include <linux/mount.h>
43  
44  #include <asm/kmap_types.h>
45  #include <asm/uaccess.h>
46  
47  #include "internal.h"
48  
49  #define AIO_RING_MAGIC			0xa10a10a1
50  #define AIO_RING_COMPAT_FEATURES	1
51  #define AIO_RING_INCOMPAT_FEATURES	0
52  struct aio_ring {
53  	unsigned	id;	/* kernel internal index number */
54  	unsigned	nr;	/* number of io_events */
55  	unsigned	head;	/* Written to by userland or under ring_lock
56  				 * mutex by aio_read_events_ring(). */
57  	unsigned	tail;
58  
59  	unsigned	magic;
60  	unsigned	compat_features;
61  	unsigned	incompat_features;
62  	unsigned	header_length;	/* size of aio_ring */
63  
64  
65  	struct io_event		io_events[0];
66  }; /* 128 bytes + ring size */
67  
68  #define AIO_RING_PAGES	8
69  
70  struct kioctx_table {
71  	struct rcu_head	rcu;
72  	unsigned	nr;
73  	struct kioctx	*table[];
74  };
75  
76  struct kioctx_cpu {
77  	unsigned		reqs_available;
78  };
79  
80  struct ctx_rq_wait {
81  	struct completion comp;
82  	atomic_t count;
83  };
84  
85  struct kioctx {
86  	struct percpu_ref	users;
87  	atomic_t		dead;
88  
89  	struct percpu_ref	reqs;
90  
91  	unsigned long		user_id;
92  
93  	struct __percpu kioctx_cpu *cpu;
94  
95  	/*
96  	 * For percpu reqs_available, number of slots we move to/from global
97  	 * counter at a time:
98  	 */
99  	unsigned		req_batch;
100  	/*
101  	 * This is what userspace passed to io_setup(), it's not used for
102  	 * anything but counting against the global max_reqs quota.
103  	 *
104  	 * The real limit is nr_events - 1, which will be larger (see
105  	 * aio_setup_ring())
106  	 */
107  	unsigned		max_reqs;
108  
109  	/* Size of ringbuffer, in units of struct io_event */
110  	unsigned		nr_events;
111  
112  	unsigned long		mmap_base;
113  	unsigned long		mmap_size;
114  
115  	struct page		**ring_pages;
116  	long			nr_pages;
117  
118  	struct work_struct	free_work;
119  
120  	/*
121  	 * signals when all in-flight requests are done
122  	 */
123  	struct ctx_rq_wait	*rq_wait;
124  
125  	struct {
126  		/*
127  		 * This counts the number of available slots in the ringbuffer,
128  		 * so we avoid overflowing it: it's decremented (if positive)
129  		 * when allocating a kiocb and incremented when the resulting
130  		 * io_event is pulled off the ringbuffer.
131  		 *
132  		 * We batch accesses to it with a percpu version.
133  		 */
134  		atomic_t	reqs_available;
135  	} ____cacheline_aligned_in_smp;
136  
137  	struct {
138  		spinlock_t	ctx_lock;
139  		struct list_head active_reqs;	/* used for cancellation */
140  	} ____cacheline_aligned_in_smp;
141  
142  	struct {
143  		struct mutex	ring_lock;
144  		wait_queue_head_t wait;
145  	} ____cacheline_aligned_in_smp;
146  
147  	struct {
148  		unsigned	tail;
149  		unsigned	completed_events;
150  		spinlock_t	completion_lock;
151  	} ____cacheline_aligned_in_smp;
152  
153  	struct page		*internal_pages[AIO_RING_PAGES];
154  	struct file		*aio_ring_file;
155  
156  	unsigned		id;
157  };
158  
159  /*
160   * We use ki_cancel == KIOCB_CANCELLED to indicate that a kiocb has been either
161   * cancelled or completed (this makes a certain amount of sense because
162   * successful cancellation - io_cancel() - does deliver the completion to
163   * userspace).
164   *
165   * And since most things don't implement kiocb cancellation and we'd really like
166   * kiocb completion to be lockless when possible, we use ki_cancel to
167   * synchronize cancellation and completion - we only set it to KIOCB_CANCELLED
168   * with xchg() or cmpxchg(), see batch_complete_aio() and kiocb_cancel().
169   */
170  #define KIOCB_CANCELLED		((void *) (~0ULL))
171  
172  struct aio_kiocb {
173  	struct kiocb		common;
174  
175  	struct kioctx		*ki_ctx;
176  	kiocb_cancel_fn		*ki_cancel;
177  
178  	struct iocb __user	*ki_user_iocb;	/* user's aiocb */
179  	__u64			ki_user_data;	/* user's data for completion */
180  
181  	struct list_head	ki_list;	/* the aio core uses this
182  						 * for cancellation */
183  
184  	/*
185  	 * If the aio_resfd field of the userspace iocb is not zero,
186  	 * this is the underlying eventfd context to deliver events to.
187  	 */
188  	struct eventfd_ctx	*ki_eventfd;
189  };
190  
191  /*------ sysctl variables----*/
192  static DEFINE_SPINLOCK(aio_nr_lock);
193  unsigned long aio_nr;		/* current system wide number of aio requests */
194  unsigned long aio_max_nr = 0x10000; /* system wide maximum number of aio requests */
195  /*----end sysctl variables---*/
196  
197  static struct kmem_cache	*kiocb_cachep;
198  static struct kmem_cache	*kioctx_cachep;
199  
200  static struct vfsmount *aio_mnt;
201  
202  static const struct file_operations aio_ring_fops;
203  static const struct address_space_operations aio_ctx_aops;
204  
205  static struct file *aio_private_file(struct kioctx *ctx, loff_t nr_pages)
206  {
207  	struct qstr this = QSTR_INIT("[aio]", 5);
208  	struct file *file;
209  	struct path path;
210  	struct inode *inode = alloc_anon_inode(aio_mnt->mnt_sb);
211  	if (IS_ERR(inode))
212  		return ERR_CAST(inode);
213  
214  	inode->i_mapping->a_ops = &aio_ctx_aops;
215  	inode->i_mapping->private_data = ctx;
216  	inode->i_size = PAGE_SIZE * nr_pages;
217  
218  	path.dentry = d_alloc_pseudo(aio_mnt->mnt_sb, &this);
219  	if (!path.dentry) {
220  		iput(inode);
221  		return ERR_PTR(-ENOMEM);
222  	}
223  	path.mnt = mntget(aio_mnt);
224  
225  	d_instantiate(path.dentry, inode);
226  	file = alloc_file(&path, FMODE_READ | FMODE_WRITE, &aio_ring_fops);
227  	if (IS_ERR(file)) {
228  		path_put(&path);
229  		return file;
230  	}
231  
232  	file->f_flags = O_RDWR;
233  	return file;
234  }
235  
236  static struct dentry *aio_mount(struct file_system_type *fs_type,
237  				int flags, const char *dev_name, void *data)
238  {
239  	static const struct dentry_operations ops = {
240  		.d_dname	= simple_dname,
241  	};
242  	struct dentry *root = mount_pseudo(fs_type, "aio:", NULL, &ops,
243  					   AIO_RING_MAGIC);
244  
245  	if (!IS_ERR(root))
246  		root->d_sb->s_iflags |= SB_I_NOEXEC;
247  	return root;
248  }
249  
250  /* aio_setup
251   *	Creates the slab caches used by the aio routines, panic on
252   *	failure as this is done early during the boot sequence.
253   */
254  static int __init aio_setup(void)
255  {
256  	static struct file_system_type aio_fs = {
257  		.name		= "aio",
258  		.mount		= aio_mount,
259  		.kill_sb	= kill_anon_super,
260  	};
261  	aio_mnt = kern_mount(&aio_fs);
262  	if (IS_ERR(aio_mnt))
263  		panic("Failed to create aio fs mount.");
264  
265  	kiocb_cachep = KMEM_CACHE(aio_kiocb, SLAB_HWCACHE_ALIGN|SLAB_PANIC);
266  	kioctx_cachep = KMEM_CACHE(kioctx,SLAB_HWCACHE_ALIGN|SLAB_PANIC);
267  
268  	pr_debug("sizeof(struct page) = %zu\n", sizeof(struct page));
269  
270  	return 0;
271  }
272  __initcall(aio_setup);
273  
274  static void put_aio_ring_file(struct kioctx *ctx)
275  {
276  	struct file *aio_ring_file = ctx->aio_ring_file;
277  	struct address_space *i_mapping;
278  
279  	if (aio_ring_file) {
280  		truncate_setsize(aio_ring_file->f_inode, 0);
281  
282  		/* Prevent further access to the kioctx from migratepages */
283  		i_mapping = aio_ring_file->f_inode->i_mapping;
284  		spin_lock(&i_mapping->private_lock);
285  		i_mapping->private_data = NULL;
286  		ctx->aio_ring_file = NULL;
287  		spin_unlock(&i_mapping->private_lock);
288  
289  		fput(aio_ring_file);
290  	}
291  }
292  
293  static void aio_free_ring(struct kioctx *ctx)
294  {
295  	int i;
296  
297  	/* Disconnect the kiotx from the ring file.  This prevents future
298  	 * accesses to the kioctx from page migration.
299  	 */
300  	put_aio_ring_file(ctx);
301  
302  	for (i = 0; i < ctx->nr_pages; i++) {
303  		struct page *page;
304  		pr_debug("pid(%d) [%d] page->count=%d\n", current->pid, i,
305  				page_count(ctx->ring_pages[i]));
306  		page = ctx->ring_pages[i];
307  		if (!page)
308  			continue;
309  		ctx->ring_pages[i] = NULL;
310  		put_page(page);
311  	}
312  
313  	if (ctx->ring_pages && ctx->ring_pages != ctx->internal_pages) {
314  		kfree(ctx->ring_pages);
315  		ctx->ring_pages = NULL;
316  	}
317  }
318  
319  static int aio_ring_mremap(struct vm_area_struct *vma)
320  {
321  	struct file *file = vma->vm_file;
322  	struct mm_struct *mm = vma->vm_mm;
323  	struct kioctx_table *table;
324  	int i, res = -EINVAL;
325  
326  	spin_lock(&mm->ioctx_lock);
327  	rcu_read_lock();
328  	table = rcu_dereference(mm->ioctx_table);
329  	for (i = 0; i < table->nr; i++) {
330  		struct kioctx *ctx;
331  
332  		ctx = table->table[i];
333  		if (ctx && ctx->aio_ring_file == file) {
334  			if (!atomic_read(&ctx->dead)) {
335  				ctx->user_id = ctx->mmap_base = vma->vm_start;
336  				res = 0;
337  			}
338  			break;
339  		}
340  	}
341  
342  	rcu_read_unlock();
343  	spin_unlock(&mm->ioctx_lock);
344  	return res;
345  }
346  
347  static const struct vm_operations_struct aio_ring_vm_ops = {
348  	.mremap		= aio_ring_mremap,
349  #if IS_ENABLED(CONFIG_MMU)
350  	.fault		= filemap_fault,
351  	.map_pages	= filemap_map_pages,
352  	.page_mkwrite	= filemap_page_mkwrite,
353  #endif
354  };
355  
356  static int aio_ring_mmap(struct file *file, struct vm_area_struct *vma)
357  {
358  	vma->vm_flags |= VM_DONTEXPAND;
359  	vma->vm_ops = &aio_ring_vm_ops;
360  	return 0;
361  }
362  
363  static const struct file_operations aio_ring_fops = {
364  	.mmap = aio_ring_mmap,
365  };
366  
367  #if IS_ENABLED(CONFIG_MIGRATION)
368  static int aio_migratepage(struct address_space *mapping, struct page *new,
369  			struct page *old, enum migrate_mode mode)
370  {
371  	struct kioctx *ctx;
372  	unsigned long flags;
373  	pgoff_t idx;
374  	int rc;
375  
376  	rc = 0;
377  
378  	/* mapping->private_lock here protects against the kioctx teardown.  */
379  	spin_lock(&mapping->private_lock);
380  	ctx = mapping->private_data;
381  	if (!ctx) {
382  		rc = -EINVAL;
383  		goto out;
384  	}
385  
386  	/* The ring_lock mutex.  The prevents aio_read_events() from writing
387  	 * to the ring's head, and prevents page migration from mucking in
388  	 * a partially initialized kiotx.
389  	 */
390  	if (!mutex_trylock(&ctx->ring_lock)) {
391  		rc = -EAGAIN;
392  		goto out;
393  	}
394  
395  	idx = old->index;
396  	if (idx < (pgoff_t)ctx->nr_pages) {
397  		/* Make sure the old page hasn't already been changed */
398  		if (ctx->ring_pages[idx] != old)
399  			rc = -EAGAIN;
400  	} else
401  		rc = -EINVAL;
402  
403  	if (rc != 0)
404  		goto out_unlock;
405  
406  	/* Writeback must be complete */
407  	BUG_ON(PageWriteback(old));
408  	get_page(new);
409  
410  	rc = migrate_page_move_mapping(mapping, new, old, NULL, mode, 1);
411  	if (rc != MIGRATEPAGE_SUCCESS) {
412  		put_page(new);
413  		goto out_unlock;
414  	}
415  
416  	/* Take completion_lock to prevent other writes to the ring buffer
417  	 * while the old page is copied to the new.  This prevents new
418  	 * events from being lost.
419  	 */
420  	spin_lock_irqsave(&ctx->completion_lock, flags);
421  	migrate_page_copy(new, old);
422  	BUG_ON(ctx->ring_pages[idx] != old);
423  	ctx->ring_pages[idx] = new;
424  	spin_unlock_irqrestore(&ctx->completion_lock, flags);
425  
426  	/* The old page is no longer accessible. */
427  	put_page(old);
428  
429  out_unlock:
430  	mutex_unlock(&ctx->ring_lock);
431  out:
432  	spin_unlock(&mapping->private_lock);
433  	return rc;
434  }
435  #endif
436  
437  static const struct address_space_operations aio_ctx_aops = {
438  	.set_page_dirty = __set_page_dirty_no_writeback,
439  #if IS_ENABLED(CONFIG_MIGRATION)
440  	.migratepage	= aio_migratepage,
441  #endif
442  };
443  
444  static int aio_setup_ring(struct kioctx *ctx)
445  {
446  	struct aio_ring *ring;
447  	unsigned nr_events = ctx->max_reqs;
448  	struct mm_struct *mm = current->mm;
449  	unsigned long size, unused;
450  	int nr_pages;
451  	int i;
452  	struct file *file;
453  
454  	/* Compensate for the ring buffer's head/tail overlap entry */
455  	nr_events += 2;	/* 1 is required, 2 for good luck */
456  
457  	size = sizeof(struct aio_ring);
458  	size += sizeof(struct io_event) * nr_events;
459  
460  	nr_pages = PFN_UP(size);
461  	if (nr_pages < 0)
462  		return -EINVAL;
463  
464  	file = aio_private_file(ctx, nr_pages);
465  	if (IS_ERR(file)) {
466  		ctx->aio_ring_file = NULL;
467  		return -ENOMEM;
468  	}
469  
470  	ctx->aio_ring_file = file;
471  	nr_events = (PAGE_SIZE * nr_pages - sizeof(struct aio_ring))
472  			/ sizeof(struct io_event);
473  
474  	ctx->ring_pages = ctx->internal_pages;
475  	if (nr_pages > AIO_RING_PAGES) {
476  		ctx->ring_pages = kcalloc(nr_pages, sizeof(struct page *),
477  					  GFP_KERNEL);
478  		if (!ctx->ring_pages) {
479  			put_aio_ring_file(ctx);
480  			return -ENOMEM;
481  		}
482  	}
483  
484  	for (i = 0; i < nr_pages; i++) {
485  		struct page *page;
486  		page = find_or_create_page(file->f_inode->i_mapping,
487  					   i, GFP_HIGHUSER | __GFP_ZERO);
488  		if (!page)
489  			break;
490  		pr_debug("pid(%d) page[%d]->count=%d\n",
491  			 current->pid, i, page_count(page));
492  		SetPageUptodate(page);
493  		unlock_page(page);
494  
495  		ctx->ring_pages[i] = page;
496  	}
497  	ctx->nr_pages = i;
498  
499  	if (unlikely(i != nr_pages)) {
500  		aio_free_ring(ctx);
501  		return -ENOMEM;
502  	}
503  
504  	ctx->mmap_size = nr_pages * PAGE_SIZE;
505  	pr_debug("attempting mmap of %lu bytes\n", ctx->mmap_size);
506  
507  	if (down_write_killable(&mm->mmap_sem)) {
508  		ctx->mmap_size = 0;
509  		aio_free_ring(ctx);
510  		return -EINTR;
511  	}
512  
513  	ctx->mmap_base = do_mmap_pgoff(ctx->aio_ring_file, 0, ctx->mmap_size,
514  				       PROT_READ | PROT_WRITE,
515  				       MAP_SHARED, 0, &unused);
516  	up_write(&mm->mmap_sem);
517  	if (IS_ERR((void *)ctx->mmap_base)) {
518  		ctx->mmap_size = 0;
519  		aio_free_ring(ctx);
520  		return -ENOMEM;
521  	}
522  
523  	pr_debug("mmap address: 0x%08lx\n", ctx->mmap_base);
524  
525  	ctx->user_id = ctx->mmap_base;
526  	ctx->nr_events = nr_events; /* trusted copy */
527  
528  	ring = kmap_atomic(ctx->ring_pages[0]);
529  	ring->nr = nr_events;	/* user copy */
530  	ring->id = ~0U;
531  	ring->head = ring->tail = 0;
532  	ring->magic = AIO_RING_MAGIC;
533  	ring->compat_features = AIO_RING_COMPAT_FEATURES;
534  	ring->incompat_features = AIO_RING_INCOMPAT_FEATURES;
535  	ring->header_length = sizeof(struct aio_ring);
536  	kunmap_atomic(ring);
537  	flush_dcache_page(ctx->ring_pages[0]);
538  
539  	return 0;
540  }
541  
542  #define AIO_EVENTS_PER_PAGE	(PAGE_SIZE / sizeof(struct io_event))
543  #define AIO_EVENTS_FIRST_PAGE	((PAGE_SIZE - sizeof(struct aio_ring)) / sizeof(struct io_event))
544  #define AIO_EVENTS_OFFSET	(AIO_EVENTS_PER_PAGE - AIO_EVENTS_FIRST_PAGE)
545  
546  void kiocb_set_cancel_fn(struct kiocb *iocb, kiocb_cancel_fn *cancel)
547  {
548  	struct aio_kiocb *req = container_of(iocb, struct aio_kiocb, common);
549  	struct kioctx *ctx = req->ki_ctx;
550  	unsigned long flags;
551  
552  	spin_lock_irqsave(&ctx->ctx_lock, flags);
553  
554  	if (!req->ki_list.next)
555  		list_add(&req->ki_list, &ctx->active_reqs);
556  
557  	req->ki_cancel = cancel;
558  
559  	spin_unlock_irqrestore(&ctx->ctx_lock, flags);
560  }
561  EXPORT_SYMBOL(kiocb_set_cancel_fn);
562  
563  static int kiocb_cancel(struct aio_kiocb *kiocb)
564  {
565  	kiocb_cancel_fn *old, *cancel;
566  
567  	/*
568  	 * Don't want to set kiocb->ki_cancel = KIOCB_CANCELLED unless it
569  	 * actually has a cancel function, hence the cmpxchg()
570  	 */
571  
572  	cancel = ACCESS_ONCE(kiocb->ki_cancel);
573  	do {
574  		if (!cancel || cancel == KIOCB_CANCELLED)
575  			return -EINVAL;
576  
577  		old = cancel;
578  		cancel = cmpxchg(&kiocb->ki_cancel, old, KIOCB_CANCELLED);
579  	} while (cancel != old);
580  
581  	return cancel(&kiocb->common);
582  }
583  
584  static void free_ioctx(struct work_struct *work)
585  {
586  	struct kioctx *ctx = container_of(work, struct kioctx, free_work);
587  
588  	pr_debug("freeing %p\n", ctx);
589  
590  	aio_free_ring(ctx);
591  	free_percpu(ctx->cpu);
592  	percpu_ref_exit(&ctx->reqs);
593  	percpu_ref_exit(&ctx->users);
594  	kmem_cache_free(kioctx_cachep, ctx);
595  }
596  
597  static void free_ioctx_reqs(struct percpu_ref *ref)
598  {
599  	struct kioctx *ctx = container_of(ref, struct kioctx, reqs);
600  
601  	/* At this point we know that there are no any in-flight requests */
602  	if (ctx->rq_wait && atomic_dec_and_test(&ctx->rq_wait->count))
603  		complete(&ctx->rq_wait->comp);
604  
605  	INIT_WORK(&ctx->free_work, free_ioctx);
606  	schedule_work(&ctx->free_work);
607  }
608  
609  /*
610   * When this function runs, the kioctx has been removed from the "hash table"
611   * and ctx->users has dropped to 0, so we know no more kiocbs can be submitted -
612   * now it's safe to cancel any that need to be.
613   */
614  static void free_ioctx_users(struct percpu_ref *ref)
615  {
616  	struct kioctx *ctx = container_of(ref, struct kioctx, users);
617  	struct aio_kiocb *req;
618  
619  	spin_lock_irq(&ctx->ctx_lock);
620  
621  	while (!list_empty(&ctx->active_reqs)) {
622  		req = list_first_entry(&ctx->active_reqs,
623  				       struct aio_kiocb, ki_list);
624  
625  		list_del_init(&req->ki_list);
626  		kiocb_cancel(req);
627  	}
628  
629  	spin_unlock_irq(&ctx->ctx_lock);
630  
631  	percpu_ref_kill(&ctx->reqs);
632  	percpu_ref_put(&ctx->reqs);
633  }
634  
635  static int ioctx_add_table(struct kioctx *ctx, struct mm_struct *mm)
636  {
637  	unsigned i, new_nr;
638  	struct kioctx_table *table, *old;
639  	struct aio_ring *ring;
640  
641  	spin_lock(&mm->ioctx_lock);
642  	table = rcu_dereference_raw(mm->ioctx_table);
643  
644  	while (1) {
645  		if (table)
646  			for (i = 0; i < table->nr; i++)
647  				if (!table->table[i]) {
648  					ctx->id = i;
649  					table->table[i] = ctx;
650  					spin_unlock(&mm->ioctx_lock);
651  
652  					/* While kioctx setup is in progress,
653  					 * we are protected from page migration
654  					 * changes ring_pages by ->ring_lock.
655  					 */
656  					ring = kmap_atomic(ctx->ring_pages[0]);
657  					ring->id = ctx->id;
658  					kunmap_atomic(ring);
659  					return 0;
660  				}
661  
662  		new_nr = (table ? table->nr : 1) * 4;
663  		spin_unlock(&mm->ioctx_lock);
664  
665  		table = kzalloc(sizeof(*table) + sizeof(struct kioctx *) *
666  				new_nr, GFP_KERNEL);
667  		if (!table)
668  			return -ENOMEM;
669  
670  		table->nr = new_nr;
671  
672  		spin_lock(&mm->ioctx_lock);
673  		old = rcu_dereference_raw(mm->ioctx_table);
674  
675  		if (!old) {
676  			rcu_assign_pointer(mm->ioctx_table, table);
677  		} else if (table->nr > old->nr) {
678  			memcpy(table->table, old->table,
679  			       old->nr * sizeof(struct kioctx *));
680  
681  			rcu_assign_pointer(mm->ioctx_table, table);
682  			kfree_rcu(old, rcu);
683  		} else {
684  			kfree(table);
685  			table = old;
686  		}
687  	}
688  }
689  
690  static void aio_nr_sub(unsigned nr)
691  {
692  	spin_lock(&aio_nr_lock);
693  	if (WARN_ON(aio_nr - nr > aio_nr))
694  		aio_nr = 0;
695  	else
696  		aio_nr -= nr;
697  	spin_unlock(&aio_nr_lock);
698  }
699  
700  /* ioctx_alloc
701   *	Allocates and initializes an ioctx.  Returns an ERR_PTR if it failed.
702   */
703  static struct kioctx *ioctx_alloc(unsigned nr_events)
704  {
705  	struct mm_struct *mm = current->mm;
706  	struct kioctx *ctx;
707  	int err = -ENOMEM;
708  
709  	/*
710  	 * We keep track of the number of available ringbuffer slots, to prevent
711  	 * overflow (reqs_available), and we also use percpu counters for this.
712  	 *
713  	 * So since up to half the slots might be on other cpu's percpu counters
714  	 * and unavailable, double nr_events so userspace sees what they
715  	 * expected: additionally, we move req_batch slots to/from percpu
716  	 * counters at a time, so make sure that isn't 0:
717  	 */
718  	nr_events = max(nr_events, num_possible_cpus() * 4);
719  	nr_events *= 2;
720  
721  	/* Prevent overflows */
722  	if (nr_events > (0x10000000U / sizeof(struct io_event))) {
723  		pr_debug("ENOMEM: nr_events too high\n");
724  		return ERR_PTR(-EINVAL);
725  	}
726  
727  	if (!nr_events || (unsigned long)nr_events > (aio_max_nr * 2UL))
728  		return ERR_PTR(-EAGAIN);
729  
730  	ctx = kmem_cache_zalloc(kioctx_cachep, GFP_KERNEL);
731  	if (!ctx)
732  		return ERR_PTR(-ENOMEM);
733  
734  	ctx->max_reqs = nr_events;
735  
736  	spin_lock_init(&ctx->ctx_lock);
737  	spin_lock_init(&ctx->completion_lock);
738  	mutex_init(&ctx->ring_lock);
739  	/* Protect against page migration throughout kiotx setup by keeping
740  	 * the ring_lock mutex held until setup is complete. */
741  	mutex_lock(&ctx->ring_lock);
742  	init_waitqueue_head(&ctx->wait);
743  
744  	INIT_LIST_HEAD(&ctx->active_reqs);
745  
746  	if (percpu_ref_init(&ctx->users, free_ioctx_users, 0, GFP_KERNEL))
747  		goto err;
748  
749  	if (percpu_ref_init(&ctx->reqs, free_ioctx_reqs, 0, GFP_KERNEL))
750  		goto err;
751  
752  	ctx->cpu = alloc_percpu(struct kioctx_cpu);
753  	if (!ctx->cpu)
754  		goto err;
755  
756  	err = aio_setup_ring(ctx);
757  	if (err < 0)
758  		goto err;
759  
760  	atomic_set(&ctx->reqs_available, ctx->nr_events - 1);
761  	ctx->req_batch = (ctx->nr_events - 1) / (num_possible_cpus() * 4);
762  	if (ctx->req_batch < 1)
763  		ctx->req_batch = 1;
764  
765  	/* limit the number of system wide aios */
766  	spin_lock(&aio_nr_lock);
767  	if (aio_nr + nr_events > (aio_max_nr * 2UL) ||
768  	    aio_nr + nr_events < aio_nr) {
769  		spin_unlock(&aio_nr_lock);
770  		err = -EAGAIN;
771  		goto err_ctx;
772  	}
773  	aio_nr += ctx->max_reqs;
774  	spin_unlock(&aio_nr_lock);
775  
776  	percpu_ref_get(&ctx->users);	/* io_setup() will drop this ref */
777  	percpu_ref_get(&ctx->reqs);	/* free_ioctx_users() will drop this */
778  
779  	err = ioctx_add_table(ctx, mm);
780  	if (err)
781  		goto err_cleanup;
782  
783  	/* Release the ring_lock mutex now that all setup is complete. */
784  	mutex_unlock(&ctx->ring_lock);
785  
786  	pr_debug("allocated ioctx %p[%ld]: mm=%p mask=0x%x\n",
787  		 ctx, ctx->user_id, mm, ctx->nr_events);
788  	return ctx;
789  
790  err_cleanup:
791  	aio_nr_sub(ctx->max_reqs);
792  err_ctx:
793  	atomic_set(&ctx->dead, 1);
794  	if (ctx->mmap_size)
795  		vm_munmap(ctx->mmap_base, ctx->mmap_size);
796  	aio_free_ring(ctx);
797  err:
798  	mutex_unlock(&ctx->ring_lock);
799  	free_percpu(ctx->cpu);
800  	percpu_ref_exit(&ctx->reqs);
801  	percpu_ref_exit(&ctx->users);
802  	kmem_cache_free(kioctx_cachep, ctx);
803  	pr_debug("error allocating ioctx %d\n", err);
804  	return ERR_PTR(err);
805  }
806  
807  /* kill_ioctx
808   *	Cancels all outstanding aio requests on an aio context.  Used
809   *	when the processes owning a context have all exited to encourage
810   *	the rapid destruction of the kioctx.
811   */
812  static int kill_ioctx(struct mm_struct *mm, struct kioctx *ctx,
813  		      struct ctx_rq_wait *wait)
814  {
815  	struct kioctx_table *table;
816  
817  	spin_lock(&mm->ioctx_lock);
818  	if (atomic_xchg(&ctx->dead, 1)) {
819  		spin_unlock(&mm->ioctx_lock);
820  		return -EINVAL;
821  	}
822  
823  	table = rcu_dereference_raw(mm->ioctx_table);
824  	WARN_ON(ctx != table->table[ctx->id]);
825  	table->table[ctx->id] = NULL;
826  	spin_unlock(&mm->ioctx_lock);
827  
828  	/* percpu_ref_kill() will do the necessary call_rcu() */
829  	wake_up_all(&ctx->wait);
830  
831  	/*
832  	 * It'd be more correct to do this in free_ioctx(), after all
833  	 * the outstanding kiocbs have finished - but by then io_destroy
834  	 * has already returned, so io_setup() could potentially return
835  	 * -EAGAIN with no ioctxs actually in use (as far as userspace
836  	 *  could tell).
837  	 */
838  	aio_nr_sub(ctx->max_reqs);
839  
840  	if (ctx->mmap_size)
841  		vm_munmap(ctx->mmap_base, ctx->mmap_size);
842  
843  	ctx->rq_wait = wait;
844  	percpu_ref_kill(&ctx->users);
845  	return 0;
846  }
847  
848  /*
849   * exit_aio: called when the last user of mm goes away.  At this point, there is
850   * no way for any new requests to be submited or any of the io_* syscalls to be
851   * called on the context.
852   *
853   * There may be outstanding kiocbs, but free_ioctx() will explicitly wait on
854   * them.
855   */
856  void exit_aio(struct mm_struct *mm)
857  {
858  	struct kioctx_table *table = rcu_dereference_raw(mm->ioctx_table);
859  	struct ctx_rq_wait wait;
860  	int i, skipped;
861  
862  	if (!table)
863  		return;
864  
865  	atomic_set(&wait.count, table->nr);
866  	init_completion(&wait.comp);
867  
868  	skipped = 0;
869  	for (i = 0; i < table->nr; ++i) {
870  		struct kioctx *ctx = table->table[i];
871  
872  		if (!ctx) {
873  			skipped++;
874  			continue;
875  		}
876  
877  		/*
878  		 * We don't need to bother with munmap() here - exit_mmap(mm)
879  		 * is coming and it'll unmap everything. And we simply can't,
880  		 * this is not necessarily our ->mm.
881  		 * Since kill_ioctx() uses non-zero ->mmap_size as indicator
882  		 * that it needs to unmap the area, just set it to 0.
883  		 */
884  		ctx->mmap_size = 0;
885  		kill_ioctx(mm, ctx, &wait);
886  	}
887  
888  	if (!atomic_sub_and_test(skipped, &wait.count)) {
889  		/* Wait until all IO for the context are done. */
890  		wait_for_completion(&wait.comp);
891  	}
892  
893  	RCU_INIT_POINTER(mm->ioctx_table, NULL);
894  	kfree(table);
895  }
896  
897  static void put_reqs_available(struct kioctx *ctx, unsigned nr)
898  {
899  	struct kioctx_cpu *kcpu;
900  	unsigned long flags;
901  
902  	local_irq_save(flags);
903  	kcpu = this_cpu_ptr(ctx->cpu);
904  	kcpu->reqs_available += nr;
905  
906  	while (kcpu->reqs_available >= ctx->req_batch * 2) {
907  		kcpu->reqs_available -= ctx->req_batch;
908  		atomic_add(ctx->req_batch, &ctx->reqs_available);
909  	}
910  
911  	local_irq_restore(flags);
912  }
913  
914  static bool get_reqs_available(struct kioctx *ctx)
915  {
916  	struct kioctx_cpu *kcpu;
917  	bool ret = false;
918  	unsigned long flags;
919  
920  	local_irq_save(flags);
921  	kcpu = this_cpu_ptr(ctx->cpu);
922  	if (!kcpu->reqs_available) {
923  		int old, avail = atomic_read(&ctx->reqs_available);
924  
925  		do {
926  			if (avail < ctx->req_batch)
927  				goto out;
928  
929  			old = avail;
930  			avail = atomic_cmpxchg(&ctx->reqs_available,
931  					       avail, avail - ctx->req_batch);
932  		} while (avail != old);
933  
934  		kcpu->reqs_available += ctx->req_batch;
935  	}
936  
937  	ret = true;
938  	kcpu->reqs_available--;
939  out:
940  	local_irq_restore(flags);
941  	return ret;
942  }
943  
944  /* refill_reqs_available
945   *	Updates the reqs_available reference counts used for tracking the
946   *	number of free slots in the completion ring.  This can be called
947   *	from aio_complete() (to optimistically update reqs_available) or
948   *	from aio_get_req() (the we're out of events case).  It must be
949   *	called holding ctx->completion_lock.
950   */
951  static void refill_reqs_available(struct kioctx *ctx, unsigned head,
952                                    unsigned tail)
953  {
954  	unsigned events_in_ring, completed;
955  
956  	/* Clamp head since userland can write to it. */
957  	head %= ctx->nr_events;
958  	if (head <= tail)
959  		events_in_ring = tail - head;
960  	else
961  		events_in_ring = ctx->nr_events - (head - tail);
962  
963  	completed = ctx->completed_events;
964  	if (events_in_ring < completed)
965  		completed -= events_in_ring;
966  	else
967  		completed = 0;
968  
969  	if (!completed)
970  		return;
971  
972  	ctx->completed_events -= completed;
973  	put_reqs_available(ctx, completed);
974  }
975  
976  /* user_refill_reqs_available
977   *	Called to refill reqs_available when aio_get_req() encounters an
978   *	out of space in the completion ring.
979   */
980  static void user_refill_reqs_available(struct kioctx *ctx)
981  {
982  	spin_lock_irq(&ctx->completion_lock);
983  	if (ctx->completed_events) {
984  		struct aio_ring *ring;
985  		unsigned head;
986  
987  		/* Access of ring->head may race with aio_read_events_ring()
988  		 * here, but that's okay since whether we read the old version
989  		 * or the new version, and either will be valid.  The important
990  		 * part is that head cannot pass tail since we prevent
991  		 * aio_complete() from updating tail by holding
992  		 * ctx->completion_lock.  Even if head is invalid, the check
993  		 * against ctx->completed_events below will make sure we do the
994  		 * safe/right thing.
995  		 */
996  		ring = kmap_atomic(ctx->ring_pages[0]);
997  		head = ring->head;
998  		kunmap_atomic(ring);
999  
1000  		refill_reqs_available(ctx, head, ctx->tail);
1001  	}
1002  
1003  	spin_unlock_irq(&ctx->completion_lock);
1004  }
1005  
1006  /* aio_get_req
1007   *	Allocate a slot for an aio request.
1008   * Returns NULL if no requests are free.
1009   */
1010  static inline struct aio_kiocb *aio_get_req(struct kioctx *ctx)
1011  {
1012  	struct aio_kiocb *req;
1013  
1014  	if (!get_reqs_available(ctx)) {
1015  		user_refill_reqs_available(ctx);
1016  		if (!get_reqs_available(ctx))
1017  			return NULL;
1018  	}
1019  
1020  	req = kmem_cache_alloc(kiocb_cachep, GFP_KERNEL|__GFP_ZERO);
1021  	if (unlikely(!req))
1022  		goto out_put;
1023  
1024  	percpu_ref_get(&ctx->reqs);
1025  
1026  	req->ki_ctx = ctx;
1027  	return req;
1028  out_put:
1029  	put_reqs_available(ctx, 1);
1030  	return NULL;
1031  }
1032  
1033  static void kiocb_free(struct aio_kiocb *req)
1034  {
1035  	if (req->common.ki_filp)
1036  		fput(req->common.ki_filp);
1037  	if (req->ki_eventfd != NULL)
1038  		eventfd_ctx_put(req->ki_eventfd);
1039  	kmem_cache_free(kiocb_cachep, req);
1040  }
1041  
1042  static struct kioctx *lookup_ioctx(unsigned long ctx_id)
1043  {
1044  	struct aio_ring __user *ring  = (void __user *)ctx_id;
1045  	struct mm_struct *mm = current->mm;
1046  	struct kioctx *ctx, *ret = NULL;
1047  	struct kioctx_table *table;
1048  	unsigned id;
1049  
1050  	if (get_user(id, &ring->id))
1051  		return NULL;
1052  
1053  	rcu_read_lock();
1054  	table = rcu_dereference(mm->ioctx_table);
1055  
1056  	if (!table || id >= table->nr)
1057  		goto out;
1058  
1059  	ctx = table->table[id];
1060  	if (ctx && ctx->user_id == ctx_id) {
1061  		percpu_ref_get(&ctx->users);
1062  		ret = ctx;
1063  	}
1064  out:
1065  	rcu_read_unlock();
1066  	return ret;
1067  }
1068  
1069  /* aio_complete
1070   *	Called when the io request on the given iocb is complete.
1071   */
1072  static void aio_complete(struct kiocb *kiocb, long res, long res2)
1073  {
1074  	struct aio_kiocb *iocb = container_of(kiocb, struct aio_kiocb, common);
1075  	struct kioctx	*ctx = iocb->ki_ctx;
1076  	struct aio_ring	*ring;
1077  	struct io_event	*ev_page, *event;
1078  	unsigned tail, pos, head;
1079  	unsigned long	flags;
1080  
1081  	if (kiocb->ki_flags & IOCB_WRITE) {
1082  		struct file *file = kiocb->ki_filp;
1083  
1084  		/*
1085  		 * Tell lockdep we inherited freeze protection from submission
1086  		 * thread.
1087  		 */
1088  		__sb_writers_acquired(file_inode(file)->i_sb, SB_FREEZE_WRITE);
1089  		file_end_write(file);
1090  	}
1091  
1092  	/*
1093  	 * Special case handling for sync iocbs:
1094  	 *  - events go directly into the iocb for fast handling
1095  	 *  - the sync task with the iocb in its stack holds the single iocb
1096  	 *    ref, no other paths have a way to get another ref
1097  	 *  - the sync task helpfully left a reference to itself in the iocb
1098  	 */
1099  	BUG_ON(is_sync_kiocb(kiocb));
1100  
1101  	if (iocb->ki_list.next) {
1102  		unsigned long flags;
1103  
1104  		spin_lock_irqsave(&ctx->ctx_lock, flags);
1105  		list_del(&iocb->ki_list);
1106  		spin_unlock_irqrestore(&ctx->ctx_lock, flags);
1107  	}
1108  
1109  	/*
1110  	 * Add a completion event to the ring buffer. Must be done holding
1111  	 * ctx->completion_lock to prevent other code from messing with the tail
1112  	 * pointer since we might be called from irq context.
1113  	 */
1114  	spin_lock_irqsave(&ctx->completion_lock, flags);
1115  
1116  	tail = ctx->tail;
1117  	pos = tail + AIO_EVENTS_OFFSET;
1118  
1119  	if (++tail >= ctx->nr_events)
1120  		tail = 0;
1121  
1122  	ev_page = kmap_atomic(ctx->ring_pages[pos / AIO_EVENTS_PER_PAGE]);
1123  	event = ev_page + pos % AIO_EVENTS_PER_PAGE;
1124  
1125  	event->obj = (u64)(unsigned long)iocb->ki_user_iocb;
1126  	event->data = iocb->ki_user_data;
1127  	event->res = res;
1128  	event->res2 = res2;
1129  
1130  	kunmap_atomic(ev_page);
1131  	flush_dcache_page(ctx->ring_pages[pos / AIO_EVENTS_PER_PAGE]);
1132  
1133  	pr_debug("%p[%u]: %p: %p %Lx %lx %lx\n",
1134  		 ctx, tail, iocb, iocb->ki_user_iocb, iocb->ki_user_data,
1135  		 res, res2);
1136  
1137  	/* after flagging the request as done, we
1138  	 * must never even look at it again
1139  	 */
1140  	smp_wmb();	/* make event visible before updating tail */
1141  
1142  	ctx->tail = tail;
1143  
1144  	ring = kmap_atomic(ctx->ring_pages[0]);
1145  	head = ring->head;
1146  	ring->tail = tail;
1147  	kunmap_atomic(ring);
1148  	flush_dcache_page(ctx->ring_pages[0]);
1149  
1150  	ctx->completed_events++;
1151  	if (ctx->completed_events > 1)
1152  		refill_reqs_available(ctx, head, tail);
1153  	spin_unlock_irqrestore(&ctx->completion_lock, flags);
1154  
1155  	pr_debug("added to ring %p at [%u]\n", iocb, tail);
1156  
1157  	/*
1158  	 * Check if the user asked us to deliver the result through an
1159  	 * eventfd. The eventfd_signal() function is safe to be called
1160  	 * from IRQ context.
1161  	 */
1162  	if (iocb->ki_eventfd != NULL)
1163  		eventfd_signal(iocb->ki_eventfd, 1);
1164  
1165  	/* everything turned out well, dispose of the aiocb. */
1166  	kiocb_free(iocb);
1167  
1168  	/*
1169  	 * We have to order our ring_info tail store above and test
1170  	 * of the wait list below outside the wait lock.  This is
1171  	 * like in wake_up_bit() where clearing a bit has to be
1172  	 * ordered with the unlocked test.
1173  	 */
1174  	smp_mb();
1175  
1176  	if (waitqueue_active(&ctx->wait))
1177  		wake_up(&ctx->wait);
1178  
1179  	percpu_ref_put(&ctx->reqs);
1180  }
1181  
1182  /* aio_read_events_ring
1183   *	Pull an event off of the ioctx's event ring.  Returns the number of
1184   *	events fetched
1185   */
1186  static long aio_read_events_ring(struct kioctx *ctx,
1187  				 struct io_event __user *event, long nr)
1188  {
1189  	struct aio_ring *ring;
1190  	unsigned head, tail, pos;
1191  	long ret = 0;
1192  	int copy_ret;
1193  
1194  	/*
1195  	 * The mutex can block and wake us up and that will cause
1196  	 * wait_event_interruptible_hrtimeout() to schedule without sleeping
1197  	 * and repeat. This should be rare enough that it doesn't cause
1198  	 * peformance issues. See the comment in read_events() for more detail.
1199  	 */
1200  	sched_annotate_sleep();
1201  	mutex_lock(&ctx->ring_lock);
1202  
1203  	/* Access to ->ring_pages here is protected by ctx->ring_lock. */
1204  	ring = kmap_atomic(ctx->ring_pages[0]);
1205  	head = ring->head;
1206  	tail = ring->tail;
1207  	kunmap_atomic(ring);
1208  
1209  	/*
1210  	 * Ensure that once we've read the current tail pointer, that
1211  	 * we also see the events that were stored up to the tail.
1212  	 */
1213  	smp_rmb();
1214  
1215  	pr_debug("h%u t%u m%u\n", head, tail, ctx->nr_events);
1216  
1217  	if (head == tail)
1218  		goto out;
1219  
1220  	head %= ctx->nr_events;
1221  	tail %= ctx->nr_events;
1222  
1223  	while (ret < nr) {
1224  		long avail;
1225  		struct io_event *ev;
1226  		struct page *page;
1227  
1228  		avail = (head <= tail ?  tail : ctx->nr_events) - head;
1229  		if (head == tail)
1230  			break;
1231  
1232  		avail = min(avail, nr - ret);
1233  		avail = min_t(long, avail, AIO_EVENTS_PER_PAGE -
1234  			    ((head + AIO_EVENTS_OFFSET) % AIO_EVENTS_PER_PAGE));
1235  
1236  		pos = head + AIO_EVENTS_OFFSET;
1237  		page = ctx->ring_pages[pos / AIO_EVENTS_PER_PAGE];
1238  		pos %= AIO_EVENTS_PER_PAGE;
1239  
1240  		ev = kmap(page);
1241  		copy_ret = copy_to_user(event + ret, ev + pos,
1242  					sizeof(*ev) * avail);
1243  		kunmap(page);
1244  
1245  		if (unlikely(copy_ret)) {
1246  			ret = -EFAULT;
1247  			goto out;
1248  		}
1249  
1250  		ret += avail;
1251  		head += avail;
1252  		head %= ctx->nr_events;
1253  	}
1254  
1255  	ring = kmap_atomic(ctx->ring_pages[0]);
1256  	ring->head = head;
1257  	kunmap_atomic(ring);
1258  	flush_dcache_page(ctx->ring_pages[0]);
1259  
1260  	pr_debug("%li  h%u t%u\n", ret, head, tail);
1261  out:
1262  	mutex_unlock(&ctx->ring_lock);
1263  
1264  	return ret;
1265  }
1266  
1267  static bool aio_read_events(struct kioctx *ctx, long min_nr, long nr,
1268  			    struct io_event __user *event, long *i)
1269  {
1270  	long ret = aio_read_events_ring(ctx, event + *i, nr - *i);
1271  
1272  	if (ret > 0)
1273  		*i += ret;
1274  
1275  	if (unlikely(atomic_read(&ctx->dead)))
1276  		ret = -EINVAL;
1277  
1278  	if (!*i)
1279  		*i = ret;
1280  
1281  	return ret < 0 || *i >= min_nr;
1282  }
1283  
1284  static long read_events(struct kioctx *ctx, long min_nr, long nr,
1285  			struct io_event __user *event,
1286  			struct timespec __user *timeout)
1287  {
1288  	ktime_t until = { .tv64 = KTIME_MAX };
1289  	long ret = 0;
1290  
1291  	if (timeout) {
1292  		struct timespec	ts;
1293  
1294  		if (unlikely(copy_from_user(&ts, timeout, sizeof(ts))))
1295  			return -EFAULT;
1296  
1297  		until = timespec_to_ktime(ts);
1298  	}
1299  
1300  	/*
1301  	 * Note that aio_read_events() is being called as the conditional - i.e.
1302  	 * we're calling it after prepare_to_wait() has set task state to
1303  	 * TASK_INTERRUPTIBLE.
1304  	 *
1305  	 * But aio_read_events() can block, and if it blocks it's going to flip
1306  	 * the task state back to TASK_RUNNING.
1307  	 *
1308  	 * This should be ok, provided it doesn't flip the state back to
1309  	 * TASK_RUNNING and return 0 too much - that causes us to spin. That
1310  	 * will only happen if the mutex_lock() call blocks, and we then find
1311  	 * the ringbuffer empty. So in practice we should be ok, but it's
1312  	 * something to be aware of when touching this code.
1313  	 */
1314  	if (until.tv64 == 0)
1315  		aio_read_events(ctx, min_nr, nr, event, &ret);
1316  	else
1317  		wait_event_interruptible_hrtimeout(ctx->wait,
1318  				aio_read_events(ctx, min_nr, nr, event, &ret),
1319  				until);
1320  
1321  	if (!ret && signal_pending(current))
1322  		ret = -EINTR;
1323  
1324  	return ret;
1325  }
1326  
1327  /* sys_io_setup:
1328   *	Create an aio_context capable of receiving at least nr_events.
1329   *	ctxp must not point to an aio_context that already exists, and
1330   *	must be initialized to 0 prior to the call.  On successful
1331   *	creation of the aio_context, *ctxp is filled in with the resulting
1332   *	handle.  May fail with -EINVAL if *ctxp is not initialized,
1333   *	if the specified nr_events exceeds internal limits.  May fail
1334   *	with -EAGAIN if the specified nr_events exceeds the user's limit
1335   *	of available events.  May fail with -ENOMEM if insufficient kernel
1336   *	resources are available.  May fail with -EFAULT if an invalid
1337   *	pointer is passed for ctxp.  Will fail with -ENOSYS if not
1338   *	implemented.
1339   */
1340  SYSCALL_DEFINE2(io_setup, unsigned, nr_events, aio_context_t __user *, ctxp)
1341  {
1342  	struct kioctx *ioctx = NULL;
1343  	unsigned long ctx;
1344  	long ret;
1345  
1346  	ret = get_user(ctx, ctxp);
1347  	if (unlikely(ret))
1348  		goto out;
1349  
1350  	ret = -EINVAL;
1351  	if (unlikely(ctx || nr_events == 0)) {
1352  		pr_debug("EINVAL: ctx %lu nr_events %u\n",
1353  		         ctx, nr_events);
1354  		goto out;
1355  	}
1356  
1357  	ioctx = ioctx_alloc(nr_events);
1358  	ret = PTR_ERR(ioctx);
1359  	if (!IS_ERR(ioctx)) {
1360  		ret = put_user(ioctx->user_id, ctxp);
1361  		if (ret)
1362  			kill_ioctx(current->mm, ioctx, NULL);
1363  		percpu_ref_put(&ioctx->users);
1364  	}
1365  
1366  out:
1367  	return ret;
1368  }
1369  
1370  /* sys_io_destroy:
1371   *	Destroy the aio_context specified.  May cancel any outstanding
1372   *	AIOs and block on completion.  Will fail with -ENOSYS if not
1373   *	implemented.  May fail with -EINVAL if the context pointed to
1374   *	is invalid.
1375   */
1376  SYSCALL_DEFINE1(io_destroy, aio_context_t, ctx)
1377  {
1378  	struct kioctx *ioctx = lookup_ioctx(ctx);
1379  	if (likely(NULL != ioctx)) {
1380  		struct ctx_rq_wait wait;
1381  		int ret;
1382  
1383  		init_completion(&wait.comp);
1384  		atomic_set(&wait.count, 1);
1385  
1386  		/* Pass requests_done to kill_ioctx() where it can be set
1387  		 * in a thread-safe way. If we try to set it here then we have
1388  		 * a race condition if two io_destroy() called simultaneously.
1389  		 */
1390  		ret = kill_ioctx(current->mm, ioctx, &wait);
1391  		percpu_ref_put(&ioctx->users);
1392  
1393  		/* Wait until all IO for the context are done. Otherwise kernel
1394  		 * keep using user-space buffers even if user thinks the context
1395  		 * is destroyed.
1396  		 */
1397  		if (!ret)
1398  			wait_for_completion(&wait.comp);
1399  
1400  		return ret;
1401  	}
1402  	pr_debug("EINVAL: invalid context id\n");
1403  	return -EINVAL;
1404  }
1405  
1406  static int aio_setup_rw(int rw, struct iocb *iocb, struct iovec **iovec,
1407  		bool vectored, bool compat, struct iov_iter *iter)
1408  {
1409  	void __user *buf = (void __user *)(uintptr_t)iocb->aio_buf;
1410  	size_t len = iocb->aio_nbytes;
1411  
1412  	if (!vectored) {
1413  		ssize_t ret = import_single_range(rw, buf, len, *iovec, iter);
1414  		*iovec = NULL;
1415  		return ret;
1416  	}
1417  #ifdef CONFIG_COMPAT
1418  	if (compat)
1419  		return compat_import_iovec(rw, buf, len, UIO_FASTIOV, iovec,
1420  				iter);
1421  #endif
1422  	return import_iovec(rw, buf, len, UIO_FASTIOV, iovec, iter);
1423  }
1424  
1425  static inline ssize_t aio_ret(struct kiocb *req, ssize_t ret)
1426  {
1427  	switch (ret) {
1428  	case -EIOCBQUEUED:
1429  		return ret;
1430  	case -ERESTARTSYS:
1431  	case -ERESTARTNOINTR:
1432  	case -ERESTARTNOHAND:
1433  	case -ERESTART_RESTARTBLOCK:
1434  		/*
1435  		 * There's no easy way to restart the syscall since other AIO's
1436  		 * may be already running. Just fail this IO with EINTR.
1437  		 */
1438  		ret = -EINTR;
1439  		/*FALLTHRU*/
1440  	default:
1441  		aio_complete(req, ret, 0);
1442  		return 0;
1443  	}
1444  }
1445  
1446  static ssize_t aio_read(struct kiocb *req, struct iocb *iocb, bool vectored,
1447  		bool compat)
1448  {
1449  	struct file *file = req->ki_filp;
1450  	struct iovec inline_vecs[UIO_FASTIOV], *iovec = inline_vecs;
1451  	struct iov_iter iter;
1452  	ssize_t ret;
1453  
1454  	if (unlikely(!(file->f_mode & FMODE_READ)))
1455  		return -EBADF;
1456  	if (unlikely(!file->f_op->read_iter))
1457  		return -EINVAL;
1458  
1459  	ret = aio_setup_rw(READ, iocb, &iovec, vectored, compat, &iter);
1460  	if (ret)
1461  		return ret;
1462  	ret = rw_verify_area(READ, file, &req->ki_pos, iov_iter_count(&iter));
1463  	if (!ret)
1464  		ret = aio_ret(req, file->f_op->read_iter(req, &iter));
1465  	kfree(iovec);
1466  	return ret;
1467  }
1468  
1469  static ssize_t aio_write(struct kiocb *req, struct iocb *iocb, bool vectored,
1470  		bool compat)
1471  {
1472  	struct file *file = req->ki_filp;
1473  	struct iovec inline_vecs[UIO_FASTIOV], *iovec = inline_vecs;
1474  	struct iov_iter iter;
1475  	ssize_t ret;
1476  
1477  	if (unlikely(!(file->f_mode & FMODE_WRITE)))
1478  		return -EBADF;
1479  	if (unlikely(!file->f_op->write_iter))
1480  		return -EINVAL;
1481  
1482  	ret = aio_setup_rw(WRITE, iocb, &iovec, vectored, compat, &iter);
1483  	if (ret)
1484  		return ret;
1485  	ret = rw_verify_area(WRITE, file, &req->ki_pos, iov_iter_count(&iter));
1486  	if (!ret) {
1487  		req->ki_flags |= IOCB_WRITE;
1488  		file_start_write(file);
1489  		ret = aio_ret(req, file->f_op->write_iter(req, &iter));
1490  		/*
1491  		 * We release freeze protection in aio_complete().  Fool lockdep
1492  		 * by telling it the lock got released so that it doesn't
1493  		 * complain about held lock when we return to userspace.
1494  		 */
1495  		__sb_writers_release(file_inode(file)->i_sb, SB_FREEZE_WRITE);
1496  	}
1497  	kfree(iovec);
1498  	return ret;
1499  }
1500  
1501  static int io_submit_one(struct kioctx *ctx, struct iocb __user *user_iocb,
1502  			 struct iocb *iocb, bool compat)
1503  {
1504  	struct aio_kiocb *req;
1505  	struct file *file;
1506  	ssize_t ret;
1507  
1508  	/* enforce forwards compatibility on users */
1509  	if (unlikely(iocb->aio_reserved1 || iocb->aio_reserved2)) {
1510  		pr_debug("EINVAL: reserve field set\n");
1511  		return -EINVAL;
1512  	}
1513  
1514  	/* prevent overflows */
1515  	if (unlikely(
1516  	    (iocb->aio_buf != (unsigned long)iocb->aio_buf) ||
1517  	    (iocb->aio_nbytes != (size_t)iocb->aio_nbytes) ||
1518  	    ((ssize_t)iocb->aio_nbytes < 0)
1519  	   )) {
1520  		pr_debug("EINVAL: overflow check\n");
1521  		return -EINVAL;
1522  	}
1523  
1524  	req = aio_get_req(ctx);
1525  	if (unlikely(!req))
1526  		return -EAGAIN;
1527  
1528  	req->common.ki_filp = file = fget(iocb->aio_fildes);
1529  	if (unlikely(!req->common.ki_filp)) {
1530  		ret = -EBADF;
1531  		goto out_put_req;
1532  	}
1533  	req->common.ki_pos = iocb->aio_offset;
1534  	req->common.ki_complete = aio_complete;
1535  	req->common.ki_flags = iocb_flags(req->common.ki_filp);
1536  
1537  	if (iocb->aio_flags & IOCB_FLAG_RESFD) {
1538  		/*
1539  		 * If the IOCB_FLAG_RESFD flag of aio_flags is set, get an
1540  		 * instance of the file* now. The file descriptor must be
1541  		 * an eventfd() fd, and will be signaled for each completed
1542  		 * event using the eventfd_signal() function.
1543  		 */
1544  		req->ki_eventfd = eventfd_ctx_fdget((int) iocb->aio_resfd);
1545  		if (IS_ERR(req->ki_eventfd)) {
1546  			ret = PTR_ERR(req->ki_eventfd);
1547  			req->ki_eventfd = NULL;
1548  			goto out_put_req;
1549  		}
1550  
1551  		req->common.ki_flags |= IOCB_EVENTFD;
1552  	}
1553  
1554  	ret = put_user(KIOCB_KEY, &user_iocb->aio_key);
1555  	if (unlikely(ret)) {
1556  		pr_debug("EFAULT: aio_key\n");
1557  		goto out_put_req;
1558  	}
1559  
1560  	req->ki_user_iocb = user_iocb;
1561  	req->ki_user_data = iocb->aio_data;
1562  
1563  	get_file(file);
1564  	switch (iocb->aio_lio_opcode) {
1565  	case IOCB_CMD_PREAD:
1566  		ret = aio_read(&req->common, iocb, false, compat);
1567  		break;
1568  	case IOCB_CMD_PWRITE:
1569  		ret = aio_write(&req->common, iocb, false, compat);
1570  		break;
1571  	case IOCB_CMD_PREADV:
1572  		ret = aio_read(&req->common, iocb, true, compat);
1573  		break;
1574  	case IOCB_CMD_PWRITEV:
1575  		ret = aio_write(&req->common, iocb, true, compat);
1576  		break;
1577  	default:
1578  		pr_debug("invalid aio operation %d\n", iocb->aio_lio_opcode);
1579  		ret = -EINVAL;
1580  		break;
1581  	}
1582  	fput(file);
1583  
1584  	if (ret && ret != -EIOCBQUEUED)
1585  		goto out_put_req;
1586  	return 0;
1587  out_put_req:
1588  	put_reqs_available(ctx, 1);
1589  	percpu_ref_put(&ctx->reqs);
1590  	kiocb_free(req);
1591  	return ret;
1592  }
1593  
1594  long do_io_submit(aio_context_t ctx_id, long nr,
1595  		  struct iocb __user *__user *iocbpp, bool compat)
1596  {
1597  	struct kioctx *ctx;
1598  	long ret = 0;
1599  	int i = 0;
1600  	struct blk_plug plug;
1601  
1602  	if (unlikely(nr < 0))
1603  		return -EINVAL;
1604  
1605  	if (unlikely(nr > LONG_MAX/sizeof(*iocbpp)))
1606  		nr = LONG_MAX/sizeof(*iocbpp);
1607  
1608  	if (unlikely(!access_ok(VERIFY_READ, iocbpp, (nr*sizeof(*iocbpp)))))
1609  		return -EFAULT;
1610  
1611  	ctx = lookup_ioctx(ctx_id);
1612  	if (unlikely(!ctx)) {
1613  		pr_debug("EINVAL: invalid context id\n");
1614  		return -EINVAL;
1615  	}
1616  
1617  	blk_start_plug(&plug);
1618  
1619  	/*
1620  	 * AKPM: should this return a partial result if some of the IOs were
1621  	 * successfully submitted?
1622  	 */
1623  	for (i=0; i<nr; i++) {
1624  		struct iocb __user *user_iocb;
1625  		struct iocb tmp;
1626  
1627  		if (unlikely(__get_user(user_iocb, iocbpp + i))) {
1628  			ret = -EFAULT;
1629  			break;
1630  		}
1631  
1632  		if (unlikely(copy_from_user(&tmp, user_iocb, sizeof(tmp)))) {
1633  			ret = -EFAULT;
1634  			break;
1635  		}
1636  
1637  		ret = io_submit_one(ctx, user_iocb, &tmp, compat);
1638  		if (ret)
1639  			break;
1640  	}
1641  	blk_finish_plug(&plug);
1642  
1643  	percpu_ref_put(&ctx->users);
1644  	return i ? i : ret;
1645  }
1646  
1647  /* sys_io_submit:
1648   *	Queue the nr iocbs pointed to by iocbpp for processing.  Returns
1649   *	the number of iocbs queued.  May return -EINVAL if the aio_context
1650   *	specified by ctx_id is invalid, if nr is < 0, if the iocb at
1651   *	*iocbpp[0] is not properly initialized, if the operation specified
1652   *	is invalid for the file descriptor in the iocb.  May fail with
1653   *	-EFAULT if any of the data structures point to invalid data.  May
1654   *	fail with -EBADF if the file descriptor specified in the first
1655   *	iocb is invalid.  May fail with -EAGAIN if insufficient resources
1656   *	are available to queue any iocbs.  Will return 0 if nr is 0.  Will
1657   *	fail with -ENOSYS if not implemented.
1658   */
1659  SYSCALL_DEFINE3(io_submit, aio_context_t, ctx_id, long, nr,
1660  		struct iocb __user * __user *, iocbpp)
1661  {
1662  	return do_io_submit(ctx_id, nr, iocbpp, 0);
1663  }
1664  
1665  /* lookup_kiocb
1666   *	Finds a given iocb for cancellation.
1667   */
1668  static struct aio_kiocb *
1669  lookup_kiocb(struct kioctx *ctx, struct iocb __user *iocb, u32 key)
1670  {
1671  	struct aio_kiocb *kiocb;
1672  
1673  	assert_spin_locked(&ctx->ctx_lock);
1674  
1675  	if (key != KIOCB_KEY)
1676  		return NULL;
1677  
1678  	/* TODO: use a hash or array, this sucks. */
1679  	list_for_each_entry(kiocb, &ctx->active_reqs, ki_list) {
1680  		if (kiocb->ki_user_iocb == iocb)
1681  			return kiocb;
1682  	}
1683  	return NULL;
1684  }
1685  
1686  /* sys_io_cancel:
1687   *	Attempts to cancel an iocb previously passed to io_submit.  If
1688   *	the operation is successfully cancelled, the resulting event is
1689   *	copied into the memory pointed to by result without being placed
1690   *	into the completion queue and 0 is returned.  May fail with
1691   *	-EFAULT if any of the data structures pointed to are invalid.
1692   *	May fail with -EINVAL if aio_context specified by ctx_id is
1693   *	invalid.  May fail with -EAGAIN if the iocb specified was not
1694   *	cancelled.  Will fail with -ENOSYS if not implemented.
1695   */
1696  SYSCALL_DEFINE3(io_cancel, aio_context_t, ctx_id, struct iocb __user *, iocb,
1697  		struct io_event __user *, result)
1698  {
1699  	struct kioctx *ctx;
1700  	struct aio_kiocb *kiocb;
1701  	u32 key;
1702  	int ret;
1703  
1704  	ret = get_user(key, &iocb->aio_key);
1705  	if (unlikely(ret))
1706  		return -EFAULT;
1707  
1708  	ctx = lookup_ioctx(ctx_id);
1709  	if (unlikely(!ctx))
1710  		return -EINVAL;
1711  
1712  	spin_lock_irq(&ctx->ctx_lock);
1713  
1714  	kiocb = lookup_kiocb(ctx, iocb, key);
1715  	if (kiocb)
1716  		ret = kiocb_cancel(kiocb);
1717  	else
1718  		ret = -EINVAL;
1719  
1720  	spin_unlock_irq(&ctx->ctx_lock);
1721  
1722  	if (!ret) {
1723  		/*
1724  		 * The result argument is no longer used - the io_event is
1725  		 * always delivered via the ring buffer. -EINPROGRESS indicates
1726  		 * cancellation is progress:
1727  		 */
1728  		ret = -EINPROGRESS;
1729  	}
1730  
1731  	percpu_ref_put(&ctx->users);
1732  
1733  	return ret;
1734  }
1735  
1736  /* io_getevents:
1737   *	Attempts to read at least min_nr events and up to nr events from
1738   *	the completion queue for the aio_context specified by ctx_id. If
1739   *	it succeeds, the number of read events is returned. May fail with
1740   *	-EINVAL if ctx_id is invalid, if min_nr is out of range, if nr is
1741   *	out of range, if timeout is out of range.  May fail with -EFAULT
1742   *	if any of the memory specified is invalid.  May return 0 or
1743   *	< min_nr if the timeout specified by timeout has elapsed
1744   *	before sufficient events are available, where timeout == NULL
1745   *	specifies an infinite timeout. Note that the timeout pointed to by
1746   *	timeout is relative.  Will fail with -ENOSYS if not implemented.
1747   */
1748  SYSCALL_DEFINE5(io_getevents, aio_context_t, ctx_id,
1749  		long, min_nr,
1750  		long, nr,
1751  		struct io_event __user *, events,
1752  		struct timespec __user *, timeout)
1753  {
1754  	struct kioctx *ioctx = lookup_ioctx(ctx_id);
1755  	long ret = -EINVAL;
1756  
1757  	if (likely(ioctx)) {
1758  		if (likely(min_nr <= nr && min_nr >= 0))
1759  			ret = read_events(ioctx, min_nr, nr, events, timeout);
1760  		percpu_ref_put(&ioctx->users);
1761  	}
1762  	return ret;
1763  }
1764