xref: /openbmc/linux/fs/nfs/direct.c (revision d28a1de5)
1  // SPDX-License-Identifier: GPL-2.0-only
2  /*
3   * linux/fs/nfs/direct.c
4   *
5   * Copyright (C) 2003 by Chuck Lever <cel@netapp.com>
6   *
7   * High-performance uncached I/O for the Linux NFS client
8   *
9   * There are important applications whose performance or correctness
10   * depends on uncached access to file data.  Database clusters
11   * (multiple copies of the same instance running on separate hosts)
12   * implement their own cache coherency protocol that subsumes file
13   * system cache protocols.  Applications that process datasets
14   * considerably larger than the client's memory do not always benefit
15   * from a local cache.  A streaming video server, for instance, has no
16   * need to cache the contents of a file.
17   *
18   * When an application requests uncached I/O, all read and write requests
19   * are made directly to the server; data stored or fetched via these
20   * requests is not cached in the Linux page cache.  The client does not
21   * correct unaligned requests from applications.  All requested bytes are
22   * held on permanent storage before a direct write system call returns to
23   * an application.
24   *
25   * Solaris implements an uncached I/O facility called directio() that
26   * is used for backups and sequential I/O to very large files.  Solaris
27   * also supports uncaching whole NFS partitions with "-o forcedirectio,"
28   * an undocumented mount option.
29   *
30   * Designed by Jeff Kimmel, Chuck Lever, and Trond Myklebust, with
31   * help from Andrew Morton.
32   *
33   * 18 Dec 2001	Initial implementation for 2.4  --cel
34   * 08 Jul 2002	Version for 2.4.19, with bug fixes --trondmy
35   * 08 Jun 2003	Port to 2.5 APIs  --cel
36   * 31 Mar 2004	Handle direct I/O without VFS support  --cel
37   * 15 Sep 2004	Parallel async reads  --cel
38   * 04 May 2005	support O_DIRECT with aio  --cel
39   *
40   */
41  
42  #include <linux/errno.h>
43  #include <linux/sched.h>
44  #include <linux/kernel.h>
45  #include <linux/file.h>
46  #include <linux/pagemap.h>
47  #include <linux/kref.h>
48  #include <linux/slab.h>
49  #include <linux/task_io_accounting_ops.h>
50  #include <linux/module.h>
51  
52  #include <linux/nfs_fs.h>
53  #include <linux/nfs_page.h>
54  #include <linux/sunrpc/clnt.h>
55  
56  #include <linux/uaccess.h>
57  #include <linux/atomic.h>
58  
59  #include "internal.h"
60  #include "iostat.h"
61  #include "pnfs.h"
62  #include "fscache.h"
63  #include "nfstrace.h"
64  
65  #define NFSDBG_FACILITY		NFSDBG_VFS
66  
67  static struct kmem_cache *nfs_direct_cachep;
68  
69  static const struct nfs_pgio_completion_ops nfs_direct_write_completion_ops;
70  static const struct nfs_commit_completion_ops nfs_direct_commit_completion_ops;
71  static void nfs_direct_write_complete(struct nfs_direct_req *dreq);
72  static void nfs_direct_write_schedule_work(struct work_struct *work);
73  
74  static inline void get_dreq(struct nfs_direct_req *dreq)
75  {
76  	atomic_inc(&dreq->io_count);
77  }
78  
79  static inline int put_dreq(struct nfs_direct_req *dreq)
80  {
81  	return atomic_dec_and_test(&dreq->io_count);
82  }
83  
84  static void
85  nfs_direct_handle_truncated(struct nfs_direct_req *dreq,
86  			    const struct nfs_pgio_header *hdr,
87  			    ssize_t dreq_len)
88  {
89  	if (!(test_bit(NFS_IOHDR_ERROR, &hdr->flags) ||
90  	      test_bit(NFS_IOHDR_EOF, &hdr->flags)))
91  		return;
92  	if (dreq->max_count >= dreq_len) {
93  		dreq->max_count = dreq_len;
94  		if (dreq->count > dreq_len)
95  			dreq->count = dreq_len;
96  
97  		if (test_bit(NFS_IOHDR_ERROR, &hdr->flags))
98  			dreq->error = hdr->error;
99  		else /* Clear outstanding error if this is EOF */
100  			dreq->error = 0;
101  	}
102  }
103  
104  static void
105  nfs_direct_count_bytes(struct nfs_direct_req *dreq,
106  		       const struct nfs_pgio_header *hdr)
107  {
108  	loff_t hdr_end = hdr->io_start + hdr->good_bytes;
109  	ssize_t dreq_len = 0;
110  
111  	if (hdr_end > dreq->io_start)
112  		dreq_len = hdr_end - dreq->io_start;
113  
114  	nfs_direct_handle_truncated(dreq, hdr, dreq_len);
115  
116  	if (dreq_len > dreq->max_count)
117  		dreq_len = dreq->max_count;
118  
119  	if (dreq->count < dreq_len)
120  		dreq->count = dreq_len;
121  }
122  
123  /**
124   * nfs_swap_rw - NFS address space operation for swap I/O
125   * @iocb: target I/O control block
126   * @iter: I/O buffer
127   *
128   * Perform IO to the swap-file.  This is much like direct IO.
129   */
130  int nfs_swap_rw(struct kiocb *iocb, struct iov_iter *iter)
131  {
132  	ssize_t ret;
133  
134  	VM_BUG_ON(iov_iter_count(iter) != PAGE_SIZE);
135  
136  	if (iov_iter_rw(iter) == READ)
137  		ret = nfs_file_direct_read(iocb, iter, true);
138  	else
139  		ret = nfs_file_direct_write(iocb, iter, true);
140  	if (ret < 0)
141  		return ret;
142  	return 0;
143  }
144  
145  static void nfs_direct_release_pages(struct page **pages, unsigned int npages)
146  {
147  	unsigned int i;
148  	for (i = 0; i < npages; i++)
149  		put_page(pages[i]);
150  }
151  
152  void nfs_init_cinfo_from_dreq(struct nfs_commit_info *cinfo,
153  			      struct nfs_direct_req *dreq)
154  {
155  	cinfo->inode = dreq->inode;
156  	cinfo->mds = &dreq->mds_cinfo;
157  	cinfo->ds = &dreq->ds_cinfo;
158  	cinfo->dreq = dreq;
159  	cinfo->completion_ops = &nfs_direct_commit_completion_ops;
160  }
161  
162  static inline struct nfs_direct_req *nfs_direct_req_alloc(void)
163  {
164  	struct nfs_direct_req *dreq;
165  
166  	dreq = kmem_cache_zalloc(nfs_direct_cachep, GFP_KERNEL);
167  	if (!dreq)
168  		return NULL;
169  
170  	kref_init(&dreq->kref);
171  	kref_get(&dreq->kref);
172  	init_completion(&dreq->completion);
173  	INIT_LIST_HEAD(&dreq->mds_cinfo.list);
174  	pnfs_init_ds_commit_info(&dreq->ds_cinfo);
175  	INIT_WORK(&dreq->work, nfs_direct_write_schedule_work);
176  	spin_lock_init(&dreq->lock);
177  
178  	return dreq;
179  }
180  
181  static void nfs_direct_req_free(struct kref *kref)
182  {
183  	struct nfs_direct_req *dreq = container_of(kref, struct nfs_direct_req, kref);
184  
185  	pnfs_release_ds_info(&dreq->ds_cinfo, dreq->inode);
186  	if (dreq->l_ctx != NULL)
187  		nfs_put_lock_context(dreq->l_ctx);
188  	if (dreq->ctx != NULL)
189  		put_nfs_open_context(dreq->ctx);
190  	kmem_cache_free(nfs_direct_cachep, dreq);
191  }
192  
193  static void nfs_direct_req_release(struct nfs_direct_req *dreq)
194  {
195  	kref_put(&dreq->kref, nfs_direct_req_free);
196  }
197  
198  ssize_t nfs_dreq_bytes_left(struct nfs_direct_req *dreq)
199  {
200  	return dreq->bytes_left;
201  }
202  EXPORT_SYMBOL_GPL(nfs_dreq_bytes_left);
203  
204  /*
205   * Collects and returns the final error value/byte-count.
206   */
207  static ssize_t nfs_direct_wait(struct nfs_direct_req *dreq)
208  {
209  	ssize_t result = -EIOCBQUEUED;
210  
211  	/* Async requests don't wait here */
212  	if (dreq->iocb)
213  		goto out;
214  
215  	result = wait_for_completion_killable(&dreq->completion);
216  
217  	if (!result) {
218  		result = dreq->count;
219  		WARN_ON_ONCE(dreq->count < 0);
220  	}
221  	if (!result)
222  		result = dreq->error;
223  
224  out:
225  	return (ssize_t) result;
226  }
227  
228  /*
229   * Synchronous I/O uses a stack-allocated iocb.  Thus we can't trust
230   * the iocb is still valid here if this is a synchronous request.
231   */
232  static void nfs_direct_complete(struct nfs_direct_req *dreq)
233  {
234  	struct inode *inode = dreq->inode;
235  
236  	inode_dio_end(inode);
237  
238  	if (dreq->iocb) {
239  		long res = (long) dreq->error;
240  		if (dreq->count != 0) {
241  			res = (long) dreq->count;
242  			WARN_ON_ONCE(dreq->count < 0);
243  		}
244  		dreq->iocb->ki_complete(dreq->iocb, res);
245  	}
246  
247  	complete(&dreq->completion);
248  
249  	nfs_direct_req_release(dreq);
250  }
251  
252  static void nfs_direct_read_completion(struct nfs_pgio_header *hdr)
253  {
254  	unsigned long bytes = 0;
255  	struct nfs_direct_req *dreq = hdr->dreq;
256  
257  	spin_lock(&dreq->lock);
258  	if (test_bit(NFS_IOHDR_REDO, &hdr->flags)) {
259  		spin_unlock(&dreq->lock);
260  		goto out_put;
261  	}
262  
263  	nfs_direct_count_bytes(dreq, hdr);
264  	spin_unlock(&dreq->lock);
265  
266  	while (!list_empty(&hdr->pages)) {
267  		struct nfs_page *req = nfs_list_entry(hdr->pages.next);
268  		struct page *page = req->wb_page;
269  
270  		if (!PageCompound(page) && bytes < hdr->good_bytes &&
271  		    (dreq->flags == NFS_ODIRECT_SHOULD_DIRTY))
272  			set_page_dirty(page);
273  		bytes += req->wb_bytes;
274  		nfs_list_remove_request(req);
275  		nfs_release_request(req);
276  	}
277  out_put:
278  	if (put_dreq(dreq))
279  		nfs_direct_complete(dreq);
280  	hdr->release(hdr);
281  }
282  
283  static void nfs_read_sync_pgio_error(struct list_head *head, int error)
284  {
285  	struct nfs_page *req;
286  
287  	while (!list_empty(head)) {
288  		req = nfs_list_entry(head->next);
289  		nfs_list_remove_request(req);
290  		nfs_release_request(req);
291  	}
292  }
293  
294  static void nfs_direct_pgio_init(struct nfs_pgio_header *hdr)
295  {
296  	get_dreq(hdr->dreq);
297  }
298  
299  static const struct nfs_pgio_completion_ops nfs_direct_read_completion_ops = {
300  	.error_cleanup = nfs_read_sync_pgio_error,
301  	.init_hdr = nfs_direct_pgio_init,
302  	.completion = nfs_direct_read_completion,
303  };
304  
305  /*
306   * For each rsize'd chunk of the user's buffer, dispatch an NFS READ
307   * operation.  If nfs_readdata_alloc() or get_user_pages() fails,
308   * bail and stop sending more reads.  Read length accounting is
309   * handled automatically by nfs_direct_read_result().  Otherwise, if
310   * no requests have been sent, just return an error.
311   */
312  
313  static ssize_t nfs_direct_read_schedule_iovec(struct nfs_direct_req *dreq,
314  					      struct iov_iter *iter,
315  					      loff_t pos)
316  {
317  	struct nfs_pageio_descriptor desc;
318  	struct inode *inode = dreq->inode;
319  	ssize_t result = -EINVAL;
320  	size_t requested_bytes = 0;
321  	size_t rsize = max_t(size_t, NFS_SERVER(inode)->rsize, PAGE_SIZE);
322  
323  	nfs_pageio_init_read(&desc, dreq->inode, false,
324  			     &nfs_direct_read_completion_ops);
325  	get_dreq(dreq);
326  	desc.pg_dreq = dreq;
327  	inode_dio_begin(inode);
328  
329  	while (iov_iter_count(iter)) {
330  		struct page **pagevec;
331  		size_t bytes;
332  		size_t pgbase;
333  		unsigned npages, i;
334  
335  		result = iov_iter_get_pages_alloc2(iter, &pagevec,
336  						  rsize, &pgbase);
337  		if (result < 0)
338  			break;
339  
340  		bytes = result;
341  		npages = (result + pgbase + PAGE_SIZE - 1) / PAGE_SIZE;
342  		for (i = 0; i < npages; i++) {
343  			struct nfs_page *req;
344  			unsigned int req_len = min_t(size_t, bytes, PAGE_SIZE - pgbase);
345  			/* XXX do we need to do the eof zeroing found in async_filler? */
346  			req = nfs_create_request(dreq->ctx, pagevec[i],
347  						 pgbase, req_len);
348  			if (IS_ERR(req)) {
349  				result = PTR_ERR(req);
350  				break;
351  			}
352  			req->wb_index = pos >> PAGE_SHIFT;
353  			req->wb_offset = pos & ~PAGE_MASK;
354  			if (!nfs_pageio_add_request(&desc, req)) {
355  				result = desc.pg_error;
356  				nfs_release_request(req);
357  				break;
358  			}
359  			pgbase = 0;
360  			bytes -= req_len;
361  			requested_bytes += req_len;
362  			pos += req_len;
363  			dreq->bytes_left -= req_len;
364  		}
365  		nfs_direct_release_pages(pagevec, npages);
366  		kvfree(pagevec);
367  		if (result < 0)
368  			break;
369  	}
370  
371  	nfs_pageio_complete(&desc);
372  
373  	/*
374  	 * If no bytes were started, return the error, and let the
375  	 * generic layer handle the completion.
376  	 */
377  	if (requested_bytes == 0) {
378  		inode_dio_end(inode);
379  		nfs_direct_req_release(dreq);
380  		return result < 0 ? result : -EIO;
381  	}
382  
383  	if (put_dreq(dreq))
384  		nfs_direct_complete(dreq);
385  	return requested_bytes;
386  }
387  
388  /**
389   * nfs_file_direct_read - file direct read operation for NFS files
390   * @iocb: target I/O control block
391   * @iter: vector of user buffers into which to read data
392   * @swap: flag indicating this is swap IO, not O_DIRECT IO
393   *
394   * We use this function for direct reads instead of calling
395   * generic_file_aio_read() in order to avoid gfar's check to see if
396   * the request starts before the end of the file.  For that check
397   * to work, we must generate a GETATTR before each direct read, and
398   * even then there is a window between the GETATTR and the subsequent
399   * READ where the file size could change.  Our preference is simply
400   * to do all reads the application wants, and the server will take
401   * care of managing the end of file boundary.
402   *
403   * This function also eliminates unnecessarily updating the file's
404   * atime locally, as the NFS server sets the file's atime, and this
405   * client must read the updated atime from the server back into its
406   * cache.
407   */
408  ssize_t nfs_file_direct_read(struct kiocb *iocb, struct iov_iter *iter,
409  			     bool swap)
410  {
411  	struct file *file = iocb->ki_filp;
412  	struct address_space *mapping = file->f_mapping;
413  	struct inode *inode = mapping->host;
414  	struct nfs_direct_req *dreq;
415  	struct nfs_lock_context *l_ctx;
416  	ssize_t result, requested;
417  	size_t count = iov_iter_count(iter);
418  	nfs_add_stats(mapping->host, NFSIOS_DIRECTREADBYTES, count);
419  
420  	dfprintk(FILE, "NFS: direct read(%pD2, %zd@%Ld)\n",
421  		file, count, (long long) iocb->ki_pos);
422  
423  	result = 0;
424  	if (!count)
425  		goto out;
426  
427  	task_io_account_read(count);
428  
429  	result = -ENOMEM;
430  	dreq = nfs_direct_req_alloc();
431  	if (dreq == NULL)
432  		goto out;
433  
434  	dreq->inode = inode;
435  	dreq->bytes_left = dreq->max_count = count;
436  	dreq->io_start = iocb->ki_pos;
437  	dreq->ctx = get_nfs_open_context(nfs_file_open_context(iocb->ki_filp));
438  	l_ctx = nfs_get_lock_context(dreq->ctx);
439  	if (IS_ERR(l_ctx)) {
440  		result = PTR_ERR(l_ctx);
441  		nfs_direct_req_release(dreq);
442  		goto out_release;
443  	}
444  	dreq->l_ctx = l_ctx;
445  	if (!is_sync_kiocb(iocb))
446  		dreq->iocb = iocb;
447  
448  	if (user_backed_iter(iter))
449  		dreq->flags = NFS_ODIRECT_SHOULD_DIRTY;
450  
451  	if (!swap)
452  		nfs_start_io_direct(inode);
453  
454  	NFS_I(inode)->read_io += count;
455  	requested = nfs_direct_read_schedule_iovec(dreq, iter, iocb->ki_pos);
456  
457  	if (!swap)
458  		nfs_end_io_direct(inode);
459  
460  	if (requested > 0) {
461  		result = nfs_direct_wait(dreq);
462  		if (result > 0) {
463  			requested -= result;
464  			iocb->ki_pos += result;
465  		}
466  		iov_iter_revert(iter, requested);
467  	} else {
468  		result = requested;
469  	}
470  
471  out_release:
472  	nfs_direct_req_release(dreq);
473  out:
474  	return result;
475  }
476  
477  static void
478  nfs_direct_join_group(struct list_head *list, struct inode *inode)
479  {
480  	struct nfs_page *req, *next;
481  
482  	list_for_each_entry(req, list, wb_list) {
483  		if (req->wb_head != req || req->wb_this_page == req)
484  			continue;
485  		for (next = req->wb_this_page;
486  				next != req->wb_head;
487  				next = next->wb_this_page) {
488  			nfs_list_remove_request(next);
489  			nfs_release_request(next);
490  		}
491  		nfs_join_page_group(req, inode);
492  	}
493  }
494  
495  static void
496  nfs_direct_write_scan_commit_list(struct inode *inode,
497  				  struct list_head *list,
498  				  struct nfs_commit_info *cinfo)
499  {
500  	mutex_lock(&NFS_I(cinfo->inode)->commit_mutex);
501  	pnfs_recover_commit_reqs(list, cinfo);
502  	nfs_scan_commit_list(&cinfo->mds->list, list, cinfo, 0);
503  	mutex_unlock(&NFS_I(cinfo->inode)->commit_mutex);
504  }
505  
506  static void nfs_direct_write_reschedule(struct nfs_direct_req *dreq)
507  {
508  	struct nfs_pageio_descriptor desc;
509  	struct nfs_page *req, *tmp;
510  	LIST_HEAD(reqs);
511  	struct nfs_commit_info cinfo;
512  	LIST_HEAD(failed);
513  
514  	nfs_init_cinfo_from_dreq(&cinfo, dreq);
515  	nfs_direct_write_scan_commit_list(dreq->inode, &reqs, &cinfo);
516  
517  	nfs_direct_join_group(&reqs, dreq->inode);
518  
519  	dreq->count = 0;
520  	dreq->max_count = 0;
521  	list_for_each_entry(req, &reqs, wb_list)
522  		dreq->max_count += req->wb_bytes;
523  	nfs_clear_pnfs_ds_commit_verifiers(&dreq->ds_cinfo);
524  	get_dreq(dreq);
525  
526  	nfs_pageio_init_write(&desc, dreq->inode, FLUSH_STABLE, false,
527  			      &nfs_direct_write_completion_ops);
528  	desc.pg_dreq = dreq;
529  
530  	list_for_each_entry_safe(req, tmp, &reqs, wb_list) {
531  		/* Bump the transmission count */
532  		req->wb_nio++;
533  		if (!nfs_pageio_add_request(&desc, req)) {
534  			nfs_list_move_request(req, &failed);
535  			spin_lock(&cinfo.inode->i_lock);
536  			dreq->flags = 0;
537  			if (desc.pg_error < 0)
538  				dreq->error = desc.pg_error;
539  			else
540  				dreq->error = -EIO;
541  			spin_unlock(&cinfo.inode->i_lock);
542  		}
543  		nfs_release_request(req);
544  	}
545  	nfs_pageio_complete(&desc);
546  
547  	while (!list_empty(&failed)) {
548  		req = nfs_list_entry(failed.next);
549  		nfs_list_remove_request(req);
550  		nfs_unlock_and_release_request(req);
551  	}
552  
553  	if (put_dreq(dreq))
554  		nfs_direct_write_complete(dreq);
555  }
556  
557  static void nfs_direct_commit_complete(struct nfs_commit_data *data)
558  {
559  	const struct nfs_writeverf *verf = data->res.verf;
560  	struct nfs_direct_req *dreq = data->dreq;
561  	struct nfs_commit_info cinfo;
562  	struct nfs_page *req;
563  	int status = data->task.tk_status;
564  
565  	trace_nfs_direct_commit_complete(dreq);
566  
567  	if (status < 0) {
568  		/* Errors in commit are fatal */
569  		dreq->error = status;
570  		dreq->max_count = 0;
571  		dreq->count = 0;
572  		dreq->flags = NFS_ODIRECT_DONE;
573  	} else {
574  		status = dreq->error;
575  	}
576  
577  	nfs_init_cinfo_from_dreq(&cinfo, dreq);
578  
579  	while (!list_empty(&data->pages)) {
580  		req = nfs_list_entry(data->pages.next);
581  		nfs_list_remove_request(req);
582  		if (status >= 0 && !nfs_write_match_verf(verf, req)) {
583  			dreq->flags = NFS_ODIRECT_RESCHED_WRITES;
584  			/*
585  			 * Despite the reboot, the write was successful,
586  			 * so reset wb_nio.
587  			 */
588  			req->wb_nio = 0;
589  			nfs_mark_request_commit(req, NULL, &cinfo, 0);
590  		} else /* Error or match */
591  			nfs_release_request(req);
592  		nfs_unlock_and_release_request(req);
593  	}
594  
595  	if (nfs_commit_end(cinfo.mds))
596  		nfs_direct_write_complete(dreq);
597  }
598  
599  static void nfs_direct_resched_write(struct nfs_commit_info *cinfo,
600  		struct nfs_page *req)
601  {
602  	struct nfs_direct_req *dreq = cinfo->dreq;
603  
604  	trace_nfs_direct_resched_write(dreq);
605  
606  	spin_lock(&dreq->lock);
607  	if (dreq->flags != NFS_ODIRECT_DONE)
608  		dreq->flags = NFS_ODIRECT_RESCHED_WRITES;
609  	spin_unlock(&dreq->lock);
610  	nfs_mark_request_commit(req, NULL, cinfo, 0);
611  }
612  
613  static const struct nfs_commit_completion_ops nfs_direct_commit_completion_ops = {
614  	.completion = nfs_direct_commit_complete,
615  	.resched_write = nfs_direct_resched_write,
616  };
617  
618  static void nfs_direct_commit_schedule(struct nfs_direct_req *dreq)
619  {
620  	int res;
621  	struct nfs_commit_info cinfo;
622  	LIST_HEAD(mds_list);
623  
624  	nfs_init_cinfo_from_dreq(&cinfo, dreq);
625  	nfs_scan_commit(dreq->inode, &mds_list, &cinfo);
626  	res = nfs_generic_commit_list(dreq->inode, &mds_list, 0, &cinfo);
627  	if (res < 0) /* res == -ENOMEM */
628  		nfs_direct_write_reschedule(dreq);
629  }
630  
631  static void nfs_direct_write_clear_reqs(struct nfs_direct_req *dreq)
632  {
633  	struct nfs_commit_info cinfo;
634  	struct nfs_page *req;
635  	LIST_HEAD(reqs);
636  
637  	nfs_init_cinfo_from_dreq(&cinfo, dreq);
638  	nfs_direct_write_scan_commit_list(dreq->inode, &reqs, &cinfo);
639  
640  	while (!list_empty(&reqs)) {
641  		req = nfs_list_entry(reqs.next);
642  		nfs_list_remove_request(req);
643  		nfs_release_request(req);
644  		nfs_unlock_and_release_request(req);
645  	}
646  }
647  
648  static void nfs_direct_write_schedule_work(struct work_struct *work)
649  {
650  	struct nfs_direct_req *dreq = container_of(work, struct nfs_direct_req, work);
651  	int flags = dreq->flags;
652  
653  	dreq->flags = 0;
654  	switch (flags) {
655  		case NFS_ODIRECT_DO_COMMIT:
656  			nfs_direct_commit_schedule(dreq);
657  			break;
658  		case NFS_ODIRECT_RESCHED_WRITES:
659  			nfs_direct_write_reschedule(dreq);
660  			break;
661  		default:
662  			nfs_direct_write_clear_reqs(dreq);
663  			nfs_zap_mapping(dreq->inode, dreq->inode->i_mapping);
664  			nfs_direct_complete(dreq);
665  	}
666  }
667  
668  static void nfs_direct_write_complete(struct nfs_direct_req *dreq)
669  {
670  	trace_nfs_direct_write_complete(dreq);
671  	queue_work(nfsiod_workqueue, &dreq->work); /* Calls nfs_direct_write_schedule_work */
672  }
673  
674  static void nfs_direct_write_completion(struct nfs_pgio_header *hdr)
675  {
676  	struct nfs_direct_req *dreq = hdr->dreq;
677  	struct nfs_commit_info cinfo;
678  	struct nfs_page *req = nfs_list_entry(hdr->pages.next);
679  	int flags = NFS_ODIRECT_DONE;
680  
681  	trace_nfs_direct_write_completion(dreq);
682  
683  	nfs_init_cinfo_from_dreq(&cinfo, dreq);
684  
685  	spin_lock(&dreq->lock);
686  	if (test_bit(NFS_IOHDR_REDO, &hdr->flags)) {
687  		spin_unlock(&dreq->lock);
688  		goto out_put;
689  	}
690  
691  	nfs_direct_count_bytes(dreq, hdr);
692  	if (test_bit(NFS_IOHDR_UNSTABLE_WRITES, &hdr->flags)) {
693  		if (!dreq->flags)
694  			dreq->flags = NFS_ODIRECT_DO_COMMIT;
695  		flags = dreq->flags;
696  	}
697  	spin_unlock(&dreq->lock);
698  
699  	while (!list_empty(&hdr->pages)) {
700  
701  		req = nfs_list_entry(hdr->pages.next);
702  		nfs_list_remove_request(req);
703  		if (flags == NFS_ODIRECT_DO_COMMIT) {
704  			kref_get(&req->wb_kref);
705  			memcpy(&req->wb_verf, &hdr->verf.verifier,
706  			       sizeof(req->wb_verf));
707  			nfs_mark_request_commit(req, hdr->lseg, &cinfo,
708  				hdr->ds_commit_idx);
709  		} else if (flags == NFS_ODIRECT_RESCHED_WRITES) {
710  			kref_get(&req->wb_kref);
711  			nfs_mark_request_commit(req, NULL, &cinfo, 0);
712  		}
713  		nfs_unlock_and_release_request(req);
714  	}
715  
716  out_put:
717  	if (put_dreq(dreq))
718  		nfs_direct_write_complete(dreq);
719  	hdr->release(hdr);
720  }
721  
722  static void nfs_write_sync_pgio_error(struct list_head *head, int error)
723  {
724  	struct nfs_page *req;
725  
726  	while (!list_empty(head)) {
727  		req = nfs_list_entry(head->next);
728  		nfs_list_remove_request(req);
729  		nfs_unlock_and_release_request(req);
730  	}
731  }
732  
733  static void nfs_direct_write_reschedule_io(struct nfs_pgio_header *hdr)
734  {
735  	struct nfs_direct_req *dreq = hdr->dreq;
736  
737  	trace_nfs_direct_write_reschedule_io(dreq);
738  
739  	spin_lock(&dreq->lock);
740  	if (dreq->error == 0) {
741  		dreq->flags = NFS_ODIRECT_RESCHED_WRITES;
742  		/* fake unstable write to let common nfs resend pages */
743  		hdr->verf.committed = NFS_UNSTABLE;
744  		hdr->good_bytes = hdr->args.offset + hdr->args.count -
745  			hdr->io_start;
746  	}
747  	spin_unlock(&dreq->lock);
748  }
749  
750  static const struct nfs_pgio_completion_ops nfs_direct_write_completion_ops = {
751  	.error_cleanup = nfs_write_sync_pgio_error,
752  	.init_hdr = nfs_direct_pgio_init,
753  	.completion = nfs_direct_write_completion,
754  	.reschedule_io = nfs_direct_write_reschedule_io,
755  };
756  
757  
758  /*
759   * NB: Return the value of the first error return code.  Subsequent
760   *     errors after the first one are ignored.
761   */
762  /*
763   * For each wsize'd chunk of the user's buffer, dispatch an NFS WRITE
764   * operation.  If nfs_writedata_alloc() or get_user_pages() fails,
765   * bail and stop sending more writes.  Write length accounting is
766   * handled automatically by nfs_direct_write_result().  Otherwise, if
767   * no requests have been sent, just return an error.
768   */
769  static ssize_t nfs_direct_write_schedule_iovec(struct nfs_direct_req *dreq,
770  					       struct iov_iter *iter,
771  					       loff_t pos, int ioflags)
772  {
773  	struct nfs_pageio_descriptor desc;
774  	struct inode *inode = dreq->inode;
775  	ssize_t result = 0;
776  	size_t requested_bytes = 0;
777  	size_t wsize = max_t(size_t, NFS_SERVER(inode)->wsize, PAGE_SIZE);
778  
779  	trace_nfs_direct_write_schedule_iovec(dreq);
780  
781  	nfs_pageio_init_write(&desc, inode, ioflags, false,
782  			      &nfs_direct_write_completion_ops);
783  	desc.pg_dreq = dreq;
784  	get_dreq(dreq);
785  	inode_dio_begin(inode);
786  
787  	NFS_I(inode)->write_io += iov_iter_count(iter);
788  	while (iov_iter_count(iter)) {
789  		struct page **pagevec;
790  		size_t bytes;
791  		size_t pgbase;
792  		unsigned npages, i;
793  
794  		result = iov_iter_get_pages_alloc2(iter, &pagevec,
795  						  wsize, &pgbase);
796  		if (result < 0)
797  			break;
798  
799  		bytes = result;
800  		npages = (result + pgbase + PAGE_SIZE - 1) / PAGE_SIZE;
801  		for (i = 0; i < npages; i++) {
802  			struct nfs_page *req;
803  			unsigned int req_len = min_t(size_t, bytes, PAGE_SIZE - pgbase);
804  
805  			req = nfs_create_request(dreq->ctx, pagevec[i],
806  						 pgbase, req_len);
807  			if (IS_ERR(req)) {
808  				result = PTR_ERR(req);
809  				break;
810  			}
811  
812  			if (desc.pg_error < 0) {
813  				nfs_free_request(req);
814  				result = desc.pg_error;
815  				break;
816  			}
817  
818  			nfs_lock_request(req);
819  			req->wb_index = pos >> PAGE_SHIFT;
820  			req->wb_offset = pos & ~PAGE_MASK;
821  			if (!nfs_pageio_add_request(&desc, req)) {
822  				result = desc.pg_error;
823  				nfs_unlock_and_release_request(req);
824  				break;
825  			}
826  			pgbase = 0;
827  			bytes -= req_len;
828  			requested_bytes += req_len;
829  			pos += req_len;
830  			dreq->bytes_left -= req_len;
831  		}
832  		nfs_direct_release_pages(pagevec, npages);
833  		kvfree(pagevec);
834  		if (result < 0)
835  			break;
836  	}
837  	nfs_pageio_complete(&desc);
838  
839  	/*
840  	 * If no bytes were started, return the error, and let the
841  	 * generic layer handle the completion.
842  	 */
843  	if (requested_bytes == 0) {
844  		inode_dio_end(inode);
845  		nfs_direct_req_release(dreq);
846  		return result < 0 ? result : -EIO;
847  	}
848  
849  	if (put_dreq(dreq))
850  		nfs_direct_write_complete(dreq);
851  	return requested_bytes;
852  }
853  
854  /**
855   * nfs_file_direct_write - file direct write operation for NFS files
856   * @iocb: target I/O control block
857   * @iter: vector of user buffers from which to write data
858   * @swap: flag indicating this is swap IO, not O_DIRECT IO
859   *
860   * We use this function for direct writes instead of calling
861   * generic_file_aio_write() in order to avoid taking the inode
862   * semaphore and updating the i_size.  The NFS server will set
863   * the new i_size and this client must read the updated size
864   * back into its cache.  We let the server do generic write
865   * parameter checking and report problems.
866   *
867   * We eliminate local atime updates, see direct read above.
868   *
869   * We avoid unnecessary page cache invalidations for normal cached
870   * readers of this file.
871   *
872   * Note that O_APPEND is not supported for NFS direct writes, as there
873   * is no atomic O_APPEND write facility in the NFS protocol.
874   */
875  ssize_t nfs_file_direct_write(struct kiocb *iocb, struct iov_iter *iter,
876  			      bool swap)
877  {
878  	ssize_t result, requested;
879  	size_t count;
880  	struct file *file = iocb->ki_filp;
881  	struct address_space *mapping = file->f_mapping;
882  	struct inode *inode = mapping->host;
883  	struct nfs_direct_req *dreq;
884  	struct nfs_lock_context *l_ctx;
885  	loff_t pos, end;
886  
887  	dfprintk(FILE, "NFS: direct write(%pD2, %zd@%Ld)\n",
888  		file, iov_iter_count(iter), (long long) iocb->ki_pos);
889  
890  	if (swap)
891  		/* bypass generic checks */
892  		result =  iov_iter_count(iter);
893  	else
894  		result = generic_write_checks(iocb, iter);
895  	if (result <= 0)
896  		return result;
897  	count = result;
898  	nfs_add_stats(mapping->host, NFSIOS_DIRECTWRITTENBYTES, count);
899  
900  	pos = iocb->ki_pos;
901  	end = (pos + iov_iter_count(iter) - 1) >> PAGE_SHIFT;
902  
903  	task_io_account_write(count);
904  
905  	result = -ENOMEM;
906  	dreq = nfs_direct_req_alloc();
907  	if (!dreq)
908  		goto out;
909  
910  	dreq->inode = inode;
911  	dreq->bytes_left = dreq->max_count = count;
912  	dreq->io_start = pos;
913  	dreq->ctx = get_nfs_open_context(nfs_file_open_context(iocb->ki_filp));
914  	l_ctx = nfs_get_lock_context(dreq->ctx);
915  	if (IS_ERR(l_ctx)) {
916  		result = PTR_ERR(l_ctx);
917  		nfs_direct_req_release(dreq);
918  		goto out_release;
919  	}
920  	dreq->l_ctx = l_ctx;
921  	if (!is_sync_kiocb(iocb))
922  		dreq->iocb = iocb;
923  	pnfs_init_ds_commit_info_ops(&dreq->ds_cinfo, inode);
924  
925  	if (swap) {
926  		requested = nfs_direct_write_schedule_iovec(dreq, iter, pos,
927  							    FLUSH_STABLE);
928  	} else {
929  		nfs_start_io_direct(inode);
930  
931  		requested = nfs_direct_write_schedule_iovec(dreq, iter, pos,
932  							    FLUSH_COND_STABLE);
933  
934  		if (mapping->nrpages) {
935  			invalidate_inode_pages2_range(mapping,
936  						      pos >> PAGE_SHIFT, end);
937  		}
938  
939  		nfs_end_io_direct(inode);
940  	}
941  
942  	if (requested > 0) {
943  		result = nfs_direct_wait(dreq);
944  		if (result > 0) {
945  			requested -= result;
946  			iocb->ki_pos = pos + result;
947  			/* XXX: should check the generic_write_sync retval */
948  			generic_write_sync(iocb, result);
949  		}
950  		iov_iter_revert(iter, requested);
951  	} else {
952  		result = requested;
953  	}
954  	nfs_fscache_invalidate(inode, FSCACHE_INVAL_DIO_WRITE);
955  out_release:
956  	nfs_direct_req_release(dreq);
957  out:
958  	return result;
959  }
960  
961  /**
962   * nfs_init_directcache - create a slab cache for nfs_direct_req structures
963   *
964   */
965  int __init nfs_init_directcache(void)
966  {
967  	nfs_direct_cachep = kmem_cache_create("nfs_direct_cache",
968  						sizeof(struct nfs_direct_req),
969  						0, (SLAB_RECLAIM_ACCOUNT|
970  							SLAB_MEM_SPREAD),
971  						NULL);
972  	if (nfs_direct_cachep == NULL)
973  		return -ENOMEM;
974  
975  	return 0;
976  }
977  
978  /**
979   * nfs_destroy_directcache - destroy the slab cache for nfs_direct_req structures
980   *
981   */
982  void nfs_destroy_directcache(void)
983  {
984  	kmem_cache_destroy(nfs_direct_cachep);
985  }
986