1 /* 2 * linux/fs/nfs/direct.c 3 * 4 * Copyright (C) 2003 by Chuck Lever <cel@netapp.com> 5 * 6 * High-performance uncached I/O for the Linux NFS client 7 * 8 * There are important applications whose performance or correctness 9 * depends on uncached access to file data. Database clusters 10 * (multiple copies of the same instance running on separate hosts) 11 * implement their own cache coherency protocol that subsumes file 12 * system cache protocols. Applications that process datasets 13 * considerably larger than the client's memory do not always benefit 14 * from a local cache. A streaming video server, for instance, has no 15 * need to cache the contents of a file. 16 * 17 * When an application requests uncached I/O, all read and write requests 18 * are made directly to the server; data stored or fetched via these 19 * requests is not cached in the Linux page cache. The client does not 20 * correct unaligned requests from applications. All requested bytes are 21 * held on permanent storage before a direct write system call returns to 22 * an application. 23 * 24 * Solaris implements an uncached I/O facility called directio() that 25 * is used for backups and sequential I/O to very large files. Solaris 26 * also supports uncaching whole NFS partitions with "-o forcedirectio," 27 * an undocumented mount option. 28 * 29 * Designed by Jeff Kimmel, Chuck Lever, and Trond Myklebust, with 30 * help from Andrew Morton. 31 * 32 * 18 Dec 2001 Initial implementation for 2.4 --cel 33 * 08 Jul 2002 Version for 2.4.19, with bug fixes --trondmy 34 * 08 Jun 2003 Port to 2.5 APIs --cel 35 * 31 Mar 2004 Handle direct I/O without VFS support --cel 36 * 15 Sep 2004 Parallel async reads --cel 37 * 04 May 2005 support O_DIRECT with aio --cel 38 * 39 */ 40 41 #include <linux/errno.h> 42 #include <linux/sched.h> 43 #include <linux/kernel.h> 44 #include <linux/file.h> 45 #include <linux/pagemap.h> 46 #include <linux/kref.h> 47 #include <linux/slab.h> 48 #include <linux/task_io_accounting_ops.h> 49 50 #include <linux/nfs_fs.h> 51 #include <linux/nfs_page.h> 52 #include <linux/sunrpc/clnt.h> 53 54 #include <asm/system.h> 55 #include <asm/uaccess.h> 56 #include <linux/atomic.h> 57 58 #include "internal.h" 59 #include "iostat.h" 60 61 #define NFSDBG_FACILITY NFSDBG_VFS 62 63 static struct kmem_cache *nfs_direct_cachep; 64 65 /* 66 * This represents a set of asynchronous requests that we're waiting on 67 */ 68 struct nfs_direct_req { 69 struct kref kref; /* release manager */ 70 71 /* I/O parameters */ 72 struct nfs_open_context *ctx; /* file open context info */ 73 struct nfs_lock_context *l_ctx; /* Lock context info */ 74 struct kiocb * iocb; /* controlling i/o request */ 75 struct inode * inode; /* target file of i/o */ 76 77 /* completion state */ 78 atomic_t io_count; /* i/os we're waiting for */ 79 spinlock_t lock; /* protect completion state */ 80 ssize_t count, /* bytes actually processed */ 81 error; /* any reported error */ 82 struct completion completion; /* wait for i/o completion */ 83 84 /* commit state */ 85 struct list_head rewrite_list; /* saved nfs_write_data structs */ 86 struct nfs_write_data * commit_data; /* special write_data for commits */ 87 int flags; 88 #define NFS_ODIRECT_DO_COMMIT (1) /* an unstable reply was received */ 89 #define NFS_ODIRECT_RESCHED_WRITES (2) /* write verification failed */ 90 struct nfs_writeverf verf; /* unstable write verifier */ 91 }; 92 93 static void nfs_direct_write_complete(struct nfs_direct_req *dreq, struct inode *inode); 94 static const struct rpc_call_ops nfs_write_direct_ops; 95 96 static inline void get_dreq(struct nfs_direct_req *dreq) 97 { 98 atomic_inc(&dreq->io_count); 99 } 100 101 static inline int put_dreq(struct nfs_direct_req *dreq) 102 { 103 return atomic_dec_and_test(&dreq->io_count); 104 } 105 106 /** 107 * nfs_direct_IO - NFS address space operation for direct I/O 108 * @rw: direction (read or write) 109 * @iocb: target I/O control block 110 * @iov: array of vectors that define I/O buffer 111 * @pos: offset in file to begin the operation 112 * @nr_segs: size of iovec array 113 * 114 * The presence of this routine in the address space ops vector means 115 * the NFS client supports direct I/O. However, we shunt off direct 116 * read and write requests before the VFS gets them, so this method 117 * should never be called. 118 */ 119 ssize_t nfs_direct_IO(int rw, struct kiocb *iocb, const struct iovec *iov, loff_t pos, unsigned long nr_segs) 120 { 121 dprintk("NFS: nfs_direct_IO (%s) off/no(%Ld/%lu) EINVAL\n", 122 iocb->ki_filp->f_path.dentry->d_name.name, 123 (long long) pos, nr_segs); 124 125 return -EINVAL; 126 } 127 128 static void nfs_direct_dirty_pages(struct page **pages, unsigned int pgbase, size_t count) 129 { 130 unsigned int npages; 131 unsigned int i; 132 133 if (count == 0) 134 return; 135 pages += (pgbase >> PAGE_SHIFT); 136 npages = (count + (pgbase & ~PAGE_MASK) + PAGE_SIZE - 1) >> PAGE_SHIFT; 137 for (i = 0; i < npages; i++) { 138 struct page *page = pages[i]; 139 if (!PageCompound(page)) 140 set_page_dirty(page); 141 } 142 } 143 144 static void nfs_direct_release_pages(struct page **pages, unsigned int npages) 145 { 146 unsigned int i; 147 for (i = 0; i < npages; i++) 148 page_cache_release(pages[i]); 149 } 150 151 static inline struct nfs_direct_req *nfs_direct_req_alloc(void) 152 { 153 struct nfs_direct_req *dreq; 154 155 dreq = kmem_cache_alloc(nfs_direct_cachep, GFP_KERNEL); 156 if (!dreq) 157 return NULL; 158 159 kref_init(&dreq->kref); 160 kref_get(&dreq->kref); 161 init_completion(&dreq->completion); 162 INIT_LIST_HEAD(&dreq->rewrite_list); 163 dreq->iocb = NULL; 164 dreq->ctx = NULL; 165 dreq->l_ctx = NULL; 166 spin_lock_init(&dreq->lock); 167 atomic_set(&dreq->io_count, 0); 168 dreq->count = 0; 169 dreq->error = 0; 170 dreq->flags = 0; 171 172 return dreq; 173 } 174 175 static void nfs_direct_req_free(struct kref *kref) 176 { 177 struct nfs_direct_req *dreq = container_of(kref, struct nfs_direct_req, kref); 178 179 if (dreq->l_ctx != NULL) 180 nfs_put_lock_context(dreq->l_ctx); 181 if (dreq->ctx != NULL) 182 put_nfs_open_context(dreq->ctx); 183 kmem_cache_free(nfs_direct_cachep, dreq); 184 } 185 186 static void nfs_direct_req_release(struct nfs_direct_req *dreq) 187 { 188 kref_put(&dreq->kref, nfs_direct_req_free); 189 } 190 191 /* 192 * Collects and returns the final error value/byte-count. 193 */ 194 static ssize_t nfs_direct_wait(struct nfs_direct_req *dreq) 195 { 196 ssize_t result = -EIOCBQUEUED; 197 198 /* Async requests don't wait here */ 199 if (dreq->iocb) 200 goto out; 201 202 result = wait_for_completion_killable(&dreq->completion); 203 204 if (!result) 205 result = dreq->error; 206 if (!result) 207 result = dreq->count; 208 209 out: 210 return (ssize_t) result; 211 } 212 213 /* 214 * Synchronous I/O uses a stack-allocated iocb. Thus we can't trust 215 * the iocb is still valid here if this is a synchronous request. 216 */ 217 static void nfs_direct_complete(struct nfs_direct_req *dreq) 218 { 219 if (dreq->iocb) { 220 long res = (long) dreq->error; 221 if (!res) 222 res = (long) dreq->count; 223 aio_complete(dreq->iocb, res, 0); 224 } 225 complete_all(&dreq->completion); 226 227 nfs_direct_req_release(dreq); 228 } 229 230 /* 231 * We must hold a reference to all the pages in this direct read request 232 * until the RPCs complete. This could be long *after* we are woken up in 233 * nfs_direct_wait (for instance, if someone hits ^C on a slow server). 234 */ 235 static void nfs_direct_read_result(struct rpc_task *task, void *calldata) 236 { 237 struct nfs_read_data *data = calldata; 238 239 nfs_readpage_result(task, data); 240 } 241 242 static void nfs_direct_read_release(void *calldata) 243 { 244 245 struct nfs_read_data *data = calldata; 246 struct nfs_direct_req *dreq = (struct nfs_direct_req *) data->req; 247 int status = data->task.tk_status; 248 249 spin_lock(&dreq->lock); 250 if (unlikely(status < 0)) { 251 dreq->error = status; 252 spin_unlock(&dreq->lock); 253 } else { 254 dreq->count += data->res.count; 255 spin_unlock(&dreq->lock); 256 nfs_direct_dirty_pages(data->pagevec, 257 data->args.pgbase, 258 data->res.count); 259 } 260 nfs_direct_release_pages(data->pagevec, data->npages); 261 262 if (put_dreq(dreq)) 263 nfs_direct_complete(dreq); 264 nfs_readdata_free(data); 265 } 266 267 static const struct rpc_call_ops nfs_read_direct_ops = { 268 #if defined(CONFIG_NFS_V4_1) 269 .rpc_call_prepare = nfs_read_prepare, 270 #endif /* CONFIG_NFS_V4_1 */ 271 .rpc_call_done = nfs_direct_read_result, 272 .rpc_release = nfs_direct_read_release, 273 }; 274 275 /* 276 * For each rsize'd chunk of the user's buffer, dispatch an NFS READ 277 * operation. If nfs_readdata_alloc() or get_user_pages() fails, 278 * bail and stop sending more reads. Read length accounting is 279 * handled automatically by nfs_direct_read_result(). Otherwise, if 280 * no requests have been sent, just return an error. 281 */ 282 static ssize_t nfs_direct_read_schedule_segment(struct nfs_direct_req *dreq, 283 const struct iovec *iov, 284 loff_t pos) 285 { 286 struct nfs_open_context *ctx = dreq->ctx; 287 struct inode *inode = ctx->dentry->d_inode; 288 unsigned long user_addr = (unsigned long)iov->iov_base; 289 size_t count = iov->iov_len; 290 size_t rsize = NFS_SERVER(inode)->rsize; 291 struct rpc_task *task; 292 struct rpc_message msg = { 293 .rpc_cred = ctx->cred, 294 }; 295 struct rpc_task_setup task_setup_data = { 296 .rpc_client = NFS_CLIENT(inode), 297 .rpc_message = &msg, 298 .callback_ops = &nfs_read_direct_ops, 299 .workqueue = nfsiod_workqueue, 300 .flags = RPC_TASK_ASYNC, 301 }; 302 unsigned int pgbase; 303 int result; 304 ssize_t started = 0; 305 306 do { 307 struct nfs_read_data *data; 308 size_t bytes; 309 310 pgbase = user_addr & ~PAGE_MASK; 311 bytes = min(rsize,count); 312 313 result = -ENOMEM; 314 data = nfs_readdata_alloc(nfs_page_array_len(pgbase, bytes)); 315 if (unlikely(!data)) 316 break; 317 318 down_read(¤t->mm->mmap_sem); 319 result = get_user_pages(current, current->mm, user_addr, 320 data->npages, 1, 0, data->pagevec, NULL); 321 up_read(¤t->mm->mmap_sem); 322 if (result < 0) { 323 nfs_readdata_free(data); 324 break; 325 } 326 if ((unsigned)result < data->npages) { 327 bytes = result * PAGE_SIZE; 328 if (bytes <= pgbase) { 329 nfs_direct_release_pages(data->pagevec, result); 330 nfs_readdata_free(data); 331 break; 332 } 333 bytes -= pgbase; 334 data->npages = result; 335 } 336 337 get_dreq(dreq); 338 339 data->req = (struct nfs_page *) dreq; 340 data->inode = inode; 341 data->cred = msg.rpc_cred; 342 data->args.fh = NFS_FH(inode); 343 data->args.context = ctx; 344 data->args.lock_context = dreq->l_ctx; 345 data->args.offset = pos; 346 data->args.pgbase = pgbase; 347 data->args.pages = data->pagevec; 348 data->args.count = bytes; 349 data->res.fattr = &data->fattr; 350 data->res.eof = 0; 351 data->res.count = bytes; 352 nfs_fattr_init(&data->fattr); 353 msg.rpc_argp = &data->args; 354 msg.rpc_resp = &data->res; 355 356 task_setup_data.task = &data->task; 357 task_setup_data.callback_data = data; 358 NFS_PROTO(inode)->read_setup(data, &msg); 359 360 task = rpc_run_task(&task_setup_data); 361 if (IS_ERR(task)) 362 break; 363 rpc_put_task(task); 364 365 dprintk("NFS: %5u initiated direct read call " 366 "(req %s/%Ld, %zu bytes @ offset %Lu)\n", 367 data->task.tk_pid, 368 inode->i_sb->s_id, 369 (long long)NFS_FILEID(inode), 370 bytes, 371 (unsigned long long)data->args.offset); 372 373 started += bytes; 374 user_addr += bytes; 375 pos += bytes; 376 /* FIXME: Remove this unnecessary math from final patch */ 377 pgbase += bytes; 378 pgbase &= ~PAGE_MASK; 379 BUG_ON(pgbase != (user_addr & ~PAGE_MASK)); 380 381 count -= bytes; 382 } while (count != 0); 383 384 if (started) 385 return started; 386 return result < 0 ? (ssize_t) result : -EFAULT; 387 } 388 389 static ssize_t nfs_direct_read_schedule_iovec(struct nfs_direct_req *dreq, 390 const struct iovec *iov, 391 unsigned long nr_segs, 392 loff_t pos) 393 { 394 ssize_t result = -EINVAL; 395 size_t requested_bytes = 0; 396 unsigned long seg; 397 398 get_dreq(dreq); 399 400 for (seg = 0; seg < nr_segs; seg++) { 401 const struct iovec *vec = &iov[seg]; 402 result = nfs_direct_read_schedule_segment(dreq, vec, pos); 403 if (result < 0) 404 break; 405 requested_bytes += result; 406 if ((size_t)result < vec->iov_len) 407 break; 408 pos += vec->iov_len; 409 } 410 411 /* 412 * If no bytes were started, return the error, and let the 413 * generic layer handle the completion. 414 */ 415 if (requested_bytes == 0) { 416 nfs_direct_req_release(dreq); 417 return result < 0 ? result : -EIO; 418 } 419 420 if (put_dreq(dreq)) 421 nfs_direct_complete(dreq); 422 return 0; 423 } 424 425 static ssize_t nfs_direct_read(struct kiocb *iocb, const struct iovec *iov, 426 unsigned long nr_segs, loff_t pos) 427 { 428 ssize_t result = -ENOMEM; 429 struct inode *inode = iocb->ki_filp->f_mapping->host; 430 struct nfs_direct_req *dreq; 431 432 dreq = nfs_direct_req_alloc(); 433 if (dreq == NULL) 434 goto out; 435 436 dreq->inode = inode; 437 dreq->ctx = get_nfs_open_context(nfs_file_open_context(iocb->ki_filp)); 438 dreq->l_ctx = nfs_get_lock_context(dreq->ctx); 439 if (dreq->l_ctx == NULL) 440 goto out_release; 441 if (!is_sync_kiocb(iocb)) 442 dreq->iocb = iocb; 443 444 result = nfs_direct_read_schedule_iovec(dreq, iov, nr_segs, pos); 445 if (!result) 446 result = nfs_direct_wait(dreq); 447 out_release: 448 nfs_direct_req_release(dreq); 449 out: 450 return result; 451 } 452 453 static void nfs_direct_free_writedata(struct nfs_direct_req *dreq) 454 { 455 while (!list_empty(&dreq->rewrite_list)) { 456 struct nfs_write_data *data = list_entry(dreq->rewrite_list.next, struct nfs_write_data, pages); 457 list_del(&data->pages); 458 nfs_direct_release_pages(data->pagevec, data->npages); 459 nfs_writedata_free(data); 460 } 461 } 462 463 #if defined(CONFIG_NFS_V3) || defined(CONFIG_NFS_V4) 464 static void nfs_direct_write_reschedule(struct nfs_direct_req *dreq) 465 { 466 struct inode *inode = dreq->inode; 467 struct list_head *p; 468 struct nfs_write_data *data; 469 struct rpc_task *task; 470 struct rpc_message msg = { 471 .rpc_cred = dreq->ctx->cred, 472 }; 473 struct rpc_task_setup task_setup_data = { 474 .rpc_client = NFS_CLIENT(inode), 475 .rpc_message = &msg, 476 .callback_ops = &nfs_write_direct_ops, 477 .workqueue = nfsiod_workqueue, 478 .flags = RPC_TASK_ASYNC, 479 }; 480 481 dreq->count = 0; 482 get_dreq(dreq); 483 484 list_for_each(p, &dreq->rewrite_list) { 485 data = list_entry(p, struct nfs_write_data, pages); 486 487 get_dreq(dreq); 488 489 /* Use stable writes */ 490 data->args.stable = NFS_FILE_SYNC; 491 492 /* 493 * Reset data->res. 494 */ 495 nfs_fattr_init(&data->fattr); 496 data->res.count = data->args.count; 497 memset(&data->verf, 0, sizeof(data->verf)); 498 499 /* 500 * Reuse data->task; data->args should not have changed 501 * since the original request was sent. 502 */ 503 task_setup_data.task = &data->task; 504 task_setup_data.callback_data = data; 505 msg.rpc_argp = &data->args; 506 msg.rpc_resp = &data->res; 507 NFS_PROTO(inode)->write_setup(data, &msg); 508 509 /* 510 * We're called via an RPC callback, so BKL is already held. 511 */ 512 task = rpc_run_task(&task_setup_data); 513 if (!IS_ERR(task)) 514 rpc_put_task(task); 515 516 dprintk("NFS: %5u rescheduled direct write call (req %s/%Ld, %u bytes @ offset %Lu)\n", 517 data->task.tk_pid, 518 inode->i_sb->s_id, 519 (long long)NFS_FILEID(inode), 520 data->args.count, 521 (unsigned long long)data->args.offset); 522 } 523 524 if (put_dreq(dreq)) 525 nfs_direct_write_complete(dreq, inode); 526 } 527 528 static void nfs_direct_commit_result(struct rpc_task *task, void *calldata) 529 { 530 struct nfs_write_data *data = calldata; 531 532 /* Call the NFS version-specific code */ 533 NFS_PROTO(data->inode)->commit_done(task, data); 534 } 535 536 static void nfs_direct_commit_release(void *calldata) 537 { 538 struct nfs_write_data *data = calldata; 539 struct nfs_direct_req *dreq = (struct nfs_direct_req *) data->req; 540 int status = data->task.tk_status; 541 542 if (status < 0) { 543 dprintk("NFS: %5u commit failed with error %d.\n", 544 data->task.tk_pid, status); 545 dreq->flags = NFS_ODIRECT_RESCHED_WRITES; 546 } else if (memcmp(&dreq->verf, &data->verf, sizeof(data->verf))) { 547 dprintk("NFS: %5u commit verify failed\n", data->task.tk_pid); 548 dreq->flags = NFS_ODIRECT_RESCHED_WRITES; 549 } 550 551 dprintk("NFS: %5u commit returned %d\n", data->task.tk_pid, status); 552 nfs_direct_write_complete(dreq, data->inode); 553 nfs_commit_free(data); 554 } 555 556 static const struct rpc_call_ops nfs_commit_direct_ops = { 557 #if defined(CONFIG_NFS_V4_1) 558 .rpc_call_prepare = nfs_write_prepare, 559 #endif /* CONFIG_NFS_V4_1 */ 560 .rpc_call_done = nfs_direct_commit_result, 561 .rpc_release = nfs_direct_commit_release, 562 }; 563 564 static void nfs_direct_commit_schedule(struct nfs_direct_req *dreq) 565 { 566 struct nfs_write_data *data = dreq->commit_data; 567 struct rpc_task *task; 568 struct rpc_message msg = { 569 .rpc_argp = &data->args, 570 .rpc_resp = &data->res, 571 .rpc_cred = dreq->ctx->cred, 572 }; 573 struct rpc_task_setup task_setup_data = { 574 .task = &data->task, 575 .rpc_client = NFS_CLIENT(dreq->inode), 576 .rpc_message = &msg, 577 .callback_ops = &nfs_commit_direct_ops, 578 .callback_data = data, 579 .workqueue = nfsiod_workqueue, 580 .flags = RPC_TASK_ASYNC, 581 }; 582 583 data->inode = dreq->inode; 584 data->cred = msg.rpc_cred; 585 586 data->args.fh = NFS_FH(data->inode); 587 data->args.offset = 0; 588 data->args.count = 0; 589 data->args.context = dreq->ctx; 590 data->args.lock_context = dreq->l_ctx; 591 data->res.count = 0; 592 data->res.fattr = &data->fattr; 593 data->res.verf = &data->verf; 594 nfs_fattr_init(&data->fattr); 595 596 NFS_PROTO(data->inode)->commit_setup(data, &msg); 597 598 /* Note: task.tk_ops->rpc_release will free dreq->commit_data */ 599 dreq->commit_data = NULL; 600 601 dprintk("NFS: %5u initiated commit call\n", data->task.tk_pid); 602 603 task = rpc_run_task(&task_setup_data); 604 if (!IS_ERR(task)) 605 rpc_put_task(task); 606 } 607 608 static void nfs_direct_write_complete(struct nfs_direct_req *dreq, struct inode *inode) 609 { 610 int flags = dreq->flags; 611 612 dreq->flags = 0; 613 switch (flags) { 614 case NFS_ODIRECT_DO_COMMIT: 615 nfs_direct_commit_schedule(dreq); 616 break; 617 case NFS_ODIRECT_RESCHED_WRITES: 618 nfs_direct_write_reschedule(dreq); 619 break; 620 default: 621 if (dreq->commit_data != NULL) 622 nfs_commit_free(dreq->commit_data); 623 nfs_direct_free_writedata(dreq); 624 nfs_zap_mapping(inode, inode->i_mapping); 625 nfs_direct_complete(dreq); 626 } 627 } 628 629 static void nfs_alloc_commit_data(struct nfs_direct_req *dreq) 630 { 631 dreq->commit_data = nfs_commitdata_alloc(); 632 if (dreq->commit_data != NULL) 633 dreq->commit_data->req = (struct nfs_page *) dreq; 634 } 635 #else 636 static inline void nfs_alloc_commit_data(struct nfs_direct_req *dreq) 637 { 638 dreq->commit_data = NULL; 639 } 640 641 static void nfs_direct_write_complete(struct nfs_direct_req *dreq, struct inode *inode) 642 { 643 nfs_direct_free_writedata(dreq); 644 nfs_zap_mapping(inode, inode->i_mapping); 645 nfs_direct_complete(dreq); 646 } 647 #endif 648 649 static void nfs_direct_write_result(struct rpc_task *task, void *calldata) 650 { 651 struct nfs_write_data *data = calldata; 652 653 nfs_writeback_done(task, data); 654 } 655 656 /* 657 * NB: Return the value of the first error return code. Subsequent 658 * errors after the first one are ignored. 659 */ 660 static void nfs_direct_write_release(void *calldata) 661 { 662 struct nfs_write_data *data = calldata; 663 struct nfs_direct_req *dreq = (struct nfs_direct_req *) data->req; 664 int status = data->task.tk_status; 665 666 spin_lock(&dreq->lock); 667 668 if (unlikely(status < 0)) { 669 /* An error has occurred, so we should not commit */ 670 dreq->flags = 0; 671 dreq->error = status; 672 } 673 if (unlikely(dreq->error != 0)) 674 goto out_unlock; 675 676 dreq->count += data->res.count; 677 678 if (data->res.verf->committed != NFS_FILE_SYNC) { 679 switch (dreq->flags) { 680 case 0: 681 memcpy(&dreq->verf, &data->verf, sizeof(dreq->verf)); 682 dreq->flags = NFS_ODIRECT_DO_COMMIT; 683 break; 684 case NFS_ODIRECT_DO_COMMIT: 685 if (memcmp(&dreq->verf, &data->verf, sizeof(dreq->verf))) { 686 dprintk("NFS: %5u write verify failed\n", data->task.tk_pid); 687 dreq->flags = NFS_ODIRECT_RESCHED_WRITES; 688 } 689 } 690 } 691 out_unlock: 692 spin_unlock(&dreq->lock); 693 694 if (put_dreq(dreq)) 695 nfs_direct_write_complete(dreq, data->inode); 696 } 697 698 static const struct rpc_call_ops nfs_write_direct_ops = { 699 #if defined(CONFIG_NFS_V4_1) 700 .rpc_call_prepare = nfs_write_prepare, 701 #endif /* CONFIG_NFS_V4_1 */ 702 .rpc_call_done = nfs_direct_write_result, 703 .rpc_release = nfs_direct_write_release, 704 }; 705 706 /* 707 * For each wsize'd chunk of the user's buffer, dispatch an NFS WRITE 708 * operation. If nfs_writedata_alloc() or get_user_pages() fails, 709 * bail and stop sending more writes. Write length accounting is 710 * handled automatically by nfs_direct_write_result(). Otherwise, if 711 * no requests have been sent, just return an error. 712 */ 713 static ssize_t nfs_direct_write_schedule_segment(struct nfs_direct_req *dreq, 714 const struct iovec *iov, 715 loff_t pos, int sync) 716 { 717 struct nfs_open_context *ctx = dreq->ctx; 718 struct inode *inode = ctx->dentry->d_inode; 719 unsigned long user_addr = (unsigned long)iov->iov_base; 720 size_t count = iov->iov_len; 721 struct rpc_task *task; 722 struct rpc_message msg = { 723 .rpc_cred = ctx->cred, 724 }; 725 struct rpc_task_setup task_setup_data = { 726 .rpc_client = NFS_CLIENT(inode), 727 .rpc_message = &msg, 728 .callback_ops = &nfs_write_direct_ops, 729 .workqueue = nfsiod_workqueue, 730 .flags = RPC_TASK_ASYNC, 731 }; 732 size_t wsize = NFS_SERVER(inode)->wsize; 733 unsigned int pgbase; 734 int result; 735 ssize_t started = 0; 736 737 do { 738 struct nfs_write_data *data; 739 size_t bytes; 740 741 pgbase = user_addr & ~PAGE_MASK; 742 bytes = min(wsize,count); 743 744 result = -ENOMEM; 745 data = nfs_writedata_alloc(nfs_page_array_len(pgbase, bytes)); 746 if (unlikely(!data)) 747 break; 748 749 down_read(¤t->mm->mmap_sem); 750 result = get_user_pages(current, current->mm, user_addr, 751 data->npages, 0, 0, data->pagevec, NULL); 752 up_read(¤t->mm->mmap_sem); 753 if (result < 0) { 754 nfs_writedata_free(data); 755 break; 756 } 757 if ((unsigned)result < data->npages) { 758 bytes = result * PAGE_SIZE; 759 if (bytes <= pgbase) { 760 nfs_direct_release_pages(data->pagevec, result); 761 nfs_writedata_free(data); 762 break; 763 } 764 bytes -= pgbase; 765 data->npages = result; 766 } 767 768 get_dreq(dreq); 769 770 list_move_tail(&data->pages, &dreq->rewrite_list); 771 772 data->req = (struct nfs_page *) dreq; 773 data->inode = inode; 774 data->cred = msg.rpc_cred; 775 data->args.fh = NFS_FH(inode); 776 data->args.context = ctx; 777 data->args.lock_context = dreq->l_ctx; 778 data->args.offset = pos; 779 data->args.pgbase = pgbase; 780 data->args.pages = data->pagevec; 781 data->args.count = bytes; 782 data->args.stable = sync; 783 data->res.fattr = &data->fattr; 784 data->res.count = bytes; 785 data->res.verf = &data->verf; 786 nfs_fattr_init(&data->fattr); 787 788 task_setup_data.task = &data->task; 789 task_setup_data.callback_data = data; 790 msg.rpc_argp = &data->args; 791 msg.rpc_resp = &data->res; 792 NFS_PROTO(inode)->write_setup(data, &msg); 793 794 task = rpc_run_task(&task_setup_data); 795 if (IS_ERR(task)) 796 break; 797 rpc_put_task(task); 798 799 dprintk("NFS: %5u initiated direct write call " 800 "(req %s/%Ld, %zu bytes @ offset %Lu)\n", 801 data->task.tk_pid, 802 inode->i_sb->s_id, 803 (long long)NFS_FILEID(inode), 804 bytes, 805 (unsigned long long)data->args.offset); 806 807 started += bytes; 808 user_addr += bytes; 809 pos += bytes; 810 811 /* FIXME: Remove this useless math from the final patch */ 812 pgbase += bytes; 813 pgbase &= ~PAGE_MASK; 814 BUG_ON(pgbase != (user_addr & ~PAGE_MASK)); 815 816 count -= bytes; 817 } while (count != 0); 818 819 if (started) 820 return started; 821 return result < 0 ? (ssize_t) result : -EFAULT; 822 } 823 824 static ssize_t nfs_direct_write_schedule_iovec(struct nfs_direct_req *dreq, 825 const struct iovec *iov, 826 unsigned long nr_segs, 827 loff_t pos, int sync) 828 { 829 ssize_t result = 0; 830 size_t requested_bytes = 0; 831 unsigned long seg; 832 833 get_dreq(dreq); 834 835 for (seg = 0; seg < nr_segs; seg++) { 836 const struct iovec *vec = &iov[seg]; 837 result = nfs_direct_write_schedule_segment(dreq, vec, 838 pos, sync); 839 if (result < 0) 840 break; 841 requested_bytes += result; 842 if ((size_t)result < vec->iov_len) 843 break; 844 pos += vec->iov_len; 845 } 846 847 /* 848 * If no bytes were started, return the error, and let the 849 * generic layer handle the completion. 850 */ 851 if (requested_bytes == 0) { 852 nfs_direct_req_release(dreq); 853 return result < 0 ? result : -EIO; 854 } 855 856 if (put_dreq(dreq)) 857 nfs_direct_write_complete(dreq, dreq->inode); 858 return 0; 859 } 860 861 static ssize_t nfs_direct_write(struct kiocb *iocb, const struct iovec *iov, 862 unsigned long nr_segs, loff_t pos, 863 size_t count) 864 { 865 ssize_t result = -ENOMEM; 866 struct inode *inode = iocb->ki_filp->f_mapping->host; 867 struct nfs_direct_req *dreq; 868 size_t wsize = NFS_SERVER(inode)->wsize; 869 int sync = NFS_UNSTABLE; 870 871 dreq = nfs_direct_req_alloc(); 872 if (!dreq) 873 goto out; 874 nfs_alloc_commit_data(dreq); 875 876 if (dreq->commit_data == NULL || count <= wsize) 877 sync = NFS_FILE_SYNC; 878 879 dreq->inode = inode; 880 dreq->ctx = get_nfs_open_context(nfs_file_open_context(iocb->ki_filp)); 881 dreq->l_ctx = nfs_get_lock_context(dreq->ctx); 882 if (dreq->l_ctx == NULL) 883 goto out_release; 884 if (!is_sync_kiocb(iocb)) 885 dreq->iocb = iocb; 886 887 result = nfs_direct_write_schedule_iovec(dreq, iov, nr_segs, pos, sync); 888 if (!result) 889 result = nfs_direct_wait(dreq); 890 out_release: 891 nfs_direct_req_release(dreq); 892 out: 893 return result; 894 } 895 896 /** 897 * nfs_file_direct_read - file direct read operation for NFS files 898 * @iocb: target I/O control block 899 * @iov: vector of user buffers into which to read data 900 * @nr_segs: size of iov vector 901 * @pos: byte offset in file where reading starts 902 * 903 * We use this function for direct reads instead of calling 904 * generic_file_aio_read() in order to avoid gfar's check to see if 905 * the request starts before the end of the file. For that check 906 * to work, we must generate a GETATTR before each direct read, and 907 * even then there is a window between the GETATTR and the subsequent 908 * READ where the file size could change. Our preference is simply 909 * to do all reads the application wants, and the server will take 910 * care of managing the end of file boundary. 911 * 912 * This function also eliminates unnecessarily updating the file's 913 * atime locally, as the NFS server sets the file's atime, and this 914 * client must read the updated atime from the server back into its 915 * cache. 916 */ 917 ssize_t nfs_file_direct_read(struct kiocb *iocb, const struct iovec *iov, 918 unsigned long nr_segs, loff_t pos) 919 { 920 ssize_t retval = -EINVAL; 921 struct file *file = iocb->ki_filp; 922 struct address_space *mapping = file->f_mapping; 923 size_t count; 924 925 count = iov_length(iov, nr_segs); 926 nfs_add_stats(mapping->host, NFSIOS_DIRECTREADBYTES, count); 927 928 dfprintk(FILE, "NFS: direct read(%s/%s, %zd@%Ld)\n", 929 file->f_path.dentry->d_parent->d_name.name, 930 file->f_path.dentry->d_name.name, 931 count, (long long) pos); 932 933 retval = 0; 934 if (!count) 935 goto out; 936 937 retval = nfs_sync_mapping(mapping); 938 if (retval) 939 goto out; 940 941 task_io_account_read(count); 942 943 retval = nfs_direct_read(iocb, iov, nr_segs, pos); 944 if (retval > 0) 945 iocb->ki_pos = pos + retval; 946 947 out: 948 return retval; 949 } 950 951 /** 952 * nfs_file_direct_write - file direct write operation for NFS files 953 * @iocb: target I/O control block 954 * @iov: vector of user buffers from which to write data 955 * @nr_segs: size of iov vector 956 * @pos: byte offset in file where writing starts 957 * 958 * We use this function for direct writes instead of calling 959 * generic_file_aio_write() in order to avoid taking the inode 960 * semaphore and updating the i_size. The NFS server will set 961 * the new i_size and this client must read the updated size 962 * back into its cache. We let the server do generic write 963 * parameter checking and report problems. 964 * 965 * We eliminate local atime updates, see direct read above. 966 * 967 * We avoid unnecessary page cache invalidations for normal cached 968 * readers of this file. 969 * 970 * Note that O_APPEND is not supported for NFS direct writes, as there 971 * is no atomic O_APPEND write facility in the NFS protocol. 972 */ 973 ssize_t nfs_file_direct_write(struct kiocb *iocb, const struct iovec *iov, 974 unsigned long nr_segs, loff_t pos) 975 { 976 ssize_t retval = -EINVAL; 977 struct file *file = iocb->ki_filp; 978 struct address_space *mapping = file->f_mapping; 979 size_t count; 980 981 count = iov_length(iov, nr_segs); 982 nfs_add_stats(mapping->host, NFSIOS_DIRECTWRITTENBYTES, count); 983 984 dfprintk(FILE, "NFS: direct write(%s/%s, %zd@%Ld)\n", 985 file->f_path.dentry->d_parent->d_name.name, 986 file->f_path.dentry->d_name.name, 987 count, (long long) pos); 988 989 retval = generic_write_checks(file, &pos, &count, 0); 990 if (retval) 991 goto out; 992 993 retval = -EINVAL; 994 if ((ssize_t) count < 0) 995 goto out; 996 retval = 0; 997 if (!count) 998 goto out; 999 1000 retval = nfs_sync_mapping(mapping); 1001 if (retval) 1002 goto out; 1003 1004 task_io_account_write(count); 1005 1006 retval = nfs_direct_write(iocb, iov, nr_segs, pos, count); 1007 1008 if (retval > 0) 1009 iocb->ki_pos = pos + retval; 1010 1011 out: 1012 return retval; 1013 } 1014 1015 /** 1016 * nfs_init_directcache - create a slab cache for nfs_direct_req structures 1017 * 1018 */ 1019 int __init nfs_init_directcache(void) 1020 { 1021 nfs_direct_cachep = kmem_cache_create("nfs_direct_cache", 1022 sizeof(struct nfs_direct_req), 1023 0, (SLAB_RECLAIM_ACCOUNT| 1024 SLAB_MEM_SPREAD), 1025 NULL); 1026 if (nfs_direct_cachep == NULL) 1027 return -ENOMEM; 1028 1029 return 0; 1030 } 1031 1032 /** 1033 * nfs_destroy_directcache - destroy the slab cache for nfs_direct_req structures 1034 * 1035 */ 1036 void nfs_destroy_directcache(void) 1037 { 1038 kmem_cache_destroy(nfs_direct_cachep); 1039 } 1040