1 /* 2 * linux/fs/nfs/direct.c 3 * 4 * Copyright (C) 2003 by Chuck Lever <cel@netapp.com> 5 * 6 * High-performance uncached I/O for the Linux NFS client 7 * 8 * There are important applications whose performance or correctness 9 * depends on uncached access to file data. Database clusters 10 * (multiple copies of the same instance running on separate hosts) 11 * implement their own cache coherency protocol that subsumes file 12 * system cache protocols. Applications that process datasets 13 * considerably larger than the client's memory do not always benefit 14 * from a local cache. A streaming video server, for instance, has no 15 * need to cache the contents of a file. 16 * 17 * When an application requests uncached I/O, all read and write requests 18 * are made directly to the server; data stored or fetched via these 19 * requests is not cached in the Linux page cache. The client does not 20 * correct unaligned requests from applications. All requested bytes are 21 * held on permanent storage before a direct write system call returns to 22 * an application. 23 * 24 * Solaris implements an uncached I/O facility called directio() that 25 * is used for backups and sequential I/O to very large files. Solaris 26 * also supports uncaching whole NFS partitions with "-o forcedirectio," 27 * an undocumented mount option. 28 * 29 * Designed by Jeff Kimmel, Chuck Lever, and Trond Myklebust, with 30 * help from Andrew Morton. 31 * 32 * 18 Dec 2001 Initial implementation for 2.4 --cel 33 * 08 Jul 2002 Version for 2.4.19, with bug fixes --trondmy 34 * 08 Jun 2003 Port to 2.5 APIs --cel 35 * 31 Mar 2004 Handle direct I/O without VFS support --cel 36 * 15 Sep 2004 Parallel async reads --cel 37 * 04 May 2005 support O_DIRECT with aio --cel 38 * 39 */ 40 41 #include <linux/errno.h> 42 #include <linux/sched.h> 43 #include <linux/kernel.h> 44 #include <linux/file.h> 45 #include <linux/pagemap.h> 46 #include <linux/kref.h> 47 #include <linux/slab.h> 48 #include <linux/task_io_accounting_ops.h> 49 50 #include <linux/nfs_fs.h> 51 #include <linux/nfs_page.h> 52 #include <linux/sunrpc/clnt.h> 53 54 #include <asm/uaccess.h> 55 #include <linux/atomic.h> 56 57 #include "internal.h" 58 #include "iostat.h" 59 60 #define NFSDBG_FACILITY NFSDBG_VFS 61 62 static struct kmem_cache *nfs_direct_cachep; 63 64 /* 65 * This represents a set of asynchronous requests that we're waiting on 66 */ 67 struct nfs_direct_req { 68 struct kref kref; /* release manager */ 69 70 /* I/O parameters */ 71 struct nfs_open_context *ctx; /* file open context info */ 72 struct nfs_lock_context *l_ctx; /* Lock context info */ 73 struct kiocb * iocb; /* controlling i/o request */ 74 struct inode * inode; /* target file of i/o */ 75 76 /* completion state */ 77 atomic_t io_count; /* i/os we're waiting for */ 78 spinlock_t lock; /* protect completion state */ 79 ssize_t count, /* bytes actually processed */ 80 error; /* any reported error */ 81 struct completion completion; /* wait for i/o completion */ 82 83 /* commit state */ 84 struct list_head rewrite_list; /* saved nfs_write_data structs */ 85 struct nfs_commit_data *commit_data; /* special write_data for commits */ 86 int flags; 87 #define NFS_ODIRECT_DO_COMMIT (1) /* an unstable reply was received */ 88 #define NFS_ODIRECT_RESCHED_WRITES (2) /* write verification failed */ 89 struct nfs_writeverf verf; /* unstable write verifier */ 90 }; 91 92 static void nfs_direct_write_complete(struct nfs_direct_req *dreq, struct inode *inode); 93 static const struct rpc_call_ops nfs_write_direct_ops; 94 95 static inline void get_dreq(struct nfs_direct_req *dreq) 96 { 97 atomic_inc(&dreq->io_count); 98 } 99 100 static inline int put_dreq(struct nfs_direct_req *dreq) 101 { 102 return atomic_dec_and_test(&dreq->io_count); 103 } 104 105 /** 106 * nfs_direct_IO - NFS address space operation for direct I/O 107 * @rw: direction (read or write) 108 * @iocb: target I/O control block 109 * @iov: array of vectors that define I/O buffer 110 * @pos: offset in file to begin the operation 111 * @nr_segs: size of iovec array 112 * 113 * The presence of this routine in the address space ops vector means 114 * the NFS client supports direct I/O. However, we shunt off direct 115 * read and write requests before the VFS gets them, so this method 116 * should never be called. 117 */ 118 ssize_t nfs_direct_IO(int rw, struct kiocb *iocb, const struct iovec *iov, loff_t pos, unsigned long nr_segs) 119 { 120 dprintk("NFS: nfs_direct_IO (%s) off/no(%Ld/%lu) EINVAL\n", 121 iocb->ki_filp->f_path.dentry->d_name.name, 122 (long long) pos, nr_segs); 123 124 return -EINVAL; 125 } 126 127 static void nfs_direct_dirty_pages(struct page **pages, unsigned int pgbase, size_t count) 128 { 129 unsigned int npages; 130 unsigned int i; 131 132 if (count == 0) 133 return; 134 pages += (pgbase >> PAGE_SHIFT); 135 npages = (count + (pgbase & ~PAGE_MASK) + PAGE_SIZE - 1) >> PAGE_SHIFT; 136 for (i = 0; i < npages; i++) { 137 struct page *page = pages[i]; 138 if (!PageCompound(page)) 139 set_page_dirty(page); 140 } 141 } 142 143 static void nfs_direct_release_pages(struct page **pages, unsigned int npages) 144 { 145 unsigned int i; 146 for (i = 0; i < npages; i++) 147 page_cache_release(pages[i]); 148 } 149 150 static inline struct nfs_direct_req *nfs_direct_req_alloc(void) 151 { 152 struct nfs_direct_req *dreq; 153 154 dreq = kmem_cache_alloc(nfs_direct_cachep, GFP_KERNEL); 155 if (!dreq) 156 return NULL; 157 158 kref_init(&dreq->kref); 159 kref_get(&dreq->kref); 160 init_completion(&dreq->completion); 161 INIT_LIST_HEAD(&dreq->rewrite_list); 162 dreq->iocb = NULL; 163 dreq->ctx = NULL; 164 dreq->l_ctx = NULL; 165 spin_lock_init(&dreq->lock); 166 atomic_set(&dreq->io_count, 0); 167 dreq->count = 0; 168 dreq->error = 0; 169 dreq->flags = 0; 170 171 return dreq; 172 } 173 174 static void nfs_direct_req_free(struct kref *kref) 175 { 176 struct nfs_direct_req *dreq = container_of(kref, struct nfs_direct_req, kref); 177 178 if (dreq->l_ctx != NULL) 179 nfs_put_lock_context(dreq->l_ctx); 180 if (dreq->ctx != NULL) 181 put_nfs_open_context(dreq->ctx); 182 kmem_cache_free(nfs_direct_cachep, dreq); 183 } 184 185 static void nfs_direct_req_release(struct nfs_direct_req *dreq) 186 { 187 kref_put(&dreq->kref, nfs_direct_req_free); 188 } 189 190 /* 191 * Collects and returns the final error value/byte-count. 192 */ 193 static ssize_t nfs_direct_wait(struct nfs_direct_req *dreq) 194 { 195 ssize_t result = -EIOCBQUEUED; 196 197 /* Async requests don't wait here */ 198 if (dreq->iocb) 199 goto out; 200 201 result = wait_for_completion_killable(&dreq->completion); 202 203 if (!result) 204 result = dreq->error; 205 if (!result) 206 result = dreq->count; 207 208 out: 209 return (ssize_t) result; 210 } 211 212 /* 213 * Synchronous I/O uses a stack-allocated iocb. Thus we can't trust 214 * the iocb is still valid here if this is a synchronous request. 215 */ 216 static void nfs_direct_complete(struct nfs_direct_req *dreq) 217 { 218 if (dreq->iocb) { 219 long res = (long) dreq->error; 220 if (!res) 221 res = (long) dreq->count; 222 aio_complete(dreq->iocb, res, 0); 223 } 224 complete_all(&dreq->completion); 225 226 nfs_direct_req_release(dreq); 227 } 228 229 /* 230 * We must hold a reference to all the pages in this direct read request 231 * until the RPCs complete. This could be long *after* we are woken up in 232 * nfs_direct_wait (for instance, if someone hits ^C on a slow server). 233 */ 234 static void nfs_direct_read_result(struct rpc_task *task, void *calldata) 235 { 236 struct nfs_read_data *data = calldata; 237 238 nfs_readpage_result(task, data); 239 } 240 241 static void nfs_direct_read_release(void *calldata) 242 { 243 244 struct nfs_read_data *data = calldata; 245 struct nfs_direct_req *dreq = (struct nfs_direct_req *) data->req; 246 int status = data->task.tk_status; 247 248 spin_lock(&dreq->lock); 249 if (unlikely(status < 0)) { 250 dreq->error = status; 251 spin_unlock(&dreq->lock); 252 } else { 253 dreq->count += data->res.count; 254 spin_unlock(&dreq->lock); 255 nfs_direct_dirty_pages(data->pagevec, 256 data->args.pgbase, 257 data->res.count); 258 } 259 nfs_direct_release_pages(data->pagevec, data->npages); 260 261 if (put_dreq(dreq)) 262 nfs_direct_complete(dreq); 263 nfs_readdata_release(data); 264 } 265 266 static const struct rpc_call_ops nfs_read_direct_ops = { 267 .rpc_call_prepare = nfs_read_prepare, 268 .rpc_call_done = nfs_direct_read_result, 269 .rpc_release = nfs_direct_read_release, 270 }; 271 272 /* 273 * For each rsize'd chunk of the user's buffer, dispatch an NFS READ 274 * operation. If nfs_readdata_alloc() or get_user_pages() fails, 275 * bail and stop sending more reads. Read length accounting is 276 * handled automatically by nfs_direct_read_result(). Otherwise, if 277 * no requests have been sent, just return an error. 278 */ 279 static ssize_t nfs_direct_read_schedule_segment(struct nfs_direct_req *dreq, 280 const struct iovec *iov, 281 loff_t pos) 282 { 283 struct nfs_open_context *ctx = dreq->ctx; 284 struct inode *inode = ctx->dentry->d_inode; 285 unsigned long user_addr = (unsigned long)iov->iov_base; 286 size_t count = iov->iov_len; 287 size_t rsize = NFS_SERVER(inode)->rsize; 288 struct rpc_task *task; 289 struct rpc_message msg = { 290 .rpc_cred = ctx->cred, 291 }; 292 struct rpc_task_setup task_setup_data = { 293 .rpc_client = NFS_CLIENT(inode), 294 .rpc_message = &msg, 295 .callback_ops = &nfs_read_direct_ops, 296 .workqueue = nfsiod_workqueue, 297 .flags = RPC_TASK_ASYNC, 298 }; 299 unsigned int pgbase; 300 int result; 301 ssize_t started = 0; 302 303 do { 304 struct nfs_read_data *data; 305 size_t bytes; 306 307 pgbase = user_addr & ~PAGE_MASK; 308 bytes = min(rsize,count); 309 310 result = -ENOMEM; 311 data = nfs_readdata_alloc(nfs_page_array_len(pgbase, bytes)); 312 if (unlikely(!data)) 313 break; 314 315 down_read(¤t->mm->mmap_sem); 316 result = get_user_pages(current, current->mm, user_addr, 317 data->npages, 1, 0, data->pagevec, NULL); 318 up_read(¤t->mm->mmap_sem); 319 if (result < 0) { 320 nfs_readdata_free(data); 321 break; 322 } 323 if ((unsigned)result < data->npages) { 324 bytes = result * PAGE_SIZE; 325 if (bytes <= pgbase) { 326 nfs_direct_release_pages(data->pagevec, result); 327 nfs_readdata_free(data); 328 break; 329 } 330 bytes -= pgbase; 331 data->npages = result; 332 } 333 334 get_dreq(dreq); 335 336 data->req = (struct nfs_page *) dreq; 337 data->inode = inode; 338 data->cred = msg.rpc_cred; 339 data->args.fh = NFS_FH(inode); 340 data->args.context = get_nfs_open_context(ctx); 341 data->args.lock_context = dreq->l_ctx; 342 data->args.offset = pos; 343 data->args.pgbase = pgbase; 344 data->args.pages = data->pagevec; 345 data->args.count = bytes; 346 data->res.fattr = &data->fattr; 347 data->res.eof = 0; 348 data->res.count = bytes; 349 nfs_fattr_init(&data->fattr); 350 msg.rpc_argp = &data->args; 351 msg.rpc_resp = &data->res; 352 353 task_setup_data.task = &data->task; 354 task_setup_data.callback_data = data; 355 NFS_PROTO(inode)->read_setup(data, &msg); 356 357 task = rpc_run_task(&task_setup_data); 358 if (IS_ERR(task)) 359 break; 360 rpc_put_task(task); 361 362 dprintk("NFS: %5u initiated direct read call " 363 "(req %s/%Ld, %zu bytes @ offset %Lu)\n", 364 data->task.tk_pid, 365 inode->i_sb->s_id, 366 (long long)NFS_FILEID(inode), 367 bytes, 368 (unsigned long long)data->args.offset); 369 370 started += bytes; 371 user_addr += bytes; 372 pos += bytes; 373 /* FIXME: Remove this unnecessary math from final patch */ 374 pgbase += bytes; 375 pgbase &= ~PAGE_MASK; 376 BUG_ON(pgbase != (user_addr & ~PAGE_MASK)); 377 378 count -= bytes; 379 } while (count != 0); 380 381 if (started) 382 return started; 383 return result < 0 ? (ssize_t) result : -EFAULT; 384 } 385 386 static ssize_t nfs_direct_read_schedule_iovec(struct nfs_direct_req *dreq, 387 const struct iovec *iov, 388 unsigned long nr_segs, 389 loff_t pos) 390 { 391 ssize_t result = -EINVAL; 392 size_t requested_bytes = 0; 393 unsigned long seg; 394 395 get_dreq(dreq); 396 397 for (seg = 0; seg < nr_segs; seg++) { 398 const struct iovec *vec = &iov[seg]; 399 result = nfs_direct_read_schedule_segment(dreq, vec, pos); 400 if (result < 0) 401 break; 402 requested_bytes += result; 403 if ((size_t)result < vec->iov_len) 404 break; 405 pos += vec->iov_len; 406 } 407 408 /* 409 * If no bytes were started, return the error, and let the 410 * generic layer handle the completion. 411 */ 412 if (requested_bytes == 0) { 413 nfs_direct_req_release(dreq); 414 return result < 0 ? result : -EIO; 415 } 416 417 if (put_dreq(dreq)) 418 nfs_direct_complete(dreq); 419 return 0; 420 } 421 422 static ssize_t nfs_direct_read(struct kiocb *iocb, const struct iovec *iov, 423 unsigned long nr_segs, loff_t pos) 424 { 425 ssize_t result = -ENOMEM; 426 struct inode *inode = iocb->ki_filp->f_mapping->host; 427 struct nfs_direct_req *dreq; 428 429 dreq = nfs_direct_req_alloc(); 430 if (dreq == NULL) 431 goto out; 432 433 dreq->inode = inode; 434 dreq->ctx = get_nfs_open_context(nfs_file_open_context(iocb->ki_filp)); 435 dreq->l_ctx = nfs_get_lock_context(dreq->ctx); 436 if (dreq->l_ctx == NULL) 437 goto out_release; 438 if (!is_sync_kiocb(iocb)) 439 dreq->iocb = iocb; 440 441 result = nfs_direct_read_schedule_iovec(dreq, iov, nr_segs, pos); 442 if (!result) 443 result = nfs_direct_wait(dreq); 444 out_release: 445 nfs_direct_req_release(dreq); 446 out: 447 return result; 448 } 449 450 static void nfs_direct_free_writedata(struct nfs_direct_req *dreq) 451 { 452 while (!list_empty(&dreq->rewrite_list)) { 453 struct nfs_write_data *data = list_entry(dreq->rewrite_list.next, struct nfs_write_data, pages); 454 list_del(&data->pages); 455 nfs_direct_release_pages(data->pagevec, data->npages); 456 nfs_writedata_free(data); 457 } 458 } 459 460 #if defined(CONFIG_NFS_V3) || defined(CONFIG_NFS_V4) 461 static void nfs_direct_write_reschedule(struct nfs_direct_req *dreq) 462 { 463 struct inode *inode = dreq->inode; 464 struct list_head *p; 465 struct nfs_write_data *data; 466 struct rpc_task *task; 467 struct rpc_message msg = { 468 .rpc_cred = dreq->ctx->cred, 469 }; 470 struct rpc_task_setup task_setup_data = { 471 .rpc_client = NFS_CLIENT(inode), 472 .rpc_message = &msg, 473 .callback_ops = &nfs_write_direct_ops, 474 .workqueue = nfsiod_workqueue, 475 .flags = RPC_TASK_ASYNC, 476 }; 477 478 dreq->count = 0; 479 get_dreq(dreq); 480 481 list_for_each(p, &dreq->rewrite_list) { 482 data = list_entry(p, struct nfs_write_data, pages); 483 484 get_dreq(dreq); 485 486 /* Use stable writes */ 487 data->args.stable = NFS_FILE_SYNC; 488 489 /* 490 * Reset data->res. 491 */ 492 nfs_fattr_init(&data->fattr); 493 data->res.count = data->args.count; 494 memset(&data->verf, 0, sizeof(data->verf)); 495 496 /* 497 * Reuse data->task; data->args should not have changed 498 * since the original request was sent. 499 */ 500 task_setup_data.task = &data->task; 501 task_setup_data.callback_data = data; 502 msg.rpc_argp = &data->args; 503 msg.rpc_resp = &data->res; 504 NFS_PROTO(inode)->write_setup(data, &msg); 505 506 /* 507 * We're called via an RPC callback, so BKL is already held. 508 */ 509 task = rpc_run_task(&task_setup_data); 510 if (!IS_ERR(task)) 511 rpc_put_task(task); 512 513 dprintk("NFS: %5u rescheduled direct write call (req %s/%Ld, %u bytes @ offset %Lu)\n", 514 data->task.tk_pid, 515 inode->i_sb->s_id, 516 (long long)NFS_FILEID(inode), 517 data->args.count, 518 (unsigned long long)data->args.offset); 519 } 520 521 if (put_dreq(dreq)) 522 nfs_direct_write_complete(dreq, inode); 523 } 524 525 static void nfs_direct_commit_result(struct rpc_task *task, void *calldata) 526 { 527 struct nfs_commit_data *data = calldata; 528 529 /* Call the NFS version-specific code */ 530 NFS_PROTO(data->inode)->commit_done(task, data); 531 } 532 533 static void nfs_direct_commit_release(void *calldata) 534 { 535 struct nfs_commit_data *data = calldata; 536 struct nfs_direct_req *dreq = data->dreq; 537 int status = data->task.tk_status; 538 539 if (status < 0) { 540 dprintk("NFS: %5u commit failed with error %d.\n", 541 data->task.tk_pid, status); 542 dreq->flags = NFS_ODIRECT_RESCHED_WRITES; 543 } else if (memcmp(&dreq->verf, &data->verf, sizeof(data->verf))) { 544 dprintk("NFS: %5u commit verify failed\n", data->task.tk_pid); 545 dreq->flags = NFS_ODIRECT_RESCHED_WRITES; 546 } 547 548 dprintk("NFS: %5u commit returned %d\n", data->task.tk_pid, status); 549 nfs_direct_write_complete(dreq, data->inode); 550 nfs_commit_free(data); 551 } 552 553 static const struct rpc_call_ops nfs_commit_direct_ops = { 554 .rpc_call_prepare = nfs_commit_prepare, 555 .rpc_call_done = nfs_direct_commit_result, 556 .rpc_release = nfs_direct_commit_release, 557 }; 558 559 static void nfs_direct_commit_schedule(struct nfs_direct_req *dreq) 560 { 561 struct nfs_commit_data *data = dreq->commit_data; 562 struct rpc_task *task; 563 struct rpc_message msg = { 564 .rpc_argp = &data->args, 565 .rpc_resp = &data->res, 566 .rpc_cred = dreq->ctx->cred, 567 }; 568 struct rpc_task_setup task_setup_data = { 569 .task = &data->task, 570 .rpc_client = NFS_CLIENT(dreq->inode), 571 .rpc_message = &msg, 572 .callback_ops = &nfs_commit_direct_ops, 573 .callback_data = data, 574 .workqueue = nfsiod_workqueue, 575 .flags = RPC_TASK_ASYNC, 576 }; 577 578 data->inode = dreq->inode; 579 data->cred = msg.rpc_cred; 580 581 data->args.fh = NFS_FH(data->inode); 582 data->args.offset = 0; 583 data->args.count = 0; 584 data->res.fattr = &data->fattr; 585 data->res.verf = &data->verf; 586 nfs_fattr_init(&data->fattr); 587 588 NFS_PROTO(data->inode)->commit_setup(data, &msg); 589 590 /* Note: task.tk_ops->rpc_release will free dreq->commit_data */ 591 dreq->commit_data = NULL; 592 593 dprintk("NFS: %5u initiated commit call\n", data->task.tk_pid); 594 595 task = rpc_run_task(&task_setup_data); 596 if (!IS_ERR(task)) 597 rpc_put_task(task); 598 } 599 600 static void nfs_direct_write_complete(struct nfs_direct_req *dreq, struct inode *inode) 601 { 602 int flags = dreq->flags; 603 604 dreq->flags = 0; 605 switch (flags) { 606 case NFS_ODIRECT_DO_COMMIT: 607 nfs_direct_commit_schedule(dreq); 608 break; 609 case NFS_ODIRECT_RESCHED_WRITES: 610 nfs_direct_write_reschedule(dreq); 611 break; 612 default: 613 if (dreq->commit_data != NULL) 614 nfs_commit_free(dreq->commit_data); 615 nfs_direct_free_writedata(dreq); 616 nfs_zap_mapping(inode, inode->i_mapping); 617 nfs_direct_complete(dreq); 618 } 619 } 620 621 static void nfs_alloc_commit_data(struct nfs_direct_req *dreq) 622 { 623 dreq->commit_data = nfs_commitdata_alloc(); 624 if (dreq->commit_data != NULL) 625 dreq->commit_data->dreq = dreq; 626 } 627 #else 628 static inline void nfs_alloc_commit_data(struct nfs_direct_req *dreq) 629 { 630 dreq->commit_data = NULL; 631 } 632 633 static void nfs_direct_write_complete(struct nfs_direct_req *dreq, struct inode *inode) 634 { 635 nfs_direct_free_writedata(dreq); 636 nfs_zap_mapping(inode, inode->i_mapping); 637 nfs_direct_complete(dreq); 638 } 639 #endif 640 641 static void nfs_direct_write_result(struct rpc_task *task, void *calldata) 642 { 643 struct nfs_write_data *data = calldata; 644 645 nfs_writeback_done(task, data); 646 } 647 648 /* 649 * NB: Return the value of the first error return code. Subsequent 650 * errors after the first one are ignored. 651 */ 652 static void nfs_direct_write_release(void *calldata) 653 { 654 struct nfs_write_data *data = calldata; 655 struct nfs_direct_req *dreq = (struct nfs_direct_req *) data->req; 656 int status = data->task.tk_status; 657 658 spin_lock(&dreq->lock); 659 660 if (unlikely(status < 0)) { 661 /* An error has occurred, so we should not commit */ 662 dreq->flags = 0; 663 dreq->error = status; 664 } 665 if (unlikely(dreq->error != 0)) 666 goto out_unlock; 667 668 dreq->count += data->res.count; 669 670 if (data->res.verf->committed != NFS_FILE_SYNC) { 671 switch (dreq->flags) { 672 case 0: 673 memcpy(&dreq->verf, &data->verf, sizeof(dreq->verf)); 674 dreq->flags = NFS_ODIRECT_DO_COMMIT; 675 break; 676 case NFS_ODIRECT_DO_COMMIT: 677 if (memcmp(&dreq->verf, &data->verf, sizeof(dreq->verf))) { 678 dprintk("NFS: %5u write verify failed\n", data->task.tk_pid); 679 dreq->flags = NFS_ODIRECT_RESCHED_WRITES; 680 } 681 } 682 } 683 out_unlock: 684 spin_unlock(&dreq->lock); 685 686 if (put_dreq(dreq)) 687 nfs_direct_write_complete(dreq, data->inode); 688 } 689 690 static const struct rpc_call_ops nfs_write_direct_ops = { 691 .rpc_call_prepare = nfs_write_prepare, 692 .rpc_call_done = nfs_direct_write_result, 693 .rpc_release = nfs_direct_write_release, 694 }; 695 696 /* 697 * For each wsize'd chunk of the user's buffer, dispatch an NFS WRITE 698 * operation. If nfs_writedata_alloc() or get_user_pages() fails, 699 * bail and stop sending more writes. Write length accounting is 700 * handled automatically by nfs_direct_write_result(). Otherwise, if 701 * no requests have been sent, just return an error. 702 */ 703 static ssize_t nfs_direct_write_schedule_segment(struct nfs_direct_req *dreq, 704 const struct iovec *iov, 705 loff_t pos, int sync) 706 { 707 struct nfs_open_context *ctx = dreq->ctx; 708 struct inode *inode = ctx->dentry->d_inode; 709 unsigned long user_addr = (unsigned long)iov->iov_base; 710 size_t count = iov->iov_len; 711 struct rpc_task *task; 712 struct rpc_message msg = { 713 .rpc_cred = ctx->cred, 714 }; 715 struct rpc_task_setup task_setup_data = { 716 .rpc_client = NFS_CLIENT(inode), 717 .rpc_message = &msg, 718 .callback_ops = &nfs_write_direct_ops, 719 .workqueue = nfsiod_workqueue, 720 .flags = RPC_TASK_ASYNC, 721 }; 722 size_t wsize = NFS_SERVER(inode)->wsize; 723 unsigned int pgbase; 724 int result; 725 ssize_t started = 0; 726 727 do { 728 struct nfs_write_data *data; 729 size_t bytes; 730 731 pgbase = user_addr & ~PAGE_MASK; 732 bytes = min(wsize,count); 733 734 result = -ENOMEM; 735 data = nfs_writedata_alloc(nfs_page_array_len(pgbase, bytes)); 736 if (unlikely(!data)) 737 break; 738 739 down_read(¤t->mm->mmap_sem); 740 result = get_user_pages(current, current->mm, user_addr, 741 data->npages, 0, 0, data->pagevec, NULL); 742 up_read(¤t->mm->mmap_sem); 743 if (result < 0) { 744 nfs_writedata_free(data); 745 break; 746 } 747 if ((unsigned)result < data->npages) { 748 bytes = result * PAGE_SIZE; 749 if (bytes <= pgbase) { 750 nfs_direct_release_pages(data->pagevec, result); 751 nfs_writedata_free(data); 752 break; 753 } 754 bytes -= pgbase; 755 data->npages = result; 756 } 757 758 get_dreq(dreq); 759 760 list_move_tail(&data->pages, &dreq->rewrite_list); 761 762 data->req = (struct nfs_page *) dreq; 763 data->inode = inode; 764 data->cred = msg.rpc_cred; 765 data->args.fh = NFS_FH(inode); 766 data->args.context = ctx; 767 data->args.lock_context = dreq->l_ctx; 768 data->args.offset = pos; 769 data->args.pgbase = pgbase; 770 data->args.pages = data->pagevec; 771 data->args.count = bytes; 772 data->args.stable = sync; 773 data->res.fattr = &data->fattr; 774 data->res.count = bytes; 775 data->res.verf = &data->verf; 776 nfs_fattr_init(&data->fattr); 777 778 task_setup_data.task = &data->task; 779 task_setup_data.callback_data = data; 780 msg.rpc_argp = &data->args; 781 msg.rpc_resp = &data->res; 782 NFS_PROTO(inode)->write_setup(data, &msg); 783 784 task = rpc_run_task(&task_setup_data); 785 if (IS_ERR(task)) 786 break; 787 rpc_put_task(task); 788 789 dprintk("NFS: %5u initiated direct write call " 790 "(req %s/%Ld, %zu bytes @ offset %Lu)\n", 791 data->task.tk_pid, 792 inode->i_sb->s_id, 793 (long long)NFS_FILEID(inode), 794 bytes, 795 (unsigned long long)data->args.offset); 796 797 started += bytes; 798 user_addr += bytes; 799 pos += bytes; 800 801 /* FIXME: Remove this useless math from the final patch */ 802 pgbase += bytes; 803 pgbase &= ~PAGE_MASK; 804 BUG_ON(pgbase != (user_addr & ~PAGE_MASK)); 805 806 count -= bytes; 807 } while (count != 0); 808 809 if (started) 810 return started; 811 return result < 0 ? (ssize_t) result : -EFAULT; 812 } 813 814 static ssize_t nfs_direct_write_schedule_iovec(struct nfs_direct_req *dreq, 815 const struct iovec *iov, 816 unsigned long nr_segs, 817 loff_t pos, int sync) 818 { 819 ssize_t result = 0; 820 size_t requested_bytes = 0; 821 unsigned long seg; 822 823 get_dreq(dreq); 824 825 for (seg = 0; seg < nr_segs; seg++) { 826 const struct iovec *vec = &iov[seg]; 827 result = nfs_direct_write_schedule_segment(dreq, vec, 828 pos, sync); 829 if (result < 0) 830 break; 831 requested_bytes += result; 832 if ((size_t)result < vec->iov_len) 833 break; 834 pos += vec->iov_len; 835 } 836 837 /* 838 * If no bytes were started, return the error, and let the 839 * generic layer handle the completion. 840 */ 841 if (requested_bytes == 0) { 842 nfs_direct_req_release(dreq); 843 return result < 0 ? result : -EIO; 844 } 845 846 if (put_dreq(dreq)) 847 nfs_direct_write_complete(dreq, dreq->inode); 848 return 0; 849 } 850 851 static ssize_t nfs_direct_write(struct kiocb *iocb, const struct iovec *iov, 852 unsigned long nr_segs, loff_t pos, 853 size_t count) 854 { 855 ssize_t result = -ENOMEM; 856 struct inode *inode = iocb->ki_filp->f_mapping->host; 857 struct nfs_direct_req *dreq; 858 size_t wsize = NFS_SERVER(inode)->wsize; 859 int sync = NFS_UNSTABLE; 860 861 dreq = nfs_direct_req_alloc(); 862 if (!dreq) 863 goto out; 864 nfs_alloc_commit_data(dreq); 865 866 if (dreq->commit_data == NULL || count <= wsize) 867 sync = NFS_FILE_SYNC; 868 869 dreq->inode = inode; 870 dreq->ctx = get_nfs_open_context(nfs_file_open_context(iocb->ki_filp)); 871 dreq->l_ctx = nfs_get_lock_context(dreq->ctx); 872 if (dreq->l_ctx == NULL) 873 goto out_release; 874 if (!is_sync_kiocb(iocb)) 875 dreq->iocb = iocb; 876 877 result = nfs_direct_write_schedule_iovec(dreq, iov, nr_segs, pos, sync); 878 if (!result) 879 result = nfs_direct_wait(dreq); 880 out_release: 881 nfs_direct_req_release(dreq); 882 out: 883 return result; 884 } 885 886 /** 887 * nfs_file_direct_read - file direct read operation for NFS files 888 * @iocb: target I/O control block 889 * @iov: vector of user buffers into which to read data 890 * @nr_segs: size of iov vector 891 * @pos: byte offset in file where reading starts 892 * 893 * We use this function for direct reads instead of calling 894 * generic_file_aio_read() in order to avoid gfar's check to see if 895 * the request starts before the end of the file. For that check 896 * to work, we must generate a GETATTR before each direct read, and 897 * even then there is a window between the GETATTR and the subsequent 898 * READ where the file size could change. Our preference is simply 899 * to do all reads the application wants, and the server will take 900 * care of managing the end of file boundary. 901 * 902 * This function also eliminates unnecessarily updating the file's 903 * atime locally, as the NFS server sets the file's atime, and this 904 * client must read the updated atime from the server back into its 905 * cache. 906 */ 907 ssize_t nfs_file_direct_read(struct kiocb *iocb, const struct iovec *iov, 908 unsigned long nr_segs, loff_t pos) 909 { 910 ssize_t retval = -EINVAL; 911 struct file *file = iocb->ki_filp; 912 struct address_space *mapping = file->f_mapping; 913 size_t count; 914 915 count = iov_length(iov, nr_segs); 916 nfs_add_stats(mapping->host, NFSIOS_DIRECTREADBYTES, count); 917 918 dfprintk(FILE, "NFS: direct read(%s/%s, %zd@%Ld)\n", 919 file->f_path.dentry->d_parent->d_name.name, 920 file->f_path.dentry->d_name.name, 921 count, (long long) pos); 922 923 retval = 0; 924 if (!count) 925 goto out; 926 927 retval = nfs_sync_mapping(mapping); 928 if (retval) 929 goto out; 930 931 task_io_account_read(count); 932 933 retval = nfs_direct_read(iocb, iov, nr_segs, pos); 934 if (retval > 0) 935 iocb->ki_pos = pos + retval; 936 937 out: 938 return retval; 939 } 940 941 /** 942 * nfs_file_direct_write - file direct write operation for NFS files 943 * @iocb: target I/O control block 944 * @iov: vector of user buffers from which to write data 945 * @nr_segs: size of iov vector 946 * @pos: byte offset in file where writing starts 947 * 948 * We use this function for direct writes instead of calling 949 * generic_file_aio_write() in order to avoid taking the inode 950 * semaphore and updating the i_size. The NFS server will set 951 * the new i_size and this client must read the updated size 952 * back into its cache. We let the server do generic write 953 * parameter checking and report problems. 954 * 955 * We eliminate local atime updates, see direct read above. 956 * 957 * We avoid unnecessary page cache invalidations for normal cached 958 * readers of this file. 959 * 960 * Note that O_APPEND is not supported for NFS direct writes, as there 961 * is no atomic O_APPEND write facility in the NFS protocol. 962 */ 963 ssize_t nfs_file_direct_write(struct kiocb *iocb, const struct iovec *iov, 964 unsigned long nr_segs, loff_t pos) 965 { 966 ssize_t retval = -EINVAL; 967 struct file *file = iocb->ki_filp; 968 struct address_space *mapping = file->f_mapping; 969 size_t count; 970 971 count = iov_length(iov, nr_segs); 972 nfs_add_stats(mapping->host, NFSIOS_DIRECTWRITTENBYTES, count); 973 974 dfprintk(FILE, "NFS: direct write(%s/%s, %zd@%Ld)\n", 975 file->f_path.dentry->d_parent->d_name.name, 976 file->f_path.dentry->d_name.name, 977 count, (long long) pos); 978 979 retval = generic_write_checks(file, &pos, &count, 0); 980 if (retval) 981 goto out; 982 983 retval = -EINVAL; 984 if ((ssize_t) count < 0) 985 goto out; 986 retval = 0; 987 if (!count) 988 goto out; 989 990 retval = nfs_sync_mapping(mapping); 991 if (retval) 992 goto out; 993 994 task_io_account_write(count); 995 996 retval = nfs_direct_write(iocb, iov, nr_segs, pos, count); 997 998 if (retval > 0) 999 iocb->ki_pos = pos + retval; 1000 1001 out: 1002 return retval; 1003 } 1004 1005 /** 1006 * nfs_init_directcache - create a slab cache for nfs_direct_req structures 1007 * 1008 */ 1009 int __init nfs_init_directcache(void) 1010 { 1011 nfs_direct_cachep = kmem_cache_create("nfs_direct_cache", 1012 sizeof(struct nfs_direct_req), 1013 0, (SLAB_RECLAIM_ACCOUNT| 1014 SLAB_MEM_SPREAD), 1015 NULL); 1016 if (nfs_direct_cachep == NULL) 1017 return -ENOMEM; 1018 1019 return 0; 1020 } 1021 1022 /** 1023 * nfs_destroy_directcache - destroy the slab cache for nfs_direct_req structures 1024 * 1025 */ 1026 void nfs_destroy_directcache(void) 1027 { 1028 kmem_cache_destroy(nfs_direct_cachep); 1029 } 1030