xref: /openbmc/linux/fs/nfs/direct.c (revision 3e9e0ca3f19e911ce13c2e6c9858fcb41a37496c)
1 /*
2  * linux/fs/nfs/direct.c
3  *
4  * Copyright (C) 2003 by Chuck Lever <cel@netapp.com>
5  *
6  * High-performance uncached I/O for the Linux NFS client
7  *
8  * There are important applications whose performance or correctness
9  * depends on uncached access to file data.  Database clusters
10  * (multiple copies of the same instance running on separate hosts)
11  * implement their own cache coherency protocol that subsumes file
12  * system cache protocols.  Applications that process datasets
13  * considerably larger than the client's memory do not always benefit
14  * from a local cache.  A streaming video server, for instance, has no
15  * need to cache the contents of a file.
16  *
17  * When an application requests uncached I/O, all read and write requests
18  * are made directly to the server; data stored or fetched via these
19  * requests is not cached in the Linux page cache.  The client does not
20  * correct unaligned requests from applications.  All requested bytes are
21  * held on permanent storage before a direct write system call returns to
22  * an application.
23  *
24  * Solaris implements an uncached I/O facility called directio() that
25  * is used for backups and sequential I/O to very large files.  Solaris
26  * also supports uncaching whole NFS partitions with "-o forcedirectio,"
27  * an undocumented mount option.
28  *
29  * Designed by Jeff Kimmel, Chuck Lever, and Trond Myklebust, with
30  * help from Andrew Morton.
31  *
32  * 18 Dec 2001	Initial implementation for 2.4  --cel
33  * 08 Jul 2002	Version for 2.4.19, with bug fixes --trondmy
34  * 08 Jun 2003	Port to 2.5 APIs  --cel
35  * 31 Mar 2004	Handle direct I/O without VFS support  --cel
36  * 15 Sep 2004	Parallel async reads  --cel
37  * 04 May 2005	support O_DIRECT with aio  --cel
38  *
39  */
40 
41 #include <linux/errno.h>
42 #include <linux/sched.h>
43 #include <linux/kernel.h>
44 #include <linux/file.h>
45 #include <linux/pagemap.h>
46 #include <linux/kref.h>
47 #include <linux/slab.h>
48 #include <linux/task_io_accounting_ops.h>
49 
50 #include <linux/nfs_fs.h>
51 #include <linux/nfs_page.h>
52 #include <linux/sunrpc/clnt.h>
53 
54 #include <asm/uaccess.h>
55 #include <linux/atomic.h>
56 
57 #include "internal.h"
58 #include "iostat.h"
59 #include "pnfs.h"
60 
61 #define NFSDBG_FACILITY		NFSDBG_VFS
62 
63 static struct kmem_cache *nfs_direct_cachep;
64 
65 /*
66  * This represents a set of asynchronous requests that we're waiting on
67  */
68 struct nfs_direct_req {
69 	struct kref		kref;		/* release manager */
70 
71 	/* I/O parameters */
72 	struct nfs_open_context	*ctx;		/* file open context info */
73 	struct nfs_lock_context *l_ctx;		/* Lock context info */
74 	struct kiocb *		iocb;		/* controlling i/o request */
75 	struct inode *		inode;		/* target file of i/o */
76 
77 	/* completion state */
78 	atomic_t		io_count;	/* i/os we're waiting for */
79 	spinlock_t		lock;		/* protect completion state */
80 	ssize_t			count,		/* bytes actually processed */
81 				error;		/* any reported error */
82 	struct completion	completion;	/* wait for i/o completion */
83 
84 	/* commit state */
85 	struct nfs_mds_commit_info mds_cinfo;	/* Storage for cinfo */
86 	struct pnfs_ds_commit_info ds_cinfo;	/* Storage for cinfo */
87 	struct work_struct	work;
88 	int			flags;
89 #define NFS_ODIRECT_DO_COMMIT		(1)	/* an unstable reply was received */
90 #define NFS_ODIRECT_RESCHED_WRITES	(2)	/* write verification failed */
91 	struct nfs_writeverf	verf;		/* unstable write verifier */
92 };
93 
94 static const struct nfs_pgio_completion_ops nfs_direct_write_completion_ops;
95 static const struct nfs_commit_completion_ops nfs_direct_commit_completion_ops;
96 static void nfs_direct_write_complete(struct nfs_direct_req *dreq, struct inode *inode);
97 static void nfs_direct_write_schedule_work(struct work_struct *work);
98 
99 static inline void get_dreq(struct nfs_direct_req *dreq)
100 {
101 	atomic_inc(&dreq->io_count);
102 }
103 
104 static inline int put_dreq(struct nfs_direct_req *dreq)
105 {
106 	return atomic_dec_and_test(&dreq->io_count);
107 }
108 
109 /**
110  * nfs_direct_IO - NFS address space operation for direct I/O
111  * @rw: direction (read or write)
112  * @iocb: target I/O control block
113  * @iov: array of vectors that define I/O buffer
114  * @pos: offset in file to begin the operation
115  * @nr_segs: size of iovec array
116  *
117  * The presence of this routine in the address space ops vector means
118  * the NFS client supports direct I/O.  However, we shunt off direct
119  * read and write requests before the VFS gets them, so this method
120  * should never be called.
121  */
122 ssize_t nfs_direct_IO(int rw, struct kiocb *iocb, const struct iovec *iov, loff_t pos, unsigned long nr_segs)
123 {
124 	dprintk("NFS: nfs_direct_IO (%s) off/no(%Ld/%lu) EINVAL\n",
125 			iocb->ki_filp->f_path.dentry->d_name.name,
126 			(long long) pos, nr_segs);
127 
128 	return -EINVAL;
129 }
130 
131 static void nfs_direct_release_pages(struct page **pages, unsigned int npages)
132 {
133 	unsigned int i;
134 	for (i = 0; i < npages; i++)
135 		page_cache_release(pages[i]);
136 }
137 
138 void nfs_init_cinfo_from_dreq(struct nfs_commit_info *cinfo,
139 			      struct nfs_direct_req *dreq)
140 {
141 	cinfo->lock = &dreq->lock;
142 	cinfo->mds = &dreq->mds_cinfo;
143 	cinfo->ds = &dreq->ds_cinfo;
144 	cinfo->dreq = dreq;
145 	cinfo->completion_ops = &nfs_direct_commit_completion_ops;
146 }
147 
148 static inline struct nfs_direct_req *nfs_direct_req_alloc(void)
149 {
150 	struct nfs_direct_req *dreq;
151 
152 	dreq = kmem_cache_alloc(nfs_direct_cachep, GFP_KERNEL);
153 	if (!dreq)
154 		return NULL;
155 
156 	kref_init(&dreq->kref);
157 	kref_get(&dreq->kref);
158 	init_completion(&dreq->completion);
159 	dreq->mds_cinfo.ncommit = 0;
160 	atomic_set(&dreq->mds_cinfo.rpcs_out, 0);
161 	INIT_LIST_HEAD(&dreq->mds_cinfo.list);
162 	INIT_WORK(&dreq->work, nfs_direct_write_schedule_work);
163 	memset(&dreq->ds_cinfo, 0, sizeof(dreq->ds_cinfo));
164 	dreq->iocb = NULL;
165 	dreq->ctx = NULL;
166 	dreq->l_ctx = NULL;
167 	spin_lock_init(&dreq->lock);
168 	atomic_set(&dreq->io_count, 0);
169 	dreq->count = 0;
170 	dreq->error = 0;
171 	dreq->flags = 0;
172 
173 	return dreq;
174 }
175 
176 static void nfs_direct_req_free(struct kref *kref)
177 {
178 	struct nfs_direct_req *dreq = container_of(kref, struct nfs_direct_req, kref);
179 
180 	if (dreq->l_ctx != NULL)
181 		nfs_put_lock_context(dreq->l_ctx);
182 	if (dreq->ctx != NULL)
183 		put_nfs_open_context(dreq->ctx);
184 	kmem_cache_free(nfs_direct_cachep, dreq);
185 }
186 
187 static void nfs_direct_req_release(struct nfs_direct_req *dreq)
188 {
189 	kref_put(&dreq->kref, nfs_direct_req_free);
190 }
191 
192 /*
193  * Collects and returns the final error value/byte-count.
194  */
195 static ssize_t nfs_direct_wait(struct nfs_direct_req *dreq)
196 {
197 	ssize_t result = -EIOCBQUEUED;
198 
199 	/* Async requests don't wait here */
200 	if (dreq->iocb)
201 		goto out;
202 
203 	result = wait_for_completion_killable(&dreq->completion);
204 
205 	if (!result)
206 		result = dreq->error;
207 	if (!result)
208 		result = dreq->count;
209 
210 out:
211 	return (ssize_t) result;
212 }
213 
214 /*
215  * Synchronous I/O uses a stack-allocated iocb.  Thus we can't trust
216  * the iocb is still valid here if this is a synchronous request.
217  */
218 static void nfs_direct_complete(struct nfs_direct_req *dreq)
219 {
220 	if (dreq->iocb) {
221 		long res = (long) dreq->error;
222 		if (!res)
223 			res = (long) dreq->count;
224 		aio_complete(dreq->iocb, res, 0);
225 	}
226 	complete_all(&dreq->completion);
227 
228 	nfs_direct_req_release(dreq);
229 }
230 
231 void nfs_direct_readpage_release(struct nfs_page *req)
232 {
233 	dprintk("NFS: direct read done (%s/%lld %d@%lld)\n",
234 		req->wb_context->dentry->d_inode->i_sb->s_id,
235 		(long long)NFS_FILEID(req->wb_context->dentry->d_inode),
236 		req->wb_bytes,
237 		(long long)req_offset(req));
238 	nfs_release_request(req);
239 }
240 
241 static void nfs_direct_read_completion(struct nfs_pgio_header *hdr)
242 {
243 	unsigned long bytes = 0;
244 	struct nfs_direct_req *dreq = hdr->dreq;
245 
246 	if (test_bit(NFS_IOHDR_REDO, &hdr->flags))
247 		goto out_put;
248 
249 	spin_lock(&dreq->lock);
250 	if (test_bit(NFS_IOHDR_ERROR, &hdr->flags) && (hdr->good_bytes == 0))
251 		dreq->error = hdr->error;
252 	else
253 		dreq->count += hdr->good_bytes;
254 	spin_unlock(&dreq->lock);
255 
256 	if (!test_bit(NFS_IOHDR_ERROR, &hdr->flags)) {
257 		while (!list_empty(&hdr->pages)) {
258 			struct nfs_page *req = nfs_list_entry(hdr->pages.next);
259 			struct page *page = req->wb_page;
260 
261 			if (test_bit(NFS_IOHDR_EOF, &hdr->flags)) {
262 				if (bytes > hdr->good_bytes)
263 					zero_user(page, 0, PAGE_SIZE);
264 				else if (hdr->good_bytes - bytes < PAGE_SIZE)
265 					zero_user_segment(page,
266 						hdr->good_bytes & ~PAGE_MASK,
267 						PAGE_SIZE);
268 			}
269 			bytes += req->wb_bytes;
270 			nfs_list_remove_request(req);
271 			nfs_direct_readpage_release(req);
272 			if (!PageCompound(page))
273 				set_page_dirty(page);
274 			page_cache_release(page);
275 		}
276 	} else {
277 		while (!list_empty(&hdr->pages)) {
278 			struct nfs_page *req = nfs_list_entry(hdr->pages.next);
279 
280 			if (bytes < hdr->good_bytes)
281 				if (!PageCompound(req->wb_page))
282 					set_page_dirty(req->wb_page);
283 			bytes += req->wb_bytes;
284 			page_cache_release(req->wb_page);
285 			nfs_list_remove_request(req);
286 			nfs_direct_readpage_release(req);
287 		}
288 	}
289 out_put:
290 	if (put_dreq(dreq))
291 		nfs_direct_complete(dreq);
292 	hdr->release(hdr);
293 }
294 
295 static void nfs_read_sync_pgio_error(struct list_head *head)
296 {
297 	struct nfs_page *req;
298 
299 	while (!list_empty(head)) {
300 		req = nfs_list_entry(head->next);
301 		nfs_list_remove_request(req);
302 		nfs_release_request(req);
303 	}
304 }
305 
306 static void nfs_direct_pgio_init(struct nfs_pgio_header *hdr)
307 {
308 	get_dreq(hdr->dreq);
309 }
310 
311 static const struct nfs_pgio_completion_ops nfs_direct_read_completion_ops = {
312 	.error_cleanup = nfs_read_sync_pgio_error,
313 	.init_hdr = nfs_direct_pgio_init,
314 	.completion = nfs_direct_read_completion,
315 };
316 
317 /*
318  * For each rsize'd chunk of the user's buffer, dispatch an NFS READ
319  * operation.  If nfs_readdata_alloc() or get_user_pages() fails,
320  * bail and stop sending more reads.  Read length accounting is
321  * handled automatically by nfs_direct_read_result().  Otherwise, if
322  * no requests have been sent, just return an error.
323  */
324 static ssize_t nfs_direct_read_schedule_segment(struct nfs_pageio_descriptor *desc,
325 						const struct iovec *iov,
326 						loff_t pos)
327 {
328 	struct nfs_direct_req *dreq = desc->pg_dreq;
329 	struct nfs_open_context *ctx = dreq->ctx;
330 	struct inode *inode = ctx->dentry->d_inode;
331 	unsigned long user_addr = (unsigned long)iov->iov_base;
332 	size_t count = iov->iov_len;
333 	size_t rsize = NFS_SERVER(inode)->rsize;
334 	unsigned int pgbase;
335 	int result;
336 	ssize_t started = 0;
337 	struct page **pagevec = NULL;
338 	unsigned int npages;
339 
340 	do {
341 		size_t bytes;
342 		int i;
343 
344 		pgbase = user_addr & ~PAGE_MASK;
345 		bytes = min(max(rsize, PAGE_SIZE), count);
346 
347 		result = -ENOMEM;
348 		npages = nfs_page_array_len(pgbase, bytes);
349 		if (!pagevec)
350 			pagevec = kmalloc(npages * sizeof(struct page *),
351 					  GFP_KERNEL);
352 		if (!pagevec)
353 			break;
354 		down_read(&current->mm->mmap_sem);
355 		result = get_user_pages(current, current->mm, user_addr,
356 					npages, 1, 0, pagevec, NULL);
357 		up_read(&current->mm->mmap_sem);
358 		if (result < 0)
359 			break;
360 		if ((unsigned)result < npages) {
361 			bytes = result * PAGE_SIZE;
362 			if (bytes <= pgbase) {
363 				nfs_direct_release_pages(pagevec, result);
364 				break;
365 			}
366 			bytes -= pgbase;
367 			npages = result;
368 		}
369 
370 		for (i = 0; i < npages; i++) {
371 			struct nfs_page *req;
372 			unsigned int req_len = min(bytes, PAGE_SIZE - pgbase);
373 			/* XXX do we need to do the eof zeroing found in async_filler? */
374 			req = nfs_create_request(dreq->ctx, dreq->inode,
375 						 pagevec[i],
376 						 pgbase, req_len);
377 			if (IS_ERR(req)) {
378 				nfs_direct_release_pages(pagevec + i,
379 							 npages - i);
380 				result = PTR_ERR(req);
381 				break;
382 			}
383 			req->wb_index = pos >> PAGE_SHIFT;
384 			req->wb_offset = pos & ~PAGE_MASK;
385 			if (!nfs_pageio_add_request(desc, req)) {
386 				result = desc->pg_error;
387 				nfs_release_request(req);
388 				nfs_direct_release_pages(pagevec + i,
389 							 npages - i);
390 				break;
391 			}
392 			pgbase = 0;
393 			bytes -= req_len;
394 			started += req_len;
395 			user_addr += req_len;
396 			pos += req_len;
397 			count -= req_len;
398 		}
399 	} while (count != 0 && result >= 0);
400 
401 	kfree(pagevec);
402 
403 	if (started)
404 		return started;
405 	return result < 0 ? (ssize_t) result : -EFAULT;
406 }
407 
408 static ssize_t nfs_direct_read_schedule_iovec(struct nfs_direct_req *dreq,
409 					      const struct iovec *iov,
410 					      unsigned long nr_segs,
411 					      loff_t pos)
412 {
413 	struct nfs_pageio_descriptor desc;
414 	ssize_t result = -EINVAL;
415 	size_t requested_bytes = 0;
416 	unsigned long seg;
417 
418 	nfs_pageio_init_read(&desc, dreq->inode,
419 			     &nfs_direct_read_completion_ops);
420 	get_dreq(dreq);
421 	desc.pg_dreq = dreq;
422 
423 	for (seg = 0; seg < nr_segs; seg++) {
424 		const struct iovec *vec = &iov[seg];
425 		result = nfs_direct_read_schedule_segment(&desc, vec, pos);
426 		if (result < 0)
427 			break;
428 		requested_bytes += result;
429 		if ((size_t)result < vec->iov_len)
430 			break;
431 		pos += vec->iov_len;
432 	}
433 
434 	nfs_pageio_complete(&desc);
435 
436 	/*
437 	 * If no bytes were started, return the error, and let the
438 	 * generic layer handle the completion.
439 	 */
440 	if (requested_bytes == 0) {
441 		nfs_direct_req_release(dreq);
442 		return result < 0 ? result : -EIO;
443 	}
444 
445 	if (put_dreq(dreq))
446 		nfs_direct_complete(dreq);
447 	return 0;
448 }
449 
450 static ssize_t nfs_direct_read(struct kiocb *iocb, const struct iovec *iov,
451 			       unsigned long nr_segs, loff_t pos)
452 {
453 	ssize_t result = -ENOMEM;
454 	struct inode *inode = iocb->ki_filp->f_mapping->host;
455 	struct nfs_direct_req *dreq;
456 
457 	dreq = nfs_direct_req_alloc();
458 	if (dreq == NULL)
459 		goto out;
460 
461 	dreq->inode = inode;
462 	dreq->ctx = get_nfs_open_context(nfs_file_open_context(iocb->ki_filp));
463 	dreq->l_ctx = nfs_get_lock_context(dreq->ctx);
464 	if (dreq->l_ctx == NULL)
465 		goto out_release;
466 	if (!is_sync_kiocb(iocb))
467 		dreq->iocb = iocb;
468 
469 	result = nfs_direct_read_schedule_iovec(dreq, iov, nr_segs, pos);
470 	if (!result)
471 		result = nfs_direct_wait(dreq);
472 out_release:
473 	nfs_direct_req_release(dreq);
474 out:
475 	return result;
476 }
477 
478 #if defined(CONFIG_NFS_V3) || defined(CONFIG_NFS_V4)
479 static void nfs_direct_write_reschedule(struct nfs_direct_req *dreq)
480 {
481 	struct nfs_pageio_descriptor desc;
482 	struct nfs_page *req, *tmp;
483 	LIST_HEAD(reqs);
484 	struct nfs_commit_info cinfo;
485 	LIST_HEAD(failed);
486 
487 	nfs_init_cinfo_from_dreq(&cinfo, dreq);
488 	pnfs_recover_commit_reqs(dreq->inode, &reqs, &cinfo);
489 	spin_lock(cinfo.lock);
490 	nfs_scan_commit_list(&cinfo.mds->list, &reqs, &cinfo, 0);
491 	spin_unlock(cinfo.lock);
492 
493 	dreq->count = 0;
494 	get_dreq(dreq);
495 
496 	nfs_pageio_init_write(&desc, dreq->inode, FLUSH_STABLE,
497 			      &nfs_direct_write_completion_ops);
498 	desc.pg_dreq = dreq;
499 
500 	list_for_each_entry_safe(req, tmp, &reqs, wb_list) {
501 		if (!nfs_pageio_add_request(&desc, req)) {
502 			nfs_list_add_request(req, &failed);
503 			spin_lock(cinfo.lock);
504 			dreq->flags = 0;
505 			dreq->error = -EIO;
506 			spin_unlock(cinfo.lock);
507 		}
508 	}
509 	nfs_pageio_complete(&desc);
510 
511 	while (!list_empty(&failed)) {
512 		page_cache_release(req->wb_page);
513 		nfs_release_request(req);
514 		nfs_unlock_request(req);
515 	}
516 
517 	if (put_dreq(dreq))
518 		nfs_direct_write_complete(dreq, dreq->inode);
519 }
520 
521 static void nfs_direct_commit_complete(struct nfs_commit_data *data)
522 {
523 	struct nfs_direct_req *dreq = data->dreq;
524 	struct nfs_commit_info cinfo;
525 	struct nfs_page *req;
526 	int status = data->task.tk_status;
527 
528 	nfs_init_cinfo_from_dreq(&cinfo, dreq);
529 	if (status < 0) {
530 		dprintk("NFS: %5u commit failed with error %d.\n",
531 			data->task.tk_pid, status);
532 		dreq->flags = NFS_ODIRECT_RESCHED_WRITES;
533 	} else if (memcmp(&dreq->verf, &data->verf, sizeof(data->verf))) {
534 		dprintk("NFS: %5u commit verify failed\n", data->task.tk_pid);
535 		dreq->flags = NFS_ODIRECT_RESCHED_WRITES;
536 	}
537 
538 	dprintk("NFS: %5u commit returned %d\n", data->task.tk_pid, status);
539 	while (!list_empty(&data->pages)) {
540 		req = nfs_list_entry(data->pages.next);
541 		nfs_list_remove_request(req);
542 		if (dreq->flags == NFS_ODIRECT_RESCHED_WRITES) {
543 			/* Note the rewrite will go through mds */
544 			nfs_mark_request_commit(req, NULL, &cinfo);
545 		} else {
546 			page_cache_release(req->wb_page);
547 			nfs_release_request(req);
548 		}
549 		nfs_unlock_request(req);
550 	}
551 
552 	if (atomic_dec_and_test(&cinfo.mds->rpcs_out))
553 		nfs_direct_write_complete(dreq, data->inode);
554 }
555 
556 static void nfs_direct_error_cleanup(struct nfs_inode *nfsi)
557 {
558 	/* There is no lock to clear */
559 }
560 
561 static const struct nfs_commit_completion_ops nfs_direct_commit_completion_ops = {
562 	.completion = nfs_direct_commit_complete,
563 	.error_cleanup = nfs_direct_error_cleanup,
564 };
565 
566 static void nfs_direct_commit_schedule(struct nfs_direct_req *dreq)
567 {
568 	int res;
569 	struct nfs_commit_info cinfo;
570 	LIST_HEAD(mds_list);
571 
572 	nfs_init_cinfo_from_dreq(&cinfo, dreq);
573 	nfs_scan_commit(dreq->inode, &mds_list, &cinfo);
574 	res = nfs_generic_commit_list(dreq->inode, &mds_list, 0, &cinfo);
575 	if (res < 0) /* res == -ENOMEM */
576 		nfs_direct_write_reschedule(dreq);
577 }
578 
579 static void nfs_direct_write_schedule_work(struct work_struct *work)
580 {
581 	struct nfs_direct_req *dreq = container_of(work, struct nfs_direct_req, work);
582 	int flags = dreq->flags;
583 
584 	dreq->flags = 0;
585 	switch (flags) {
586 		case NFS_ODIRECT_DO_COMMIT:
587 			nfs_direct_commit_schedule(dreq);
588 			break;
589 		case NFS_ODIRECT_RESCHED_WRITES:
590 			nfs_direct_write_reschedule(dreq);
591 			break;
592 		default:
593 			nfs_zap_mapping(dreq->inode, dreq->inode->i_mapping);
594 			nfs_direct_complete(dreq);
595 	}
596 }
597 
598 static void nfs_direct_write_complete(struct nfs_direct_req *dreq, struct inode *inode)
599 {
600 	schedule_work(&dreq->work); /* Calls nfs_direct_write_schedule_work */
601 }
602 
603 #else
604 static void nfs_direct_write_schedule_work(struct work_struct *work)
605 {
606 }
607 
608 static void nfs_direct_write_complete(struct nfs_direct_req *dreq, struct inode *inode)
609 {
610 	nfs_zap_mapping(inode, inode->i_mapping);
611 	nfs_direct_complete(dreq);
612 }
613 #endif
614 
615 /*
616  * NB: Return the value of the first error return code.  Subsequent
617  *     errors after the first one are ignored.
618  */
619 /*
620  * For each wsize'd chunk of the user's buffer, dispatch an NFS WRITE
621  * operation.  If nfs_writedata_alloc() or get_user_pages() fails,
622  * bail and stop sending more writes.  Write length accounting is
623  * handled automatically by nfs_direct_write_result().  Otherwise, if
624  * no requests have been sent, just return an error.
625  */
626 static ssize_t nfs_direct_write_schedule_segment(struct nfs_pageio_descriptor *desc,
627 						 const struct iovec *iov,
628 						 loff_t pos)
629 {
630 	struct nfs_direct_req *dreq = desc->pg_dreq;
631 	struct nfs_open_context *ctx = dreq->ctx;
632 	struct inode *inode = ctx->dentry->d_inode;
633 	unsigned long user_addr = (unsigned long)iov->iov_base;
634 	size_t count = iov->iov_len;
635 	size_t wsize = NFS_SERVER(inode)->wsize;
636 	unsigned int pgbase;
637 	int result;
638 	ssize_t started = 0;
639 	struct page **pagevec = NULL;
640 	unsigned int npages;
641 
642 	do {
643 		size_t bytes;
644 		int i;
645 
646 		pgbase = user_addr & ~PAGE_MASK;
647 		bytes = min(max(wsize, PAGE_SIZE), count);
648 
649 		result = -ENOMEM;
650 		npages = nfs_page_array_len(pgbase, bytes);
651 		if (!pagevec)
652 			pagevec = kmalloc(npages * sizeof(struct page *), GFP_KERNEL);
653 		if (!pagevec)
654 			break;
655 
656 		down_read(&current->mm->mmap_sem);
657 		result = get_user_pages(current, current->mm, user_addr,
658 					npages, 0, 0, pagevec, NULL);
659 		up_read(&current->mm->mmap_sem);
660 		if (result < 0)
661 			break;
662 
663 		if ((unsigned)result < npages) {
664 			bytes = result * PAGE_SIZE;
665 			if (bytes <= pgbase) {
666 				nfs_direct_release_pages(pagevec, result);
667 				break;
668 			}
669 			bytes -= pgbase;
670 			npages = result;
671 		}
672 
673 		for (i = 0; i < npages; i++) {
674 			struct nfs_page *req;
675 			unsigned int req_len = min(bytes, PAGE_SIZE - pgbase);
676 
677 			req = nfs_create_request(dreq->ctx, dreq->inode,
678 						 pagevec[i],
679 						 pgbase, req_len);
680 			if (IS_ERR(req)) {
681 				nfs_direct_release_pages(pagevec + i,
682 							 npages - i);
683 				result = PTR_ERR(req);
684 				break;
685 			}
686 			nfs_lock_request(req);
687 			req->wb_index = pos >> PAGE_SHIFT;
688 			req->wb_offset = pos & ~PAGE_MASK;
689 			if (!nfs_pageio_add_request(desc, req)) {
690 				result = desc->pg_error;
691 				nfs_unlock_request(req);
692 				nfs_release_request(req);
693 				nfs_direct_release_pages(pagevec + i,
694 							 npages - i);
695 				break;
696 			}
697 			pgbase = 0;
698 			bytes -= req_len;
699 			started += req_len;
700 			user_addr += req_len;
701 			pos += req_len;
702 			count -= req_len;
703 		}
704 	} while (count != 0 && result >= 0);
705 
706 	kfree(pagevec);
707 
708 	if (started)
709 		return started;
710 	return result < 0 ? (ssize_t) result : -EFAULT;
711 }
712 
713 static void nfs_direct_write_completion(struct nfs_pgio_header *hdr)
714 {
715 	struct nfs_direct_req *dreq = hdr->dreq;
716 	struct nfs_commit_info cinfo;
717 	int bit = -1;
718 	struct nfs_page *req = nfs_list_entry(hdr->pages.next);
719 
720 	if (test_bit(NFS_IOHDR_REDO, &hdr->flags))
721 		goto out_put;
722 
723 	nfs_init_cinfo_from_dreq(&cinfo, dreq);
724 
725 	spin_lock(&dreq->lock);
726 
727 	if (test_bit(NFS_IOHDR_ERROR, &hdr->flags)) {
728 		dreq->flags = 0;
729 		dreq->error = hdr->error;
730 	}
731 	if (dreq->error != 0)
732 		bit = NFS_IOHDR_ERROR;
733 	else {
734 		dreq->count += hdr->good_bytes;
735 		if (test_bit(NFS_IOHDR_NEED_RESCHED, &hdr->flags)) {
736 			dreq->flags = NFS_ODIRECT_RESCHED_WRITES;
737 			bit = NFS_IOHDR_NEED_RESCHED;
738 		} else if (test_bit(NFS_IOHDR_NEED_COMMIT, &hdr->flags)) {
739 			if (dreq->flags == NFS_ODIRECT_RESCHED_WRITES)
740 				bit = NFS_IOHDR_NEED_RESCHED;
741 			else if (dreq->flags == 0) {
742 				memcpy(&dreq->verf, &req->wb_verf,
743 				       sizeof(dreq->verf));
744 				bit = NFS_IOHDR_NEED_COMMIT;
745 				dreq->flags = NFS_ODIRECT_DO_COMMIT;
746 			} else if (dreq->flags == NFS_ODIRECT_DO_COMMIT) {
747 				if (memcmp(&dreq->verf, &req->wb_verf, sizeof(dreq->verf))) {
748 					dreq->flags = NFS_ODIRECT_RESCHED_WRITES;
749 					bit = NFS_IOHDR_NEED_RESCHED;
750 				} else
751 					bit = NFS_IOHDR_NEED_COMMIT;
752 			}
753 		}
754 	}
755 	spin_unlock(&dreq->lock);
756 
757 	while (!list_empty(&hdr->pages)) {
758 		req = nfs_list_entry(hdr->pages.next);
759 		nfs_list_remove_request(req);
760 		switch (bit) {
761 		case NFS_IOHDR_NEED_RESCHED:
762 		case NFS_IOHDR_NEED_COMMIT:
763 			nfs_mark_request_commit(req, hdr->lseg, &cinfo);
764 			break;
765 		default:
766 			page_cache_release(req->wb_page);
767 			nfs_release_request(req);
768 		}
769 		nfs_unlock_request(req);
770 	}
771 
772 out_put:
773 	if (put_dreq(dreq))
774 		nfs_direct_write_complete(dreq, hdr->inode);
775 	hdr->release(hdr);
776 }
777 
778 static void nfs_write_sync_pgio_error(struct list_head *head)
779 {
780 	struct nfs_page *req;
781 
782 	while (!list_empty(head)) {
783 		req = nfs_list_entry(head->next);
784 		nfs_list_remove_request(req);
785 		nfs_release_request(req);
786 		nfs_unlock_request(req);
787 	}
788 }
789 
790 static const struct nfs_pgio_completion_ops nfs_direct_write_completion_ops = {
791 	.error_cleanup = nfs_write_sync_pgio_error,
792 	.init_hdr = nfs_direct_pgio_init,
793 	.completion = nfs_direct_write_completion,
794 };
795 
796 static ssize_t nfs_direct_write_schedule_iovec(struct nfs_direct_req *dreq,
797 					       const struct iovec *iov,
798 					       unsigned long nr_segs,
799 					       loff_t pos)
800 {
801 	struct nfs_pageio_descriptor desc;
802 	ssize_t result = 0;
803 	size_t requested_bytes = 0;
804 	unsigned long seg;
805 
806 	nfs_pageio_init_write(&desc, dreq->inode, FLUSH_COND_STABLE,
807 			      &nfs_direct_write_completion_ops);
808 	desc.pg_dreq = dreq;
809 	get_dreq(dreq);
810 
811 	for (seg = 0; seg < nr_segs; seg++) {
812 		const struct iovec *vec = &iov[seg];
813 		result = nfs_direct_write_schedule_segment(&desc, vec, pos);
814 		if (result < 0)
815 			break;
816 		requested_bytes += result;
817 		if ((size_t)result < vec->iov_len)
818 			break;
819 		pos += vec->iov_len;
820 	}
821 	nfs_pageio_complete(&desc);
822 
823 	/*
824 	 * If no bytes were started, return the error, and let the
825 	 * generic layer handle the completion.
826 	 */
827 	if (requested_bytes == 0) {
828 		nfs_direct_req_release(dreq);
829 		return result < 0 ? result : -EIO;
830 	}
831 
832 	if (put_dreq(dreq))
833 		nfs_direct_write_complete(dreq, dreq->inode);
834 	return 0;
835 }
836 
837 static ssize_t nfs_direct_write(struct kiocb *iocb, const struct iovec *iov,
838 				unsigned long nr_segs, loff_t pos,
839 				size_t count)
840 {
841 	ssize_t result = -ENOMEM;
842 	struct inode *inode = iocb->ki_filp->f_mapping->host;
843 	struct nfs_direct_req *dreq;
844 
845 	dreq = nfs_direct_req_alloc();
846 	if (!dreq)
847 		goto out;
848 
849 	dreq->inode = inode;
850 	dreq->ctx = get_nfs_open_context(nfs_file_open_context(iocb->ki_filp));
851 	dreq->l_ctx = nfs_get_lock_context(dreq->ctx);
852 	if (dreq->l_ctx == NULL)
853 		goto out_release;
854 	if (!is_sync_kiocb(iocb))
855 		dreq->iocb = iocb;
856 
857 	result = nfs_direct_write_schedule_iovec(dreq, iov, nr_segs, pos);
858 	if (!result)
859 		result = nfs_direct_wait(dreq);
860 out_release:
861 	nfs_direct_req_release(dreq);
862 out:
863 	return result;
864 }
865 
866 /**
867  * nfs_file_direct_read - file direct read operation for NFS files
868  * @iocb: target I/O control block
869  * @iov: vector of user buffers into which to read data
870  * @nr_segs: size of iov vector
871  * @pos: byte offset in file where reading starts
872  *
873  * We use this function for direct reads instead of calling
874  * generic_file_aio_read() in order to avoid gfar's check to see if
875  * the request starts before the end of the file.  For that check
876  * to work, we must generate a GETATTR before each direct read, and
877  * even then there is a window between the GETATTR and the subsequent
878  * READ where the file size could change.  Our preference is simply
879  * to do all reads the application wants, and the server will take
880  * care of managing the end of file boundary.
881  *
882  * This function also eliminates unnecessarily updating the file's
883  * atime locally, as the NFS server sets the file's atime, and this
884  * client must read the updated atime from the server back into its
885  * cache.
886  */
887 ssize_t nfs_file_direct_read(struct kiocb *iocb, const struct iovec *iov,
888 				unsigned long nr_segs, loff_t pos)
889 {
890 	ssize_t retval = -EINVAL;
891 	struct file *file = iocb->ki_filp;
892 	struct address_space *mapping = file->f_mapping;
893 	size_t count;
894 
895 	count = iov_length(iov, nr_segs);
896 	nfs_add_stats(mapping->host, NFSIOS_DIRECTREADBYTES, count);
897 
898 	dfprintk(FILE, "NFS: direct read(%s/%s, %zd@%Ld)\n",
899 		file->f_path.dentry->d_parent->d_name.name,
900 		file->f_path.dentry->d_name.name,
901 		count, (long long) pos);
902 
903 	retval = 0;
904 	if (!count)
905 		goto out;
906 
907 	retval = nfs_sync_mapping(mapping);
908 	if (retval)
909 		goto out;
910 
911 	task_io_account_read(count);
912 
913 	retval = nfs_direct_read(iocb, iov, nr_segs, pos);
914 	if (retval > 0)
915 		iocb->ki_pos = pos + retval;
916 
917 out:
918 	return retval;
919 }
920 
921 /**
922  * nfs_file_direct_write - file direct write operation for NFS files
923  * @iocb: target I/O control block
924  * @iov: vector of user buffers from which to write data
925  * @nr_segs: size of iov vector
926  * @pos: byte offset in file where writing starts
927  *
928  * We use this function for direct writes instead of calling
929  * generic_file_aio_write() in order to avoid taking the inode
930  * semaphore and updating the i_size.  The NFS server will set
931  * the new i_size and this client must read the updated size
932  * back into its cache.  We let the server do generic write
933  * parameter checking and report problems.
934  *
935  * We eliminate local atime updates, see direct read above.
936  *
937  * We avoid unnecessary page cache invalidations for normal cached
938  * readers of this file.
939  *
940  * Note that O_APPEND is not supported for NFS direct writes, as there
941  * is no atomic O_APPEND write facility in the NFS protocol.
942  */
943 ssize_t nfs_file_direct_write(struct kiocb *iocb, const struct iovec *iov,
944 				unsigned long nr_segs, loff_t pos)
945 {
946 	ssize_t retval = -EINVAL;
947 	struct file *file = iocb->ki_filp;
948 	struct address_space *mapping = file->f_mapping;
949 	size_t count;
950 
951 	count = iov_length(iov, nr_segs);
952 	nfs_add_stats(mapping->host, NFSIOS_DIRECTWRITTENBYTES, count);
953 
954 	dfprintk(FILE, "NFS: direct write(%s/%s, %zd@%Ld)\n",
955 		file->f_path.dentry->d_parent->d_name.name,
956 		file->f_path.dentry->d_name.name,
957 		count, (long long) pos);
958 
959 	retval = generic_write_checks(file, &pos, &count, 0);
960 	if (retval)
961 		goto out;
962 
963 	retval = -EINVAL;
964 	if ((ssize_t) count < 0)
965 		goto out;
966 	retval = 0;
967 	if (!count)
968 		goto out;
969 
970 	retval = nfs_sync_mapping(mapping);
971 	if (retval)
972 		goto out;
973 
974 	task_io_account_write(count);
975 
976 	retval = nfs_direct_write(iocb, iov, nr_segs, pos, count);
977 	if (retval > 0) {
978 		struct inode *inode = mapping->host;
979 
980 		iocb->ki_pos = pos + retval;
981 		spin_lock(&inode->i_lock);
982 		if (i_size_read(inode) < iocb->ki_pos)
983 			i_size_write(inode, iocb->ki_pos);
984 		spin_unlock(&inode->i_lock);
985 	}
986 out:
987 	return retval;
988 }
989 
990 /**
991  * nfs_init_directcache - create a slab cache for nfs_direct_req structures
992  *
993  */
994 int __init nfs_init_directcache(void)
995 {
996 	nfs_direct_cachep = kmem_cache_create("nfs_direct_cache",
997 						sizeof(struct nfs_direct_req),
998 						0, (SLAB_RECLAIM_ACCOUNT|
999 							SLAB_MEM_SPREAD),
1000 						NULL);
1001 	if (nfs_direct_cachep == NULL)
1002 		return -ENOMEM;
1003 
1004 	return 0;
1005 }
1006 
1007 /**
1008  * nfs_destroy_directcache - destroy the slab cache for nfs_direct_req structures
1009  *
1010  */
1011 void nfs_destroy_directcache(void)
1012 {
1013 	kmem_cache_destroy(nfs_direct_cachep);
1014 }
1015