xref: /openbmc/linux/fs/nfs/direct.c (revision fb5f7f20cdb91f8ef985aef09fa2217c49c38396)
1 // SPDX-License-Identifier: GPL-2.0-only
2 /*
3  * linux/fs/nfs/direct.c
4  *
5  * Copyright (C) 2003 by Chuck Lever <cel@netapp.com>
6  *
7  * High-performance uncached I/O for the Linux NFS client
8  *
9  * There are important applications whose performance or correctness
10  * depends on uncached access to file data.  Database clusters
11  * (multiple copies of the same instance running on separate hosts)
12  * implement their own cache coherency protocol that subsumes file
13  * system cache protocols.  Applications that process datasets
14  * considerably larger than the client's memory do not always benefit
15  * from a local cache.  A streaming video server, for instance, has no
16  * need to cache the contents of a file.
17  *
18  * When an application requests uncached I/O, all read and write requests
19  * are made directly to the server; data stored or fetched via these
20  * requests is not cached in the Linux page cache.  The client does not
21  * correct unaligned requests from applications.  All requested bytes are
22  * held on permanent storage before a direct write system call returns to
23  * an application.
24  *
25  * Solaris implements an uncached I/O facility called directio() that
26  * is used for backups and sequential I/O to very large files.  Solaris
27  * also supports uncaching whole NFS partitions with "-o forcedirectio,"
28  * an undocumented mount option.
29  *
30  * Designed by Jeff Kimmel, Chuck Lever, and Trond Myklebust, with
31  * help from Andrew Morton.
32  *
33  * 18 Dec 2001	Initial implementation for 2.4  --cel
34  * 08 Jul 2002	Version for 2.4.19, with bug fixes --trondmy
35  * 08 Jun 2003	Port to 2.5 APIs  --cel
36  * 31 Mar 2004	Handle direct I/O without VFS support  --cel
37  * 15 Sep 2004	Parallel async reads  --cel
38  * 04 May 2005	support O_DIRECT with aio  --cel
39  *
40  */
41 
42 #include <linux/errno.h>
43 #include <linux/sched.h>
44 #include <linux/kernel.h>
45 #include <linux/file.h>
46 #include <linux/pagemap.h>
47 #include <linux/kref.h>
48 #include <linux/slab.h>
49 #include <linux/task_io_accounting_ops.h>
50 #include <linux/module.h>
51 
52 #include <linux/nfs_fs.h>
53 #include <linux/nfs_page.h>
54 #include <linux/sunrpc/clnt.h>
55 
56 #include <linux/uaccess.h>
57 #include <linux/atomic.h>
58 
59 #include "internal.h"
60 #include "iostat.h"
61 #include "pnfs.h"
62 
63 #define NFSDBG_FACILITY		NFSDBG_VFS
64 
65 static struct kmem_cache *nfs_direct_cachep;
66 
67 struct nfs_direct_req {
68 	struct kref		kref;		/* release manager */
69 
70 	/* I/O parameters */
71 	struct nfs_open_context	*ctx;		/* file open context info */
72 	struct nfs_lock_context *l_ctx;		/* Lock context info */
73 	struct kiocb *		iocb;		/* controlling i/o request */
74 	struct inode *		inode;		/* target file of i/o */
75 
76 	/* completion state */
77 	atomic_t		io_count;	/* i/os we're waiting for */
78 	spinlock_t		lock;		/* protect completion state */
79 
80 	loff_t			io_start;	/* Start offset for I/O */
81 	ssize_t			count,		/* bytes actually processed */
82 				max_count,	/* max expected count */
83 				bytes_left,	/* bytes left to be sent */
84 				error;		/* any reported error */
85 	struct completion	completion;	/* wait for i/o completion */
86 
87 	/* commit state */
88 	struct nfs_mds_commit_info mds_cinfo;	/* Storage for cinfo */
89 	struct pnfs_ds_commit_info ds_cinfo;	/* Storage for cinfo */
90 	struct work_struct	work;
91 	int			flags;
92 	/* for write */
93 #define NFS_ODIRECT_DO_COMMIT		(1)	/* an unstable reply was received */
94 #define NFS_ODIRECT_RESCHED_WRITES	(2)	/* write verification failed */
95 	/* for read */
96 #define NFS_ODIRECT_SHOULD_DIRTY	(3)	/* dirty user-space page after read */
97 #define NFS_ODIRECT_DONE		INT_MAX	/* write verification failed */
98 	struct nfs_writeverf	verf;		/* unstable write verifier */
99 };
100 
101 static const struct nfs_pgio_completion_ops nfs_direct_write_completion_ops;
102 static const struct nfs_commit_completion_ops nfs_direct_commit_completion_ops;
103 static void nfs_direct_write_complete(struct nfs_direct_req *dreq);
104 static void nfs_direct_write_schedule_work(struct work_struct *work);
105 
106 static inline void get_dreq(struct nfs_direct_req *dreq)
107 {
108 	atomic_inc(&dreq->io_count);
109 }
110 
111 static inline int put_dreq(struct nfs_direct_req *dreq)
112 {
113 	return atomic_dec_and_test(&dreq->io_count);
114 }
115 
116 static void
117 nfs_direct_handle_truncated(struct nfs_direct_req *dreq,
118 			    const struct nfs_pgio_header *hdr,
119 			    ssize_t dreq_len)
120 {
121 	if (!(test_bit(NFS_IOHDR_ERROR, &hdr->flags) ||
122 	      test_bit(NFS_IOHDR_EOF, &hdr->flags)))
123 		return;
124 	if (dreq->max_count >= dreq_len) {
125 		dreq->max_count = dreq_len;
126 		if (dreq->count > dreq_len)
127 			dreq->count = dreq_len;
128 
129 		if (test_bit(NFS_IOHDR_ERROR, &hdr->flags))
130 			dreq->error = hdr->error;
131 		else /* Clear outstanding error if this is EOF */
132 			dreq->error = 0;
133 	}
134 }
135 
136 static void
137 nfs_direct_count_bytes(struct nfs_direct_req *dreq,
138 		       const struct nfs_pgio_header *hdr)
139 {
140 	loff_t hdr_end = hdr->io_start + hdr->good_bytes;
141 	ssize_t dreq_len = 0;
142 
143 	if (hdr_end > dreq->io_start)
144 		dreq_len = hdr_end - dreq->io_start;
145 
146 	nfs_direct_handle_truncated(dreq, hdr, dreq_len);
147 
148 	if (dreq_len > dreq->max_count)
149 		dreq_len = dreq->max_count;
150 
151 	if (dreq->count < dreq_len)
152 		dreq->count = dreq_len;
153 }
154 
155 /*
156  * nfs_direct_select_verf - select the right verifier
157  * @dreq - direct request possibly spanning multiple servers
158  * @ds_clp - nfs_client of data server or NULL if MDS / non-pnfs
159  * @commit_idx - commit bucket index for the DS
160  *
161  * returns the correct verifier to use given the role of the server
162  */
163 static struct nfs_writeverf *
164 nfs_direct_select_verf(struct nfs_direct_req *dreq,
165 		       struct nfs_client *ds_clp,
166 		       int commit_idx)
167 {
168 	struct nfs_writeverf *verfp = &dreq->verf;
169 
170 #ifdef CONFIG_NFS_V4_1
171 	/*
172 	 * pNFS is in use, use the DS verf except commit_through_mds is set
173 	 * for layout segment where nbuckets is zero.
174 	 */
175 	if (ds_clp && dreq->ds_cinfo.nbuckets > 0) {
176 		if (commit_idx >= 0 && commit_idx < dreq->ds_cinfo.nbuckets)
177 			verfp = &dreq->ds_cinfo.buckets[commit_idx].direct_verf;
178 		else
179 			WARN_ON_ONCE(1);
180 	}
181 #endif
182 	return verfp;
183 }
184 
185 
186 /*
187  * nfs_direct_set_hdr_verf - set the write/commit verifier
188  * @dreq - direct request possibly spanning multiple servers
189  * @hdr - pageio header to validate against previously seen verfs
190  *
191  * Set the server's (MDS or DS) "seen" verifier
192  */
193 static void nfs_direct_set_hdr_verf(struct nfs_direct_req *dreq,
194 				    struct nfs_pgio_header *hdr)
195 {
196 	struct nfs_writeverf *verfp;
197 
198 	verfp = nfs_direct_select_verf(dreq, hdr->ds_clp, hdr->ds_commit_idx);
199 	WARN_ON_ONCE(verfp->committed >= 0);
200 	memcpy(verfp, &hdr->verf, sizeof(struct nfs_writeverf));
201 	WARN_ON_ONCE(verfp->committed < 0);
202 }
203 
204 static int nfs_direct_cmp_verf(const struct nfs_writeverf *v1,
205 		const struct nfs_writeverf *v2)
206 {
207 	return nfs_write_verifier_cmp(&v1->verifier, &v2->verifier);
208 }
209 
210 /*
211  * nfs_direct_cmp_hdr_verf - compare verifier for pgio header
212  * @dreq - direct request possibly spanning multiple servers
213  * @hdr - pageio header to validate against previously seen verf
214  *
215  * set the server's "seen" verf if not initialized.
216  * returns result of comparison between @hdr->verf and the "seen"
217  * verf of the server used by @hdr (DS or MDS)
218  */
219 static int nfs_direct_set_or_cmp_hdr_verf(struct nfs_direct_req *dreq,
220 					  struct nfs_pgio_header *hdr)
221 {
222 	struct nfs_writeverf *verfp;
223 
224 	verfp = nfs_direct_select_verf(dreq, hdr->ds_clp, hdr->ds_commit_idx);
225 	if (verfp->committed < 0) {
226 		nfs_direct_set_hdr_verf(dreq, hdr);
227 		return 0;
228 	}
229 	return nfs_direct_cmp_verf(verfp, &hdr->verf);
230 }
231 
232 /*
233  * nfs_direct_cmp_commit_data_verf - compare verifier for commit data
234  * @dreq - direct request possibly spanning multiple servers
235  * @data - commit data to validate against previously seen verf
236  *
237  * returns result of comparison between @data->verf and the verf of
238  * the server used by @data (DS or MDS)
239  */
240 static int nfs_direct_cmp_commit_data_verf(struct nfs_direct_req *dreq,
241 					   struct nfs_commit_data *data)
242 {
243 	struct nfs_writeverf *verfp;
244 
245 	verfp = nfs_direct_select_verf(dreq, data->ds_clp,
246 					 data->ds_commit_index);
247 
248 	/* verifier not set so always fail */
249 	if (verfp->committed < 0 || data->res.verf->committed <= NFS_UNSTABLE)
250 		return 1;
251 
252 	return nfs_direct_cmp_verf(verfp, data->res.verf);
253 }
254 
255 /**
256  * nfs_direct_IO - NFS address space operation for direct I/O
257  * @iocb: target I/O control block
258  * @iter: I/O buffer
259  *
260  * The presence of this routine in the address space ops vector means
261  * the NFS client supports direct I/O. However, for most direct IO, we
262  * shunt off direct read and write requests before the VFS gets them,
263  * so this method is only ever called for swap.
264  */
265 ssize_t nfs_direct_IO(struct kiocb *iocb, struct iov_iter *iter)
266 {
267 	struct inode *inode = iocb->ki_filp->f_mapping->host;
268 
269 	/* we only support swap file calling nfs_direct_IO */
270 	if (!IS_SWAPFILE(inode))
271 		return 0;
272 
273 	VM_BUG_ON(iov_iter_count(iter) != PAGE_SIZE);
274 
275 	if (iov_iter_rw(iter) == READ)
276 		return nfs_file_direct_read(iocb, iter);
277 	return nfs_file_direct_write(iocb, iter);
278 }
279 
280 static void nfs_direct_release_pages(struct page **pages, unsigned int npages)
281 {
282 	unsigned int i;
283 	for (i = 0; i < npages; i++)
284 		put_page(pages[i]);
285 }
286 
287 void nfs_init_cinfo_from_dreq(struct nfs_commit_info *cinfo,
288 			      struct nfs_direct_req *dreq)
289 {
290 	cinfo->inode = dreq->inode;
291 	cinfo->mds = &dreq->mds_cinfo;
292 	cinfo->ds = &dreq->ds_cinfo;
293 	cinfo->dreq = dreq;
294 	cinfo->completion_ops = &nfs_direct_commit_completion_ops;
295 }
296 
297 static inline struct nfs_direct_req *nfs_direct_req_alloc(void)
298 {
299 	struct nfs_direct_req *dreq;
300 
301 	dreq = kmem_cache_zalloc(nfs_direct_cachep, GFP_KERNEL);
302 	if (!dreq)
303 		return NULL;
304 
305 	kref_init(&dreq->kref);
306 	kref_get(&dreq->kref);
307 	init_completion(&dreq->completion);
308 	INIT_LIST_HEAD(&dreq->mds_cinfo.list);
309 	pnfs_init_ds_commit_info(&dreq->ds_cinfo);
310 	dreq->verf.committed = NFS_INVALID_STABLE_HOW;	/* not set yet */
311 	INIT_WORK(&dreq->work, nfs_direct_write_schedule_work);
312 	spin_lock_init(&dreq->lock);
313 
314 	return dreq;
315 }
316 
317 static void nfs_direct_req_free(struct kref *kref)
318 {
319 	struct nfs_direct_req *dreq = container_of(kref, struct nfs_direct_req, kref);
320 
321 	pnfs_release_ds_info(&dreq->ds_cinfo, dreq->inode);
322 	nfs_free_pnfs_ds_cinfo(&dreq->ds_cinfo);
323 	if (dreq->l_ctx != NULL)
324 		nfs_put_lock_context(dreq->l_ctx);
325 	if (dreq->ctx != NULL)
326 		put_nfs_open_context(dreq->ctx);
327 	kmem_cache_free(nfs_direct_cachep, dreq);
328 }
329 
330 static void nfs_direct_req_release(struct nfs_direct_req *dreq)
331 {
332 	kref_put(&dreq->kref, nfs_direct_req_free);
333 }
334 
335 ssize_t nfs_dreq_bytes_left(struct nfs_direct_req *dreq)
336 {
337 	return dreq->bytes_left;
338 }
339 EXPORT_SYMBOL_GPL(nfs_dreq_bytes_left);
340 
341 /*
342  * Collects and returns the final error value/byte-count.
343  */
344 static ssize_t nfs_direct_wait(struct nfs_direct_req *dreq)
345 {
346 	ssize_t result = -EIOCBQUEUED;
347 
348 	/* Async requests don't wait here */
349 	if (dreq->iocb)
350 		goto out;
351 
352 	result = wait_for_completion_killable(&dreq->completion);
353 
354 	if (!result) {
355 		result = dreq->count;
356 		WARN_ON_ONCE(dreq->count < 0);
357 	}
358 	if (!result)
359 		result = dreq->error;
360 
361 out:
362 	return (ssize_t) result;
363 }
364 
365 /*
366  * Synchronous I/O uses a stack-allocated iocb.  Thus we can't trust
367  * the iocb is still valid here if this is a synchronous request.
368  */
369 static void nfs_direct_complete(struct nfs_direct_req *dreq)
370 {
371 	struct inode *inode = dreq->inode;
372 
373 	inode_dio_end(inode);
374 
375 	if (dreq->iocb) {
376 		long res = (long) dreq->error;
377 		if (dreq->count != 0) {
378 			res = (long) dreq->count;
379 			WARN_ON_ONCE(dreq->count < 0);
380 		}
381 		dreq->iocb->ki_complete(dreq->iocb, res, 0);
382 	}
383 
384 	complete(&dreq->completion);
385 
386 	nfs_direct_req_release(dreq);
387 }
388 
389 static void nfs_direct_read_completion(struct nfs_pgio_header *hdr)
390 {
391 	unsigned long bytes = 0;
392 	struct nfs_direct_req *dreq = hdr->dreq;
393 
394 	spin_lock(&dreq->lock);
395 	if (test_bit(NFS_IOHDR_REDO, &hdr->flags)) {
396 		spin_unlock(&dreq->lock);
397 		goto out_put;
398 	}
399 
400 	nfs_direct_count_bytes(dreq, hdr);
401 	spin_unlock(&dreq->lock);
402 
403 	while (!list_empty(&hdr->pages)) {
404 		struct nfs_page *req = nfs_list_entry(hdr->pages.next);
405 		struct page *page = req->wb_page;
406 
407 		if (!PageCompound(page) && bytes < hdr->good_bytes &&
408 		    (dreq->flags == NFS_ODIRECT_SHOULD_DIRTY))
409 			set_page_dirty(page);
410 		bytes += req->wb_bytes;
411 		nfs_list_remove_request(req);
412 		nfs_release_request(req);
413 	}
414 out_put:
415 	if (put_dreq(dreq))
416 		nfs_direct_complete(dreq);
417 	hdr->release(hdr);
418 }
419 
420 static void nfs_read_sync_pgio_error(struct list_head *head, int error)
421 {
422 	struct nfs_page *req;
423 
424 	while (!list_empty(head)) {
425 		req = nfs_list_entry(head->next);
426 		nfs_list_remove_request(req);
427 		nfs_release_request(req);
428 	}
429 }
430 
431 static void nfs_direct_pgio_init(struct nfs_pgio_header *hdr)
432 {
433 	get_dreq(hdr->dreq);
434 }
435 
436 static const struct nfs_pgio_completion_ops nfs_direct_read_completion_ops = {
437 	.error_cleanup = nfs_read_sync_pgio_error,
438 	.init_hdr = nfs_direct_pgio_init,
439 	.completion = nfs_direct_read_completion,
440 };
441 
442 /*
443  * For each rsize'd chunk of the user's buffer, dispatch an NFS READ
444  * operation.  If nfs_readdata_alloc() or get_user_pages() fails,
445  * bail and stop sending more reads.  Read length accounting is
446  * handled automatically by nfs_direct_read_result().  Otherwise, if
447  * no requests have been sent, just return an error.
448  */
449 
450 static ssize_t nfs_direct_read_schedule_iovec(struct nfs_direct_req *dreq,
451 					      struct iov_iter *iter,
452 					      loff_t pos)
453 {
454 	struct nfs_pageio_descriptor desc;
455 	struct inode *inode = dreq->inode;
456 	ssize_t result = -EINVAL;
457 	size_t requested_bytes = 0;
458 	size_t rsize = max_t(size_t, NFS_SERVER(inode)->rsize, PAGE_SIZE);
459 
460 	nfs_pageio_init_read(&desc, dreq->inode, false,
461 			     &nfs_direct_read_completion_ops);
462 	get_dreq(dreq);
463 	desc.pg_dreq = dreq;
464 	inode_dio_begin(inode);
465 
466 	while (iov_iter_count(iter)) {
467 		struct page **pagevec;
468 		size_t bytes;
469 		size_t pgbase;
470 		unsigned npages, i;
471 
472 		result = iov_iter_get_pages_alloc(iter, &pagevec,
473 						  rsize, &pgbase);
474 		if (result < 0)
475 			break;
476 
477 		bytes = result;
478 		iov_iter_advance(iter, bytes);
479 		npages = (result + pgbase + PAGE_SIZE - 1) / PAGE_SIZE;
480 		for (i = 0; i < npages; i++) {
481 			struct nfs_page *req;
482 			unsigned int req_len = min_t(size_t, bytes, PAGE_SIZE - pgbase);
483 			/* XXX do we need to do the eof zeroing found in async_filler? */
484 			req = nfs_create_request(dreq->ctx, pagevec[i],
485 						 pgbase, req_len);
486 			if (IS_ERR(req)) {
487 				result = PTR_ERR(req);
488 				break;
489 			}
490 			req->wb_index = pos >> PAGE_SHIFT;
491 			req->wb_offset = pos & ~PAGE_MASK;
492 			if (!nfs_pageio_add_request(&desc, req)) {
493 				result = desc.pg_error;
494 				nfs_release_request(req);
495 				break;
496 			}
497 			pgbase = 0;
498 			bytes -= req_len;
499 			requested_bytes += req_len;
500 			pos += req_len;
501 			dreq->bytes_left -= req_len;
502 		}
503 		nfs_direct_release_pages(pagevec, npages);
504 		kvfree(pagevec);
505 		if (result < 0)
506 			break;
507 	}
508 
509 	nfs_pageio_complete(&desc);
510 
511 	/*
512 	 * If no bytes were started, return the error, and let the
513 	 * generic layer handle the completion.
514 	 */
515 	if (requested_bytes == 0) {
516 		inode_dio_end(inode);
517 		nfs_direct_req_release(dreq);
518 		return result < 0 ? result : -EIO;
519 	}
520 
521 	if (put_dreq(dreq))
522 		nfs_direct_complete(dreq);
523 	return requested_bytes;
524 }
525 
526 /**
527  * nfs_file_direct_read - file direct read operation for NFS files
528  * @iocb: target I/O control block
529  * @iter: vector of user buffers into which to read data
530  *
531  * We use this function for direct reads instead of calling
532  * generic_file_aio_read() in order to avoid gfar's check to see if
533  * the request starts before the end of the file.  For that check
534  * to work, we must generate a GETATTR before each direct read, and
535  * even then there is a window between the GETATTR and the subsequent
536  * READ where the file size could change.  Our preference is simply
537  * to do all reads the application wants, and the server will take
538  * care of managing the end of file boundary.
539  *
540  * This function also eliminates unnecessarily updating the file's
541  * atime locally, as the NFS server sets the file's atime, and this
542  * client must read the updated atime from the server back into its
543  * cache.
544  */
545 ssize_t nfs_file_direct_read(struct kiocb *iocb, struct iov_iter *iter)
546 {
547 	struct file *file = iocb->ki_filp;
548 	struct address_space *mapping = file->f_mapping;
549 	struct inode *inode = mapping->host;
550 	struct nfs_direct_req *dreq;
551 	struct nfs_lock_context *l_ctx;
552 	ssize_t result = -EINVAL, requested;
553 	size_t count = iov_iter_count(iter);
554 	nfs_add_stats(mapping->host, NFSIOS_DIRECTREADBYTES, count);
555 
556 	dfprintk(FILE, "NFS: direct read(%pD2, %zd@%Ld)\n",
557 		file, count, (long long) iocb->ki_pos);
558 
559 	result = 0;
560 	if (!count)
561 		goto out;
562 
563 	task_io_account_read(count);
564 
565 	result = -ENOMEM;
566 	dreq = nfs_direct_req_alloc();
567 	if (dreq == NULL)
568 		goto out;
569 
570 	dreq->inode = inode;
571 	dreq->bytes_left = dreq->max_count = count;
572 	dreq->io_start = iocb->ki_pos;
573 	dreq->ctx = get_nfs_open_context(nfs_file_open_context(iocb->ki_filp));
574 	l_ctx = nfs_get_lock_context(dreq->ctx);
575 	if (IS_ERR(l_ctx)) {
576 		result = PTR_ERR(l_ctx);
577 		nfs_direct_req_release(dreq);
578 		goto out_release;
579 	}
580 	dreq->l_ctx = l_ctx;
581 	if (!is_sync_kiocb(iocb))
582 		dreq->iocb = iocb;
583 
584 	if (iter_is_iovec(iter))
585 		dreq->flags = NFS_ODIRECT_SHOULD_DIRTY;
586 
587 	nfs_start_io_direct(inode);
588 
589 	NFS_I(inode)->read_io += count;
590 	requested = nfs_direct_read_schedule_iovec(dreq, iter, iocb->ki_pos);
591 
592 	nfs_end_io_direct(inode);
593 
594 	if (requested > 0) {
595 		result = nfs_direct_wait(dreq);
596 		if (result > 0) {
597 			requested -= result;
598 			iocb->ki_pos += result;
599 		}
600 		iov_iter_revert(iter, requested);
601 	} else {
602 		result = requested;
603 	}
604 
605 out_release:
606 	nfs_direct_req_release(dreq);
607 out:
608 	return result;
609 }
610 
611 static void
612 nfs_direct_write_scan_commit_list(struct inode *inode,
613 				  struct list_head *list,
614 				  struct nfs_commit_info *cinfo)
615 {
616 	mutex_lock(&NFS_I(cinfo->inode)->commit_mutex);
617 #ifdef CONFIG_NFS_V4_1
618 	if (cinfo->ds != NULL && cinfo->ds->nwritten != 0)
619 		NFS_SERVER(inode)->pnfs_curr_ld->recover_commit_reqs(list, cinfo);
620 #endif
621 	nfs_scan_commit_list(&cinfo->mds->list, list, cinfo, 0);
622 	mutex_unlock(&NFS_I(cinfo->inode)->commit_mutex);
623 }
624 
625 static void nfs_direct_write_reschedule(struct nfs_direct_req *dreq)
626 {
627 	struct nfs_pageio_descriptor desc;
628 	struct nfs_page *req, *tmp;
629 	LIST_HEAD(reqs);
630 	struct nfs_commit_info cinfo;
631 	LIST_HEAD(failed);
632 
633 	nfs_init_cinfo_from_dreq(&cinfo, dreq);
634 	nfs_direct_write_scan_commit_list(dreq->inode, &reqs, &cinfo);
635 
636 	dreq->count = 0;
637 	dreq->max_count = 0;
638 	list_for_each_entry(req, &reqs, wb_list)
639 		dreq->max_count += req->wb_bytes;
640 	dreq->verf.committed = NFS_INVALID_STABLE_HOW;
641 	nfs_clear_pnfs_ds_commit_verifiers(&dreq->ds_cinfo);
642 	get_dreq(dreq);
643 
644 	nfs_pageio_init_write(&desc, dreq->inode, FLUSH_STABLE, false,
645 			      &nfs_direct_write_completion_ops);
646 	desc.pg_dreq = dreq;
647 
648 	list_for_each_entry_safe(req, tmp, &reqs, wb_list) {
649 		/* Bump the transmission count */
650 		req->wb_nio++;
651 		if (!nfs_pageio_add_request(&desc, req)) {
652 			nfs_list_move_request(req, &failed);
653 			spin_lock(&cinfo.inode->i_lock);
654 			dreq->flags = 0;
655 			if (desc.pg_error < 0)
656 				dreq->error = desc.pg_error;
657 			else
658 				dreq->error = -EIO;
659 			spin_unlock(&cinfo.inode->i_lock);
660 		}
661 		nfs_release_request(req);
662 	}
663 	nfs_pageio_complete(&desc);
664 
665 	while (!list_empty(&failed)) {
666 		req = nfs_list_entry(failed.next);
667 		nfs_list_remove_request(req);
668 		nfs_unlock_and_release_request(req);
669 	}
670 
671 	if (put_dreq(dreq))
672 		nfs_direct_write_complete(dreq);
673 }
674 
675 static void nfs_direct_commit_complete(struct nfs_commit_data *data)
676 {
677 	struct nfs_direct_req *dreq = data->dreq;
678 	struct nfs_commit_info cinfo;
679 	struct nfs_page *req;
680 	int status = data->task.tk_status;
681 
682 	if (status < 0) {
683 		/* Errors in commit are fatal */
684 		dreq->error = status;
685 		dreq->max_count = 0;
686 		dreq->count = 0;
687 		dreq->flags = NFS_ODIRECT_DONE;
688 	} else if (dreq->flags == NFS_ODIRECT_DONE)
689 		status = dreq->error;
690 
691 	nfs_init_cinfo_from_dreq(&cinfo, dreq);
692 	if (nfs_direct_cmp_commit_data_verf(dreq, data))
693 		dreq->flags = NFS_ODIRECT_RESCHED_WRITES;
694 
695 	while (!list_empty(&data->pages)) {
696 		req = nfs_list_entry(data->pages.next);
697 		nfs_list_remove_request(req);
698 		if (dreq->flags == NFS_ODIRECT_RESCHED_WRITES) {
699 			/*
700 			 * Despite the reboot, the write was successful,
701 			 * so reset wb_nio.
702 			 */
703 			req->wb_nio = 0;
704 			/* Note the rewrite will go through mds */
705 			nfs_mark_request_commit(req, NULL, &cinfo, 0);
706 		} else
707 			nfs_release_request(req);
708 		nfs_unlock_and_release_request(req);
709 	}
710 
711 	if (atomic_dec_and_test(&cinfo.mds->rpcs_out))
712 		nfs_direct_write_complete(dreq);
713 }
714 
715 static void nfs_direct_resched_write(struct nfs_commit_info *cinfo,
716 		struct nfs_page *req)
717 {
718 	struct nfs_direct_req *dreq = cinfo->dreq;
719 
720 	spin_lock(&dreq->lock);
721 	if (dreq->flags != NFS_ODIRECT_DONE)
722 		dreq->flags = NFS_ODIRECT_RESCHED_WRITES;
723 	spin_unlock(&dreq->lock);
724 	nfs_mark_request_commit(req, NULL, cinfo, 0);
725 }
726 
727 static const struct nfs_commit_completion_ops nfs_direct_commit_completion_ops = {
728 	.completion = nfs_direct_commit_complete,
729 	.resched_write = nfs_direct_resched_write,
730 };
731 
732 static void nfs_direct_commit_schedule(struct nfs_direct_req *dreq)
733 {
734 	int res;
735 	struct nfs_commit_info cinfo;
736 	LIST_HEAD(mds_list);
737 
738 	nfs_init_cinfo_from_dreq(&cinfo, dreq);
739 	nfs_scan_commit(dreq->inode, &mds_list, &cinfo);
740 	res = nfs_generic_commit_list(dreq->inode, &mds_list, 0, &cinfo);
741 	if (res < 0) /* res == -ENOMEM */
742 		nfs_direct_write_reschedule(dreq);
743 }
744 
745 static void nfs_direct_write_clear_reqs(struct nfs_direct_req *dreq)
746 {
747 	struct nfs_commit_info cinfo;
748 	struct nfs_page *req;
749 	LIST_HEAD(reqs);
750 
751 	nfs_init_cinfo_from_dreq(&cinfo, dreq);
752 	nfs_direct_write_scan_commit_list(dreq->inode, &reqs, &cinfo);
753 
754 	while (!list_empty(&reqs)) {
755 		req = nfs_list_entry(reqs.next);
756 		nfs_list_remove_request(req);
757 		nfs_unlock_and_release_request(req);
758 	}
759 }
760 
761 static void nfs_direct_write_schedule_work(struct work_struct *work)
762 {
763 	struct nfs_direct_req *dreq = container_of(work, struct nfs_direct_req, work);
764 	int flags = dreq->flags;
765 
766 	dreq->flags = 0;
767 	switch (flags) {
768 		case NFS_ODIRECT_DO_COMMIT:
769 			nfs_direct_commit_schedule(dreq);
770 			break;
771 		case NFS_ODIRECT_RESCHED_WRITES:
772 			nfs_direct_write_reschedule(dreq);
773 			break;
774 		default:
775 			nfs_direct_write_clear_reqs(dreq);
776 			nfs_zap_mapping(dreq->inode, dreq->inode->i_mapping);
777 			nfs_direct_complete(dreq);
778 	}
779 }
780 
781 static void nfs_direct_write_complete(struct nfs_direct_req *dreq)
782 {
783 	queue_work(nfsiod_workqueue, &dreq->work); /* Calls nfs_direct_write_schedule_work */
784 }
785 
786 static void nfs_direct_write_completion(struct nfs_pgio_header *hdr)
787 {
788 	struct nfs_direct_req *dreq = hdr->dreq;
789 	struct nfs_commit_info cinfo;
790 	bool request_commit = false;
791 	struct nfs_page *req = nfs_list_entry(hdr->pages.next);
792 
793 	nfs_init_cinfo_from_dreq(&cinfo, dreq);
794 
795 	spin_lock(&dreq->lock);
796 	if (test_bit(NFS_IOHDR_REDO, &hdr->flags)) {
797 		spin_unlock(&dreq->lock);
798 		goto out_put;
799 	}
800 
801 	nfs_direct_count_bytes(dreq, hdr);
802 	if (hdr->good_bytes != 0) {
803 		if (nfs_write_need_commit(hdr)) {
804 			if (dreq->flags == NFS_ODIRECT_RESCHED_WRITES)
805 				request_commit = true;
806 			else if (dreq->flags == 0) {
807 				nfs_direct_set_hdr_verf(dreq, hdr);
808 				request_commit = true;
809 				dreq->flags = NFS_ODIRECT_DO_COMMIT;
810 			} else if (dreq->flags == NFS_ODIRECT_DO_COMMIT) {
811 				request_commit = true;
812 				if (nfs_direct_set_or_cmp_hdr_verf(dreq, hdr))
813 					dreq->flags =
814 						NFS_ODIRECT_RESCHED_WRITES;
815 			}
816 		}
817 	}
818 	spin_unlock(&dreq->lock);
819 
820 	while (!list_empty(&hdr->pages)) {
821 
822 		req = nfs_list_entry(hdr->pages.next);
823 		nfs_list_remove_request(req);
824 		if (request_commit) {
825 			kref_get(&req->wb_kref);
826 			nfs_mark_request_commit(req, hdr->lseg, &cinfo,
827 				hdr->ds_commit_idx);
828 		}
829 		nfs_unlock_and_release_request(req);
830 	}
831 
832 out_put:
833 	if (put_dreq(dreq))
834 		nfs_direct_write_complete(dreq);
835 	hdr->release(hdr);
836 }
837 
838 static void nfs_write_sync_pgio_error(struct list_head *head, int error)
839 {
840 	struct nfs_page *req;
841 
842 	while (!list_empty(head)) {
843 		req = nfs_list_entry(head->next);
844 		nfs_list_remove_request(req);
845 		nfs_unlock_and_release_request(req);
846 	}
847 }
848 
849 static void nfs_direct_write_reschedule_io(struct nfs_pgio_header *hdr)
850 {
851 	struct nfs_direct_req *dreq = hdr->dreq;
852 
853 	spin_lock(&dreq->lock);
854 	if (dreq->error == 0) {
855 		dreq->flags = NFS_ODIRECT_RESCHED_WRITES;
856 		/* fake unstable write to let common nfs resend pages */
857 		hdr->verf.committed = NFS_UNSTABLE;
858 		hdr->good_bytes = hdr->args.offset + hdr->args.count -
859 			hdr->io_start;
860 	}
861 	spin_unlock(&dreq->lock);
862 }
863 
864 static const struct nfs_pgio_completion_ops nfs_direct_write_completion_ops = {
865 	.error_cleanup = nfs_write_sync_pgio_error,
866 	.init_hdr = nfs_direct_pgio_init,
867 	.completion = nfs_direct_write_completion,
868 	.reschedule_io = nfs_direct_write_reschedule_io,
869 };
870 
871 
872 /*
873  * NB: Return the value of the first error return code.  Subsequent
874  *     errors after the first one are ignored.
875  */
876 /*
877  * For each wsize'd chunk of the user's buffer, dispatch an NFS WRITE
878  * operation.  If nfs_writedata_alloc() or get_user_pages() fails,
879  * bail and stop sending more writes.  Write length accounting is
880  * handled automatically by nfs_direct_write_result().  Otherwise, if
881  * no requests have been sent, just return an error.
882  */
883 static ssize_t nfs_direct_write_schedule_iovec(struct nfs_direct_req *dreq,
884 					       struct iov_iter *iter,
885 					       loff_t pos)
886 {
887 	struct nfs_pageio_descriptor desc;
888 	struct inode *inode = dreq->inode;
889 	ssize_t result = 0;
890 	size_t requested_bytes = 0;
891 	size_t wsize = max_t(size_t, NFS_SERVER(inode)->wsize, PAGE_SIZE);
892 
893 	nfs_pageio_init_write(&desc, inode, FLUSH_COND_STABLE, false,
894 			      &nfs_direct_write_completion_ops);
895 	desc.pg_dreq = dreq;
896 	get_dreq(dreq);
897 	inode_dio_begin(inode);
898 
899 	NFS_I(inode)->write_io += iov_iter_count(iter);
900 	while (iov_iter_count(iter)) {
901 		struct page **pagevec;
902 		size_t bytes;
903 		size_t pgbase;
904 		unsigned npages, i;
905 
906 		result = iov_iter_get_pages_alloc(iter, &pagevec,
907 						  wsize, &pgbase);
908 		if (result < 0)
909 			break;
910 
911 		bytes = result;
912 		iov_iter_advance(iter, bytes);
913 		npages = (result + pgbase + PAGE_SIZE - 1) / PAGE_SIZE;
914 		for (i = 0; i < npages; i++) {
915 			struct nfs_page *req;
916 			unsigned int req_len = min_t(size_t, bytes, PAGE_SIZE - pgbase);
917 
918 			req = nfs_create_request(dreq->ctx, pagevec[i],
919 						 pgbase, req_len);
920 			if (IS_ERR(req)) {
921 				result = PTR_ERR(req);
922 				break;
923 			}
924 
925 			if (desc.pg_error < 0) {
926 				nfs_free_request(req);
927 				result = desc.pg_error;
928 				break;
929 			}
930 
931 			nfs_lock_request(req);
932 			req->wb_index = pos >> PAGE_SHIFT;
933 			req->wb_offset = pos & ~PAGE_MASK;
934 			if (!nfs_pageio_add_request(&desc, req)) {
935 				result = desc.pg_error;
936 				nfs_unlock_and_release_request(req);
937 				break;
938 			}
939 			pgbase = 0;
940 			bytes -= req_len;
941 			requested_bytes += req_len;
942 			pos += req_len;
943 			dreq->bytes_left -= req_len;
944 		}
945 		nfs_direct_release_pages(pagevec, npages);
946 		kvfree(pagevec);
947 		if (result < 0)
948 			break;
949 	}
950 	nfs_pageio_complete(&desc);
951 
952 	/*
953 	 * If no bytes were started, return the error, and let the
954 	 * generic layer handle the completion.
955 	 */
956 	if (requested_bytes == 0) {
957 		inode_dio_end(inode);
958 		nfs_direct_req_release(dreq);
959 		return result < 0 ? result : -EIO;
960 	}
961 
962 	if (put_dreq(dreq))
963 		nfs_direct_write_complete(dreq);
964 	return requested_bytes;
965 }
966 
967 /**
968  * nfs_file_direct_write - file direct write operation for NFS files
969  * @iocb: target I/O control block
970  * @iter: vector of user buffers from which to write data
971  *
972  * We use this function for direct writes instead of calling
973  * generic_file_aio_write() in order to avoid taking the inode
974  * semaphore and updating the i_size.  The NFS server will set
975  * the new i_size and this client must read the updated size
976  * back into its cache.  We let the server do generic write
977  * parameter checking and report problems.
978  *
979  * We eliminate local atime updates, see direct read above.
980  *
981  * We avoid unnecessary page cache invalidations for normal cached
982  * readers of this file.
983  *
984  * Note that O_APPEND is not supported for NFS direct writes, as there
985  * is no atomic O_APPEND write facility in the NFS protocol.
986  */
987 ssize_t nfs_file_direct_write(struct kiocb *iocb, struct iov_iter *iter)
988 {
989 	ssize_t result = -EINVAL, requested;
990 	size_t count;
991 	struct file *file = iocb->ki_filp;
992 	struct address_space *mapping = file->f_mapping;
993 	struct inode *inode = mapping->host;
994 	struct nfs_direct_req *dreq;
995 	struct nfs_lock_context *l_ctx;
996 	loff_t pos, end;
997 
998 	dfprintk(FILE, "NFS: direct write(%pD2, %zd@%Ld)\n",
999 		file, iov_iter_count(iter), (long long) iocb->ki_pos);
1000 
1001 	result = generic_write_checks(iocb, iter);
1002 	if (result <= 0)
1003 		return result;
1004 	count = result;
1005 	nfs_add_stats(mapping->host, NFSIOS_DIRECTWRITTENBYTES, count);
1006 
1007 	pos = iocb->ki_pos;
1008 	end = (pos + iov_iter_count(iter) - 1) >> PAGE_SHIFT;
1009 
1010 	task_io_account_write(count);
1011 
1012 	result = -ENOMEM;
1013 	dreq = nfs_direct_req_alloc();
1014 	if (!dreq)
1015 		goto out;
1016 
1017 	dreq->inode = inode;
1018 	dreq->bytes_left = dreq->max_count = count;
1019 	dreq->io_start = pos;
1020 	dreq->ctx = get_nfs_open_context(nfs_file_open_context(iocb->ki_filp));
1021 	l_ctx = nfs_get_lock_context(dreq->ctx);
1022 	if (IS_ERR(l_ctx)) {
1023 		result = PTR_ERR(l_ctx);
1024 		nfs_direct_req_release(dreq);
1025 		goto out_release;
1026 	}
1027 	dreq->l_ctx = l_ctx;
1028 	if (!is_sync_kiocb(iocb))
1029 		dreq->iocb = iocb;
1030 
1031 	nfs_start_io_direct(inode);
1032 
1033 	requested = nfs_direct_write_schedule_iovec(dreq, iter, pos);
1034 
1035 	if (mapping->nrpages) {
1036 		invalidate_inode_pages2_range(mapping,
1037 					      pos >> PAGE_SHIFT, end);
1038 	}
1039 
1040 	nfs_end_io_direct(inode);
1041 
1042 	if (requested > 0) {
1043 		result = nfs_direct_wait(dreq);
1044 		if (result > 0) {
1045 			requested -= result;
1046 			iocb->ki_pos = pos + result;
1047 			/* XXX: should check the generic_write_sync retval */
1048 			generic_write_sync(iocb, result);
1049 		}
1050 		iov_iter_revert(iter, requested);
1051 	} else {
1052 		result = requested;
1053 	}
1054 out_release:
1055 	nfs_direct_req_release(dreq);
1056 out:
1057 	return result;
1058 }
1059 
1060 /**
1061  * nfs_init_directcache - create a slab cache for nfs_direct_req structures
1062  *
1063  */
1064 int __init nfs_init_directcache(void)
1065 {
1066 	nfs_direct_cachep = kmem_cache_create("nfs_direct_cache",
1067 						sizeof(struct nfs_direct_req),
1068 						0, (SLAB_RECLAIM_ACCOUNT|
1069 							SLAB_MEM_SPREAD),
1070 						NULL);
1071 	if (nfs_direct_cachep == NULL)
1072 		return -ENOMEM;
1073 
1074 	return 0;
1075 }
1076 
1077 /**
1078  * nfs_destroy_directcache - destroy the slab cache for nfs_direct_req structures
1079  *
1080  */
1081 void nfs_destroy_directcache(void)
1082 {
1083 	kmem_cache_destroy(nfs_direct_cachep);
1084 }
1085