1 /* 2 * linux/fs/nfs/direct.c 3 * 4 * Copyright (C) 2003 by Chuck Lever <cel@netapp.com> 5 * 6 * High-performance uncached I/O for the Linux NFS client 7 * 8 * There are important applications whose performance or correctness 9 * depends on uncached access to file data. Database clusters 10 * (multiple copies of the same instance running on separate hosts) 11 * implement their own cache coherency protocol that subsumes file 12 * system cache protocols. Applications that process datasets 13 * considerably larger than the client's memory do not always benefit 14 * from a local cache. A streaming video server, for instance, has no 15 * need to cache the contents of a file. 16 * 17 * When an application requests uncached I/O, all read and write requests 18 * are made directly to the server; data stored or fetched via these 19 * requests is not cached in the Linux page cache. The client does not 20 * correct unaligned requests from applications. All requested bytes are 21 * held on permanent storage before a direct write system call returns to 22 * an application. 23 * 24 * Solaris implements an uncached I/O facility called directio() that 25 * is used for backups and sequential I/O to very large files. Solaris 26 * also supports uncaching whole NFS partitions with "-o forcedirectio," 27 * an undocumented mount option. 28 * 29 * Designed by Jeff Kimmel, Chuck Lever, and Trond Myklebust, with 30 * help from Andrew Morton. 31 * 32 * 18 Dec 2001 Initial implementation for 2.4 --cel 33 * 08 Jul 2002 Version for 2.4.19, with bug fixes --trondmy 34 * 08 Jun 2003 Port to 2.5 APIs --cel 35 * 31 Mar 2004 Handle direct I/O without VFS support --cel 36 * 15 Sep 2004 Parallel async reads --cel 37 * 04 May 2005 support O_DIRECT with aio --cel 38 * 39 */ 40 41 #include <linux/errno.h> 42 #include <linux/sched.h> 43 #include <linux/kernel.h> 44 #include <linux/file.h> 45 #include <linux/pagemap.h> 46 #include <linux/kref.h> 47 48 #include <linux/nfs_fs.h> 49 #include <linux/nfs_page.h> 50 #include <linux/sunrpc/clnt.h> 51 52 #include <asm/system.h> 53 #include <asm/uaccess.h> 54 #include <asm/atomic.h> 55 56 #include "internal.h" 57 #include "iostat.h" 58 59 #define NFSDBG_FACILITY NFSDBG_VFS 60 61 static struct kmem_cache *nfs_direct_cachep; 62 63 /* 64 * This represents a set of asynchronous requests that we're waiting on 65 */ 66 struct nfs_direct_req { 67 struct kref kref; /* release manager */ 68 69 /* I/O parameters */ 70 struct nfs_open_context *ctx; /* file open context info */ 71 struct kiocb * iocb; /* controlling i/o request */ 72 struct inode * inode; /* target file of i/o */ 73 74 /* completion state */ 75 atomic_t io_count; /* i/os we're waiting for */ 76 spinlock_t lock; /* protect completion state */ 77 ssize_t count, /* bytes actually processed */ 78 error; /* any reported error */ 79 struct completion completion; /* wait for i/o completion */ 80 81 /* commit state */ 82 struct list_head rewrite_list; /* saved nfs_write_data structs */ 83 struct nfs_write_data * commit_data; /* special write_data for commits */ 84 int flags; 85 #define NFS_ODIRECT_DO_COMMIT (1) /* an unstable reply was received */ 86 #define NFS_ODIRECT_RESCHED_WRITES (2) /* write verification failed */ 87 struct nfs_writeverf verf; /* unstable write verifier */ 88 }; 89 90 static void nfs_direct_write_complete(struct nfs_direct_req *dreq, struct inode *inode); 91 static const struct rpc_call_ops nfs_write_direct_ops; 92 93 static inline void get_dreq(struct nfs_direct_req *dreq) 94 { 95 atomic_inc(&dreq->io_count); 96 } 97 98 static inline int put_dreq(struct nfs_direct_req *dreq) 99 { 100 return atomic_dec_and_test(&dreq->io_count); 101 } 102 103 /** 104 * nfs_direct_IO - NFS address space operation for direct I/O 105 * @rw: direction (read or write) 106 * @iocb: target I/O control block 107 * @iov: array of vectors that define I/O buffer 108 * @pos: offset in file to begin the operation 109 * @nr_segs: size of iovec array 110 * 111 * The presence of this routine in the address space ops vector means 112 * the NFS client supports direct I/O. However, we shunt off direct 113 * read and write requests before the VFS gets them, so this method 114 * should never be called. 115 */ 116 ssize_t nfs_direct_IO(int rw, struct kiocb *iocb, const struct iovec *iov, loff_t pos, unsigned long nr_segs) 117 { 118 dprintk("NFS: nfs_direct_IO (%s) off/no(%Ld/%lu) EINVAL\n", 119 iocb->ki_filp->f_path.dentry->d_name.name, 120 (long long) pos, nr_segs); 121 122 return -EINVAL; 123 } 124 125 static void nfs_direct_dirty_pages(struct page **pages, unsigned int pgbase, size_t count) 126 { 127 unsigned int npages; 128 unsigned int i; 129 130 if (count == 0) 131 return; 132 pages += (pgbase >> PAGE_SHIFT); 133 npages = (count + (pgbase & ~PAGE_MASK) + PAGE_SIZE - 1) >> PAGE_SHIFT; 134 for (i = 0; i < npages; i++) { 135 struct page *page = pages[i]; 136 if (!PageCompound(page)) 137 set_page_dirty(page); 138 } 139 } 140 141 static void nfs_direct_release_pages(struct page **pages, unsigned int npages) 142 { 143 unsigned int i; 144 for (i = 0; i < npages; i++) 145 page_cache_release(pages[i]); 146 } 147 148 static inline struct nfs_direct_req *nfs_direct_req_alloc(void) 149 { 150 struct nfs_direct_req *dreq; 151 152 dreq = kmem_cache_alloc(nfs_direct_cachep, GFP_KERNEL); 153 if (!dreq) 154 return NULL; 155 156 kref_init(&dreq->kref); 157 kref_get(&dreq->kref); 158 init_completion(&dreq->completion); 159 INIT_LIST_HEAD(&dreq->rewrite_list); 160 dreq->iocb = NULL; 161 dreq->ctx = NULL; 162 spin_lock_init(&dreq->lock); 163 atomic_set(&dreq->io_count, 0); 164 dreq->count = 0; 165 dreq->error = 0; 166 dreq->flags = 0; 167 168 return dreq; 169 } 170 171 static void nfs_direct_req_free(struct kref *kref) 172 { 173 struct nfs_direct_req *dreq = container_of(kref, struct nfs_direct_req, kref); 174 175 if (dreq->ctx != NULL) 176 put_nfs_open_context(dreq->ctx); 177 kmem_cache_free(nfs_direct_cachep, dreq); 178 } 179 180 static void nfs_direct_req_release(struct nfs_direct_req *dreq) 181 { 182 kref_put(&dreq->kref, nfs_direct_req_free); 183 } 184 185 /* 186 * Collects and returns the final error value/byte-count. 187 */ 188 static ssize_t nfs_direct_wait(struct nfs_direct_req *dreq) 189 { 190 ssize_t result = -EIOCBQUEUED; 191 192 /* Async requests don't wait here */ 193 if (dreq->iocb) 194 goto out; 195 196 result = wait_for_completion_killable(&dreq->completion); 197 198 if (!result) 199 result = dreq->error; 200 if (!result) 201 result = dreq->count; 202 203 out: 204 return (ssize_t) result; 205 } 206 207 /* 208 * Synchronous I/O uses a stack-allocated iocb. Thus we can't trust 209 * the iocb is still valid here if this is a synchronous request. 210 */ 211 static void nfs_direct_complete(struct nfs_direct_req *dreq) 212 { 213 if (dreq->iocb) { 214 long res = (long) dreq->error; 215 if (!res) 216 res = (long) dreq->count; 217 aio_complete(dreq->iocb, res, 0); 218 } 219 complete_all(&dreq->completion); 220 221 nfs_direct_req_release(dreq); 222 } 223 224 /* 225 * We must hold a reference to all the pages in this direct read request 226 * until the RPCs complete. This could be long *after* we are woken up in 227 * nfs_direct_wait (for instance, if someone hits ^C on a slow server). 228 */ 229 static void nfs_direct_read_result(struct rpc_task *task, void *calldata) 230 { 231 struct nfs_read_data *data = calldata; 232 233 nfs_readpage_result(task, data); 234 } 235 236 static void nfs_direct_read_release(void *calldata) 237 { 238 239 struct nfs_read_data *data = calldata; 240 struct nfs_direct_req *dreq = (struct nfs_direct_req *) data->req; 241 int status = data->task.tk_status; 242 243 spin_lock(&dreq->lock); 244 if (unlikely(status < 0)) { 245 dreq->error = status; 246 spin_unlock(&dreq->lock); 247 } else { 248 dreq->count += data->res.count; 249 spin_unlock(&dreq->lock); 250 nfs_direct_dirty_pages(data->pagevec, 251 data->args.pgbase, 252 data->res.count); 253 } 254 nfs_direct_release_pages(data->pagevec, data->npages); 255 256 if (put_dreq(dreq)) 257 nfs_direct_complete(dreq); 258 nfs_readdata_free(data); 259 } 260 261 static const struct rpc_call_ops nfs_read_direct_ops = { 262 #if defined(CONFIG_NFS_V4_1) 263 .rpc_call_prepare = nfs_read_prepare, 264 #endif /* CONFIG_NFS_V4_1 */ 265 .rpc_call_done = nfs_direct_read_result, 266 .rpc_release = nfs_direct_read_release, 267 }; 268 269 /* 270 * For each rsize'd chunk of the user's buffer, dispatch an NFS READ 271 * operation. If nfs_readdata_alloc() or get_user_pages() fails, 272 * bail and stop sending more reads. Read length accounting is 273 * handled automatically by nfs_direct_read_result(). Otherwise, if 274 * no requests have been sent, just return an error. 275 */ 276 static ssize_t nfs_direct_read_schedule_segment(struct nfs_direct_req *dreq, 277 const struct iovec *iov, 278 loff_t pos) 279 { 280 struct nfs_open_context *ctx = dreq->ctx; 281 struct inode *inode = ctx->path.dentry->d_inode; 282 unsigned long user_addr = (unsigned long)iov->iov_base; 283 size_t count = iov->iov_len; 284 size_t rsize = NFS_SERVER(inode)->rsize; 285 struct rpc_task *task; 286 struct rpc_message msg = { 287 .rpc_cred = ctx->cred, 288 }; 289 struct rpc_task_setup task_setup_data = { 290 .rpc_client = NFS_CLIENT(inode), 291 .rpc_message = &msg, 292 .callback_ops = &nfs_read_direct_ops, 293 .workqueue = nfsiod_workqueue, 294 .flags = RPC_TASK_ASYNC, 295 }; 296 unsigned int pgbase; 297 int result; 298 ssize_t started = 0; 299 300 do { 301 struct nfs_read_data *data; 302 size_t bytes; 303 304 pgbase = user_addr & ~PAGE_MASK; 305 bytes = min(rsize,count); 306 307 result = -ENOMEM; 308 data = nfs_readdata_alloc(nfs_page_array_len(pgbase, bytes)); 309 if (unlikely(!data)) 310 break; 311 312 down_read(¤t->mm->mmap_sem); 313 result = get_user_pages(current, current->mm, user_addr, 314 data->npages, 1, 0, data->pagevec, NULL); 315 up_read(¤t->mm->mmap_sem); 316 if (result < 0) { 317 nfs_readdata_free(data); 318 break; 319 } 320 if ((unsigned)result < data->npages) { 321 bytes = result * PAGE_SIZE; 322 if (bytes <= pgbase) { 323 nfs_direct_release_pages(data->pagevec, result); 324 nfs_readdata_free(data); 325 break; 326 } 327 bytes -= pgbase; 328 data->npages = result; 329 } 330 331 get_dreq(dreq); 332 333 data->req = (struct nfs_page *) dreq; 334 data->inode = inode; 335 data->cred = msg.rpc_cred; 336 data->args.fh = NFS_FH(inode); 337 data->args.context = ctx; 338 data->args.offset = pos; 339 data->args.pgbase = pgbase; 340 data->args.pages = data->pagevec; 341 data->args.count = bytes; 342 data->res.fattr = &data->fattr; 343 data->res.eof = 0; 344 data->res.count = bytes; 345 msg.rpc_argp = &data->args; 346 msg.rpc_resp = &data->res; 347 348 task_setup_data.task = &data->task; 349 task_setup_data.callback_data = data; 350 NFS_PROTO(inode)->read_setup(data, &msg); 351 352 task = rpc_run_task(&task_setup_data); 353 if (IS_ERR(task)) 354 break; 355 rpc_put_task(task); 356 357 dprintk("NFS: %5u initiated direct read call " 358 "(req %s/%Ld, %zu bytes @ offset %Lu)\n", 359 data->task.tk_pid, 360 inode->i_sb->s_id, 361 (long long)NFS_FILEID(inode), 362 bytes, 363 (unsigned long long)data->args.offset); 364 365 started += bytes; 366 user_addr += bytes; 367 pos += bytes; 368 /* FIXME: Remove this unnecessary math from final patch */ 369 pgbase += bytes; 370 pgbase &= ~PAGE_MASK; 371 BUG_ON(pgbase != (user_addr & ~PAGE_MASK)); 372 373 count -= bytes; 374 } while (count != 0); 375 376 if (started) 377 return started; 378 return result < 0 ? (ssize_t) result : -EFAULT; 379 } 380 381 static ssize_t nfs_direct_read_schedule_iovec(struct nfs_direct_req *dreq, 382 const struct iovec *iov, 383 unsigned long nr_segs, 384 loff_t pos) 385 { 386 ssize_t result = -EINVAL; 387 size_t requested_bytes = 0; 388 unsigned long seg; 389 390 get_dreq(dreq); 391 392 for (seg = 0; seg < nr_segs; seg++) { 393 const struct iovec *vec = &iov[seg]; 394 result = nfs_direct_read_schedule_segment(dreq, vec, pos); 395 if (result < 0) 396 break; 397 requested_bytes += result; 398 if ((size_t)result < vec->iov_len) 399 break; 400 pos += vec->iov_len; 401 } 402 403 if (put_dreq(dreq)) 404 nfs_direct_complete(dreq); 405 406 if (requested_bytes != 0) 407 return 0; 408 409 if (result < 0) 410 return result; 411 return -EIO; 412 } 413 414 static ssize_t nfs_direct_read(struct kiocb *iocb, const struct iovec *iov, 415 unsigned long nr_segs, loff_t pos) 416 { 417 ssize_t result = 0; 418 struct inode *inode = iocb->ki_filp->f_mapping->host; 419 struct nfs_direct_req *dreq; 420 421 dreq = nfs_direct_req_alloc(); 422 if (!dreq) 423 return -ENOMEM; 424 425 dreq->inode = inode; 426 dreq->ctx = get_nfs_open_context(nfs_file_open_context(iocb->ki_filp)); 427 if (!is_sync_kiocb(iocb)) 428 dreq->iocb = iocb; 429 430 result = nfs_direct_read_schedule_iovec(dreq, iov, nr_segs, pos); 431 if (!result) 432 result = nfs_direct_wait(dreq); 433 nfs_direct_req_release(dreq); 434 435 return result; 436 } 437 438 static void nfs_direct_free_writedata(struct nfs_direct_req *dreq) 439 { 440 while (!list_empty(&dreq->rewrite_list)) { 441 struct nfs_write_data *data = list_entry(dreq->rewrite_list.next, struct nfs_write_data, pages); 442 list_del(&data->pages); 443 nfs_direct_release_pages(data->pagevec, data->npages); 444 nfs_writedata_free(data); 445 } 446 } 447 448 #if defined(CONFIG_NFS_V3) || defined(CONFIG_NFS_V4) 449 static void nfs_direct_write_reschedule(struct nfs_direct_req *dreq) 450 { 451 struct inode *inode = dreq->inode; 452 struct list_head *p; 453 struct nfs_write_data *data; 454 struct rpc_task *task; 455 struct rpc_message msg = { 456 .rpc_cred = dreq->ctx->cred, 457 }; 458 struct rpc_task_setup task_setup_data = { 459 .rpc_client = NFS_CLIENT(inode), 460 .rpc_message = &msg, 461 .callback_ops = &nfs_write_direct_ops, 462 .workqueue = nfsiod_workqueue, 463 .flags = RPC_TASK_ASYNC, 464 }; 465 466 dreq->count = 0; 467 get_dreq(dreq); 468 469 list_for_each(p, &dreq->rewrite_list) { 470 data = list_entry(p, struct nfs_write_data, pages); 471 472 get_dreq(dreq); 473 474 /* Use stable writes */ 475 data->args.stable = NFS_FILE_SYNC; 476 477 /* 478 * Reset data->res. 479 */ 480 nfs_fattr_init(&data->fattr); 481 data->res.count = data->args.count; 482 memset(&data->verf, 0, sizeof(data->verf)); 483 484 /* 485 * Reuse data->task; data->args should not have changed 486 * since the original request was sent. 487 */ 488 task_setup_data.task = &data->task; 489 task_setup_data.callback_data = data; 490 msg.rpc_argp = &data->args; 491 msg.rpc_resp = &data->res; 492 NFS_PROTO(inode)->write_setup(data, &msg); 493 494 /* 495 * We're called via an RPC callback, so BKL is already held. 496 */ 497 task = rpc_run_task(&task_setup_data); 498 if (!IS_ERR(task)) 499 rpc_put_task(task); 500 501 dprintk("NFS: %5u rescheduled direct write call (req %s/%Ld, %u bytes @ offset %Lu)\n", 502 data->task.tk_pid, 503 inode->i_sb->s_id, 504 (long long)NFS_FILEID(inode), 505 data->args.count, 506 (unsigned long long)data->args.offset); 507 } 508 509 if (put_dreq(dreq)) 510 nfs_direct_write_complete(dreq, inode); 511 } 512 513 static void nfs_direct_commit_result(struct rpc_task *task, void *calldata) 514 { 515 struct nfs_write_data *data = calldata; 516 517 /* Call the NFS version-specific code */ 518 NFS_PROTO(data->inode)->commit_done(task, data); 519 } 520 521 static void nfs_direct_commit_release(void *calldata) 522 { 523 struct nfs_write_data *data = calldata; 524 struct nfs_direct_req *dreq = (struct nfs_direct_req *) data->req; 525 int status = data->task.tk_status; 526 527 if (status < 0) { 528 dprintk("NFS: %5u commit failed with error %d.\n", 529 data->task.tk_pid, status); 530 dreq->flags = NFS_ODIRECT_RESCHED_WRITES; 531 } else if (memcmp(&dreq->verf, &data->verf, sizeof(data->verf))) { 532 dprintk("NFS: %5u commit verify failed\n", data->task.tk_pid); 533 dreq->flags = NFS_ODIRECT_RESCHED_WRITES; 534 } 535 536 dprintk("NFS: %5u commit returned %d\n", data->task.tk_pid, status); 537 nfs_direct_write_complete(dreq, data->inode); 538 nfs_commit_free(data); 539 } 540 541 static const struct rpc_call_ops nfs_commit_direct_ops = { 542 #if defined(CONFIG_NFS_V4_1) 543 .rpc_call_prepare = nfs_write_prepare, 544 #endif /* CONFIG_NFS_V4_1 */ 545 .rpc_call_done = nfs_direct_commit_result, 546 .rpc_release = nfs_direct_commit_release, 547 }; 548 549 static void nfs_direct_commit_schedule(struct nfs_direct_req *dreq) 550 { 551 struct nfs_write_data *data = dreq->commit_data; 552 struct rpc_task *task; 553 struct rpc_message msg = { 554 .rpc_argp = &data->args, 555 .rpc_resp = &data->res, 556 .rpc_cred = dreq->ctx->cred, 557 }; 558 struct rpc_task_setup task_setup_data = { 559 .task = &data->task, 560 .rpc_client = NFS_CLIENT(dreq->inode), 561 .rpc_message = &msg, 562 .callback_ops = &nfs_commit_direct_ops, 563 .callback_data = data, 564 .workqueue = nfsiod_workqueue, 565 .flags = RPC_TASK_ASYNC, 566 }; 567 568 data->inode = dreq->inode; 569 data->cred = msg.rpc_cred; 570 571 data->args.fh = NFS_FH(data->inode); 572 data->args.offset = 0; 573 data->args.count = 0; 574 data->args.context = dreq->ctx; 575 data->res.count = 0; 576 data->res.fattr = &data->fattr; 577 data->res.verf = &data->verf; 578 579 NFS_PROTO(data->inode)->commit_setup(data, &msg); 580 581 /* Note: task.tk_ops->rpc_release will free dreq->commit_data */ 582 dreq->commit_data = NULL; 583 584 dprintk("NFS: %5u initiated commit call\n", data->task.tk_pid); 585 586 task = rpc_run_task(&task_setup_data); 587 if (!IS_ERR(task)) 588 rpc_put_task(task); 589 } 590 591 static void nfs_direct_write_complete(struct nfs_direct_req *dreq, struct inode *inode) 592 { 593 int flags = dreq->flags; 594 595 dreq->flags = 0; 596 switch (flags) { 597 case NFS_ODIRECT_DO_COMMIT: 598 nfs_direct_commit_schedule(dreq); 599 break; 600 case NFS_ODIRECT_RESCHED_WRITES: 601 nfs_direct_write_reschedule(dreq); 602 break; 603 default: 604 if (dreq->commit_data != NULL) 605 nfs_commit_free(dreq->commit_data); 606 nfs_direct_free_writedata(dreq); 607 nfs_zap_mapping(inode, inode->i_mapping); 608 nfs_direct_complete(dreq); 609 } 610 } 611 612 static void nfs_alloc_commit_data(struct nfs_direct_req *dreq) 613 { 614 dreq->commit_data = nfs_commitdata_alloc(); 615 if (dreq->commit_data != NULL) 616 dreq->commit_data->req = (struct nfs_page *) dreq; 617 } 618 #else 619 static inline void nfs_alloc_commit_data(struct nfs_direct_req *dreq) 620 { 621 dreq->commit_data = NULL; 622 } 623 624 static void nfs_direct_write_complete(struct nfs_direct_req *dreq, struct inode *inode) 625 { 626 nfs_direct_free_writedata(dreq); 627 nfs_zap_mapping(inode, inode->i_mapping); 628 nfs_direct_complete(dreq); 629 } 630 #endif 631 632 static void nfs_direct_write_result(struct rpc_task *task, void *calldata) 633 { 634 struct nfs_write_data *data = calldata; 635 636 if (nfs_writeback_done(task, data) != 0) 637 return; 638 } 639 640 /* 641 * NB: Return the value of the first error return code. Subsequent 642 * errors after the first one are ignored. 643 */ 644 static void nfs_direct_write_release(void *calldata) 645 { 646 struct nfs_write_data *data = calldata; 647 struct nfs_direct_req *dreq = (struct nfs_direct_req *) data->req; 648 int status = data->task.tk_status; 649 650 spin_lock(&dreq->lock); 651 652 if (unlikely(status < 0)) { 653 /* An error has occurred, so we should not commit */ 654 dreq->flags = 0; 655 dreq->error = status; 656 } 657 if (unlikely(dreq->error != 0)) 658 goto out_unlock; 659 660 dreq->count += data->res.count; 661 662 if (data->res.verf->committed != NFS_FILE_SYNC) { 663 switch (dreq->flags) { 664 case 0: 665 memcpy(&dreq->verf, &data->verf, sizeof(dreq->verf)); 666 dreq->flags = NFS_ODIRECT_DO_COMMIT; 667 break; 668 case NFS_ODIRECT_DO_COMMIT: 669 if (memcmp(&dreq->verf, &data->verf, sizeof(dreq->verf))) { 670 dprintk("NFS: %5u write verify failed\n", data->task.tk_pid); 671 dreq->flags = NFS_ODIRECT_RESCHED_WRITES; 672 } 673 } 674 } 675 out_unlock: 676 spin_unlock(&dreq->lock); 677 678 if (put_dreq(dreq)) 679 nfs_direct_write_complete(dreq, data->inode); 680 } 681 682 static const struct rpc_call_ops nfs_write_direct_ops = { 683 #if defined(CONFIG_NFS_V4_1) 684 .rpc_call_prepare = nfs_write_prepare, 685 #endif /* CONFIG_NFS_V4_1 */ 686 .rpc_call_done = nfs_direct_write_result, 687 .rpc_release = nfs_direct_write_release, 688 }; 689 690 /* 691 * For each wsize'd chunk of the user's buffer, dispatch an NFS WRITE 692 * operation. If nfs_writedata_alloc() or get_user_pages() fails, 693 * bail and stop sending more writes. Write length accounting is 694 * handled automatically by nfs_direct_write_result(). Otherwise, if 695 * no requests have been sent, just return an error. 696 */ 697 static ssize_t nfs_direct_write_schedule_segment(struct nfs_direct_req *dreq, 698 const struct iovec *iov, 699 loff_t pos, int sync) 700 { 701 struct nfs_open_context *ctx = dreq->ctx; 702 struct inode *inode = ctx->path.dentry->d_inode; 703 unsigned long user_addr = (unsigned long)iov->iov_base; 704 size_t count = iov->iov_len; 705 struct rpc_task *task; 706 struct rpc_message msg = { 707 .rpc_cred = ctx->cred, 708 }; 709 struct rpc_task_setup task_setup_data = { 710 .rpc_client = NFS_CLIENT(inode), 711 .rpc_message = &msg, 712 .callback_ops = &nfs_write_direct_ops, 713 .workqueue = nfsiod_workqueue, 714 .flags = RPC_TASK_ASYNC, 715 }; 716 size_t wsize = NFS_SERVER(inode)->wsize; 717 unsigned int pgbase; 718 int result; 719 ssize_t started = 0; 720 721 do { 722 struct nfs_write_data *data; 723 size_t bytes; 724 725 pgbase = user_addr & ~PAGE_MASK; 726 bytes = min(wsize,count); 727 728 result = -ENOMEM; 729 data = nfs_writedata_alloc(nfs_page_array_len(pgbase, bytes)); 730 if (unlikely(!data)) 731 break; 732 733 down_read(¤t->mm->mmap_sem); 734 result = get_user_pages(current, current->mm, user_addr, 735 data->npages, 0, 0, data->pagevec, NULL); 736 up_read(¤t->mm->mmap_sem); 737 if (result < 0) { 738 nfs_writedata_free(data); 739 break; 740 } 741 if ((unsigned)result < data->npages) { 742 bytes = result * PAGE_SIZE; 743 if (bytes <= pgbase) { 744 nfs_direct_release_pages(data->pagevec, result); 745 nfs_writedata_free(data); 746 break; 747 } 748 bytes -= pgbase; 749 data->npages = result; 750 } 751 752 get_dreq(dreq); 753 754 list_move_tail(&data->pages, &dreq->rewrite_list); 755 756 data->req = (struct nfs_page *) dreq; 757 data->inode = inode; 758 data->cred = msg.rpc_cred; 759 data->args.fh = NFS_FH(inode); 760 data->args.context = ctx; 761 data->args.offset = pos; 762 data->args.pgbase = pgbase; 763 data->args.pages = data->pagevec; 764 data->args.count = bytes; 765 data->args.stable = sync; 766 data->res.fattr = &data->fattr; 767 data->res.count = bytes; 768 data->res.verf = &data->verf; 769 770 task_setup_data.task = &data->task; 771 task_setup_data.callback_data = data; 772 msg.rpc_argp = &data->args; 773 msg.rpc_resp = &data->res; 774 NFS_PROTO(inode)->write_setup(data, &msg); 775 776 task = rpc_run_task(&task_setup_data); 777 if (IS_ERR(task)) 778 break; 779 rpc_put_task(task); 780 781 dprintk("NFS: %5u initiated direct write call " 782 "(req %s/%Ld, %zu bytes @ offset %Lu)\n", 783 data->task.tk_pid, 784 inode->i_sb->s_id, 785 (long long)NFS_FILEID(inode), 786 bytes, 787 (unsigned long long)data->args.offset); 788 789 started += bytes; 790 user_addr += bytes; 791 pos += bytes; 792 793 /* FIXME: Remove this useless math from the final patch */ 794 pgbase += bytes; 795 pgbase &= ~PAGE_MASK; 796 BUG_ON(pgbase != (user_addr & ~PAGE_MASK)); 797 798 count -= bytes; 799 } while (count != 0); 800 801 if (started) 802 return started; 803 return result < 0 ? (ssize_t) result : -EFAULT; 804 } 805 806 static ssize_t nfs_direct_write_schedule_iovec(struct nfs_direct_req *dreq, 807 const struct iovec *iov, 808 unsigned long nr_segs, 809 loff_t pos, int sync) 810 { 811 ssize_t result = 0; 812 size_t requested_bytes = 0; 813 unsigned long seg; 814 815 get_dreq(dreq); 816 817 for (seg = 0; seg < nr_segs; seg++) { 818 const struct iovec *vec = &iov[seg]; 819 result = nfs_direct_write_schedule_segment(dreq, vec, 820 pos, sync); 821 if (result < 0) 822 break; 823 requested_bytes += result; 824 if ((size_t)result < vec->iov_len) 825 break; 826 pos += vec->iov_len; 827 } 828 829 if (put_dreq(dreq)) 830 nfs_direct_write_complete(dreq, dreq->inode); 831 832 if (requested_bytes != 0) 833 return 0; 834 835 if (result < 0) 836 return result; 837 return -EIO; 838 } 839 840 static ssize_t nfs_direct_write(struct kiocb *iocb, const struct iovec *iov, 841 unsigned long nr_segs, loff_t pos, 842 size_t count) 843 { 844 ssize_t result = 0; 845 struct inode *inode = iocb->ki_filp->f_mapping->host; 846 struct nfs_direct_req *dreq; 847 size_t wsize = NFS_SERVER(inode)->wsize; 848 int sync = NFS_UNSTABLE; 849 850 dreq = nfs_direct_req_alloc(); 851 if (!dreq) 852 return -ENOMEM; 853 nfs_alloc_commit_data(dreq); 854 855 if (dreq->commit_data == NULL || count < wsize) 856 sync = NFS_FILE_SYNC; 857 858 dreq->inode = inode; 859 dreq->ctx = get_nfs_open_context(nfs_file_open_context(iocb->ki_filp)); 860 if (!is_sync_kiocb(iocb)) 861 dreq->iocb = iocb; 862 863 result = nfs_direct_write_schedule_iovec(dreq, iov, nr_segs, pos, sync); 864 if (!result) 865 result = nfs_direct_wait(dreq); 866 nfs_direct_req_release(dreq); 867 868 return result; 869 } 870 871 /** 872 * nfs_file_direct_read - file direct read operation for NFS files 873 * @iocb: target I/O control block 874 * @iov: vector of user buffers into which to read data 875 * @nr_segs: size of iov vector 876 * @pos: byte offset in file where reading starts 877 * 878 * We use this function for direct reads instead of calling 879 * generic_file_aio_read() in order to avoid gfar's check to see if 880 * the request starts before the end of the file. For that check 881 * to work, we must generate a GETATTR before each direct read, and 882 * even then there is a window between the GETATTR and the subsequent 883 * READ where the file size could change. Our preference is simply 884 * to do all reads the application wants, and the server will take 885 * care of managing the end of file boundary. 886 * 887 * This function also eliminates unnecessarily updating the file's 888 * atime locally, as the NFS server sets the file's atime, and this 889 * client must read the updated atime from the server back into its 890 * cache. 891 */ 892 ssize_t nfs_file_direct_read(struct kiocb *iocb, const struct iovec *iov, 893 unsigned long nr_segs, loff_t pos) 894 { 895 ssize_t retval = -EINVAL; 896 struct file *file = iocb->ki_filp; 897 struct address_space *mapping = file->f_mapping; 898 size_t count; 899 900 count = iov_length(iov, nr_segs); 901 nfs_add_stats(mapping->host, NFSIOS_DIRECTREADBYTES, count); 902 903 dfprintk(FILE, "NFS: direct read(%s/%s, %zd@%Ld)\n", 904 file->f_path.dentry->d_parent->d_name.name, 905 file->f_path.dentry->d_name.name, 906 count, (long long) pos); 907 908 retval = 0; 909 if (!count) 910 goto out; 911 912 retval = nfs_sync_mapping(mapping); 913 if (retval) 914 goto out; 915 916 retval = nfs_direct_read(iocb, iov, nr_segs, pos); 917 if (retval > 0) 918 iocb->ki_pos = pos + retval; 919 920 out: 921 return retval; 922 } 923 924 /** 925 * nfs_file_direct_write - file direct write operation for NFS files 926 * @iocb: target I/O control block 927 * @iov: vector of user buffers from which to write data 928 * @nr_segs: size of iov vector 929 * @pos: byte offset in file where writing starts 930 * 931 * We use this function for direct writes instead of calling 932 * generic_file_aio_write() in order to avoid taking the inode 933 * semaphore and updating the i_size. The NFS server will set 934 * the new i_size and this client must read the updated size 935 * back into its cache. We let the server do generic write 936 * parameter checking and report problems. 937 * 938 * We eliminate local atime updates, see direct read above. 939 * 940 * We avoid unnecessary page cache invalidations for normal cached 941 * readers of this file. 942 * 943 * Note that O_APPEND is not supported for NFS direct writes, as there 944 * is no atomic O_APPEND write facility in the NFS protocol. 945 */ 946 ssize_t nfs_file_direct_write(struct kiocb *iocb, const struct iovec *iov, 947 unsigned long nr_segs, loff_t pos) 948 { 949 ssize_t retval = -EINVAL; 950 struct file *file = iocb->ki_filp; 951 struct address_space *mapping = file->f_mapping; 952 size_t count; 953 954 count = iov_length(iov, nr_segs); 955 nfs_add_stats(mapping->host, NFSIOS_DIRECTWRITTENBYTES, count); 956 957 dfprintk(FILE, "NFS: direct write(%s/%s, %zd@%Ld)\n", 958 file->f_path.dentry->d_parent->d_name.name, 959 file->f_path.dentry->d_name.name, 960 count, (long long) pos); 961 962 retval = generic_write_checks(file, &pos, &count, 0); 963 if (retval) 964 goto out; 965 966 retval = -EINVAL; 967 if ((ssize_t) count < 0) 968 goto out; 969 retval = 0; 970 if (!count) 971 goto out; 972 973 retval = nfs_sync_mapping(mapping); 974 if (retval) 975 goto out; 976 977 retval = nfs_direct_write(iocb, iov, nr_segs, pos, count); 978 979 if (retval > 0) 980 iocb->ki_pos = pos + retval; 981 982 out: 983 return retval; 984 } 985 986 /** 987 * nfs_init_directcache - create a slab cache for nfs_direct_req structures 988 * 989 */ 990 int __init nfs_init_directcache(void) 991 { 992 nfs_direct_cachep = kmem_cache_create("nfs_direct_cache", 993 sizeof(struct nfs_direct_req), 994 0, (SLAB_RECLAIM_ACCOUNT| 995 SLAB_MEM_SPREAD), 996 NULL); 997 if (nfs_direct_cachep == NULL) 998 return -ENOMEM; 999 1000 return 0; 1001 } 1002 1003 /** 1004 * nfs_destroy_directcache - destroy the slab cache for nfs_direct_req structures 1005 * 1006 */ 1007 void nfs_destroy_directcache(void) 1008 { 1009 kmem_cache_destroy(nfs_direct_cachep); 1010 } 1011