1 /* 2 * linux/fs/nfs/direct.c 3 * 4 * Copyright (C) 2003 by Chuck Lever <cel@netapp.com> 5 * 6 * High-performance uncached I/O for the Linux NFS client 7 * 8 * There are important applications whose performance or correctness 9 * depends on uncached access to file data. Database clusters 10 * (multiple copies of the same instance running on separate hosts) 11 * implement their own cache coherency protocol that subsumes file 12 * system cache protocols. Applications that process datasets 13 * considerably larger than the client's memory do not always benefit 14 * from a local cache. A streaming video server, for instance, has no 15 * need to cache the contents of a file. 16 * 17 * When an application requests uncached I/O, all read and write requests 18 * are made directly to the server; data stored or fetched via these 19 * requests is not cached in the Linux page cache. The client does not 20 * correct unaligned requests from applications. All requested bytes are 21 * held on permanent storage before a direct write system call returns to 22 * an application. 23 * 24 * Solaris implements an uncached I/O facility called directio() that 25 * is used for backups and sequential I/O to very large files. Solaris 26 * also supports uncaching whole NFS partitions with "-o forcedirectio," 27 * an undocumented mount option. 28 * 29 * Designed by Jeff Kimmel, Chuck Lever, and Trond Myklebust, with 30 * help from Andrew Morton. 31 * 32 * 18 Dec 2001 Initial implementation for 2.4 --cel 33 * 08 Jul 2002 Version for 2.4.19, with bug fixes --trondmy 34 * 08 Jun 2003 Port to 2.5 APIs --cel 35 * 31 Mar 2004 Handle direct I/O without VFS support --cel 36 * 15 Sep 2004 Parallel async reads --cel 37 * 04 May 2005 support O_DIRECT with aio --cel 38 * 39 */ 40 41 #include <linux/errno.h> 42 #include <linux/sched.h> 43 #include <linux/kernel.h> 44 #include <linux/file.h> 45 #include <linux/pagemap.h> 46 #include <linux/kref.h> 47 #include <linux/slab.h> 48 49 #include <linux/nfs_fs.h> 50 #include <linux/nfs_page.h> 51 #include <linux/sunrpc/clnt.h> 52 53 #include <asm/system.h> 54 #include <asm/uaccess.h> 55 #include <asm/atomic.h> 56 57 #include "internal.h" 58 #include "iostat.h" 59 60 #define NFSDBG_FACILITY NFSDBG_VFS 61 62 static struct kmem_cache *nfs_direct_cachep; 63 64 /* 65 * This represents a set of asynchronous requests that we're waiting on 66 */ 67 struct nfs_direct_req { 68 struct kref kref; /* release manager */ 69 70 /* I/O parameters */ 71 struct nfs_open_context *ctx; /* file open context info */ 72 struct kiocb * iocb; /* controlling i/o request */ 73 struct inode * inode; /* target file of i/o */ 74 75 /* completion state */ 76 atomic_t io_count; /* i/os we're waiting for */ 77 spinlock_t lock; /* protect completion state */ 78 ssize_t count, /* bytes actually processed */ 79 error; /* any reported error */ 80 struct completion completion; /* wait for i/o completion */ 81 82 /* commit state */ 83 struct list_head rewrite_list; /* saved nfs_write_data structs */ 84 struct nfs_write_data * commit_data; /* special write_data for commits */ 85 int flags; 86 #define NFS_ODIRECT_DO_COMMIT (1) /* an unstable reply was received */ 87 #define NFS_ODIRECT_RESCHED_WRITES (2) /* write verification failed */ 88 struct nfs_writeverf verf; /* unstable write verifier */ 89 }; 90 91 static void nfs_direct_write_complete(struct nfs_direct_req *dreq, struct inode *inode); 92 static const struct rpc_call_ops nfs_write_direct_ops; 93 94 static inline void get_dreq(struct nfs_direct_req *dreq) 95 { 96 atomic_inc(&dreq->io_count); 97 } 98 99 static inline int put_dreq(struct nfs_direct_req *dreq) 100 { 101 return atomic_dec_and_test(&dreq->io_count); 102 } 103 104 /** 105 * nfs_direct_IO - NFS address space operation for direct I/O 106 * @rw: direction (read or write) 107 * @iocb: target I/O control block 108 * @iov: array of vectors that define I/O buffer 109 * @pos: offset in file to begin the operation 110 * @nr_segs: size of iovec array 111 * 112 * The presence of this routine in the address space ops vector means 113 * the NFS client supports direct I/O. However, we shunt off direct 114 * read and write requests before the VFS gets them, so this method 115 * should never be called. 116 */ 117 ssize_t nfs_direct_IO(int rw, struct kiocb *iocb, const struct iovec *iov, loff_t pos, unsigned long nr_segs) 118 { 119 dprintk("NFS: nfs_direct_IO (%s) off/no(%Ld/%lu) EINVAL\n", 120 iocb->ki_filp->f_path.dentry->d_name.name, 121 (long long) pos, nr_segs); 122 123 return -EINVAL; 124 } 125 126 static void nfs_direct_dirty_pages(struct page **pages, unsigned int pgbase, size_t count) 127 { 128 unsigned int npages; 129 unsigned int i; 130 131 if (count == 0) 132 return; 133 pages += (pgbase >> PAGE_SHIFT); 134 npages = (count + (pgbase & ~PAGE_MASK) + PAGE_SIZE - 1) >> PAGE_SHIFT; 135 for (i = 0; i < npages; i++) { 136 struct page *page = pages[i]; 137 if (!PageCompound(page)) 138 set_page_dirty(page); 139 } 140 } 141 142 static void nfs_direct_release_pages(struct page **pages, unsigned int npages) 143 { 144 unsigned int i; 145 for (i = 0; i < npages; i++) 146 page_cache_release(pages[i]); 147 } 148 149 static inline struct nfs_direct_req *nfs_direct_req_alloc(void) 150 { 151 struct nfs_direct_req *dreq; 152 153 dreq = kmem_cache_alloc(nfs_direct_cachep, GFP_KERNEL); 154 if (!dreq) 155 return NULL; 156 157 kref_init(&dreq->kref); 158 kref_get(&dreq->kref); 159 init_completion(&dreq->completion); 160 INIT_LIST_HEAD(&dreq->rewrite_list); 161 dreq->iocb = NULL; 162 dreq->ctx = NULL; 163 spin_lock_init(&dreq->lock); 164 atomic_set(&dreq->io_count, 0); 165 dreq->count = 0; 166 dreq->error = 0; 167 dreq->flags = 0; 168 169 return dreq; 170 } 171 172 static void nfs_direct_req_free(struct kref *kref) 173 { 174 struct nfs_direct_req *dreq = container_of(kref, struct nfs_direct_req, kref); 175 176 if (dreq->ctx != NULL) 177 put_nfs_open_context(dreq->ctx); 178 kmem_cache_free(nfs_direct_cachep, dreq); 179 } 180 181 static void nfs_direct_req_release(struct nfs_direct_req *dreq) 182 { 183 kref_put(&dreq->kref, nfs_direct_req_free); 184 } 185 186 /* 187 * Collects and returns the final error value/byte-count. 188 */ 189 static ssize_t nfs_direct_wait(struct nfs_direct_req *dreq) 190 { 191 ssize_t result = -EIOCBQUEUED; 192 193 /* Async requests don't wait here */ 194 if (dreq->iocb) 195 goto out; 196 197 result = wait_for_completion_killable(&dreq->completion); 198 199 if (!result) 200 result = dreq->error; 201 if (!result) 202 result = dreq->count; 203 204 out: 205 return (ssize_t) result; 206 } 207 208 /* 209 * Synchronous I/O uses a stack-allocated iocb. Thus we can't trust 210 * the iocb is still valid here if this is a synchronous request. 211 */ 212 static void nfs_direct_complete(struct nfs_direct_req *dreq) 213 { 214 if (dreq->iocb) { 215 long res = (long) dreq->error; 216 if (!res) 217 res = (long) dreq->count; 218 aio_complete(dreq->iocb, res, 0); 219 } 220 complete_all(&dreq->completion); 221 222 nfs_direct_req_release(dreq); 223 } 224 225 /* 226 * We must hold a reference to all the pages in this direct read request 227 * until the RPCs complete. This could be long *after* we are woken up in 228 * nfs_direct_wait (for instance, if someone hits ^C on a slow server). 229 */ 230 static void nfs_direct_read_result(struct rpc_task *task, void *calldata) 231 { 232 struct nfs_read_data *data = calldata; 233 234 nfs_readpage_result(task, data); 235 } 236 237 static void nfs_direct_read_release(void *calldata) 238 { 239 240 struct nfs_read_data *data = calldata; 241 struct nfs_direct_req *dreq = (struct nfs_direct_req *) data->req; 242 int status = data->task.tk_status; 243 244 spin_lock(&dreq->lock); 245 if (unlikely(status < 0)) { 246 dreq->error = status; 247 spin_unlock(&dreq->lock); 248 } else { 249 dreq->count += data->res.count; 250 spin_unlock(&dreq->lock); 251 nfs_direct_dirty_pages(data->pagevec, 252 data->args.pgbase, 253 data->res.count); 254 } 255 nfs_direct_release_pages(data->pagevec, data->npages); 256 257 if (put_dreq(dreq)) 258 nfs_direct_complete(dreq); 259 nfs_readdata_free(data); 260 } 261 262 static const struct rpc_call_ops nfs_read_direct_ops = { 263 #if defined(CONFIG_NFS_V4_1) 264 .rpc_call_prepare = nfs_read_prepare, 265 #endif /* CONFIG_NFS_V4_1 */ 266 .rpc_call_done = nfs_direct_read_result, 267 .rpc_release = nfs_direct_read_release, 268 }; 269 270 /* 271 * For each rsize'd chunk of the user's buffer, dispatch an NFS READ 272 * operation. If nfs_readdata_alloc() or get_user_pages() fails, 273 * bail and stop sending more reads. Read length accounting is 274 * handled automatically by nfs_direct_read_result(). Otherwise, if 275 * no requests have been sent, just return an error. 276 */ 277 static ssize_t nfs_direct_read_schedule_segment(struct nfs_direct_req *dreq, 278 const struct iovec *iov, 279 loff_t pos) 280 { 281 struct nfs_open_context *ctx = dreq->ctx; 282 struct inode *inode = ctx->path.dentry->d_inode; 283 unsigned long user_addr = (unsigned long)iov->iov_base; 284 size_t count = iov->iov_len; 285 size_t rsize = NFS_SERVER(inode)->rsize; 286 struct rpc_task *task; 287 struct rpc_message msg = { 288 .rpc_cred = ctx->cred, 289 }; 290 struct rpc_task_setup task_setup_data = { 291 .rpc_client = NFS_CLIENT(inode), 292 .rpc_message = &msg, 293 .callback_ops = &nfs_read_direct_ops, 294 .workqueue = nfsiod_workqueue, 295 .flags = RPC_TASK_ASYNC, 296 }; 297 unsigned int pgbase; 298 int result; 299 ssize_t started = 0; 300 301 do { 302 struct nfs_read_data *data; 303 size_t bytes; 304 305 pgbase = user_addr & ~PAGE_MASK; 306 bytes = min(rsize,count); 307 308 result = -ENOMEM; 309 data = nfs_readdata_alloc(nfs_page_array_len(pgbase, bytes)); 310 if (unlikely(!data)) 311 break; 312 313 down_read(¤t->mm->mmap_sem); 314 result = get_user_pages(current, current->mm, user_addr, 315 data->npages, 1, 0, data->pagevec, NULL); 316 up_read(¤t->mm->mmap_sem); 317 if (result < 0) { 318 nfs_readdata_free(data); 319 break; 320 } 321 if ((unsigned)result < data->npages) { 322 bytes = result * PAGE_SIZE; 323 if (bytes <= pgbase) { 324 nfs_direct_release_pages(data->pagevec, result); 325 nfs_readdata_free(data); 326 break; 327 } 328 bytes -= pgbase; 329 data->npages = result; 330 } 331 332 get_dreq(dreq); 333 334 data->req = (struct nfs_page *) dreq; 335 data->inode = inode; 336 data->cred = msg.rpc_cred; 337 data->args.fh = NFS_FH(inode); 338 data->args.context = ctx; 339 data->args.offset = pos; 340 data->args.pgbase = pgbase; 341 data->args.pages = data->pagevec; 342 data->args.count = bytes; 343 data->res.fattr = &data->fattr; 344 data->res.eof = 0; 345 data->res.count = bytes; 346 nfs_fattr_init(&data->fattr); 347 msg.rpc_argp = &data->args; 348 msg.rpc_resp = &data->res; 349 350 task_setup_data.task = &data->task; 351 task_setup_data.callback_data = data; 352 NFS_PROTO(inode)->read_setup(data, &msg); 353 354 task = rpc_run_task(&task_setup_data); 355 if (IS_ERR(task)) 356 break; 357 rpc_put_task(task); 358 359 dprintk("NFS: %5u initiated direct read call " 360 "(req %s/%Ld, %zu bytes @ offset %Lu)\n", 361 data->task.tk_pid, 362 inode->i_sb->s_id, 363 (long long)NFS_FILEID(inode), 364 bytes, 365 (unsigned long long)data->args.offset); 366 367 started += bytes; 368 user_addr += bytes; 369 pos += bytes; 370 /* FIXME: Remove this unnecessary math from final patch */ 371 pgbase += bytes; 372 pgbase &= ~PAGE_MASK; 373 BUG_ON(pgbase != (user_addr & ~PAGE_MASK)); 374 375 count -= bytes; 376 } while (count != 0); 377 378 if (started) 379 return started; 380 return result < 0 ? (ssize_t) result : -EFAULT; 381 } 382 383 static ssize_t nfs_direct_read_schedule_iovec(struct nfs_direct_req *dreq, 384 const struct iovec *iov, 385 unsigned long nr_segs, 386 loff_t pos) 387 { 388 ssize_t result = -EINVAL; 389 size_t requested_bytes = 0; 390 unsigned long seg; 391 392 get_dreq(dreq); 393 394 for (seg = 0; seg < nr_segs; seg++) { 395 const struct iovec *vec = &iov[seg]; 396 result = nfs_direct_read_schedule_segment(dreq, vec, pos); 397 if (result < 0) 398 break; 399 requested_bytes += result; 400 if ((size_t)result < vec->iov_len) 401 break; 402 pos += vec->iov_len; 403 } 404 405 if (put_dreq(dreq)) 406 nfs_direct_complete(dreq); 407 408 if (requested_bytes != 0) 409 return 0; 410 411 if (result < 0) 412 return result; 413 return -EIO; 414 } 415 416 static ssize_t nfs_direct_read(struct kiocb *iocb, const struct iovec *iov, 417 unsigned long nr_segs, loff_t pos) 418 { 419 ssize_t result = 0; 420 struct inode *inode = iocb->ki_filp->f_mapping->host; 421 struct nfs_direct_req *dreq; 422 423 dreq = nfs_direct_req_alloc(); 424 if (!dreq) 425 return -ENOMEM; 426 427 dreq->inode = inode; 428 dreq->ctx = get_nfs_open_context(nfs_file_open_context(iocb->ki_filp)); 429 if (!is_sync_kiocb(iocb)) 430 dreq->iocb = iocb; 431 432 result = nfs_direct_read_schedule_iovec(dreq, iov, nr_segs, pos); 433 if (!result) 434 result = nfs_direct_wait(dreq); 435 nfs_direct_req_release(dreq); 436 437 return result; 438 } 439 440 static void nfs_direct_free_writedata(struct nfs_direct_req *dreq) 441 { 442 while (!list_empty(&dreq->rewrite_list)) { 443 struct nfs_write_data *data = list_entry(dreq->rewrite_list.next, struct nfs_write_data, pages); 444 list_del(&data->pages); 445 nfs_direct_release_pages(data->pagevec, data->npages); 446 nfs_writedata_free(data); 447 } 448 } 449 450 #if defined(CONFIG_NFS_V3) || defined(CONFIG_NFS_V4) 451 static void nfs_direct_write_reschedule(struct nfs_direct_req *dreq) 452 { 453 struct inode *inode = dreq->inode; 454 struct list_head *p; 455 struct nfs_write_data *data; 456 struct rpc_task *task; 457 struct rpc_message msg = { 458 .rpc_cred = dreq->ctx->cred, 459 }; 460 struct rpc_task_setup task_setup_data = { 461 .rpc_client = NFS_CLIENT(inode), 462 .rpc_message = &msg, 463 .callback_ops = &nfs_write_direct_ops, 464 .workqueue = nfsiod_workqueue, 465 .flags = RPC_TASK_ASYNC, 466 }; 467 468 dreq->count = 0; 469 get_dreq(dreq); 470 471 list_for_each(p, &dreq->rewrite_list) { 472 data = list_entry(p, struct nfs_write_data, pages); 473 474 get_dreq(dreq); 475 476 /* Use stable writes */ 477 data->args.stable = NFS_FILE_SYNC; 478 479 /* 480 * Reset data->res. 481 */ 482 nfs_fattr_init(&data->fattr); 483 data->res.count = data->args.count; 484 memset(&data->verf, 0, sizeof(data->verf)); 485 486 /* 487 * Reuse data->task; data->args should not have changed 488 * since the original request was sent. 489 */ 490 task_setup_data.task = &data->task; 491 task_setup_data.callback_data = data; 492 msg.rpc_argp = &data->args; 493 msg.rpc_resp = &data->res; 494 NFS_PROTO(inode)->write_setup(data, &msg); 495 496 /* 497 * We're called via an RPC callback, so BKL is already held. 498 */ 499 task = rpc_run_task(&task_setup_data); 500 if (!IS_ERR(task)) 501 rpc_put_task(task); 502 503 dprintk("NFS: %5u rescheduled direct write call (req %s/%Ld, %u bytes @ offset %Lu)\n", 504 data->task.tk_pid, 505 inode->i_sb->s_id, 506 (long long)NFS_FILEID(inode), 507 data->args.count, 508 (unsigned long long)data->args.offset); 509 } 510 511 if (put_dreq(dreq)) 512 nfs_direct_write_complete(dreq, inode); 513 } 514 515 static void nfs_direct_commit_result(struct rpc_task *task, void *calldata) 516 { 517 struct nfs_write_data *data = calldata; 518 519 /* Call the NFS version-specific code */ 520 NFS_PROTO(data->inode)->commit_done(task, data); 521 } 522 523 static void nfs_direct_commit_release(void *calldata) 524 { 525 struct nfs_write_data *data = calldata; 526 struct nfs_direct_req *dreq = (struct nfs_direct_req *) data->req; 527 int status = data->task.tk_status; 528 529 if (status < 0) { 530 dprintk("NFS: %5u commit failed with error %d.\n", 531 data->task.tk_pid, status); 532 dreq->flags = NFS_ODIRECT_RESCHED_WRITES; 533 } else if (memcmp(&dreq->verf, &data->verf, sizeof(data->verf))) { 534 dprintk("NFS: %5u commit verify failed\n", data->task.tk_pid); 535 dreq->flags = NFS_ODIRECT_RESCHED_WRITES; 536 } 537 538 dprintk("NFS: %5u commit returned %d\n", data->task.tk_pid, status); 539 nfs_direct_write_complete(dreq, data->inode); 540 nfs_commit_free(data); 541 } 542 543 static const struct rpc_call_ops nfs_commit_direct_ops = { 544 #if defined(CONFIG_NFS_V4_1) 545 .rpc_call_prepare = nfs_write_prepare, 546 #endif /* CONFIG_NFS_V4_1 */ 547 .rpc_call_done = nfs_direct_commit_result, 548 .rpc_release = nfs_direct_commit_release, 549 }; 550 551 static void nfs_direct_commit_schedule(struct nfs_direct_req *dreq) 552 { 553 struct nfs_write_data *data = dreq->commit_data; 554 struct rpc_task *task; 555 struct rpc_message msg = { 556 .rpc_argp = &data->args, 557 .rpc_resp = &data->res, 558 .rpc_cred = dreq->ctx->cred, 559 }; 560 struct rpc_task_setup task_setup_data = { 561 .task = &data->task, 562 .rpc_client = NFS_CLIENT(dreq->inode), 563 .rpc_message = &msg, 564 .callback_ops = &nfs_commit_direct_ops, 565 .callback_data = data, 566 .workqueue = nfsiod_workqueue, 567 .flags = RPC_TASK_ASYNC, 568 }; 569 570 data->inode = dreq->inode; 571 data->cred = msg.rpc_cred; 572 573 data->args.fh = NFS_FH(data->inode); 574 data->args.offset = 0; 575 data->args.count = 0; 576 data->args.context = dreq->ctx; 577 data->res.count = 0; 578 data->res.fattr = &data->fattr; 579 data->res.verf = &data->verf; 580 nfs_fattr_init(&data->fattr); 581 582 NFS_PROTO(data->inode)->commit_setup(data, &msg); 583 584 /* Note: task.tk_ops->rpc_release will free dreq->commit_data */ 585 dreq->commit_data = NULL; 586 587 dprintk("NFS: %5u initiated commit call\n", data->task.tk_pid); 588 589 task = rpc_run_task(&task_setup_data); 590 if (!IS_ERR(task)) 591 rpc_put_task(task); 592 } 593 594 static void nfs_direct_write_complete(struct nfs_direct_req *dreq, struct inode *inode) 595 { 596 int flags = dreq->flags; 597 598 dreq->flags = 0; 599 switch (flags) { 600 case NFS_ODIRECT_DO_COMMIT: 601 nfs_direct_commit_schedule(dreq); 602 break; 603 case NFS_ODIRECT_RESCHED_WRITES: 604 nfs_direct_write_reschedule(dreq); 605 break; 606 default: 607 if (dreq->commit_data != NULL) 608 nfs_commit_free(dreq->commit_data); 609 nfs_direct_free_writedata(dreq); 610 nfs_zap_mapping(inode, inode->i_mapping); 611 nfs_direct_complete(dreq); 612 } 613 } 614 615 static void nfs_alloc_commit_data(struct nfs_direct_req *dreq) 616 { 617 dreq->commit_data = nfs_commitdata_alloc(); 618 if (dreq->commit_data != NULL) 619 dreq->commit_data->req = (struct nfs_page *) dreq; 620 } 621 #else 622 static inline void nfs_alloc_commit_data(struct nfs_direct_req *dreq) 623 { 624 dreq->commit_data = NULL; 625 } 626 627 static void nfs_direct_write_complete(struct nfs_direct_req *dreq, struct inode *inode) 628 { 629 nfs_direct_free_writedata(dreq); 630 nfs_zap_mapping(inode, inode->i_mapping); 631 nfs_direct_complete(dreq); 632 } 633 #endif 634 635 static void nfs_direct_write_result(struct rpc_task *task, void *calldata) 636 { 637 struct nfs_write_data *data = calldata; 638 639 if (nfs_writeback_done(task, data) != 0) 640 return; 641 } 642 643 /* 644 * NB: Return the value of the first error return code. Subsequent 645 * errors after the first one are ignored. 646 */ 647 static void nfs_direct_write_release(void *calldata) 648 { 649 struct nfs_write_data *data = calldata; 650 struct nfs_direct_req *dreq = (struct nfs_direct_req *) data->req; 651 int status = data->task.tk_status; 652 653 spin_lock(&dreq->lock); 654 655 if (unlikely(status < 0)) { 656 /* An error has occurred, so we should not commit */ 657 dreq->flags = 0; 658 dreq->error = status; 659 } 660 if (unlikely(dreq->error != 0)) 661 goto out_unlock; 662 663 dreq->count += data->res.count; 664 665 if (data->res.verf->committed != NFS_FILE_SYNC) { 666 switch (dreq->flags) { 667 case 0: 668 memcpy(&dreq->verf, &data->verf, sizeof(dreq->verf)); 669 dreq->flags = NFS_ODIRECT_DO_COMMIT; 670 break; 671 case NFS_ODIRECT_DO_COMMIT: 672 if (memcmp(&dreq->verf, &data->verf, sizeof(dreq->verf))) { 673 dprintk("NFS: %5u write verify failed\n", data->task.tk_pid); 674 dreq->flags = NFS_ODIRECT_RESCHED_WRITES; 675 } 676 } 677 } 678 out_unlock: 679 spin_unlock(&dreq->lock); 680 681 if (put_dreq(dreq)) 682 nfs_direct_write_complete(dreq, data->inode); 683 } 684 685 static const struct rpc_call_ops nfs_write_direct_ops = { 686 #if defined(CONFIG_NFS_V4_1) 687 .rpc_call_prepare = nfs_write_prepare, 688 #endif /* CONFIG_NFS_V4_1 */ 689 .rpc_call_done = nfs_direct_write_result, 690 .rpc_release = nfs_direct_write_release, 691 }; 692 693 /* 694 * For each wsize'd chunk of the user's buffer, dispatch an NFS WRITE 695 * operation. If nfs_writedata_alloc() or get_user_pages() fails, 696 * bail and stop sending more writes. Write length accounting is 697 * handled automatically by nfs_direct_write_result(). Otherwise, if 698 * no requests have been sent, just return an error. 699 */ 700 static ssize_t nfs_direct_write_schedule_segment(struct nfs_direct_req *dreq, 701 const struct iovec *iov, 702 loff_t pos, int sync) 703 { 704 struct nfs_open_context *ctx = dreq->ctx; 705 struct inode *inode = ctx->path.dentry->d_inode; 706 unsigned long user_addr = (unsigned long)iov->iov_base; 707 size_t count = iov->iov_len; 708 struct rpc_task *task; 709 struct rpc_message msg = { 710 .rpc_cred = ctx->cred, 711 }; 712 struct rpc_task_setup task_setup_data = { 713 .rpc_client = NFS_CLIENT(inode), 714 .rpc_message = &msg, 715 .callback_ops = &nfs_write_direct_ops, 716 .workqueue = nfsiod_workqueue, 717 .flags = RPC_TASK_ASYNC, 718 }; 719 size_t wsize = NFS_SERVER(inode)->wsize; 720 unsigned int pgbase; 721 int result; 722 ssize_t started = 0; 723 724 do { 725 struct nfs_write_data *data; 726 size_t bytes; 727 728 pgbase = user_addr & ~PAGE_MASK; 729 bytes = min(wsize,count); 730 731 result = -ENOMEM; 732 data = nfs_writedata_alloc(nfs_page_array_len(pgbase, bytes)); 733 if (unlikely(!data)) 734 break; 735 736 down_read(¤t->mm->mmap_sem); 737 result = get_user_pages(current, current->mm, user_addr, 738 data->npages, 0, 0, data->pagevec, NULL); 739 up_read(¤t->mm->mmap_sem); 740 if (result < 0) { 741 nfs_writedata_free(data); 742 break; 743 } 744 if ((unsigned)result < data->npages) { 745 bytes = result * PAGE_SIZE; 746 if (bytes <= pgbase) { 747 nfs_direct_release_pages(data->pagevec, result); 748 nfs_writedata_free(data); 749 break; 750 } 751 bytes -= pgbase; 752 data->npages = result; 753 } 754 755 get_dreq(dreq); 756 757 list_move_tail(&data->pages, &dreq->rewrite_list); 758 759 data->req = (struct nfs_page *) dreq; 760 data->inode = inode; 761 data->cred = msg.rpc_cred; 762 data->args.fh = NFS_FH(inode); 763 data->args.context = ctx; 764 data->args.offset = pos; 765 data->args.pgbase = pgbase; 766 data->args.pages = data->pagevec; 767 data->args.count = bytes; 768 data->args.stable = sync; 769 data->res.fattr = &data->fattr; 770 data->res.count = bytes; 771 data->res.verf = &data->verf; 772 nfs_fattr_init(&data->fattr); 773 774 task_setup_data.task = &data->task; 775 task_setup_data.callback_data = data; 776 msg.rpc_argp = &data->args; 777 msg.rpc_resp = &data->res; 778 NFS_PROTO(inode)->write_setup(data, &msg); 779 780 task = rpc_run_task(&task_setup_data); 781 if (IS_ERR(task)) 782 break; 783 rpc_put_task(task); 784 785 dprintk("NFS: %5u initiated direct write call " 786 "(req %s/%Ld, %zu bytes @ offset %Lu)\n", 787 data->task.tk_pid, 788 inode->i_sb->s_id, 789 (long long)NFS_FILEID(inode), 790 bytes, 791 (unsigned long long)data->args.offset); 792 793 started += bytes; 794 user_addr += bytes; 795 pos += bytes; 796 797 /* FIXME: Remove this useless math from the final patch */ 798 pgbase += bytes; 799 pgbase &= ~PAGE_MASK; 800 BUG_ON(pgbase != (user_addr & ~PAGE_MASK)); 801 802 count -= bytes; 803 } while (count != 0); 804 805 if (started) 806 return started; 807 return result < 0 ? (ssize_t) result : -EFAULT; 808 } 809 810 static ssize_t nfs_direct_write_schedule_iovec(struct nfs_direct_req *dreq, 811 const struct iovec *iov, 812 unsigned long nr_segs, 813 loff_t pos, int sync) 814 { 815 ssize_t result = 0; 816 size_t requested_bytes = 0; 817 unsigned long seg; 818 819 get_dreq(dreq); 820 821 for (seg = 0; seg < nr_segs; seg++) { 822 const struct iovec *vec = &iov[seg]; 823 result = nfs_direct_write_schedule_segment(dreq, vec, 824 pos, sync); 825 if (result < 0) 826 break; 827 requested_bytes += result; 828 if ((size_t)result < vec->iov_len) 829 break; 830 pos += vec->iov_len; 831 } 832 833 if (put_dreq(dreq)) 834 nfs_direct_write_complete(dreq, dreq->inode); 835 836 if (requested_bytes != 0) 837 return 0; 838 839 if (result < 0) 840 return result; 841 return -EIO; 842 } 843 844 static ssize_t nfs_direct_write(struct kiocb *iocb, const struct iovec *iov, 845 unsigned long nr_segs, loff_t pos, 846 size_t count) 847 { 848 ssize_t result = 0; 849 struct inode *inode = iocb->ki_filp->f_mapping->host; 850 struct nfs_direct_req *dreq; 851 size_t wsize = NFS_SERVER(inode)->wsize; 852 int sync = NFS_UNSTABLE; 853 854 dreq = nfs_direct_req_alloc(); 855 if (!dreq) 856 return -ENOMEM; 857 nfs_alloc_commit_data(dreq); 858 859 if (dreq->commit_data == NULL || count < wsize) 860 sync = NFS_FILE_SYNC; 861 862 dreq->inode = inode; 863 dreq->ctx = get_nfs_open_context(nfs_file_open_context(iocb->ki_filp)); 864 if (!is_sync_kiocb(iocb)) 865 dreq->iocb = iocb; 866 867 result = nfs_direct_write_schedule_iovec(dreq, iov, nr_segs, pos, sync); 868 if (!result) 869 result = nfs_direct_wait(dreq); 870 nfs_direct_req_release(dreq); 871 872 return result; 873 } 874 875 /** 876 * nfs_file_direct_read - file direct read operation for NFS files 877 * @iocb: target I/O control block 878 * @iov: vector of user buffers into which to read data 879 * @nr_segs: size of iov vector 880 * @pos: byte offset in file where reading starts 881 * 882 * We use this function for direct reads instead of calling 883 * generic_file_aio_read() in order to avoid gfar's check to see if 884 * the request starts before the end of the file. For that check 885 * to work, we must generate a GETATTR before each direct read, and 886 * even then there is a window between the GETATTR and the subsequent 887 * READ where the file size could change. Our preference is simply 888 * to do all reads the application wants, and the server will take 889 * care of managing the end of file boundary. 890 * 891 * This function also eliminates unnecessarily updating the file's 892 * atime locally, as the NFS server sets the file's atime, and this 893 * client must read the updated atime from the server back into its 894 * cache. 895 */ 896 ssize_t nfs_file_direct_read(struct kiocb *iocb, const struct iovec *iov, 897 unsigned long nr_segs, loff_t pos) 898 { 899 ssize_t retval = -EINVAL; 900 struct file *file = iocb->ki_filp; 901 struct address_space *mapping = file->f_mapping; 902 size_t count; 903 904 count = iov_length(iov, nr_segs); 905 nfs_add_stats(mapping->host, NFSIOS_DIRECTREADBYTES, count); 906 907 dfprintk(FILE, "NFS: direct read(%s/%s, %zd@%Ld)\n", 908 file->f_path.dentry->d_parent->d_name.name, 909 file->f_path.dentry->d_name.name, 910 count, (long long) pos); 911 912 retval = 0; 913 if (!count) 914 goto out; 915 916 retval = nfs_sync_mapping(mapping); 917 if (retval) 918 goto out; 919 920 retval = nfs_direct_read(iocb, iov, nr_segs, pos); 921 if (retval > 0) 922 iocb->ki_pos = pos + retval; 923 924 out: 925 return retval; 926 } 927 928 /** 929 * nfs_file_direct_write - file direct write operation for NFS files 930 * @iocb: target I/O control block 931 * @iov: vector of user buffers from which to write data 932 * @nr_segs: size of iov vector 933 * @pos: byte offset in file where writing starts 934 * 935 * We use this function for direct writes instead of calling 936 * generic_file_aio_write() in order to avoid taking the inode 937 * semaphore and updating the i_size. The NFS server will set 938 * the new i_size and this client must read the updated size 939 * back into its cache. We let the server do generic write 940 * parameter checking and report problems. 941 * 942 * We eliminate local atime updates, see direct read above. 943 * 944 * We avoid unnecessary page cache invalidations for normal cached 945 * readers of this file. 946 * 947 * Note that O_APPEND is not supported for NFS direct writes, as there 948 * is no atomic O_APPEND write facility in the NFS protocol. 949 */ 950 ssize_t nfs_file_direct_write(struct kiocb *iocb, const struct iovec *iov, 951 unsigned long nr_segs, loff_t pos) 952 { 953 ssize_t retval = -EINVAL; 954 struct file *file = iocb->ki_filp; 955 struct address_space *mapping = file->f_mapping; 956 size_t count; 957 958 count = iov_length(iov, nr_segs); 959 nfs_add_stats(mapping->host, NFSIOS_DIRECTWRITTENBYTES, count); 960 961 dfprintk(FILE, "NFS: direct write(%s/%s, %zd@%Ld)\n", 962 file->f_path.dentry->d_parent->d_name.name, 963 file->f_path.dentry->d_name.name, 964 count, (long long) pos); 965 966 retval = generic_write_checks(file, &pos, &count, 0); 967 if (retval) 968 goto out; 969 970 retval = -EINVAL; 971 if ((ssize_t) count < 0) 972 goto out; 973 retval = 0; 974 if (!count) 975 goto out; 976 977 retval = nfs_sync_mapping(mapping); 978 if (retval) 979 goto out; 980 981 retval = nfs_direct_write(iocb, iov, nr_segs, pos, count); 982 983 if (retval > 0) 984 iocb->ki_pos = pos + retval; 985 986 out: 987 return retval; 988 } 989 990 /** 991 * nfs_init_directcache - create a slab cache for nfs_direct_req structures 992 * 993 */ 994 int __init nfs_init_directcache(void) 995 { 996 nfs_direct_cachep = kmem_cache_create("nfs_direct_cache", 997 sizeof(struct nfs_direct_req), 998 0, (SLAB_RECLAIM_ACCOUNT| 999 SLAB_MEM_SPREAD), 1000 NULL); 1001 if (nfs_direct_cachep == NULL) 1002 return -ENOMEM; 1003 1004 return 0; 1005 } 1006 1007 /** 1008 * nfs_destroy_directcache - destroy the slab cache for nfs_direct_req structures 1009 * 1010 */ 1011 void nfs_destroy_directcache(void) 1012 { 1013 kmem_cache_destroy(nfs_direct_cachep); 1014 } 1015