1 /* 2 * linux/fs/nfs/direct.c 3 * 4 * Copyright (C) 2003 by Chuck Lever <cel@netapp.com> 5 * 6 * High-performance uncached I/O for the Linux NFS client 7 * 8 * There are important applications whose performance or correctness 9 * depends on uncached access to file data. Database clusters 10 * (multiple copies of the same instance running on separate hosts) 11 * implement their own cache coherency protocol that subsumes file 12 * system cache protocols. Applications that process datasets 13 * considerably larger than the client's memory do not always benefit 14 * from a local cache. A streaming video server, for instance, has no 15 * need to cache the contents of a file. 16 * 17 * When an application requests uncached I/O, all read and write requests 18 * are made directly to the server; data stored or fetched via these 19 * requests is not cached in the Linux page cache. The client does not 20 * correct unaligned requests from applications. All requested bytes are 21 * held on permanent storage before a direct write system call returns to 22 * an application. 23 * 24 * Solaris implements an uncached I/O facility called directio() that 25 * is used for backups and sequential I/O to very large files. Solaris 26 * also supports uncaching whole NFS partitions with "-o forcedirectio," 27 * an undocumented mount option. 28 * 29 * Designed by Jeff Kimmel, Chuck Lever, and Trond Myklebust, with 30 * help from Andrew Morton. 31 * 32 * 18 Dec 2001 Initial implementation for 2.4 --cel 33 * 08 Jul 2002 Version for 2.4.19, with bug fixes --trondmy 34 * 08 Jun 2003 Port to 2.5 APIs --cel 35 * 31 Mar 2004 Handle direct I/O without VFS support --cel 36 * 15 Sep 2004 Parallel async reads --cel 37 * 04 May 2005 support O_DIRECT with aio --cel 38 * 39 */ 40 41 #include <linux/errno.h> 42 #include <linux/sched.h> 43 #include <linux/kernel.h> 44 #include <linux/file.h> 45 #include <linux/pagemap.h> 46 #include <linux/kref.h> 47 48 #include <linux/nfs_fs.h> 49 #include <linux/nfs_page.h> 50 #include <linux/sunrpc/clnt.h> 51 52 #include <asm/system.h> 53 #include <asm/uaccess.h> 54 #include <asm/atomic.h> 55 56 #include "internal.h" 57 #include "iostat.h" 58 59 #define NFSDBG_FACILITY NFSDBG_VFS 60 61 static struct kmem_cache *nfs_direct_cachep; 62 63 /* 64 * This represents a set of asynchronous requests that we're waiting on 65 */ 66 struct nfs_direct_req { 67 struct kref kref; /* release manager */ 68 69 /* I/O parameters */ 70 struct nfs_open_context *ctx; /* file open context info */ 71 struct kiocb * iocb; /* controlling i/o request */ 72 struct inode * inode; /* target file of i/o */ 73 74 /* completion state */ 75 atomic_t io_count; /* i/os we're waiting for */ 76 spinlock_t lock; /* protect completion state */ 77 ssize_t count, /* bytes actually processed */ 78 error; /* any reported error */ 79 struct completion completion; /* wait for i/o completion */ 80 81 /* commit state */ 82 struct list_head rewrite_list; /* saved nfs_write_data structs */ 83 struct nfs_write_data * commit_data; /* special write_data for commits */ 84 int flags; 85 #define NFS_ODIRECT_DO_COMMIT (1) /* an unstable reply was received */ 86 #define NFS_ODIRECT_RESCHED_WRITES (2) /* write verification failed */ 87 struct nfs_writeverf verf; /* unstable write verifier */ 88 }; 89 90 static void nfs_direct_write_complete(struct nfs_direct_req *dreq, struct inode *inode); 91 static const struct rpc_call_ops nfs_write_direct_ops; 92 93 static inline void get_dreq(struct nfs_direct_req *dreq) 94 { 95 atomic_inc(&dreq->io_count); 96 } 97 98 static inline int put_dreq(struct nfs_direct_req *dreq) 99 { 100 return atomic_dec_and_test(&dreq->io_count); 101 } 102 103 /** 104 * nfs_direct_IO - NFS address space operation for direct I/O 105 * @rw: direction (read or write) 106 * @iocb: target I/O control block 107 * @iov: array of vectors that define I/O buffer 108 * @pos: offset in file to begin the operation 109 * @nr_segs: size of iovec array 110 * 111 * The presence of this routine in the address space ops vector means 112 * the NFS client supports direct I/O. However, we shunt off direct 113 * read and write requests before the VFS gets them, so this method 114 * should never be called. 115 */ 116 ssize_t nfs_direct_IO(int rw, struct kiocb *iocb, const struct iovec *iov, loff_t pos, unsigned long nr_segs) 117 { 118 dprintk("NFS: nfs_direct_IO (%s) off/no(%Ld/%lu) EINVAL\n", 119 iocb->ki_filp->f_path.dentry->d_name.name, 120 (long long) pos, nr_segs); 121 122 return -EINVAL; 123 } 124 125 static void nfs_direct_dirty_pages(struct page **pages, unsigned int pgbase, size_t count) 126 { 127 unsigned int npages; 128 unsigned int i; 129 130 if (count == 0) 131 return; 132 pages += (pgbase >> PAGE_SHIFT); 133 npages = (count + (pgbase & ~PAGE_MASK) + PAGE_SIZE - 1) >> PAGE_SHIFT; 134 for (i = 0; i < npages; i++) { 135 struct page *page = pages[i]; 136 if (!PageCompound(page)) 137 set_page_dirty(page); 138 } 139 } 140 141 static void nfs_direct_release_pages(struct page **pages, unsigned int npages) 142 { 143 unsigned int i; 144 for (i = 0; i < npages; i++) 145 page_cache_release(pages[i]); 146 } 147 148 static inline struct nfs_direct_req *nfs_direct_req_alloc(void) 149 { 150 struct nfs_direct_req *dreq; 151 152 dreq = kmem_cache_alloc(nfs_direct_cachep, GFP_KERNEL); 153 if (!dreq) 154 return NULL; 155 156 kref_init(&dreq->kref); 157 kref_get(&dreq->kref); 158 init_completion(&dreq->completion); 159 INIT_LIST_HEAD(&dreq->rewrite_list); 160 dreq->iocb = NULL; 161 dreq->ctx = NULL; 162 spin_lock_init(&dreq->lock); 163 atomic_set(&dreq->io_count, 0); 164 dreq->count = 0; 165 dreq->error = 0; 166 dreq->flags = 0; 167 168 return dreq; 169 } 170 171 static void nfs_direct_req_free(struct kref *kref) 172 { 173 struct nfs_direct_req *dreq = container_of(kref, struct nfs_direct_req, kref); 174 175 if (dreq->ctx != NULL) 176 put_nfs_open_context(dreq->ctx); 177 kmem_cache_free(nfs_direct_cachep, dreq); 178 } 179 180 static void nfs_direct_req_release(struct nfs_direct_req *dreq) 181 { 182 kref_put(&dreq->kref, nfs_direct_req_free); 183 } 184 185 /* 186 * Collects and returns the final error value/byte-count. 187 */ 188 static ssize_t nfs_direct_wait(struct nfs_direct_req *dreq) 189 { 190 ssize_t result = -EIOCBQUEUED; 191 192 /* Async requests don't wait here */ 193 if (dreq->iocb) 194 goto out; 195 196 result = wait_for_completion_killable(&dreq->completion); 197 198 if (!result) 199 result = dreq->error; 200 if (!result) 201 result = dreq->count; 202 203 out: 204 return (ssize_t) result; 205 } 206 207 /* 208 * Synchronous I/O uses a stack-allocated iocb. Thus we can't trust 209 * the iocb is still valid here if this is a synchronous request. 210 */ 211 static void nfs_direct_complete(struct nfs_direct_req *dreq) 212 { 213 if (dreq->iocb) { 214 long res = (long) dreq->error; 215 if (!res) 216 res = (long) dreq->count; 217 aio_complete(dreq->iocb, res, 0); 218 } 219 complete_all(&dreq->completion); 220 221 nfs_direct_req_release(dreq); 222 } 223 224 /* 225 * We must hold a reference to all the pages in this direct read request 226 * until the RPCs complete. This could be long *after* we are woken up in 227 * nfs_direct_wait (for instance, if someone hits ^C on a slow server). 228 */ 229 static void nfs_direct_read_result(struct rpc_task *task, void *calldata) 230 { 231 struct nfs_read_data *data = calldata; 232 233 nfs_readpage_result(task, data); 234 } 235 236 static void nfs_direct_read_release(void *calldata) 237 { 238 239 struct nfs_read_data *data = calldata; 240 struct nfs_direct_req *dreq = (struct nfs_direct_req *) data->req; 241 int status = data->task.tk_status; 242 243 spin_lock(&dreq->lock); 244 if (unlikely(status < 0)) { 245 dreq->error = status; 246 spin_unlock(&dreq->lock); 247 } else { 248 dreq->count += data->res.count; 249 spin_unlock(&dreq->lock); 250 nfs_direct_dirty_pages(data->pagevec, 251 data->args.pgbase, 252 data->res.count); 253 } 254 nfs_direct_release_pages(data->pagevec, data->npages); 255 256 if (put_dreq(dreq)) 257 nfs_direct_complete(dreq); 258 nfs_readdata_free(data); 259 } 260 261 static const struct rpc_call_ops nfs_read_direct_ops = { 262 #if defined(CONFIG_NFS_V4_1) 263 .rpc_call_prepare = nfs_read_prepare, 264 #endif /* CONFIG_NFS_V4_1 */ 265 .rpc_call_done = nfs_direct_read_result, 266 .rpc_release = nfs_direct_read_release, 267 }; 268 269 /* 270 * For each rsize'd chunk of the user's buffer, dispatch an NFS READ 271 * operation. If nfs_readdata_alloc() or get_user_pages() fails, 272 * bail and stop sending more reads. Read length accounting is 273 * handled automatically by nfs_direct_read_result(). Otherwise, if 274 * no requests have been sent, just return an error. 275 */ 276 static ssize_t nfs_direct_read_schedule_segment(struct nfs_direct_req *dreq, 277 const struct iovec *iov, 278 loff_t pos) 279 { 280 struct nfs_open_context *ctx = dreq->ctx; 281 struct inode *inode = ctx->path.dentry->d_inode; 282 unsigned long user_addr = (unsigned long)iov->iov_base; 283 size_t count = iov->iov_len; 284 size_t rsize = NFS_SERVER(inode)->rsize; 285 struct rpc_task *task; 286 struct rpc_message msg = { 287 .rpc_cred = ctx->cred, 288 }; 289 struct rpc_task_setup task_setup_data = { 290 .rpc_client = NFS_CLIENT(inode), 291 .rpc_message = &msg, 292 .callback_ops = &nfs_read_direct_ops, 293 .workqueue = nfsiod_workqueue, 294 .flags = RPC_TASK_ASYNC, 295 }; 296 unsigned int pgbase; 297 int result; 298 ssize_t started = 0; 299 300 do { 301 struct nfs_read_data *data; 302 size_t bytes; 303 304 pgbase = user_addr & ~PAGE_MASK; 305 bytes = min(rsize,count); 306 307 result = -ENOMEM; 308 data = nfs_readdata_alloc(nfs_page_array_len(pgbase, bytes)); 309 if (unlikely(!data)) 310 break; 311 312 down_read(¤t->mm->mmap_sem); 313 result = get_user_pages(current, current->mm, user_addr, 314 data->npages, 1, 0, data->pagevec, NULL); 315 up_read(¤t->mm->mmap_sem); 316 if (result < 0) { 317 nfs_readdata_free(data); 318 break; 319 } 320 if ((unsigned)result < data->npages) { 321 bytes = result * PAGE_SIZE; 322 if (bytes <= pgbase) { 323 nfs_direct_release_pages(data->pagevec, result); 324 nfs_readdata_free(data); 325 break; 326 } 327 bytes -= pgbase; 328 data->npages = result; 329 } 330 331 get_dreq(dreq); 332 333 data->req = (struct nfs_page *) dreq; 334 data->inode = inode; 335 data->cred = msg.rpc_cred; 336 data->args.fh = NFS_FH(inode); 337 data->args.context = ctx; 338 data->args.offset = pos; 339 data->args.pgbase = pgbase; 340 data->args.pages = data->pagevec; 341 data->args.count = bytes; 342 data->res.fattr = &data->fattr; 343 data->res.eof = 0; 344 data->res.count = bytes; 345 msg.rpc_argp = &data->args; 346 msg.rpc_resp = &data->res; 347 348 task_setup_data.task = &data->task; 349 task_setup_data.callback_data = data; 350 NFS_PROTO(inode)->read_setup(data, &msg); 351 352 task = rpc_run_task(&task_setup_data); 353 if (IS_ERR(task)) 354 break; 355 rpc_put_task(task); 356 357 dprintk("NFS: %5u initiated direct read call " 358 "(req %s/%Ld, %zu bytes @ offset %Lu)\n", 359 data->task.tk_pid, 360 inode->i_sb->s_id, 361 (long long)NFS_FILEID(inode), 362 bytes, 363 (unsigned long long)data->args.offset); 364 365 started += bytes; 366 user_addr += bytes; 367 pos += bytes; 368 /* FIXME: Remove this unnecessary math from final patch */ 369 pgbase += bytes; 370 pgbase &= ~PAGE_MASK; 371 BUG_ON(pgbase != (user_addr & ~PAGE_MASK)); 372 373 count -= bytes; 374 } while (count != 0); 375 376 if (started) 377 return started; 378 return result < 0 ? (ssize_t) result : -EFAULT; 379 } 380 381 static ssize_t nfs_direct_read_schedule_iovec(struct nfs_direct_req *dreq, 382 const struct iovec *iov, 383 unsigned long nr_segs, 384 loff_t pos) 385 { 386 ssize_t result = -EINVAL; 387 size_t requested_bytes = 0; 388 unsigned long seg; 389 390 get_dreq(dreq); 391 392 for (seg = 0; seg < nr_segs; seg++) { 393 const struct iovec *vec = &iov[seg]; 394 result = nfs_direct_read_schedule_segment(dreq, vec, pos); 395 if (result < 0) 396 break; 397 requested_bytes += result; 398 if ((size_t)result < vec->iov_len) 399 break; 400 pos += vec->iov_len; 401 } 402 403 if (put_dreq(dreq)) 404 nfs_direct_complete(dreq); 405 406 if (requested_bytes != 0) 407 return 0; 408 409 if (result < 0) 410 return result; 411 return -EIO; 412 } 413 414 static ssize_t nfs_direct_read(struct kiocb *iocb, const struct iovec *iov, 415 unsigned long nr_segs, loff_t pos) 416 { 417 ssize_t result = 0; 418 struct inode *inode = iocb->ki_filp->f_mapping->host; 419 struct nfs_direct_req *dreq; 420 421 dreq = nfs_direct_req_alloc(); 422 if (!dreq) 423 return -ENOMEM; 424 425 dreq->inode = inode; 426 dreq->ctx = get_nfs_open_context(nfs_file_open_context(iocb->ki_filp)); 427 if (!is_sync_kiocb(iocb)) 428 dreq->iocb = iocb; 429 430 result = nfs_direct_read_schedule_iovec(dreq, iov, nr_segs, pos); 431 if (!result) 432 result = nfs_direct_wait(dreq); 433 nfs_direct_req_release(dreq); 434 435 return result; 436 } 437 438 static void nfs_direct_free_writedata(struct nfs_direct_req *dreq) 439 { 440 while (!list_empty(&dreq->rewrite_list)) { 441 struct nfs_write_data *data = list_entry(dreq->rewrite_list.next, struct nfs_write_data, pages); 442 list_del(&data->pages); 443 nfs_direct_release_pages(data->pagevec, data->npages); 444 nfs_writedata_free(data); 445 } 446 } 447 448 #if defined(CONFIG_NFS_V3) || defined(CONFIG_NFS_V4) 449 static void nfs_direct_write_reschedule(struct nfs_direct_req *dreq) 450 { 451 struct inode *inode = dreq->inode; 452 struct list_head *p; 453 struct nfs_write_data *data; 454 struct rpc_task *task; 455 struct rpc_message msg = { 456 .rpc_cred = dreq->ctx->cred, 457 }; 458 struct rpc_task_setup task_setup_data = { 459 .rpc_client = NFS_CLIENT(inode), 460 .callback_ops = &nfs_write_direct_ops, 461 .workqueue = nfsiod_workqueue, 462 .flags = RPC_TASK_ASYNC, 463 }; 464 465 dreq->count = 0; 466 get_dreq(dreq); 467 468 list_for_each(p, &dreq->rewrite_list) { 469 data = list_entry(p, struct nfs_write_data, pages); 470 471 get_dreq(dreq); 472 473 /* Use stable writes */ 474 data->args.stable = NFS_FILE_SYNC; 475 476 /* 477 * Reset data->res. 478 */ 479 nfs_fattr_init(&data->fattr); 480 data->res.count = data->args.count; 481 memset(&data->verf, 0, sizeof(data->verf)); 482 483 /* 484 * Reuse data->task; data->args should not have changed 485 * since the original request was sent. 486 */ 487 task_setup_data.task = &data->task; 488 task_setup_data.callback_data = data; 489 msg.rpc_argp = &data->args; 490 msg.rpc_resp = &data->res; 491 NFS_PROTO(inode)->write_setup(data, &msg); 492 493 /* 494 * We're called via an RPC callback, so BKL is already held. 495 */ 496 task = rpc_run_task(&task_setup_data); 497 if (!IS_ERR(task)) 498 rpc_put_task(task); 499 500 dprintk("NFS: %5u rescheduled direct write call (req %s/%Ld, %u bytes @ offset %Lu)\n", 501 data->task.tk_pid, 502 inode->i_sb->s_id, 503 (long long)NFS_FILEID(inode), 504 data->args.count, 505 (unsigned long long)data->args.offset); 506 } 507 508 if (put_dreq(dreq)) 509 nfs_direct_write_complete(dreq, inode); 510 } 511 512 static void nfs_direct_commit_result(struct rpc_task *task, void *calldata) 513 { 514 struct nfs_write_data *data = calldata; 515 516 /* Call the NFS version-specific code */ 517 NFS_PROTO(data->inode)->commit_done(task, data); 518 } 519 520 static void nfs_direct_commit_release(void *calldata) 521 { 522 struct nfs_write_data *data = calldata; 523 struct nfs_direct_req *dreq = (struct nfs_direct_req *) data->req; 524 int status = data->task.tk_status; 525 526 if (status < 0) { 527 dprintk("NFS: %5u commit failed with error %d.\n", 528 data->task.tk_pid, status); 529 dreq->flags = NFS_ODIRECT_RESCHED_WRITES; 530 } else if (memcmp(&dreq->verf, &data->verf, sizeof(data->verf))) { 531 dprintk("NFS: %5u commit verify failed\n", data->task.tk_pid); 532 dreq->flags = NFS_ODIRECT_RESCHED_WRITES; 533 } 534 535 dprintk("NFS: %5u commit returned %d\n", data->task.tk_pid, status); 536 nfs_direct_write_complete(dreq, data->inode); 537 nfs_commit_free(data); 538 } 539 540 static const struct rpc_call_ops nfs_commit_direct_ops = { 541 #if defined(CONFIG_NFS_V4_1) 542 .rpc_call_prepare = nfs_write_prepare, 543 #endif /* CONFIG_NFS_V4_1 */ 544 .rpc_call_done = nfs_direct_commit_result, 545 .rpc_release = nfs_direct_commit_release, 546 }; 547 548 static void nfs_direct_commit_schedule(struct nfs_direct_req *dreq) 549 { 550 struct nfs_write_data *data = dreq->commit_data; 551 struct rpc_task *task; 552 struct rpc_message msg = { 553 .rpc_argp = &data->args, 554 .rpc_resp = &data->res, 555 .rpc_cred = dreq->ctx->cred, 556 }; 557 struct rpc_task_setup task_setup_data = { 558 .task = &data->task, 559 .rpc_client = NFS_CLIENT(dreq->inode), 560 .rpc_message = &msg, 561 .callback_ops = &nfs_commit_direct_ops, 562 .callback_data = data, 563 .workqueue = nfsiod_workqueue, 564 .flags = RPC_TASK_ASYNC, 565 }; 566 567 data->inode = dreq->inode; 568 data->cred = msg.rpc_cred; 569 570 data->args.fh = NFS_FH(data->inode); 571 data->args.offset = 0; 572 data->args.count = 0; 573 data->args.context = dreq->ctx; 574 data->res.count = 0; 575 data->res.fattr = &data->fattr; 576 data->res.verf = &data->verf; 577 578 NFS_PROTO(data->inode)->commit_setup(data, &msg); 579 580 /* Note: task.tk_ops->rpc_release will free dreq->commit_data */ 581 dreq->commit_data = NULL; 582 583 dprintk("NFS: %5u initiated commit call\n", data->task.tk_pid); 584 585 task = rpc_run_task(&task_setup_data); 586 if (!IS_ERR(task)) 587 rpc_put_task(task); 588 } 589 590 static void nfs_direct_write_complete(struct nfs_direct_req *dreq, struct inode *inode) 591 { 592 int flags = dreq->flags; 593 594 dreq->flags = 0; 595 switch (flags) { 596 case NFS_ODIRECT_DO_COMMIT: 597 nfs_direct_commit_schedule(dreq); 598 break; 599 case NFS_ODIRECT_RESCHED_WRITES: 600 nfs_direct_write_reschedule(dreq); 601 break; 602 default: 603 if (dreq->commit_data != NULL) 604 nfs_commit_free(dreq->commit_data); 605 nfs_direct_free_writedata(dreq); 606 nfs_zap_mapping(inode, inode->i_mapping); 607 nfs_direct_complete(dreq); 608 } 609 } 610 611 static void nfs_alloc_commit_data(struct nfs_direct_req *dreq) 612 { 613 dreq->commit_data = nfs_commitdata_alloc(); 614 if (dreq->commit_data != NULL) 615 dreq->commit_data->req = (struct nfs_page *) dreq; 616 } 617 #else 618 static inline void nfs_alloc_commit_data(struct nfs_direct_req *dreq) 619 { 620 dreq->commit_data = NULL; 621 } 622 623 static void nfs_direct_write_complete(struct nfs_direct_req *dreq, struct inode *inode) 624 { 625 nfs_direct_free_writedata(dreq); 626 nfs_zap_mapping(inode, inode->i_mapping); 627 nfs_direct_complete(dreq); 628 } 629 #endif 630 631 static void nfs_direct_write_result(struct rpc_task *task, void *calldata) 632 { 633 struct nfs_write_data *data = calldata; 634 635 if (nfs_writeback_done(task, data) != 0) 636 return; 637 } 638 639 /* 640 * NB: Return the value of the first error return code. Subsequent 641 * errors after the first one are ignored. 642 */ 643 static void nfs_direct_write_release(void *calldata) 644 { 645 struct nfs_write_data *data = calldata; 646 struct nfs_direct_req *dreq = (struct nfs_direct_req *) data->req; 647 int status = data->task.tk_status; 648 649 spin_lock(&dreq->lock); 650 651 if (unlikely(status < 0)) { 652 /* An error has occurred, so we should not commit */ 653 dreq->flags = 0; 654 dreq->error = status; 655 } 656 if (unlikely(dreq->error != 0)) 657 goto out_unlock; 658 659 dreq->count += data->res.count; 660 661 if (data->res.verf->committed != NFS_FILE_SYNC) { 662 switch (dreq->flags) { 663 case 0: 664 memcpy(&dreq->verf, &data->verf, sizeof(dreq->verf)); 665 dreq->flags = NFS_ODIRECT_DO_COMMIT; 666 break; 667 case NFS_ODIRECT_DO_COMMIT: 668 if (memcmp(&dreq->verf, &data->verf, sizeof(dreq->verf))) { 669 dprintk("NFS: %5u write verify failed\n", data->task.tk_pid); 670 dreq->flags = NFS_ODIRECT_RESCHED_WRITES; 671 } 672 } 673 } 674 out_unlock: 675 spin_unlock(&dreq->lock); 676 677 if (put_dreq(dreq)) 678 nfs_direct_write_complete(dreq, data->inode); 679 } 680 681 static const struct rpc_call_ops nfs_write_direct_ops = { 682 #if defined(CONFIG_NFS_V4_1) 683 .rpc_call_prepare = nfs_write_prepare, 684 #endif /* CONFIG_NFS_V4_1 */ 685 .rpc_call_done = nfs_direct_write_result, 686 .rpc_release = nfs_direct_write_release, 687 }; 688 689 /* 690 * For each wsize'd chunk of the user's buffer, dispatch an NFS WRITE 691 * operation. If nfs_writedata_alloc() or get_user_pages() fails, 692 * bail and stop sending more writes. Write length accounting is 693 * handled automatically by nfs_direct_write_result(). Otherwise, if 694 * no requests have been sent, just return an error. 695 */ 696 static ssize_t nfs_direct_write_schedule_segment(struct nfs_direct_req *dreq, 697 const struct iovec *iov, 698 loff_t pos, int sync) 699 { 700 struct nfs_open_context *ctx = dreq->ctx; 701 struct inode *inode = ctx->path.dentry->d_inode; 702 unsigned long user_addr = (unsigned long)iov->iov_base; 703 size_t count = iov->iov_len; 704 struct rpc_task *task; 705 struct rpc_message msg = { 706 .rpc_cred = ctx->cred, 707 }; 708 struct rpc_task_setup task_setup_data = { 709 .rpc_client = NFS_CLIENT(inode), 710 .rpc_message = &msg, 711 .callback_ops = &nfs_write_direct_ops, 712 .workqueue = nfsiod_workqueue, 713 .flags = RPC_TASK_ASYNC, 714 }; 715 size_t wsize = NFS_SERVER(inode)->wsize; 716 unsigned int pgbase; 717 int result; 718 ssize_t started = 0; 719 720 do { 721 struct nfs_write_data *data; 722 size_t bytes; 723 724 pgbase = user_addr & ~PAGE_MASK; 725 bytes = min(wsize,count); 726 727 result = -ENOMEM; 728 data = nfs_writedata_alloc(nfs_page_array_len(pgbase, bytes)); 729 if (unlikely(!data)) 730 break; 731 732 down_read(¤t->mm->mmap_sem); 733 result = get_user_pages(current, current->mm, user_addr, 734 data->npages, 0, 0, data->pagevec, NULL); 735 up_read(¤t->mm->mmap_sem); 736 if (result < 0) { 737 nfs_writedata_free(data); 738 break; 739 } 740 if ((unsigned)result < data->npages) { 741 bytes = result * PAGE_SIZE; 742 if (bytes <= pgbase) { 743 nfs_direct_release_pages(data->pagevec, result); 744 nfs_writedata_free(data); 745 break; 746 } 747 bytes -= pgbase; 748 data->npages = result; 749 } 750 751 get_dreq(dreq); 752 753 list_move_tail(&data->pages, &dreq->rewrite_list); 754 755 data->req = (struct nfs_page *) dreq; 756 data->inode = inode; 757 data->cred = msg.rpc_cred; 758 data->args.fh = NFS_FH(inode); 759 data->args.context = ctx; 760 data->args.offset = pos; 761 data->args.pgbase = pgbase; 762 data->args.pages = data->pagevec; 763 data->args.count = bytes; 764 data->args.stable = sync; 765 data->res.fattr = &data->fattr; 766 data->res.count = bytes; 767 data->res.verf = &data->verf; 768 769 task_setup_data.task = &data->task; 770 task_setup_data.callback_data = data; 771 msg.rpc_argp = &data->args; 772 msg.rpc_resp = &data->res; 773 NFS_PROTO(inode)->write_setup(data, &msg); 774 775 task = rpc_run_task(&task_setup_data); 776 if (IS_ERR(task)) 777 break; 778 rpc_put_task(task); 779 780 dprintk("NFS: %5u initiated direct write call " 781 "(req %s/%Ld, %zu bytes @ offset %Lu)\n", 782 data->task.tk_pid, 783 inode->i_sb->s_id, 784 (long long)NFS_FILEID(inode), 785 bytes, 786 (unsigned long long)data->args.offset); 787 788 started += bytes; 789 user_addr += bytes; 790 pos += bytes; 791 792 /* FIXME: Remove this useless math from the final patch */ 793 pgbase += bytes; 794 pgbase &= ~PAGE_MASK; 795 BUG_ON(pgbase != (user_addr & ~PAGE_MASK)); 796 797 count -= bytes; 798 } while (count != 0); 799 800 if (started) 801 return started; 802 return result < 0 ? (ssize_t) result : -EFAULT; 803 } 804 805 static ssize_t nfs_direct_write_schedule_iovec(struct nfs_direct_req *dreq, 806 const struct iovec *iov, 807 unsigned long nr_segs, 808 loff_t pos, int sync) 809 { 810 ssize_t result = 0; 811 size_t requested_bytes = 0; 812 unsigned long seg; 813 814 get_dreq(dreq); 815 816 for (seg = 0; seg < nr_segs; seg++) { 817 const struct iovec *vec = &iov[seg]; 818 result = nfs_direct_write_schedule_segment(dreq, vec, 819 pos, sync); 820 if (result < 0) 821 break; 822 requested_bytes += result; 823 if ((size_t)result < vec->iov_len) 824 break; 825 pos += vec->iov_len; 826 } 827 828 if (put_dreq(dreq)) 829 nfs_direct_write_complete(dreq, dreq->inode); 830 831 if (requested_bytes != 0) 832 return 0; 833 834 if (result < 0) 835 return result; 836 return -EIO; 837 } 838 839 static ssize_t nfs_direct_write(struct kiocb *iocb, const struct iovec *iov, 840 unsigned long nr_segs, loff_t pos, 841 size_t count) 842 { 843 ssize_t result = 0; 844 struct inode *inode = iocb->ki_filp->f_mapping->host; 845 struct nfs_direct_req *dreq; 846 size_t wsize = NFS_SERVER(inode)->wsize; 847 int sync = NFS_UNSTABLE; 848 849 dreq = nfs_direct_req_alloc(); 850 if (!dreq) 851 return -ENOMEM; 852 nfs_alloc_commit_data(dreq); 853 854 if (dreq->commit_data == NULL || count < wsize) 855 sync = NFS_FILE_SYNC; 856 857 dreq->inode = inode; 858 dreq->ctx = get_nfs_open_context(nfs_file_open_context(iocb->ki_filp)); 859 if (!is_sync_kiocb(iocb)) 860 dreq->iocb = iocb; 861 862 result = nfs_direct_write_schedule_iovec(dreq, iov, nr_segs, pos, sync); 863 if (!result) 864 result = nfs_direct_wait(dreq); 865 nfs_direct_req_release(dreq); 866 867 return result; 868 } 869 870 /** 871 * nfs_file_direct_read - file direct read operation for NFS files 872 * @iocb: target I/O control block 873 * @iov: vector of user buffers into which to read data 874 * @nr_segs: size of iov vector 875 * @pos: byte offset in file where reading starts 876 * 877 * We use this function for direct reads instead of calling 878 * generic_file_aio_read() in order to avoid gfar's check to see if 879 * the request starts before the end of the file. For that check 880 * to work, we must generate a GETATTR before each direct read, and 881 * even then there is a window between the GETATTR and the subsequent 882 * READ where the file size could change. Our preference is simply 883 * to do all reads the application wants, and the server will take 884 * care of managing the end of file boundary. 885 * 886 * This function also eliminates unnecessarily updating the file's 887 * atime locally, as the NFS server sets the file's atime, and this 888 * client must read the updated atime from the server back into its 889 * cache. 890 */ 891 ssize_t nfs_file_direct_read(struct kiocb *iocb, const struct iovec *iov, 892 unsigned long nr_segs, loff_t pos) 893 { 894 ssize_t retval = -EINVAL; 895 struct file *file = iocb->ki_filp; 896 struct address_space *mapping = file->f_mapping; 897 size_t count; 898 899 count = iov_length(iov, nr_segs); 900 nfs_add_stats(mapping->host, NFSIOS_DIRECTREADBYTES, count); 901 902 dfprintk(FILE, "NFS: direct read(%s/%s, %zd@%Ld)\n", 903 file->f_path.dentry->d_parent->d_name.name, 904 file->f_path.dentry->d_name.name, 905 count, (long long) pos); 906 907 retval = 0; 908 if (!count) 909 goto out; 910 911 retval = nfs_sync_mapping(mapping); 912 if (retval) 913 goto out; 914 915 retval = nfs_direct_read(iocb, iov, nr_segs, pos); 916 if (retval > 0) 917 iocb->ki_pos = pos + retval; 918 919 out: 920 return retval; 921 } 922 923 /** 924 * nfs_file_direct_write - file direct write operation for NFS files 925 * @iocb: target I/O control block 926 * @iov: vector of user buffers from which to write data 927 * @nr_segs: size of iov vector 928 * @pos: byte offset in file where writing starts 929 * 930 * We use this function for direct writes instead of calling 931 * generic_file_aio_write() in order to avoid taking the inode 932 * semaphore and updating the i_size. The NFS server will set 933 * the new i_size and this client must read the updated size 934 * back into its cache. We let the server do generic write 935 * parameter checking and report problems. 936 * 937 * We eliminate local atime updates, see direct read above. 938 * 939 * We avoid unnecessary page cache invalidations for normal cached 940 * readers of this file. 941 * 942 * Note that O_APPEND is not supported for NFS direct writes, as there 943 * is no atomic O_APPEND write facility in the NFS protocol. 944 */ 945 ssize_t nfs_file_direct_write(struct kiocb *iocb, const struct iovec *iov, 946 unsigned long nr_segs, loff_t pos) 947 { 948 ssize_t retval = -EINVAL; 949 struct file *file = iocb->ki_filp; 950 struct address_space *mapping = file->f_mapping; 951 size_t count; 952 953 count = iov_length(iov, nr_segs); 954 nfs_add_stats(mapping->host, NFSIOS_DIRECTWRITTENBYTES, count); 955 956 dfprintk(FILE, "NFS: direct write(%s/%s, %zd@%Ld)\n", 957 file->f_path.dentry->d_parent->d_name.name, 958 file->f_path.dentry->d_name.name, 959 count, (long long) pos); 960 961 retval = generic_write_checks(file, &pos, &count, 0); 962 if (retval) 963 goto out; 964 965 retval = -EINVAL; 966 if ((ssize_t) count < 0) 967 goto out; 968 retval = 0; 969 if (!count) 970 goto out; 971 972 retval = nfs_sync_mapping(mapping); 973 if (retval) 974 goto out; 975 976 retval = nfs_direct_write(iocb, iov, nr_segs, pos, count); 977 978 if (retval > 0) 979 iocb->ki_pos = pos + retval; 980 981 out: 982 return retval; 983 } 984 985 /** 986 * nfs_init_directcache - create a slab cache for nfs_direct_req structures 987 * 988 */ 989 int __init nfs_init_directcache(void) 990 { 991 nfs_direct_cachep = kmem_cache_create("nfs_direct_cache", 992 sizeof(struct nfs_direct_req), 993 0, (SLAB_RECLAIM_ACCOUNT| 994 SLAB_MEM_SPREAD), 995 NULL); 996 if (nfs_direct_cachep == NULL) 997 return -ENOMEM; 998 999 return 0; 1000 } 1001 1002 /** 1003 * nfs_destroy_directcache - destroy the slab cache for nfs_direct_req structures 1004 * 1005 */ 1006 void nfs_destroy_directcache(void) 1007 { 1008 kmem_cache_destroy(nfs_direct_cachep); 1009 } 1010