xref: /openbmc/linux/fs/nfs/direct.c (revision 35754bc00e94e598c432ad02f7a3d3063c4402e3)
1 /*
2  * linux/fs/nfs/direct.c
3  *
4  * Copyright (C) 2003 by Chuck Lever <cel@netapp.com>
5  *
6  * High-performance uncached I/O for the Linux NFS client
7  *
8  * There are important applications whose performance or correctness
9  * depends on uncached access to file data.  Database clusters
10  * (multiple copies of the same instance running on separate hosts)
11  * implement their own cache coherency protocol that subsumes file
12  * system cache protocols.  Applications that process datasets
13  * considerably larger than the client's memory do not always benefit
14  * from a local cache.  A streaming video server, for instance, has no
15  * need to cache the contents of a file.
16  *
17  * When an application requests uncached I/O, all read and write requests
18  * are made directly to the server; data stored or fetched via these
19  * requests is not cached in the Linux page cache.  The client does not
20  * correct unaligned requests from applications.  All requested bytes are
21  * held on permanent storage before a direct write system call returns to
22  * an application.
23  *
24  * Solaris implements an uncached I/O facility called directio() that
25  * is used for backups and sequential I/O to very large files.  Solaris
26  * also supports uncaching whole NFS partitions with "-o forcedirectio,"
27  * an undocumented mount option.
28  *
29  * Designed by Jeff Kimmel, Chuck Lever, and Trond Myklebust, with
30  * help from Andrew Morton.
31  *
32  * 18 Dec 2001	Initial implementation for 2.4  --cel
33  * 08 Jul 2002	Version for 2.4.19, with bug fixes --trondmy
34  * 08 Jun 2003	Port to 2.5 APIs  --cel
35  * 31 Mar 2004	Handle direct I/O without VFS support  --cel
36  * 15 Sep 2004	Parallel async reads  --cel
37  * 04 May 2005	support O_DIRECT with aio  --cel
38  *
39  */
40 
41 #include <linux/errno.h>
42 #include <linux/sched.h>
43 #include <linux/kernel.h>
44 #include <linux/file.h>
45 #include <linux/pagemap.h>
46 #include <linux/kref.h>
47 #include <linux/slab.h>
48 #include <linux/task_io_accounting_ops.h>
49 
50 #include <linux/nfs_fs.h>
51 #include <linux/nfs_page.h>
52 #include <linux/sunrpc/clnt.h>
53 
54 #include <asm/uaccess.h>
55 #include <linux/atomic.h>
56 
57 #include "internal.h"
58 #include "iostat.h"
59 #include "pnfs.h"
60 
61 #define NFSDBG_FACILITY		NFSDBG_VFS
62 
63 static struct kmem_cache *nfs_direct_cachep;
64 
65 /*
66  * This represents a set of asynchronous requests that we're waiting on
67  */
68 struct nfs_direct_req {
69 	struct kref		kref;		/* release manager */
70 
71 	/* I/O parameters */
72 	struct nfs_open_context	*ctx;		/* file open context info */
73 	struct nfs_lock_context *l_ctx;		/* Lock context info */
74 	struct kiocb *		iocb;		/* controlling i/o request */
75 	struct inode *		inode;		/* target file of i/o */
76 
77 	/* completion state */
78 	atomic_t		io_count;	/* i/os we're waiting for */
79 	spinlock_t		lock;		/* protect completion state */
80 	ssize_t			count,		/* bytes actually processed */
81 				bytes_left,	/* bytes left to be sent */
82 				error;		/* any reported error */
83 	struct completion	completion;	/* wait for i/o completion */
84 
85 	/* commit state */
86 	struct nfs_mds_commit_info mds_cinfo;	/* Storage for cinfo */
87 	struct pnfs_ds_commit_info ds_cinfo;	/* Storage for cinfo */
88 	struct work_struct	work;
89 	int			flags;
90 #define NFS_ODIRECT_DO_COMMIT		(1)	/* an unstable reply was received */
91 #define NFS_ODIRECT_RESCHED_WRITES	(2)	/* write verification failed */
92 	struct nfs_writeverf	verf;		/* unstable write verifier */
93 };
94 
95 static const struct nfs_pgio_completion_ops nfs_direct_write_completion_ops;
96 static const struct nfs_commit_completion_ops nfs_direct_commit_completion_ops;
97 static void nfs_direct_write_complete(struct nfs_direct_req *dreq, struct inode *inode);
98 static void nfs_direct_write_schedule_work(struct work_struct *work);
99 
100 static inline void get_dreq(struct nfs_direct_req *dreq)
101 {
102 	atomic_inc(&dreq->io_count);
103 }
104 
105 static inline int put_dreq(struct nfs_direct_req *dreq)
106 {
107 	return atomic_dec_and_test(&dreq->io_count);
108 }
109 
110 /**
111  * nfs_direct_IO - NFS address space operation for direct I/O
112  * @rw: direction (read or write)
113  * @iocb: target I/O control block
114  * @iov: array of vectors that define I/O buffer
115  * @pos: offset in file to begin the operation
116  * @nr_segs: size of iovec array
117  *
118  * The presence of this routine in the address space ops vector means
119  * the NFS client supports direct I/O. However, for most direct IO, we
120  * shunt off direct read and write requests before the VFS gets them,
121  * so this method is only ever called for swap.
122  */
123 ssize_t nfs_direct_IO(int rw, struct kiocb *iocb, const struct iovec *iov, loff_t pos, unsigned long nr_segs)
124 {
125 #ifndef CONFIG_NFS_SWAP
126 	dprintk("NFS: nfs_direct_IO (%s) off/no(%Ld/%lu) EINVAL\n",
127 			iocb->ki_filp->f_path.dentry->d_name.name,
128 			(long long) pos, nr_segs);
129 
130 	return -EINVAL;
131 #else
132 	VM_BUG_ON(iocb->ki_left != PAGE_SIZE);
133 	VM_BUG_ON(iocb->ki_nbytes != PAGE_SIZE);
134 
135 	if (rw == READ || rw == KERNEL_READ)
136 		return nfs_file_direct_read(iocb, iov, nr_segs, pos,
137 				rw == READ ? true : false);
138 	return nfs_file_direct_write(iocb, iov, nr_segs, pos,
139 				rw == WRITE ? true : false);
140 #endif /* CONFIG_NFS_SWAP */
141 }
142 
143 static void nfs_direct_release_pages(struct page **pages, unsigned int npages)
144 {
145 	unsigned int i;
146 	for (i = 0; i < npages; i++)
147 		page_cache_release(pages[i]);
148 }
149 
150 void nfs_init_cinfo_from_dreq(struct nfs_commit_info *cinfo,
151 			      struct nfs_direct_req *dreq)
152 {
153 	cinfo->lock = &dreq->lock;
154 	cinfo->mds = &dreq->mds_cinfo;
155 	cinfo->ds = &dreq->ds_cinfo;
156 	cinfo->dreq = dreq;
157 	cinfo->completion_ops = &nfs_direct_commit_completion_ops;
158 }
159 
160 static inline struct nfs_direct_req *nfs_direct_req_alloc(void)
161 {
162 	struct nfs_direct_req *dreq;
163 
164 	dreq = kmem_cache_zalloc(nfs_direct_cachep, GFP_KERNEL);
165 	if (!dreq)
166 		return NULL;
167 
168 	kref_init(&dreq->kref);
169 	kref_get(&dreq->kref);
170 	init_completion(&dreq->completion);
171 	INIT_LIST_HEAD(&dreq->mds_cinfo.list);
172 	INIT_WORK(&dreq->work, nfs_direct_write_schedule_work);
173 	spin_lock_init(&dreq->lock);
174 
175 	return dreq;
176 }
177 
178 static void nfs_direct_req_free(struct kref *kref)
179 {
180 	struct nfs_direct_req *dreq = container_of(kref, struct nfs_direct_req, kref);
181 
182 	if (dreq->l_ctx != NULL)
183 		nfs_put_lock_context(dreq->l_ctx);
184 	if (dreq->ctx != NULL)
185 		put_nfs_open_context(dreq->ctx);
186 	kmem_cache_free(nfs_direct_cachep, dreq);
187 }
188 
189 static void nfs_direct_req_release(struct nfs_direct_req *dreq)
190 {
191 	kref_put(&dreq->kref, nfs_direct_req_free);
192 }
193 
194 /*
195  * Collects and returns the final error value/byte-count.
196  */
197 static ssize_t nfs_direct_wait(struct nfs_direct_req *dreq)
198 {
199 	ssize_t result = -EIOCBQUEUED;
200 
201 	/* Async requests don't wait here */
202 	if (dreq->iocb)
203 		goto out;
204 
205 	result = wait_for_completion_killable(&dreq->completion);
206 
207 	if (!result)
208 		result = dreq->error;
209 	if (!result)
210 		result = dreq->count;
211 
212 out:
213 	return (ssize_t) result;
214 }
215 
216 /*
217  * Synchronous I/O uses a stack-allocated iocb.  Thus we can't trust
218  * the iocb is still valid here if this is a synchronous request.
219  */
220 static void nfs_direct_complete(struct nfs_direct_req *dreq)
221 {
222 	if (dreq->iocb) {
223 		long res = (long) dreq->error;
224 		if (!res)
225 			res = (long) dreq->count;
226 		aio_complete(dreq->iocb, res, 0);
227 	}
228 	complete_all(&dreq->completion);
229 
230 	nfs_direct_req_release(dreq);
231 }
232 
233 static void nfs_direct_readpage_release(struct nfs_page *req)
234 {
235 	dprintk("NFS: direct read done (%s/%lld %d@%lld)\n",
236 		req->wb_context->dentry->d_inode->i_sb->s_id,
237 		(long long)NFS_FILEID(req->wb_context->dentry->d_inode),
238 		req->wb_bytes,
239 		(long long)req_offset(req));
240 	nfs_release_request(req);
241 }
242 
243 static void nfs_direct_read_completion(struct nfs_pgio_header *hdr)
244 {
245 	unsigned long bytes = 0;
246 	struct nfs_direct_req *dreq = hdr->dreq;
247 
248 	if (test_bit(NFS_IOHDR_REDO, &hdr->flags))
249 		goto out_put;
250 
251 	spin_lock(&dreq->lock);
252 	if (test_bit(NFS_IOHDR_ERROR, &hdr->flags) && (hdr->good_bytes == 0))
253 		dreq->error = hdr->error;
254 	else
255 		dreq->count += hdr->good_bytes;
256 	spin_unlock(&dreq->lock);
257 
258 	while (!list_empty(&hdr->pages)) {
259 		struct nfs_page *req = nfs_list_entry(hdr->pages.next);
260 		struct page *page = req->wb_page;
261 
262 		if (test_bit(NFS_IOHDR_EOF, &hdr->flags)) {
263 			if (bytes > hdr->good_bytes)
264 				zero_user(page, 0, PAGE_SIZE);
265 			else if (hdr->good_bytes - bytes < PAGE_SIZE)
266 				zero_user_segment(page,
267 					hdr->good_bytes & ~PAGE_MASK,
268 					PAGE_SIZE);
269 		}
270 		if (!PageCompound(page)) {
271 			if (test_bit(NFS_IOHDR_ERROR, &hdr->flags)) {
272 				if (bytes < hdr->good_bytes)
273 					set_page_dirty(page);
274 			} else
275 				set_page_dirty(page);
276 		}
277 		bytes += req->wb_bytes;
278 		nfs_list_remove_request(req);
279 		nfs_direct_readpage_release(req);
280 	}
281 out_put:
282 	if (put_dreq(dreq))
283 		nfs_direct_complete(dreq);
284 	hdr->release(hdr);
285 }
286 
287 static void nfs_read_sync_pgio_error(struct list_head *head)
288 {
289 	struct nfs_page *req;
290 
291 	while (!list_empty(head)) {
292 		req = nfs_list_entry(head->next);
293 		nfs_list_remove_request(req);
294 		nfs_release_request(req);
295 	}
296 }
297 
298 static void nfs_direct_pgio_init(struct nfs_pgio_header *hdr)
299 {
300 	get_dreq(hdr->dreq);
301 }
302 
303 static const struct nfs_pgio_completion_ops nfs_direct_read_completion_ops = {
304 	.error_cleanup = nfs_read_sync_pgio_error,
305 	.init_hdr = nfs_direct_pgio_init,
306 	.completion = nfs_direct_read_completion,
307 };
308 
309 /*
310  * For each rsize'd chunk of the user's buffer, dispatch an NFS READ
311  * operation.  If nfs_readdata_alloc() or get_user_pages() fails,
312  * bail and stop sending more reads.  Read length accounting is
313  * handled automatically by nfs_direct_read_result().  Otherwise, if
314  * no requests have been sent, just return an error.
315  */
316 static ssize_t nfs_direct_read_schedule_segment(struct nfs_pageio_descriptor *desc,
317 						const struct iovec *iov,
318 						loff_t pos, bool uio)
319 {
320 	struct nfs_direct_req *dreq = desc->pg_dreq;
321 	struct nfs_open_context *ctx = dreq->ctx;
322 	struct inode *inode = ctx->dentry->d_inode;
323 	unsigned long user_addr = (unsigned long)iov->iov_base;
324 	size_t count = iov->iov_len;
325 	size_t rsize = NFS_SERVER(inode)->rsize;
326 	unsigned int pgbase;
327 	int result;
328 	ssize_t started = 0;
329 	struct page **pagevec = NULL;
330 	unsigned int npages;
331 
332 	do {
333 		size_t bytes;
334 		int i;
335 
336 		pgbase = user_addr & ~PAGE_MASK;
337 		bytes = min(max_t(size_t, rsize, PAGE_SIZE), count);
338 
339 		result = -ENOMEM;
340 		npages = nfs_page_array_len(pgbase, bytes);
341 		if (!pagevec)
342 			pagevec = kmalloc(npages * sizeof(struct page *),
343 					  GFP_KERNEL);
344 		if (!pagevec)
345 			break;
346 		if (uio) {
347 			down_read(&current->mm->mmap_sem);
348 			result = get_user_pages(current, current->mm, user_addr,
349 					npages, 1, 0, pagevec, NULL);
350 			up_read(&current->mm->mmap_sem);
351 			if (result < 0)
352 				break;
353 		} else {
354 			WARN_ON(npages != 1);
355 			result = get_kernel_page(user_addr, 1, pagevec);
356 			if (WARN_ON(result != 1))
357 				break;
358 		}
359 
360 		if ((unsigned)result < npages) {
361 			bytes = result * PAGE_SIZE;
362 			if (bytes <= pgbase) {
363 				nfs_direct_release_pages(pagevec, result);
364 				break;
365 			}
366 			bytes -= pgbase;
367 			npages = result;
368 		}
369 
370 		for (i = 0; i < npages; i++) {
371 			struct nfs_page *req;
372 			unsigned int req_len = min_t(size_t, bytes, PAGE_SIZE - pgbase);
373 			/* XXX do we need to do the eof zeroing found in async_filler? */
374 			req = nfs_create_request(dreq->ctx, dreq->inode,
375 						 pagevec[i],
376 						 pgbase, req_len);
377 			if (IS_ERR(req)) {
378 				result = PTR_ERR(req);
379 				break;
380 			}
381 			req->wb_index = pos >> PAGE_SHIFT;
382 			req->wb_offset = pos & ~PAGE_MASK;
383 			if (!nfs_pageio_add_request(desc, req)) {
384 				result = desc->pg_error;
385 				nfs_release_request(req);
386 				break;
387 			}
388 			pgbase = 0;
389 			bytes -= req_len;
390 			started += req_len;
391 			user_addr += req_len;
392 			pos += req_len;
393 			count -= req_len;
394 			dreq->bytes_left -= req_len;
395 		}
396 		/* The nfs_page now hold references to these pages */
397 		nfs_direct_release_pages(pagevec, npages);
398 	} while (count != 0 && result >= 0);
399 
400 	kfree(pagevec);
401 
402 	if (started)
403 		return started;
404 	return result < 0 ? (ssize_t) result : -EFAULT;
405 }
406 
407 static ssize_t nfs_direct_read_schedule_iovec(struct nfs_direct_req *dreq,
408 					      const struct iovec *iov,
409 					      unsigned long nr_segs,
410 					      loff_t pos, bool uio)
411 {
412 	struct nfs_pageio_descriptor desc;
413 	ssize_t result = -EINVAL;
414 	size_t requested_bytes = 0;
415 	unsigned long seg;
416 
417 	NFS_PROTO(dreq->inode)->read_pageio_init(&desc, dreq->inode,
418 			     &nfs_direct_read_completion_ops);
419 	get_dreq(dreq);
420 	desc.pg_dreq = dreq;
421 
422 	for (seg = 0; seg < nr_segs; seg++) {
423 		const struct iovec *vec = &iov[seg];
424 		result = nfs_direct_read_schedule_segment(&desc, vec, pos, uio);
425 		if (result < 0)
426 			break;
427 		requested_bytes += result;
428 		if ((size_t)result < vec->iov_len)
429 			break;
430 		pos += vec->iov_len;
431 	}
432 
433 	nfs_pageio_complete(&desc);
434 
435 	/*
436 	 * If no bytes were started, return the error, and let the
437 	 * generic layer handle the completion.
438 	 */
439 	if (requested_bytes == 0) {
440 		nfs_direct_req_release(dreq);
441 		return result < 0 ? result : -EIO;
442 	}
443 
444 	if (put_dreq(dreq))
445 		nfs_direct_complete(dreq);
446 	return 0;
447 }
448 
449 static ssize_t nfs_direct_read(struct kiocb *iocb, const struct iovec *iov,
450 			       unsigned long nr_segs, loff_t pos, bool uio)
451 {
452 	ssize_t result = -ENOMEM;
453 	struct inode *inode = iocb->ki_filp->f_mapping->host;
454 	struct nfs_direct_req *dreq;
455 	struct nfs_lock_context *l_ctx;
456 
457 	dreq = nfs_direct_req_alloc();
458 	if (dreq == NULL)
459 		goto out;
460 
461 	dreq->inode = inode;
462 	dreq->bytes_left = iov_length(iov, nr_segs);
463 	dreq->ctx = get_nfs_open_context(nfs_file_open_context(iocb->ki_filp));
464 	l_ctx = nfs_get_lock_context(dreq->ctx);
465 	if (IS_ERR(l_ctx)) {
466 		result = PTR_ERR(l_ctx);
467 		goto out_release;
468 	}
469 	dreq->l_ctx = l_ctx;
470 	if (!is_sync_kiocb(iocb))
471 		dreq->iocb = iocb;
472 
473 	NFS_I(inode)->read_io += iov_length(iov, nr_segs);
474 	result = nfs_direct_read_schedule_iovec(dreq, iov, nr_segs, pos, uio);
475 	if (!result)
476 		result = nfs_direct_wait(dreq);
477 out_release:
478 	nfs_direct_req_release(dreq);
479 out:
480 	return result;
481 }
482 
483 static void nfs_inode_dio_write_done(struct inode *inode)
484 {
485 	nfs_zap_mapping(inode, inode->i_mapping);
486 	inode_dio_done(inode);
487 }
488 
489 #if IS_ENABLED(CONFIG_NFS_V3) || IS_ENABLED(CONFIG_NFS_V4)
490 static void nfs_direct_write_reschedule(struct nfs_direct_req *dreq)
491 {
492 	struct nfs_pageio_descriptor desc;
493 	struct nfs_page *req, *tmp;
494 	LIST_HEAD(reqs);
495 	struct nfs_commit_info cinfo;
496 	LIST_HEAD(failed);
497 
498 	nfs_init_cinfo_from_dreq(&cinfo, dreq);
499 	pnfs_recover_commit_reqs(dreq->inode, &reqs, &cinfo);
500 	spin_lock(cinfo.lock);
501 	nfs_scan_commit_list(&cinfo.mds->list, &reqs, &cinfo, 0);
502 	spin_unlock(cinfo.lock);
503 
504 	dreq->count = 0;
505 	get_dreq(dreq);
506 
507 	NFS_PROTO(dreq->inode)->write_pageio_init(&desc, dreq->inode, FLUSH_STABLE,
508 			      &nfs_direct_write_completion_ops);
509 	desc.pg_dreq = dreq;
510 
511 	list_for_each_entry_safe(req, tmp, &reqs, wb_list) {
512 		if (!nfs_pageio_add_request(&desc, req)) {
513 			nfs_list_remove_request(req);
514 			nfs_list_add_request(req, &failed);
515 			spin_lock(cinfo.lock);
516 			dreq->flags = 0;
517 			dreq->error = -EIO;
518 			spin_unlock(cinfo.lock);
519 		}
520 		nfs_release_request(req);
521 	}
522 	nfs_pageio_complete(&desc);
523 
524 	while (!list_empty(&failed)) {
525 		req = nfs_list_entry(failed.next);
526 		nfs_list_remove_request(req);
527 		nfs_unlock_and_release_request(req);
528 	}
529 
530 	if (put_dreq(dreq))
531 		nfs_direct_write_complete(dreq, dreq->inode);
532 }
533 
534 static void nfs_direct_commit_complete(struct nfs_commit_data *data)
535 {
536 	struct nfs_direct_req *dreq = data->dreq;
537 	struct nfs_commit_info cinfo;
538 	struct nfs_page *req;
539 	int status = data->task.tk_status;
540 
541 	nfs_init_cinfo_from_dreq(&cinfo, dreq);
542 	if (status < 0) {
543 		dprintk("NFS: %5u commit failed with error %d.\n",
544 			data->task.tk_pid, status);
545 		dreq->flags = NFS_ODIRECT_RESCHED_WRITES;
546 	} else if (memcmp(&dreq->verf, &data->verf, sizeof(data->verf))) {
547 		dprintk("NFS: %5u commit verify failed\n", data->task.tk_pid);
548 		dreq->flags = NFS_ODIRECT_RESCHED_WRITES;
549 	}
550 
551 	dprintk("NFS: %5u commit returned %d\n", data->task.tk_pid, status);
552 	while (!list_empty(&data->pages)) {
553 		req = nfs_list_entry(data->pages.next);
554 		nfs_list_remove_request(req);
555 		if (dreq->flags == NFS_ODIRECT_RESCHED_WRITES) {
556 			/* Note the rewrite will go through mds */
557 			nfs_mark_request_commit(req, NULL, &cinfo);
558 		} else
559 			nfs_release_request(req);
560 		nfs_unlock_and_release_request(req);
561 	}
562 
563 	if (atomic_dec_and_test(&cinfo.mds->rpcs_out))
564 		nfs_direct_write_complete(dreq, data->inode);
565 }
566 
567 static void nfs_direct_error_cleanup(struct nfs_inode *nfsi)
568 {
569 	/* There is no lock to clear */
570 }
571 
572 static const struct nfs_commit_completion_ops nfs_direct_commit_completion_ops = {
573 	.completion = nfs_direct_commit_complete,
574 	.error_cleanup = nfs_direct_error_cleanup,
575 };
576 
577 static void nfs_direct_commit_schedule(struct nfs_direct_req *dreq)
578 {
579 	int res;
580 	struct nfs_commit_info cinfo;
581 	LIST_HEAD(mds_list);
582 
583 	nfs_init_cinfo_from_dreq(&cinfo, dreq);
584 	nfs_scan_commit(dreq->inode, &mds_list, &cinfo);
585 	res = nfs_generic_commit_list(dreq->inode, &mds_list, 0, &cinfo);
586 	if (res < 0) /* res == -ENOMEM */
587 		nfs_direct_write_reschedule(dreq);
588 }
589 
590 static void nfs_direct_write_schedule_work(struct work_struct *work)
591 {
592 	struct nfs_direct_req *dreq = container_of(work, struct nfs_direct_req, work);
593 	int flags = dreq->flags;
594 
595 	dreq->flags = 0;
596 	switch (flags) {
597 		case NFS_ODIRECT_DO_COMMIT:
598 			nfs_direct_commit_schedule(dreq);
599 			break;
600 		case NFS_ODIRECT_RESCHED_WRITES:
601 			nfs_direct_write_reschedule(dreq);
602 			break;
603 		default:
604 			nfs_inode_dio_write_done(dreq->inode);
605 			nfs_direct_complete(dreq);
606 	}
607 }
608 
609 static void nfs_direct_write_complete(struct nfs_direct_req *dreq, struct inode *inode)
610 {
611 	schedule_work(&dreq->work); /* Calls nfs_direct_write_schedule_work */
612 }
613 
614 #else
615 static void nfs_direct_write_schedule_work(struct work_struct *work)
616 {
617 }
618 
619 static void nfs_direct_write_complete(struct nfs_direct_req *dreq, struct inode *inode)
620 {
621 	nfs_inode_dio_write_done(inode);
622 	nfs_direct_complete(dreq);
623 }
624 #endif
625 
626 /*
627  * NB: Return the value of the first error return code.  Subsequent
628  *     errors after the first one are ignored.
629  */
630 /*
631  * For each wsize'd chunk of the user's buffer, dispatch an NFS WRITE
632  * operation.  If nfs_writedata_alloc() or get_user_pages() fails,
633  * bail and stop sending more writes.  Write length accounting is
634  * handled automatically by nfs_direct_write_result().  Otherwise, if
635  * no requests have been sent, just return an error.
636  */
637 static ssize_t nfs_direct_write_schedule_segment(struct nfs_pageio_descriptor *desc,
638 						 const struct iovec *iov,
639 						 loff_t pos, bool uio)
640 {
641 	struct nfs_direct_req *dreq = desc->pg_dreq;
642 	struct nfs_open_context *ctx = dreq->ctx;
643 	struct inode *inode = ctx->dentry->d_inode;
644 	unsigned long user_addr = (unsigned long)iov->iov_base;
645 	size_t count = iov->iov_len;
646 	size_t wsize = NFS_SERVER(inode)->wsize;
647 	unsigned int pgbase;
648 	int result;
649 	ssize_t started = 0;
650 	struct page **pagevec = NULL;
651 	unsigned int npages;
652 
653 	do {
654 		size_t bytes;
655 		int i;
656 
657 		pgbase = user_addr & ~PAGE_MASK;
658 		bytes = min(max_t(size_t, wsize, PAGE_SIZE), count);
659 
660 		result = -ENOMEM;
661 		npages = nfs_page_array_len(pgbase, bytes);
662 		if (!pagevec)
663 			pagevec = kmalloc(npages * sizeof(struct page *), GFP_KERNEL);
664 		if (!pagevec)
665 			break;
666 
667 		if (uio) {
668 			down_read(&current->mm->mmap_sem);
669 			result = get_user_pages(current, current->mm, user_addr,
670 						npages, 0, 0, pagevec, NULL);
671 			up_read(&current->mm->mmap_sem);
672 			if (result < 0)
673 				break;
674 		} else {
675 			WARN_ON(npages != 1);
676 			result = get_kernel_page(user_addr, 0, pagevec);
677 			if (WARN_ON(result != 1))
678 				break;
679 		}
680 
681 		if ((unsigned)result < npages) {
682 			bytes = result * PAGE_SIZE;
683 			if (bytes <= pgbase) {
684 				nfs_direct_release_pages(pagevec, result);
685 				break;
686 			}
687 			bytes -= pgbase;
688 			npages = result;
689 		}
690 
691 		for (i = 0; i < npages; i++) {
692 			struct nfs_page *req;
693 			unsigned int req_len = min_t(size_t, bytes, PAGE_SIZE - pgbase);
694 
695 			req = nfs_create_request(dreq->ctx, dreq->inode,
696 						 pagevec[i],
697 						 pgbase, req_len);
698 			if (IS_ERR(req)) {
699 				result = PTR_ERR(req);
700 				break;
701 			}
702 			nfs_lock_request(req);
703 			req->wb_index = pos >> PAGE_SHIFT;
704 			req->wb_offset = pos & ~PAGE_MASK;
705 			if (!nfs_pageio_add_request(desc, req)) {
706 				result = desc->pg_error;
707 				nfs_unlock_and_release_request(req);
708 				break;
709 			}
710 			pgbase = 0;
711 			bytes -= req_len;
712 			started += req_len;
713 			user_addr += req_len;
714 			pos += req_len;
715 			count -= req_len;
716 			dreq->bytes_left -= req_len;
717 		}
718 		/* The nfs_page now hold references to these pages */
719 		nfs_direct_release_pages(pagevec, npages);
720 	} while (count != 0 && result >= 0);
721 
722 	kfree(pagevec);
723 
724 	if (started)
725 		return started;
726 	return result < 0 ? (ssize_t) result : -EFAULT;
727 }
728 
729 static void nfs_direct_write_completion(struct nfs_pgio_header *hdr)
730 {
731 	struct nfs_direct_req *dreq = hdr->dreq;
732 	struct nfs_commit_info cinfo;
733 	int bit = -1;
734 	struct nfs_page *req = nfs_list_entry(hdr->pages.next);
735 
736 	if (test_bit(NFS_IOHDR_REDO, &hdr->flags))
737 		goto out_put;
738 
739 	nfs_init_cinfo_from_dreq(&cinfo, dreq);
740 
741 	spin_lock(&dreq->lock);
742 
743 	if (test_bit(NFS_IOHDR_ERROR, &hdr->flags)) {
744 		dreq->flags = 0;
745 		dreq->error = hdr->error;
746 	}
747 	if (dreq->error != 0)
748 		bit = NFS_IOHDR_ERROR;
749 	else {
750 		dreq->count += hdr->good_bytes;
751 		if (test_bit(NFS_IOHDR_NEED_RESCHED, &hdr->flags)) {
752 			dreq->flags = NFS_ODIRECT_RESCHED_WRITES;
753 			bit = NFS_IOHDR_NEED_RESCHED;
754 		} else if (test_bit(NFS_IOHDR_NEED_COMMIT, &hdr->flags)) {
755 			if (dreq->flags == NFS_ODIRECT_RESCHED_WRITES)
756 				bit = NFS_IOHDR_NEED_RESCHED;
757 			else if (dreq->flags == 0) {
758 				memcpy(&dreq->verf, hdr->verf,
759 				       sizeof(dreq->verf));
760 				bit = NFS_IOHDR_NEED_COMMIT;
761 				dreq->flags = NFS_ODIRECT_DO_COMMIT;
762 			} else if (dreq->flags == NFS_ODIRECT_DO_COMMIT) {
763 				if (memcmp(&dreq->verf, hdr->verf, sizeof(dreq->verf))) {
764 					dreq->flags = NFS_ODIRECT_RESCHED_WRITES;
765 					bit = NFS_IOHDR_NEED_RESCHED;
766 				} else
767 					bit = NFS_IOHDR_NEED_COMMIT;
768 			}
769 		}
770 	}
771 	spin_unlock(&dreq->lock);
772 
773 	while (!list_empty(&hdr->pages)) {
774 		req = nfs_list_entry(hdr->pages.next);
775 		nfs_list_remove_request(req);
776 		switch (bit) {
777 		case NFS_IOHDR_NEED_RESCHED:
778 		case NFS_IOHDR_NEED_COMMIT:
779 			kref_get(&req->wb_kref);
780 			nfs_mark_request_commit(req, hdr->lseg, &cinfo);
781 		}
782 		nfs_unlock_and_release_request(req);
783 	}
784 
785 out_put:
786 	if (put_dreq(dreq))
787 		nfs_direct_write_complete(dreq, hdr->inode);
788 	hdr->release(hdr);
789 }
790 
791 static void nfs_write_sync_pgio_error(struct list_head *head)
792 {
793 	struct nfs_page *req;
794 
795 	while (!list_empty(head)) {
796 		req = nfs_list_entry(head->next);
797 		nfs_list_remove_request(req);
798 		nfs_unlock_and_release_request(req);
799 	}
800 }
801 
802 static const struct nfs_pgio_completion_ops nfs_direct_write_completion_ops = {
803 	.error_cleanup = nfs_write_sync_pgio_error,
804 	.init_hdr = nfs_direct_pgio_init,
805 	.completion = nfs_direct_write_completion,
806 };
807 
808 static ssize_t nfs_direct_write_schedule_iovec(struct nfs_direct_req *dreq,
809 					       const struct iovec *iov,
810 					       unsigned long nr_segs,
811 					       loff_t pos, bool uio)
812 {
813 	struct nfs_pageio_descriptor desc;
814 	struct inode *inode = dreq->inode;
815 	ssize_t result = 0;
816 	size_t requested_bytes = 0;
817 	unsigned long seg;
818 
819 	NFS_PROTO(inode)->write_pageio_init(&desc, inode, FLUSH_COND_STABLE,
820 			      &nfs_direct_write_completion_ops);
821 	desc.pg_dreq = dreq;
822 	get_dreq(dreq);
823 	atomic_inc(&inode->i_dio_count);
824 
825 	NFS_I(dreq->inode)->write_io += iov_length(iov, nr_segs);
826 	for (seg = 0; seg < nr_segs; seg++) {
827 		const struct iovec *vec = &iov[seg];
828 		result = nfs_direct_write_schedule_segment(&desc, vec, pos, uio);
829 		if (result < 0)
830 			break;
831 		requested_bytes += result;
832 		if ((size_t)result < vec->iov_len)
833 			break;
834 		pos += vec->iov_len;
835 	}
836 	nfs_pageio_complete(&desc);
837 
838 	/*
839 	 * If no bytes were started, return the error, and let the
840 	 * generic layer handle the completion.
841 	 */
842 	if (requested_bytes == 0) {
843 		inode_dio_done(inode);
844 		nfs_direct_req_release(dreq);
845 		return result < 0 ? result : -EIO;
846 	}
847 
848 	if (put_dreq(dreq))
849 		nfs_direct_write_complete(dreq, dreq->inode);
850 	return 0;
851 }
852 
853 static ssize_t nfs_direct_write(struct kiocb *iocb, const struct iovec *iov,
854 				unsigned long nr_segs, loff_t pos,
855 				size_t count, bool uio)
856 {
857 	ssize_t result = -ENOMEM;
858 	struct inode *inode = iocb->ki_filp->f_mapping->host;
859 	struct nfs_direct_req *dreq;
860 	struct nfs_lock_context *l_ctx;
861 
862 	dreq = nfs_direct_req_alloc();
863 	if (!dreq)
864 		goto out;
865 
866 	dreq->inode = inode;
867 	dreq->bytes_left = count;
868 	dreq->ctx = get_nfs_open_context(nfs_file_open_context(iocb->ki_filp));
869 	l_ctx = nfs_get_lock_context(dreq->ctx);
870 	if (IS_ERR(l_ctx)) {
871 		result = PTR_ERR(l_ctx);
872 		goto out_release;
873 	}
874 	dreq->l_ctx = l_ctx;
875 	if (!is_sync_kiocb(iocb))
876 		dreq->iocb = iocb;
877 
878 	result = nfs_direct_write_schedule_iovec(dreq, iov, nr_segs, pos, uio);
879 	if (!result)
880 		result = nfs_direct_wait(dreq);
881 out_release:
882 	nfs_direct_req_release(dreq);
883 out:
884 	return result;
885 }
886 
887 /**
888  * nfs_file_direct_read - file direct read operation for NFS files
889  * @iocb: target I/O control block
890  * @iov: vector of user buffers into which to read data
891  * @nr_segs: size of iov vector
892  * @pos: byte offset in file where reading starts
893  *
894  * We use this function for direct reads instead of calling
895  * generic_file_aio_read() in order to avoid gfar's check to see if
896  * the request starts before the end of the file.  For that check
897  * to work, we must generate a GETATTR before each direct read, and
898  * even then there is a window between the GETATTR and the subsequent
899  * READ where the file size could change.  Our preference is simply
900  * to do all reads the application wants, and the server will take
901  * care of managing the end of file boundary.
902  *
903  * This function also eliminates unnecessarily updating the file's
904  * atime locally, as the NFS server sets the file's atime, and this
905  * client must read the updated atime from the server back into its
906  * cache.
907  */
908 ssize_t nfs_file_direct_read(struct kiocb *iocb, const struct iovec *iov,
909 				unsigned long nr_segs, loff_t pos, bool uio)
910 {
911 	ssize_t retval = -EINVAL;
912 	struct file *file = iocb->ki_filp;
913 	struct address_space *mapping = file->f_mapping;
914 	size_t count;
915 
916 	count = iov_length(iov, nr_segs);
917 	nfs_add_stats(mapping->host, NFSIOS_DIRECTREADBYTES, count);
918 
919 	dfprintk(FILE, "NFS: direct read(%s/%s, %zd@%Ld)\n",
920 		file->f_path.dentry->d_parent->d_name.name,
921 		file->f_path.dentry->d_name.name,
922 		count, (long long) pos);
923 
924 	retval = 0;
925 	if (!count)
926 		goto out;
927 
928 	retval = nfs_sync_mapping(mapping);
929 	if (retval)
930 		goto out;
931 
932 	task_io_account_read(count);
933 
934 	retval = nfs_direct_read(iocb, iov, nr_segs, pos, uio);
935 	if (retval > 0)
936 		iocb->ki_pos = pos + retval;
937 
938 out:
939 	return retval;
940 }
941 
942 /**
943  * nfs_file_direct_write - file direct write operation for NFS files
944  * @iocb: target I/O control block
945  * @iov: vector of user buffers from which to write data
946  * @nr_segs: size of iov vector
947  * @pos: byte offset in file where writing starts
948  *
949  * We use this function for direct writes instead of calling
950  * generic_file_aio_write() in order to avoid taking the inode
951  * semaphore and updating the i_size.  The NFS server will set
952  * the new i_size and this client must read the updated size
953  * back into its cache.  We let the server do generic write
954  * parameter checking and report problems.
955  *
956  * We eliminate local atime updates, see direct read above.
957  *
958  * We avoid unnecessary page cache invalidations for normal cached
959  * readers of this file.
960  *
961  * Note that O_APPEND is not supported for NFS direct writes, as there
962  * is no atomic O_APPEND write facility in the NFS protocol.
963  */
964 ssize_t nfs_file_direct_write(struct kiocb *iocb, const struct iovec *iov,
965 				unsigned long nr_segs, loff_t pos, bool uio)
966 {
967 	ssize_t retval = -EINVAL;
968 	struct file *file = iocb->ki_filp;
969 	struct address_space *mapping = file->f_mapping;
970 	size_t count;
971 
972 	count = iov_length(iov, nr_segs);
973 	nfs_add_stats(mapping->host, NFSIOS_DIRECTWRITTENBYTES, count);
974 
975 	dfprintk(FILE, "NFS: direct write(%s/%s, %zd@%Ld)\n",
976 		file->f_path.dentry->d_parent->d_name.name,
977 		file->f_path.dentry->d_name.name,
978 		count, (long long) pos);
979 
980 	retval = generic_write_checks(file, &pos, &count, 0);
981 	if (retval)
982 		goto out;
983 
984 	retval = -EINVAL;
985 	if ((ssize_t) count < 0)
986 		goto out;
987 	retval = 0;
988 	if (!count)
989 		goto out;
990 
991 	retval = nfs_sync_mapping(mapping);
992 	if (retval)
993 		goto out;
994 
995 	task_io_account_write(count);
996 
997 	retval = nfs_direct_write(iocb, iov, nr_segs, pos, count, uio);
998 	if (retval > 0) {
999 		struct inode *inode = mapping->host;
1000 
1001 		iocb->ki_pos = pos + retval;
1002 		spin_lock(&inode->i_lock);
1003 		if (i_size_read(inode) < iocb->ki_pos)
1004 			i_size_write(inode, iocb->ki_pos);
1005 		spin_unlock(&inode->i_lock);
1006 	}
1007 out:
1008 	return retval;
1009 }
1010 
1011 /**
1012  * nfs_init_directcache - create a slab cache for nfs_direct_req structures
1013  *
1014  */
1015 int __init nfs_init_directcache(void)
1016 {
1017 	nfs_direct_cachep = kmem_cache_create("nfs_direct_cache",
1018 						sizeof(struct nfs_direct_req),
1019 						0, (SLAB_RECLAIM_ACCOUNT|
1020 							SLAB_MEM_SPREAD),
1021 						NULL);
1022 	if (nfs_direct_cachep == NULL)
1023 		return -ENOMEM;
1024 
1025 	return 0;
1026 }
1027 
1028 /**
1029  * nfs_destroy_directcache - destroy the slab cache for nfs_direct_req structures
1030  *
1031  */
1032 void nfs_destroy_directcache(void)
1033 {
1034 	kmem_cache_destroy(nfs_direct_cachep);
1035 }
1036