xref: /openbmc/linux/fs/nfs/read.c (revision 424f0750edd5af866f80f5e65998e0610503cb5c)
1 /*
2  * linux/fs/nfs/read.c
3  *
4  * Block I/O for NFS
5  *
6  * Partial copy of Linus' read cache modifications to fs/nfs/file.c
7  * modified for async RPC by okir@monad.swb.de
8  */
9 
10 #include <linux/time.h>
11 #include <linux/kernel.h>
12 #include <linux/errno.h>
13 #include <linux/fcntl.h>
14 #include <linux/stat.h>
15 #include <linux/mm.h>
16 #include <linux/slab.h>
17 #include <linux/pagemap.h>
18 #include <linux/sunrpc/clnt.h>
19 #include <linux/nfs_fs.h>
20 #include <linux/nfs_page.h>
21 #include <linux/module.h>
22 
23 #include <asm/system.h>
24 #include "pnfs.h"
25 
26 #include "nfs4_fs.h"
27 #include "internal.h"
28 #include "iostat.h"
29 #include "fscache.h"
30 
31 #define NFSDBG_FACILITY		NFSDBG_PAGECACHE
32 
33 static const struct nfs_pageio_ops nfs_pageio_read_ops;
34 static const struct rpc_call_ops nfs_read_partial_ops;
35 static const struct rpc_call_ops nfs_read_full_ops;
36 
37 static struct kmem_cache *nfs_rdata_cachep;
38 
39 struct nfs_read_data *nfs_readdata_alloc(unsigned int pagecount)
40 {
41 	struct nfs_read_data *p;
42 
43 	p = kmem_cache_zalloc(nfs_rdata_cachep, GFP_KERNEL);
44 	if (p) {
45 		INIT_LIST_HEAD(&p->pages);
46 		p->npages = pagecount;
47 		if (pagecount <= ARRAY_SIZE(p->page_array))
48 			p->pagevec = p->page_array;
49 		else {
50 			p->pagevec = kcalloc(pagecount, sizeof(struct page *), GFP_KERNEL);
51 			if (!p->pagevec) {
52 				kmem_cache_free(nfs_rdata_cachep, p);
53 				p = NULL;
54 			}
55 		}
56 	}
57 	return p;
58 }
59 
60 void nfs_readdata_free(struct nfs_read_data *p)
61 {
62 	if (p && (p->pagevec != &p->page_array[0]))
63 		kfree(p->pagevec);
64 	kmem_cache_free(nfs_rdata_cachep, p);
65 }
66 
67 void nfs_readdata_release(struct nfs_read_data *rdata)
68 {
69 	put_lseg(rdata->lseg);
70 	put_nfs_open_context(rdata->args.context);
71 	nfs_readdata_free(rdata);
72 }
73 
74 static
75 int nfs_return_empty_page(struct page *page)
76 {
77 	zero_user(page, 0, PAGE_CACHE_SIZE);
78 	SetPageUptodate(page);
79 	unlock_page(page);
80 	return 0;
81 }
82 
83 static void nfs_readpage_truncate_uninitialised_page(struct nfs_read_data *data)
84 {
85 	unsigned int remainder = data->args.count - data->res.count;
86 	unsigned int base = data->args.pgbase + data->res.count;
87 	unsigned int pglen;
88 	struct page **pages;
89 
90 	if (data->res.eof == 0 || remainder == 0)
91 		return;
92 	/*
93 	 * Note: "remainder" can never be negative, since we check for
94 	 * 	this in the XDR code.
95 	 */
96 	pages = &data->args.pages[base >> PAGE_CACHE_SHIFT];
97 	base &= ~PAGE_CACHE_MASK;
98 	pglen = PAGE_CACHE_SIZE - base;
99 	for (;;) {
100 		if (remainder <= pglen) {
101 			zero_user(*pages, base, remainder);
102 			break;
103 		}
104 		zero_user(*pages, base, pglen);
105 		pages++;
106 		remainder -= pglen;
107 		pglen = PAGE_CACHE_SIZE;
108 		base = 0;
109 	}
110 }
111 
112 static void nfs_pageio_init_read_mds(struct nfs_pageio_descriptor *pgio,
113 		struct inode *inode)
114 {
115 	nfs_pageio_init(pgio, inode, &nfs_pageio_read_ops,
116 			NFS_SERVER(inode)->rsize, 0);
117 }
118 
119 void nfs_pageio_reset_read_mds(struct nfs_pageio_descriptor *pgio)
120 {
121 	pgio->pg_ops = &nfs_pageio_read_ops;
122 	pgio->pg_bsize = NFS_SERVER(pgio->pg_inode)->rsize;
123 }
124 EXPORT_SYMBOL_GPL(nfs_pageio_reset_read_mds);
125 
126 static void nfs_pageio_init_read(struct nfs_pageio_descriptor *pgio,
127 		struct inode *inode)
128 {
129 	if (!pnfs_pageio_init_read(pgio, inode))
130 		nfs_pageio_init_read_mds(pgio, inode);
131 }
132 
133 int nfs_readpage_async(struct nfs_open_context *ctx, struct inode *inode,
134 		       struct page *page)
135 {
136 	struct nfs_page	*new;
137 	unsigned int len;
138 	struct nfs_pageio_descriptor pgio;
139 
140 	len = nfs_page_length(page);
141 	if (len == 0)
142 		return nfs_return_empty_page(page);
143 	new = nfs_create_request(ctx, inode, page, 0, len);
144 	if (IS_ERR(new)) {
145 		unlock_page(page);
146 		return PTR_ERR(new);
147 	}
148 	if (len < PAGE_CACHE_SIZE)
149 		zero_user_segment(page, len, PAGE_CACHE_SIZE);
150 
151 	nfs_pageio_init_read(&pgio, inode);
152 	nfs_pageio_add_request(&pgio, new);
153 	nfs_pageio_complete(&pgio);
154 	return 0;
155 }
156 
157 static void nfs_readpage_release(struct nfs_page *req)
158 {
159 	struct inode *d_inode = req->wb_context->dentry->d_inode;
160 
161 	if (PageUptodate(req->wb_page))
162 		nfs_readpage_to_fscache(d_inode, req->wb_page, 0);
163 
164 	unlock_page(req->wb_page);
165 
166 	dprintk("NFS: read done (%s/%Ld %d@%Ld)\n",
167 			req->wb_context->dentry->d_inode->i_sb->s_id,
168 			(long long)NFS_FILEID(req->wb_context->dentry->d_inode),
169 			req->wb_bytes,
170 			(long long)req_offset(req));
171 	nfs_release_request(req);
172 }
173 
174 int nfs_initiate_read(struct nfs_read_data *data, struct rpc_clnt *clnt,
175 		      const struct rpc_call_ops *call_ops)
176 {
177 	struct inode *inode = data->inode;
178 	int swap_flags = IS_SWAPFILE(inode) ? NFS_RPC_SWAPFLAGS : 0;
179 	struct rpc_task *task;
180 	struct rpc_message msg = {
181 		.rpc_argp = &data->args,
182 		.rpc_resp = &data->res,
183 		.rpc_cred = data->cred,
184 	};
185 	struct rpc_task_setup task_setup_data = {
186 		.task = &data->task,
187 		.rpc_client = clnt,
188 		.rpc_message = &msg,
189 		.callback_ops = call_ops,
190 		.callback_data = data,
191 		.workqueue = nfsiod_workqueue,
192 		.flags = RPC_TASK_ASYNC | swap_flags,
193 	};
194 
195 	/* Set up the initial task struct. */
196 	NFS_PROTO(inode)->read_setup(data, &msg);
197 
198 	dprintk("NFS: %5u initiated read call (req %s/%lld, %u bytes @ "
199 			"offset %llu)\n",
200 			data->task.tk_pid,
201 			inode->i_sb->s_id,
202 			(long long)NFS_FILEID(inode),
203 			data->args.count,
204 			(unsigned long long)data->args.offset);
205 
206 	task = rpc_run_task(&task_setup_data);
207 	if (IS_ERR(task))
208 		return PTR_ERR(task);
209 	rpc_put_task(task);
210 	return 0;
211 }
212 EXPORT_SYMBOL_GPL(nfs_initiate_read);
213 
214 /*
215  * Set up the NFS read request struct
216  */
217 static void nfs_read_rpcsetup(struct nfs_page *req, struct nfs_read_data *data,
218 		unsigned int count, unsigned int offset)
219 {
220 	struct inode *inode = req->wb_context->dentry->d_inode;
221 
222 	data->req	  = req;
223 	data->inode	  = inode;
224 	data->cred	  = req->wb_context->cred;
225 
226 	data->args.fh     = NFS_FH(inode);
227 	data->args.offset = req_offset(req) + offset;
228 	data->args.pgbase = req->wb_pgbase + offset;
229 	data->args.pages  = data->pagevec;
230 	data->args.count  = count;
231 	data->args.context = get_nfs_open_context(req->wb_context);
232 	data->args.lock_context = req->wb_lock_context;
233 
234 	data->res.fattr   = &data->fattr;
235 	data->res.count   = count;
236 	data->res.eof     = 0;
237 	nfs_fattr_init(&data->fattr);
238 }
239 
240 static int nfs_do_read(struct nfs_read_data *data,
241 		const struct rpc_call_ops *call_ops)
242 {
243 	struct inode *inode = data->args.context->dentry->d_inode;
244 
245 	return nfs_initiate_read(data, NFS_CLIENT(inode), call_ops);
246 }
247 
248 static int
249 nfs_do_multiple_reads(struct list_head *head,
250 		const struct rpc_call_ops *call_ops)
251 {
252 	struct nfs_read_data *data;
253 	int ret = 0;
254 
255 	while (!list_empty(head)) {
256 		int ret2;
257 
258 		data = list_entry(head->next, struct nfs_read_data, list);
259 		list_del_init(&data->list);
260 
261 		ret2 = nfs_do_read(data, call_ops);
262 		if (ret == 0)
263 			ret = ret2;
264 	}
265 	return ret;
266 }
267 
268 static void
269 nfs_async_read_error(struct list_head *head)
270 {
271 	struct nfs_page	*req;
272 
273 	while (!list_empty(head)) {
274 		req = nfs_list_entry(head->next);
275 		nfs_list_remove_request(req);
276 		nfs_readpage_release(req);
277 	}
278 }
279 
280 /*
281  * Generate multiple requests to fill a single page.
282  *
283  * We optimize to reduce the number of read operations on the wire.  If we
284  * detect that we're reading a page, or an area of a page, that is past the
285  * end of file, we do not generate NFS read operations but just clear the
286  * parts of the page that would have come back zero from the server anyway.
287  *
288  * We rely on the cached value of i_size to make this determination; another
289  * client can fill pages on the server past our cached end-of-file, but we
290  * won't see the new data until our attribute cache is updated.  This is more
291  * or less conventional NFS client behavior.
292  */
293 static int nfs_pagein_multi(struct nfs_pageio_descriptor *desc, struct list_head *res)
294 {
295 	struct nfs_page *req = nfs_list_entry(desc->pg_list.next);
296 	struct page *page = req->wb_page;
297 	struct nfs_read_data *data;
298 	size_t rsize = desc->pg_bsize, nbytes;
299 	unsigned int offset;
300 	int requests = 0;
301 	int ret = 0;
302 
303 	nfs_list_remove_request(req);
304 
305 	offset = 0;
306 	nbytes = desc->pg_count;
307 	do {
308 		size_t len = min(nbytes,rsize);
309 
310 		data = nfs_readdata_alloc(1);
311 		if (!data)
312 			goto out_bad;
313 		data->pagevec[0] = page;
314 		nfs_read_rpcsetup(req, data, len, offset);
315 		list_add(&data->list, res);
316 		requests++;
317 		nbytes -= len;
318 		offset += len;
319 	} while(nbytes != 0);
320 	atomic_set(&req->wb_complete, requests);
321 	desc->pg_rpc_callops = &nfs_read_partial_ops;
322 	return ret;
323 out_bad:
324 	while (!list_empty(res)) {
325 		data = list_entry(res->next, struct nfs_read_data, list);
326 		list_del(&data->list);
327 		nfs_readdata_free(data);
328 	}
329 	nfs_readpage_release(req);
330 	return -ENOMEM;
331 }
332 
333 static int nfs_pagein_one(struct nfs_pageio_descriptor *desc, struct list_head *res)
334 {
335 	struct nfs_page		*req;
336 	struct page		**pages;
337 	struct nfs_read_data	*data;
338 	struct list_head *head = &desc->pg_list;
339 	int ret = 0;
340 
341 	data = nfs_readdata_alloc(nfs_page_array_len(desc->pg_base,
342 						     desc->pg_count));
343 	if (!data) {
344 		nfs_async_read_error(head);
345 		ret = -ENOMEM;
346 		goto out;
347 	}
348 
349 	pages = data->pagevec;
350 	while (!list_empty(head)) {
351 		req = nfs_list_entry(head->next);
352 		nfs_list_remove_request(req);
353 		nfs_list_add_request(req, &data->pages);
354 		*pages++ = req->wb_page;
355 	}
356 	req = nfs_list_entry(data->pages.next);
357 
358 	nfs_read_rpcsetup(req, data, desc->pg_count, 0);
359 	list_add(&data->list, res);
360 	desc->pg_rpc_callops = &nfs_read_full_ops;
361 out:
362 	return ret;
363 }
364 
365 int nfs_generic_pagein(struct nfs_pageio_descriptor *desc, struct list_head *head)
366 {
367 	if (desc->pg_bsize < PAGE_CACHE_SIZE)
368 		return nfs_pagein_multi(desc, head);
369 	return nfs_pagein_one(desc, head);
370 }
371 
372 static int nfs_generic_pg_readpages(struct nfs_pageio_descriptor *desc)
373 {
374 	LIST_HEAD(head);
375 	int ret;
376 
377 	ret = nfs_generic_pagein(desc, &head);
378 	if (ret == 0)
379 		ret = nfs_do_multiple_reads(&head, desc->pg_rpc_callops);
380 	return ret;
381 }
382 
383 static const struct nfs_pageio_ops nfs_pageio_read_ops = {
384 	.pg_test = nfs_generic_pg_test,
385 	.pg_doio = nfs_generic_pg_readpages,
386 };
387 
388 /*
389  * This is the callback from RPC telling us whether a reply was
390  * received or some error occurred (timeout or socket shutdown).
391  */
392 int nfs_readpage_result(struct rpc_task *task, struct nfs_read_data *data)
393 {
394 	int status;
395 
396 	dprintk("NFS: %s: %5u, (status %d)\n", __func__, task->tk_pid,
397 			task->tk_status);
398 
399 	status = NFS_PROTO(data->inode)->read_done(task, data);
400 	if (status != 0)
401 		return status;
402 
403 	nfs_add_stats(data->inode, NFSIOS_SERVERREADBYTES, data->res.count);
404 
405 	if (task->tk_status == -ESTALE) {
406 		set_bit(NFS_INO_STALE, &NFS_I(data->inode)->flags);
407 		nfs_mark_for_revalidate(data->inode);
408 	}
409 	return 0;
410 }
411 
412 static void nfs_readpage_retry(struct rpc_task *task, struct nfs_read_data *data)
413 {
414 	struct nfs_readargs *argp = &data->args;
415 	struct nfs_readres *resp = &data->res;
416 
417 	if (resp->eof || resp->count == argp->count)
418 		return;
419 
420 	/* This is a short read! */
421 	nfs_inc_stats(data->inode, NFSIOS_SHORTREAD);
422 	/* Has the server at least made some progress? */
423 	if (resp->count == 0)
424 		return;
425 
426 	/* Yes, so retry the read at the end of the data */
427 	data->mds_offset += resp->count;
428 	argp->offset += resp->count;
429 	argp->pgbase += resp->count;
430 	argp->count -= resp->count;
431 	rpc_restart_call_prepare(task);
432 }
433 
434 /*
435  * Handle a read reply that fills part of a page.
436  */
437 static void nfs_readpage_result_partial(struct rpc_task *task, void *calldata)
438 {
439 	struct nfs_read_data *data = calldata;
440 
441 	if (nfs_readpage_result(task, data) != 0)
442 		return;
443 	if (task->tk_status < 0)
444 		return;
445 
446 	nfs_readpage_truncate_uninitialised_page(data);
447 	nfs_readpage_retry(task, data);
448 }
449 
450 static void nfs_readpage_release_partial(void *calldata)
451 {
452 	struct nfs_read_data *data = calldata;
453 	struct nfs_page *req = data->req;
454 	struct page *page = req->wb_page;
455 	int status = data->task.tk_status;
456 
457 	if (status < 0)
458 		set_bit(PG_PARTIAL_READ_FAILED, &req->wb_flags);
459 
460 	if (atomic_dec_and_test(&req->wb_complete)) {
461 		if (!test_bit(PG_PARTIAL_READ_FAILED, &req->wb_flags))
462 			SetPageUptodate(page);
463 		nfs_readpage_release(req);
464 	}
465 	nfs_readdata_release(calldata);
466 }
467 
468 #if defined(CONFIG_NFS_V4_1)
469 void nfs_read_prepare(struct rpc_task *task, void *calldata)
470 {
471 	struct nfs_read_data *data = calldata;
472 
473 	if (nfs4_setup_sequence(NFS_SERVER(data->inode),
474 				&data->args.seq_args, &data->res.seq_res,
475 				0, task))
476 		return;
477 	rpc_call_start(task);
478 }
479 #endif /* CONFIG_NFS_V4_1 */
480 
481 static const struct rpc_call_ops nfs_read_partial_ops = {
482 #if defined(CONFIG_NFS_V4_1)
483 	.rpc_call_prepare = nfs_read_prepare,
484 #endif /* CONFIG_NFS_V4_1 */
485 	.rpc_call_done = nfs_readpage_result_partial,
486 	.rpc_release = nfs_readpage_release_partial,
487 };
488 
489 static void nfs_readpage_set_pages_uptodate(struct nfs_read_data *data)
490 {
491 	unsigned int count = data->res.count;
492 	unsigned int base = data->args.pgbase;
493 	struct page **pages;
494 
495 	if (data->res.eof)
496 		count = data->args.count;
497 	if (unlikely(count == 0))
498 		return;
499 	pages = &data->args.pages[base >> PAGE_CACHE_SHIFT];
500 	base &= ~PAGE_CACHE_MASK;
501 	count += base;
502 	for (;count >= PAGE_CACHE_SIZE; count -= PAGE_CACHE_SIZE, pages++)
503 		SetPageUptodate(*pages);
504 	if (count == 0)
505 		return;
506 	/* Was this a short read? */
507 	if (data->res.eof || data->res.count == data->args.count)
508 		SetPageUptodate(*pages);
509 }
510 
511 /*
512  * This is the callback from RPC telling us whether a reply was
513  * received or some error occurred (timeout or socket shutdown).
514  */
515 static void nfs_readpage_result_full(struct rpc_task *task, void *calldata)
516 {
517 	struct nfs_read_data *data = calldata;
518 
519 	if (nfs_readpage_result(task, data) != 0)
520 		return;
521 	if (task->tk_status < 0)
522 		return;
523 	/*
524 	 * Note: nfs_readpage_retry may change the values of
525 	 * data->args. In the multi-page case, we therefore need
526 	 * to ensure that we call nfs_readpage_set_pages_uptodate()
527 	 * first.
528 	 */
529 	nfs_readpage_truncate_uninitialised_page(data);
530 	nfs_readpage_set_pages_uptodate(data);
531 	nfs_readpage_retry(task, data);
532 }
533 
534 static void nfs_readpage_release_full(void *calldata)
535 {
536 	struct nfs_read_data *data = calldata;
537 	struct nfs_pageio_descriptor pgio;
538 
539 	if (data->pnfs_error) {
540 		nfs_pageio_init_read_mds(&pgio, data->inode);
541 		pgio.pg_recoalesce = 1;
542 	}
543 	while (!list_empty(&data->pages)) {
544 		struct nfs_page *req = nfs_list_entry(data->pages.next);
545 
546 		nfs_list_remove_request(req);
547 		if (!data->pnfs_error)
548 			nfs_readpage_release(req);
549 		else
550 			nfs_pageio_add_request(&pgio, req);
551 	}
552 	if (data->pnfs_error)
553 		nfs_pageio_complete(&pgio);
554 	nfs_readdata_release(calldata);
555 }
556 
557 static const struct rpc_call_ops nfs_read_full_ops = {
558 #if defined(CONFIG_NFS_V4_1)
559 	.rpc_call_prepare = nfs_read_prepare,
560 #endif /* CONFIG_NFS_V4_1 */
561 	.rpc_call_done = nfs_readpage_result_full,
562 	.rpc_release = nfs_readpage_release_full,
563 };
564 
565 /*
566  * Read a page over NFS.
567  * We read the page synchronously in the following case:
568  *  -	The error flag is set for this page. This happens only when a
569  *	previous async read operation failed.
570  */
571 int nfs_readpage(struct file *file, struct page *page)
572 {
573 	struct nfs_open_context *ctx;
574 	struct inode *inode = page->mapping->host;
575 	int		error;
576 
577 	dprintk("NFS: nfs_readpage (%p %ld@%lu)\n",
578 		page, PAGE_CACHE_SIZE, page->index);
579 	nfs_inc_stats(inode, NFSIOS_VFSREADPAGE);
580 	nfs_add_stats(inode, NFSIOS_READPAGES, 1);
581 
582 	/*
583 	 * Try to flush any pending writes to the file..
584 	 *
585 	 * NOTE! Because we own the page lock, there cannot
586 	 * be any new pending writes generated at this point
587 	 * for this page (other pages can be written to).
588 	 */
589 	error = nfs_wb_page(inode, page);
590 	if (error)
591 		goto out_unlock;
592 	if (PageUptodate(page))
593 		goto out_unlock;
594 
595 	error = -ESTALE;
596 	if (NFS_STALE(inode))
597 		goto out_unlock;
598 
599 	if (file == NULL) {
600 		error = -EBADF;
601 		ctx = nfs_find_open_context(inode, NULL, FMODE_READ);
602 		if (ctx == NULL)
603 			goto out_unlock;
604 	} else
605 		ctx = get_nfs_open_context(nfs_file_open_context(file));
606 
607 	if (!IS_SYNC(inode)) {
608 		error = nfs_readpage_from_fscache(ctx, inode, page);
609 		if (error == 0)
610 			goto out;
611 	}
612 
613 	error = nfs_readpage_async(ctx, inode, page);
614 
615 out:
616 	put_nfs_open_context(ctx);
617 	return error;
618 out_unlock:
619 	unlock_page(page);
620 	return error;
621 }
622 
623 struct nfs_readdesc {
624 	struct nfs_pageio_descriptor *pgio;
625 	struct nfs_open_context *ctx;
626 };
627 
628 static int
629 readpage_async_filler(void *data, struct page *page)
630 {
631 	struct nfs_readdesc *desc = (struct nfs_readdesc *)data;
632 	struct inode *inode = page->mapping->host;
633 	struct nfs_page *new;
634 	unsigned int len;
635 	int error;
636 
637 	len = nfs_page_length(page);
638 	if (len == 0)
639 		return nfs_return_empty_page(page);
640 
641 	new = nfs_create_request(desc->ctx, inode, page, 0, len);
642 	if (IS_ERR(new))
643 		goto out_error;
644 
645 	if (len < PAGE_CACHE_SIZE)
646 		zero_user_segment(page, len, PAGE_CACHE_SIZE);
647 	if (!nfs_pageio_add_request(desc->pgio, new)) {
648 		error = desc->pgio->pg_error;
649 		goto out_unlock;
650 	}
651 	return 0;
652 out_error:
653 	error = PTR_ERR(new);
654 out_unlock:
655 	unlock_page(page);
656 	return error;
657 }
658 
659 int nfs_readpages(struct file *filp, struct address_space *mapping,
660 		struct list_head *pages, unsigned nr_pages)
661 {
662 	struct nfs_pageio_descriptor pgio;
663 	struct nfs_readdesc desc = {
664 		.pgio = &pgio,
665 	};
666 	struct inode *inode = mapping->host;
667 	unsigned long npages;
668 	int ret = -ESTALE;
669 
670 	dprintk("NFS: nfs_readpages (%s/%Ld %d)\n",
671 			inode->i_sb->s_id,
672 			(long long)NFS_FILEID(inode),
673 			nr_pages);
674 	nfs_inc_stats(inode, NFSIOS_VFSREADPAGES);
675 
676 	if (NFS_STALE(inode))
677 		goto out;
678 
679 	if (filp == NULL) {
680 		desc.ctx = nfs_find_open_context(inode, NULL, FMODE_READ);
681 		if (desc.ctx == NULL)
682 			return -EBADF;
683 	} else
684 		desc.ctx = get_nfs_open_context(nfs_file_open_context(filp));
685 
686 	/* attempt to read as many of the pages as possible from the cache
687 	 * - this returns -ENOBUFS immediately if the cookie is negative
688 	 */
689 	ret = nfs_readpages_from_fscache(desc.ctx, inode, mapping,
690 					 pages, &nr_pages);
691 	if (ret == 0)
692 		goto read_complete; /* all pages were read */
693 
694 	nfs_pageio_init_read(&pgio, inode);
695 
696 	ret = read_cache_pages(mapping, pages, readpage_async_filler, &desc);
697 
698 	nfs_pageio_complete(&pgio);
699 	npages = (pgio.pg_bytes_written + PAGE_CACHE_SIZE - 1) >> PAGE_CACHE_SHIFT;
700 	nfs_add_stats(inode, NFSIOS_READPAGES, npages);
701 read_complete:
702 	put_nfs_open_context(desc.ctx);
703 out:
704 	return ret;
705 }
706 
707 int __init nfs_init_readpagecache(void)
708 {
709 	nfs_rdata_cachep = kmem_cache_create("nfs_read_data",
710 					     sizeof(struct nfs_read_data),
711 					     0, SLAB_HWCACHE_ALIGN,
712 					     NULL);
713 	if (nfs_rdata_cachep == NULL)
714 		return -ENOMEM;
715 
716 	return 0;
717 }
718 
719 void nfs_destroy_readpagecache(void)
720 {
721 	kmem_cache_destroy(nfs_rdata_cachep);
722 }
723