1 /* 2 * linux/fs/nfs/direct.c 3 * 4 * Copyright (C) 2003 by Chuck Lever <cel@netapp.com> 5 * 6 * High-performance uncached I/O for the Linux NFS client 7 * 8 * There are important applications whose performance or correctness 9 * depends on uncached access to file data. Database clusters 10 * (multiple copies of the same instance running on separate hosts) 11 * implement their own cache coherency protocol that subsumes file 12 * system cache protocols. Applications that process datasets 13 * considerably larger than the client's memory do not always benefit 14 * from a local cache. A streaming video server, for instance, has no 15 * need to cache the contents of a file. 16 * 17 * When an application requests uncached I/O, all read and write requests 18 * are made directly to the server; data stored or fetched via these 19 * requests is not cached in the Linux page cache. The client does not 20 * correct unaligned requests from applications. All requested bytes are 21 * held on permanent storage before a direct write system call returns to 22 * an application. 23 * 24 * Solaris implements an uncached I/O facility called directio() that 25 * is used for backups and sequential I/O to very large files. Solaris 26 * also supports uncaching whole NFS partitions with "-o forcedirectio," 27 * an undocumented mount option. 28 * 29 * Designed by Jeff Kimmel, Chuck Lever, and Trond Myklebust, with 30 * help from Andrew Morton. 31 * 32 * 18 Dec 2001 Initial implementation for 2.4 --cel 33 * 08 Jul 2002 Version for 2.4.19, with bug fixes --trondmy 34 * 08 Jun 2003 Port to 2.5 APIs --cel 35 * 31 Mar 2004 Handle direct I/O without VFS support --cel 36 * 15 Sep 2004 Parallel async reads --cel 37 * 04 May 2005 support O_DIRECT with aio --cel 38 * 39 */ 40 41 #include <linux/errno.h> 42 #include <linux/sched.h> 43 #include <linux/kernel.h> 44 #include <linux/file.h> 45 #include <linux/pagemap.h> 46 #include <linux/kref.h> 47 48 #include <linux/nfs_fs.h> 49 #include <linux/nfs_page.h> 50 #include <linux/sunrpc/clnt.h> 51 52 #include <asm/system.h> 53 #include <asm/uaccess.h> 54 #include <asm/atomic.h> 55 56 #include "internal.h" 57 #include "iostat.h" 58 59 #define NFSDBG_FACILITY NFSDBG_VFS 60 61 static struct kmem_cache *nfs_direct_cachep; 62 63 /* 64 * This represents a set of asynchronous requests that we're waiting on 65 */ 66 struct nfs_direct_req { 67 struct kref kref; /* release manager */ 68 69 /* I/O parameters */ 70 struct nfs_open_context *ctx; /* file open context info */ 71 struct kiocb * iocb; /* controlling i/o request */ 72 struct inode * inode; /* target file of i/o */ 73 74 /* completion state */ 75 atomic_t io_count; /* i/os we're waiting for */ 76 spinlock_t lock; /* protect completion state */ 77 ssize_t count, /* bytes actually processed */ 78 error; /* any reported error */ 79 struct completion completion; /* wait for i/o completion */ 80 81 /* commit state */ 82 struct list_head rewrite_list; /* saved nfs_write_data structs */ 83 struct nfs_write_data * commit_data; /* special write_data for commits */ 84 int flags; 85 #define NFS_ODIRECT_DO_COMMIT (1) /* an unstable reply was received */ 86 #define NFS_ODIRECT_RESCHED_WRITES (2) /* write verification failed */ 87 struct nfs_writeverf verf; /* unstable write verifier */ 88 }; 89 90 static void nfs_direct_write_complete(struct nfs_direct_req *dreq, struct inode *inode); 91 static const struct rpc_call_ops nfs_write_direct_ops; 92 93 static inline void get_dreq(struct nfs_direct_req *dreq) 94 { 95 atomic_inc(&dreq->io_count); 96 } 97 98 static inline int put_dreq(struct nfs_direct_req *dreq) 99 { 100 return atomic_dec_and_test(&dreq->io_count); 101 } 102 103 /** 104 * nfs_direct_IO - NFS address space operation for direct I/O 105 * @rw: direction (read or write) 106 * @iocb: target I/O control block 107 * @iov: array of vectors that define I/O buffer 108 * @pos: offset in file to begin the operation 109 * @nr_segs: size of iovec array 110 * 111 * The presence of this routine in the address space ops vector means 112 * the NFS client supports direct I/O. However, we shunt off direct 113 * read and write requests before the VFS gets them, so this method 114 * should never be called. 115 */ 116 ssize_t nfs_direct_IO(int rw, struct kiocb *iocb, const struct iovec *iov, loff_t pos, unsigned long nr_segs) 117 { 118 dprintk("NFS: nfs_direct_IO (%s) off/no(%Ld/%lu) EINVAL\n", 119 iocb->ki_filp->f_path.dentry->d_name.name, 120 (long long) pos, nr_segs); 121 122 return -EINVAL; 123 } 124 125 static void nfs_direct_dirty_pages(struct page **pages, unsigned int pgbase, size_t count) 126 { 127 unsigned int npages; 128 unsigned int i; 129 130 if (count == 0) 131 return; 132 pages += (pgbase >> PAGE_SHIFT); 133 npages = (count + (pgbase & ~PAGE_MASK) + PAGE_SIZE - 1) >> PAGE_SHIFT; 134 for (i = 0; i < npages; i++) { 135 struct page *page = pages[i]; 136 if (!PageCompound(page)) 137 set_page_dirty(page); 138 } 139 } 140 141 static void nfs_direct_release_pages(struct page **pages, unsigned int npages) 142 { 143 unsigned int i; 144 for (i = 0; i < npages; i++) 145 page_cache_release(pages[i]); 146 } 147 148 static inline struct nfs_direct_req *nfs_direct_req_alloc(void) 149 { 150 struct nfs_direct_req *dreq; 151 152 dreq = kmem_cache_alloc(nfs_direct_cachep, GFP_KERNEL); 153 if (!dreq) 154 return NULL; 155 156 kref_init(&dreq->kref); 157 kref_get(&dreq->kref); 158 init_completion(&dreq->completion); 159 INIT_LIST_HEAD(&dreq->rewrite_list); 160 dreq->iocb = NULL; 161 dreq->ctx = NULL; 162 spin_lock_init(&dreq->lock); 163 atomic_set(&dreq->io_count, 0); 164 dreq->count = 0; 165 dreq->error = 0; 166 dreq->flags = 0; 167 168 return dreq; 169 } 170 171 static void nfs_direct_req_free(struct kref *kref) 172 { 173 struct nfs_direct_req *dreq = container_of(kref, struct nfs_direct_req, kref); 174 175 if (dreq->ctx != NULL) 176 put_nfs_open_context(dreq->ctx); 177 kmem_cache_free(nfs_direct_cachep, dreq); 178 } 179 180 static void nfs_direct_req_release(struct nfs_direct_req *dreq) 181 { 182 kref_put(&dreq->kref, nfs_direct_req_free); 183 } 184 185 /* 186 * Collects and returns the final error value/byte-count. 187 */ 188 static ssize_t nfs_direct_wait(struct nfs_direct_req *dreq) 189 { 190 ssize_t result = -EIOCBQUEUED; 191 192 /* Async requests don't wait here */ 193 if (dreq->iocb) 194 goto out; 195 196 result = wait_for_completion_killable(&dreq->completion); 197 198 if (!result) 199 result = dreq->error; 200 if (!result) 201 result = dreq->count; 202 203 out: 204 return (ssize_t) result; 205 } 206 207 /* 208 * Synchronous I/O uses a stack-allocated iocb. Thus we can't trust 209 * the iocb is still valid here if this is a synchronous request. 210 */ 211 static void nfs_direct_complete(struct nfs_direct_req *dreq) 212 { 213 if (dreq->iocb) { 214 long res = (long) dreq->error; 215 if (!res) 216 res = (long) dreq->count; 217 aio_complete(dreq->iocb, res, 0); 218 } 219 complete_all(&dreq->completion); 220 221 nfs_direct_req_release(dreq); 222 } 223 224 /* 225 * We must hold a reference to all the pages in this direct read request 226 * until the RPCs complete. This could be long *after* we are woken up in 227 * nfs_direct_wait (for instance, if someone hits ^C on a slow server). 228 */ 229 static void nfs_direct_read_result(struct rpc_task *task, void *calldata) 230 { 231 struct nfs_read_data *data = calldata; 232 233 nfs_readpage_result(task, data); 234 } 235 236 static void nfs_direct_read_release(void *calldata) 237 { 238 239 struct nfs_read_data *data = calldata; 240 struct nfs_direct_req *dreq = (struct nfs_direct_req *) data->req; 241 int status = data->task.tk_status; 242 243 spin_lock(&dreq->lock); 244 if (unlikely(status < 0)) { 245 dreq->error = status; 246 spin_unlock(&dreq->lock); 247 } else { 248 dreq->count += data->res.count; 249 spin_unlock(&dreq->lock); 250 nfs_direct_dirty_pages(data->pagevec, 251 data->args.pgbase, 252 data->res.count); 253 } 254 nfs_direct_release_pages(data->pagevec, data->npages); 255 256 if (put_dreq(dreq)) 257 nfs_direct_complete(dreq); 258 nfs_readdata_release(calldata); 259 } 260 261 static const struct rpc_call_ops nfs_read_direct_ops = { 262 .rpc_call_done = nfs_direct_read_result, 263 .rpc_release = nfs_direct_read_release, 264 }; 265 266 /* 267 * For each rsize'd chunk of the user's buffer, dispatch an NFS READ 268 * operation. If nfs_readdata_alloc() or get_user_pages() fails, 269 * bail and stop sending more reads. Read length accounting is 270 * handled automatically by nfs_direct_read_result(). Otherwise, if 271 * no requests have been sent, just return an error. 272 */ 273 static ssize_t nfs_direct_read_schedule_segment(struct nfs_direct_req *dreq, 274 const struct iovec *iov, 275 loff_t pos) 276 { 277 struct nfs_open_context *ctx = dreq->ctx; 278 struct inode *inode = ctx->path.dentry->d_inode; 279 unsigned long user_addr = (unsigned long)iov->iov_base; 280 size_t count = iov->iov_len; 281 size_t rsize = NFS_SERVER(inode)->rsize; 282 struct rpc_task *task; 283 struct rpc_message msg = { 284 .rpc_cred = ctx->cred, 285 }; 286 struct rpc_task_setup task_setup_data = { 287 .rpc_client = NFS_CLIENT(inode), 288 .rpc_message = &msg, 289 .callback_ops = &nfs_read_direct_ops, 290 .workqueue = nfsiod_workqueue, 291 .flags = RPC_TASK_ASYNC, 292 }; 293 unsigned int pgbase; 294 int result; 295 ssize_t started = 0; 296 297 do { 298 struct nfs_read_data *data; 299 size_t bytes; 300 301 pgbase = user_addr & ~PAGE_MASK; 302 bytes = min(rsize,count); 303 304 result = -ENOMEM; 305 data = nfs_readdata_alloc(nfs_page_array_len(pgbase, bytes)); 306 if (unlikely(!data)) 307 break; 308 309 down_read(¤t->mm->mmap_sem); 310 result = get_user_pages(current, current->mm, user_addr, 311 data->npages, 1, 0, data->pagevec, NULL); 312 up_read(¤t->mm->mmap_sem); 313 if (result < 0) { 314 nfs_readdata_release(data); 315 break; 316 } 317 if ((unsigned)result < data->npages) { 318 bytes = result * PAGE_SIZE; 319 if (bytes <= pgbase) { 320 nfs_direct_release_pages(data->pagevec, result); 321 nfs_readdata_release(data); 322 break; 323 } 324 bytes -= pgbase; 325 data->npages = result; 326 } 327 328 get_dreq(dreq); 329 330 data->req = (struct nfs_page *) dreq; 331 data->inode = inode; 332 data->cred = msg.rpc_cred; 333 data->args.fh = NFS_FH(inode); 334 data->args.context = get_nfs_open_context(ctx); 335 data->args.offset = pos; 336 data->args.pgbase = pgbase; 337 data->args.pages = data->pagevec; 338 data->args.count = bytes; 339 data->res.fattr = &data->fattr; 340 data->res.eof = 0; 341 data->res.count = bytes; 342 msg.rpc_argp = &data->args; 343 msg.rpc_resp = &data->res; 344 345 task_setup_data.task = &data->task; 346 task_setup_data.callback_data = data; 347 NFS_PROTO(inode)->read_setup(data, &msg); 348 349 task = rpc_run_task(&task_setup_data); 350 if (!IS_ERR(task)) 351 rpc_put_task(task); 352 353 dprintk("NFS: %5u initiated direct read call " 354 "(req %s/%Ld, %zu bytes @ offset %Lu)\n", 355 data->task.tk_pid, 356 inode->i_sb->s_id, 357 (long long)NFS_FILEID(inode), 358 bytes, 359 (unsigned long long)data->args.offset); 360 361 started += bytes; 362 user_addr += bytes; 363 pos += bytes; 364 /* FIXME: Remove this unnecessary math from final patch */ 365 pgbase += bytes; 366 pgbase &= ~PAGE_MASK; 367 BUG_ON(pgbase != (user_addr & ~PAGE_MASK)); 368 369 count -= bytes; 370 } while (count != 0); 371 372 if (started) 373 return started; 374 return result < 0 ? (ssize_t) result : -EFAULT; 375 } 376 377 static ssize_t nfs_direct_read_schedule_iovec(struct nfs_direct_req *dreq, 378 const struct iovec *iov, 379 unsigned long nr_segs, 380 loff_t pos) 381 { 382 ssize_t result = -EINVAL; 383 size_t requested_bytes = 0; 384 unsigned long seg; 385 386 get_dreq(dreq); 387 388 for (seg = 0; seg < nr_segs; seg++) { 389 const struct iovec *vec = &iov[seg]; 390 result = nfs_direct_read_schedule_segment(dreq, vec, pos); 391 if (result < 0) 392 break; 393 requested_bytes += result; 394 if ((size_t)result < vec->iov_len) 395 break; 396 pos += vec->iov_len; 397 } 398 399 if (put_dreq(dreq)) 400 nfs_direct_complete(dreq); 401 402 if (requested_bytes != 0) 403 return 0; 404 405 if (result < 0) 406 return result; 407 return -EIO; 408 } 409 410 static ssize_t nfs_direct_read(struct kiocb *iocb, const struct iovec *iov, 411 unsigned long nr_segs, loff_t pos) 412 { 413 ssize_t result = 0; 414 struct inode *inode = iocb->ki_filp->f_mapping->host; 415 struct nfs_direct_req *dreq; 416 417 dreq = nfs_direct_req_alloc(); 418 if (!dreq) 419 return -ENOMEM; 420 421 dreq->inode = inode; 422 dreq->ctx = get_nfs_open_context(nfs_file_open_context(iocb->ki_filp)); 423 if (!is_sync_kiocb(iocb)) 424 dreq->iocb = iocb; 425 426 result = nfs_direct_read_schedule_iovec(dreq, iov, nr_segs, pos); 427 if (!result) 428 result = nfs_direct_wait(dreq); 429 nfs_direct_req_release(dreq); 430 431 return result; 432 } 433 434 static void nfs_direct_free_writedata(struct nfs_direct_req *dreq) 435 { 436 while (!list_empty(&dreq->rewrite_list)) { 437 struct nfs_write_data *data = list_entry(dreq->rewrite_list.next, struct nfs_write_data, pages); 438 list_del(&data->pages); 439 nfs_direct_release_pages(data->pagevec, data->npages); 440 nfs_writedata_release(data); 441 } 442 } 443 444 #if defined(CONFIG_NFS_V3) || defined(CONFIG_NFS_V4) 445 static void nfs_direct_write_reschedule(struct nfs_direct_req *dreq) 446 { 447 struct inode *inode = dreq->inode; 448 struct list_head *p; 449 struct nfs_write_data *data; 450 struct rpc_task *task; 451 struct rpc_message msg = { 452 .rpc_cred = dreq->ctx->cred, 453 }; 454 struct rpc_task_setup task_setup_data = { 455 .rpc_client = NFS_CLIENT(inode), 456 .callback_ops = &nfs_write_direct_ops, 457 .workqueue = nfsiod_workqueue, 458 .flags = RPC_TASK_ASYNC, 459 }; 460 461 dreq->count = 0; 462 get_dreq(dreq); 463 464 list_for_each(p, &dreq->rewrite_list) { 465 data = list_entry(p, struct nfs_write_data, pages); 466 467 get_dreq(dreq); 468 469 /* Use stable writes */ 470 data->args.stable = NFS_FILE_SYNC; 471 472 /* 473 * Reset data->res. 474 */ 475 nfs_fattr_init(&data->fattr); 476 data->res.count = data->args.count; 477 memset(&data->verf, 0, sizeof(data->verf)); 478 479 /* 480 * Reuse data->task; data->args should not have changed 481 * since the original request was sent. 482 */ 483 task_setup_data.task = &data->task; 484 task_setup_data.callback_data = data; 485 msg.rpc_argp = &data->args; 486 msg.rpc_resp = &data->res; 487 NFS_PROTO(inode)->write_setup(data, &msg); 488 489 /* 490 * We're called via an RPC callback, so BKL is already held. 491 */ 492 task = rpc_run_task(&task_setup_data); 493 if (!IS_ERR(task)) 494 rpc_put_task(task); 495 496 dprintk("NFS: %5u rescheduled direct write call (req %s/%Ld, %u bytes @ offset %Lu)\n", 497 data->task.tk_pid, 498 inode->i_sb->s_id, 499 (long long)NFS_FILEID(inode), 500 data->args.count, 501 (unsigned long long)data->args.offset); 502 } 503 504 if (put_dreq(dreq)) 505 nfs_direct_write_complete(dreq, inode); 506 } 507 508 static void nfs_direct_commit_result(struct rpc_task *task, void *calldata) 509 { 510 struct nfs_write_data *data = calldata; 511 512 /* Call the NFS version-specific code */ 513 NFS_PROTO(data->inode)->commit_done(task, data); 514 } 515 516 static void nfs_direct_commit_release(void *calldata) 517 { 518 struct nfs_write_data *data = calldata; 519 struct nfs_direct_req *dreq = (struct nfs_direct_req *) data->req; 520 int status = data->task.tk_status; 521 522 if (status < 0) { 523 dprintk("NFS: %5u commit failed with error %d.\n", 524 data->task.tk_pid, status); 525 dreq->flags = NFS_ODIRECT_RESCHED_WRITES; 526 } else if (memcmp(&dreq->verf, &data->verf, sizeof(data->verf))) { 527 dprintk("NFS: %5u commit verify failed\n", data->task.tk_pid); 528 dreq->flags = NFS_ODIRECT_RESCHED_WRITES; 529 } 530 531 dprintk("NFS: %5u commit returned %d\n", data->task.tk_pid, status); 532 nfs_direct_write_complete(dreq, data->inode); 533 nfs_commitdata_release(calldata); 534 } 535 536 static const struct rpc_call_ops nfs_commit_direct_ops = { 537 .rpc_call_done = nfs_direct_commit_result, 538 .rpc_release = nfs_direct_commit_release, 539 }; 540 541 static void nfs_direct_commit_schedule(struct nfs_direct_req *dreq) 542 { 543 struct nfs_write_data *data = dreq->commit_data; 544 struct rpc_task *task; 545 struct rpc_message msg = { 546 .rpc_argp = &data->args, 547 .rpc_resp = &data->res, 548 .rpc_cred = dreq->ctx->cred, 549 }; 550 struct rpc_task_setup task_setup_data = { 551 .task = &data->task, 552 .rpc_client = NFS_CLIENT(dreq->inode), 553 .rpc_message = &msg, 554 .callback_ops = &nfs_commit_direct_ops, 555 .callback_data = data, 556 .workqueue = nfsiod_workqueue, 557 .flags = RPC_TASK_ASYNC, 558 }; 559 560 data->inode = dreq->inode; 561 data->cred = msg.rpc_cred; 562 563 data->args.fh = NFS_FH(data->inode); 564 data->args.offset = 0; 565 data->args.count = 0; 566 data->args.context = get_nfs_open_context(dreq->ctx); 567 data->res.count = 0; 568 data->res.fattr = &data->fattr; 569 data->res.verf = &data->verf; 570 571 NFS_PROTO(data->inode)->commit_setup(data, &msg); 572 573 /* Note: task.tk_ops->rpc_release will free dreq->commit_data */ 574 dreq->commit_data = NULL; 575 576 dprintk("NFS: %5u initiated commit call\n", data->task.tk_pid); 577 578 task = rpc_run_task(&task_setup_data); 579 if (!IS_ERR(task)) 580 rpc_put_task(task); 581 } 582 583 static void nfs_direct_write_complete(struct nfs_direct_req *dreq, struct inode *inode) 584 { 585 int flags = dreq->flags; 586 587 dreq->flags = 0; 588 switch (flags) { 589 case NFS_ODIRECT_DO_COMMIT: 590 nfs_direct_commit_schedule(dreq); 591 break; 592 case NFS_ODIRECT_RESCHED_WRITES: 593 nfs_direct_write_reschedule(dreq); 594 break; 595 default: 596 if (dreq->commit_data != NULL) 597 nfs_commit_free(dreq->commit_data); 598 nfs_direct_free_writedata(dreq); 599 nfs_zap_mapping(inode, inode->i_mapping); 600 nfs_direct_complete(dreq); 601 } 602 } 603 604 static void nfs_alloc_commit_data(struct nfs_direct_req *dreq) 605 { 606 dreq->commit_data = nfs_commitdata_alloc(); 607 if (dreq->commit_data != NULL) 608 dreq->commit_data->req = (struct nfs_page *) dreq; 609 } 610 #else 611 static inline void nfs_alloc_commit_data(struct nfs_direct_req *dreq) 612 { 613 dreq->commit_data = NULL; 614 } 615 616 static void nfs_direct_write_complete(struct nfs_direct_req *dreq, struct inode *inode) 617 { 618 nfs_direct_free_writedata(dreq); 619 nfs_zap_mapping(inode, inode->i_mapping); 620 nfs_direct_complete(dreq); 621 } 622 #endif 623 624 static void nfs_direct_write_result(struct rpc_task *task, void *calldata) 625 { 626 struct nfs_write_data *data = calldata; 627 628 if (nfs_writeback_done(task, data) != 0) 629 return; 630 } 631 632 /* 633 * NB: Return the value of the first error return code. Subsequent 634 * errors after the first one are ignored. 635 */ 636 static void nfs_direct_write_release(void *calldata) 637 { 638 struct nfs_write_data *data = calldata; 639 struct nfs_direct_req *dreq = (struct nfs_direct_req *) data->req; 640 int status = data->task.tk_status; 641 642 spin_lock(&dreq->lock); 643 644 if (unlikely(status < 0)) { 645 /* An error has occurred, so we should not commit */ 646 dreq->flags = 0; 647 dreq->error = status; 648 } 649 if (unlikely(dreq->error != 0)) 650 goto out_unlock; 651 652 dreq->count += data->res.count; 653 654 if (data->res.verf->committed != NFS_FILE_SYNC) { 655 switch (dreq->flags) { 656 case 0: 657 memcpy(&dreq->verf, &data->verf, sizeof(dreq->verf)); 658 dreq->flags = NFS_ODIRECT_DO_COMMIT; 659 break; 660 case NFS_ODIRECT_DO_COMMIT: 661 if (memcmp(&dreq->verf, &data->verf, sizeof(dreq->verf))) { 662 dprintk("NFS: %5u write verify failed\n", data->task.tk_pid); 663 dreq->flags = NFS_ODIRECT_RESCHED_WRITES; 664 } 665 } 666 } 667 out_unlock: 668 spin_unlock(&dreq->lock); 669 670 if (put_dreq(dreq)) 671 nfs_direct_write_complete(dreq, data->inode); 672 } 673 674 static const struct rpc_call_ops nfs_write_direct_ops = { 675 .rpc_call_done = nfs_direct_write_result, 676 .rpc_release = nfs_direct_write_release, 677 }; 678 679 /* 680 * For each wsize'd chunk of the user's buffer, dispatch an NFS WRITE 681 * operation. If nfs_writedata_alloc() or get_user_pages() fails, 682 * bail and stop sending more writes. Write length accounting is 683 * handled automatically by nfs_direct_write_result(). Otherwise, if 684 * no requests have been sent, just return an error. 685 */ 686 static ssize_t nfs_direct_write_schedule_segment(struct nfs_direct_req *dreq, 687 const struct iovec *iov, 688 loff_t pos, int sync) 689 { 690 struct nfs_open_context *ctx = dreq->ctx; 691 struct inode *inode = ctx->path.dentry->d_inode; 692 unsigned long user_addr = (unsigned long)iov->iov_base; 693 size_t count = iov->iov_len; 694 struct rpc_task *task; 695 struct rpc_message msg = { 696 .rpc_cred = ctx->cred, 697 }; 698 struct rpc_task_setup task_setup_data = { 699 .rpc_client = NFS_CLIENT(inode), 700 .rpc_message = &msg, 701 .callback_ops = &nfs_write_direct_ops, 702 .workqueue = nfsiod_workqueue, 703 .flags = RPC_TASK_ASYNC, 704 }; 705 size_t wsize = NFS_SERVER(inode)->wsize; 706 unsigned int pgbase; 707 int result; 708 ssize_t started = 0; 709 710 do { 711 struct nfs_write_data *data; 712 size_t bytes; 713 714 pgbase = user_addr & ~PAGE_MASK; 715 bytes = min(wsize,count); 716 717 result = -ENOMEM; 718 data = nfs_writedata_alloc(nfs_page_array_len(pgbase, bytes)); 719 if (unlikely(!data)) 720 break; 721 722 down_read(¤t->mm->mmap_sem); 723 result = get_user_pages(current, current->mm, user_addr, 724 data->npages, 0, 0, data->pagevec, NULL); 725 up_read(¤t->mm->mmap_sem); 726 if (result < 0) { 727 nfs_writedata_release(data); 728 break; 729 } 730 if ((unsigned)result < data->npages) { 731 bytes = result * PAGE_SIZE; 732 if (bytes <= pgbase) { 733 nfs_direct_release_pages(data->pagevec, result); 734 nfs_writedata_release(data); 735 break; 736 } 737 bytes -= pgbase; 738 data->npages = result; 739 } 740 741 get_dreq(dreq); 742 743 list_move_tail(&data->pages, &dreq->rewrite_list); 744 745 data->req = (struct nfs_page *) dreq; 746 data->inode = inode; 747 data->cred = msg.rpc_cred; 748 data->args.fh = NFS_FH(inode); 749 data->args.context = get_nfs_open_context(ctx); 750 data->args.offset = pos; 751 data->args.pgbase = pgbase; 752 data->args.pages = data->pagevec; 753 data->args.count = bytes; 754 data->args.stable = sync; 755 data->res.fattr = &data->fattr; 756 data->res.count = bytes; 757 data->res.verf = &data->verf; 758 759 task_setup_data.task = &data->task; 760 task_setup_data.callback_data = data; 761 msg.rpc_argp = &data->args; 762 msg.rpc_resp = &data->res; 763 NFS_PROTO(inode)->write_setup(data, &msg); 764 765 task = rpc_run_task(&task_setup_data); 766 if (!IS_ERR(task)) 767 rpc_put_task(task); 768 769 dprintk("NFS: %5u initiated direct write call " 770 "(req %s/%Ld, %zu bytes @ offset %Lu)\n", 771 data->task.tk_pid, 772 inode->i_sb->s_id, 773 (long long)NFS_FILEID(inode), 774 bytes, 775 (unsigned long long)data->args.offset); 776 777 started += bytes; 778 user_addr += bytes; 779 pos += bytes; 780 781 /* FIXME: Remove this useless math from the final patch */ 782 pgbase += bytes; 783 pgbase &= ~PAGE_MASK; 784 BUG_ON(pgbase != (user_addr & ~PAGE_MASK)); 785 786 count -= bytes; 787 } while (count != 0); 788 789 if (started) 790 return started; 791 return result < 0 ? (ssize_t) result : -EFAULT; 792 } 793 794 static ssize_t nfs_direct_write_schedule_iovec(struct nfs_direct_req *dreq, 795 const struct iovec *iov, 796 unsigned long nr_segs, 797 loff_t pos, int sync) 798 { 799 ssize_t result = 0; 800 size_t requested_bytes = 0; 801 unsigned long seg; 802 803 get_dreq(dreq); 804 805 for (seg = 0; seg < nr_segs; seg++) { 806 const struct iovec *vec = &iov[seg]; 807 result = nfs_direct_write_schedule_segment(dreq, vec, 808 pos, sync); 809 if (result < 0) 810 break; 811 requested_bytes += result; 812 if ((size_t)result < vec->iov_len) 813 break; 814 pos += vec->iov_len; 815 } 816 817 if (put_dreq(dreq)) 818 nfs_direct_write_complete(dreq, dreq->inode); 819 820 if (requested_bytes != 0) 821 return 0; 822 823 if (result < 0) 824 return result; 825 return -EIO; 826 } 827 828 static ssize_t nfs_direct_write(struct kiocb *iocb, const struct iovec *iov, 829 unsigned long nr_segs, loff_t pos, 830 size_t count) 831 { 832 ssize_t result = 0; 833 struct inode *inode = iocb->ki_filp->f_mapping->host; 834 struct nfs_direct_req *dreq; 835 size_t wsize = NFS_SERVER(inode)->wsize; 836 int sync = NFS_UNSTABLE; 837 838 dreq = nfs_direct_req_alloc(); 839 if (!dreq) 840 return -ENOMEM; 841 nfs_alloc_commit_data(dreq); 842 843 if (dreq->commit_data == NULL || count < wsize) 844 sync = NFS_FILE_SYNC; 845 846 dreq->inode = inode; 847 dreq->ctx = get_nfs_open_context(nfs_file_open_context(iocb->ki_filp)); 848 if (!is_sync_kiocb(iocb)) 849 dreq->iocb = iocb; 850 851 result = nfs_direct_write_schedule_iovec(dreq, iov, nr_segs, pos, sync); 852 if (!result) 853 result = nfs_direct_wait(dreq); 854 nfs_direct_req_release(dreq); 855 856 return result; 857 } 858 859 /** 860 * nfs_file_direct_read - file direct read operation for NFS files 861 * @iocb: target I/O control block 862 * @iov: vector of user buffers into which to read data 863 * @nr_segs: size of iov vector 864 * @pos: byte offset in file where reading starts 865 * 866 * We use this function for direct reads instead of calling 867 * generic_file_aio_read() in order to avoid gfar's check to see if 868 * the request starts before the end of the file. For that check 869 * to work, we must generate a GETATTR before each direct read, and 870 * even then there is a window between the GETATTR and the subsequent 871 * READ where the file size could change. Our preference is simply 872 * to do all reads the application wants, and the server will take 873 * care of managing the end of file boundary. 874 * 875 * This function also eliminates unnecessarily updating the file's 876 * atime locally, as the NFS server sets the file's atime, and this 877 * client must read the updated atime from the server back into its 878 * cache. 879 */ 880 ssize_t nfs_file_direct_read(struct kiocb *iocb, const struct iovec *iov, 881 unsigned long nr_segs, loff_t pos) 882 { 883 ssize_t retval = -EINVAL; 884 struct file *file = iocb->ki_filp; 885 struct address_space *mapping = file->f_mapping; 886 size_t count; 887 888 count = iov_length(iov, nr_segs); 889 nfs_add_stats(mapping->host, NFSIOS_DIRECTREADBYTES, count); 890 891 dprintk("nfs: direct read(%s/%s, %zd@%Ld)\n", 892 file->f_path.dentry->d_parent->d_name.name, 893 file->f_path.dentry->d_name.name, 894 count, (long long) pos); 895 896 retval = 0; 897 if (!count) 898 goto out; 899 900 retval = nfs_sync_mapping(mapping); 901 if (retval) 902 goto out; 903 904 retval = nfs_direct_read(iocb, iov, nr_segs, pos); 905 if (retval > 0) 906 iocb->ki_pos = pos + retval; 907 908 out: 909 return retval; 910 } 911 912 /** 913 * nfs_file_direct_write - file direct write operation for NFS files 914 * @iocb: target I/O control block 915 * @iov: vector of user buffers from which to write data 916 * @nr_segs: size of iov vector 917 * @pos: byte offset in file where writing starts 918 * 919 * We use this function for direct writes instead of calling 920 * generic_file_aio_write() in order to avoid taking the inode 921 * semaphore and updating the i_size. The NFS server will set 922 * the new i_size and this client must read the updated size 923 * back into its cache. We let the server do generic write 924 * parameter checking and report problems. 925 * 926 * We also avoid an unnecessary invocation of generic_osync_inode(), 927 * as it is fairly meaningless to sync the metadata of an NFS file. 928 * 929 * We eliminate local atime updates, see direct read above. 930 * 931 * We avoid unnecessary page cache invalidations for normal cached 932 * readers of this file. 933 * 934 * Note that O_APPEND is not supported for NFS direct writes, as there 935 * is no atomic O_APPEND write facility in the NFS protocol. 936 */ 937 ssize_t nfs_file_direct_write(struct kiocb *iocb, const struct iovec *iov, 938 unsigned long nr_segs, loff_t pos) 939 { 940 ssize_t retval = -EINVAL; 941 struct file *file = iocb->ki_filp; 942 struct address_space *mapping = file->f_mapping; 943 size_t count; 944 945 count = iov_length(iov, nr_segs); 946 nfs_add_stats(mapping->host, NFSIOS_DIRECTWRITTENBYTES, count); 947 948 dfprintk(VFS, "nfs: direct write(%s/%s, %zd@%Ld)\n", 949 file->f_path.dentry->d_parent->d_name.name, 950 file->f_path.dentry->d_name.name, 951 count, (long long) pos); 952 953 retval = generic_write_checks(file, &pos, &count, 0); 954 if (retval) 955 goto out; 956 957 retval = -EINVAL; 958 if ((ssize_t) count < 0) 959 goto out; 960 retval = 0; 961 if (!count) 962 goto out; 963 964 retval = nfs_sync_mapping(mapping); 965 if (retval) 966 goto out; 967 968 retval = nfs_direct_write(iocb, iov, nr_segs, pos, count); 969 970 if (retval > 0) 971 iocb->ki_pos = pos + retval; 972 973 out: 974 return retval; 975 } 976 977 /** 978 * nfs_init_directcache - create a slab cache for nfs_direct_req structures 979 * 980 */ 981 int __init nfs_init_directcache(void) 982 { 983 nfs_direct_cachep = kmem_cache_create("nfs_direct_cache", 984 sizeof(struct nfs_direct_req), 985 0, (SLAB_RECLAIM_ACCOUNT| 986 SLAB_MEM_SPREAD), 987 NULL); 988 if (nfs_direct_cachep == NULL) 989 return -ENOMEM; 990 991 return 0; 992 } 993 994 /** 995 * nfs_destroy_directcache - destroy the slab cache for nfs_direct_req structures 996 * 997 */ 998 void nfs_destroy_directcache(void) 999 { 1000 kmem_cache_destroy(nfs_direct_cachep); 1001 } 1002