1 /* 2 * linux/fs/nfs/direct.c 3 * 4 * Copyright (C) 2003 by Chuck Lever <cel@netapp.com> 5 * 6 * High-performance uncached I/O for the Linux NFS client 7 * 8 * There are important applications whose performance or correctness 9 * depends on uncached access to file data. Database clusters 10 * (multiple copies of the same instance running on separate hosts) 11 * implement their own cache coherency protocol that subsumes file 12 * system cache protocols. Applications that process datasets 13 * considerably larger than the client's memory do not always benefit 14 * from a local cache. A streaming video server, for instance, has no 15 * need to cache the contents of a file. 16 * 17 * When an application requests uncached I/O, all read and write requests 18 * are made directly to the server; data stored or fetched via these 19 * requests is not cached in the Linux page cache. The client does not 20 * correct unaligned requests from applications. All requested bytes are 21 * held on permanent storage before a direct write system call returns to 22 * an application. 23 * 24 * Solaris implements an uncached I/O facility called directio() that 25 * is used for backups and sequential I/O to very large files. Solaris 26 * also supports uncaching whole NFS partitions with "-o forcedirectio," 27 * an undocumented mount option. 28 * 29 * Designed by Jeff Kimmel, Chuck Lever, and Trond Myklebust, with 30 * help from Andrew Morton. 31 * 32 * 18 Dec 2001 Initial implementation for 2.4 --cel 33 * 08 Jul 2002 Version for 2.4.19, with bug fixes --trondmy 34 * 08 Jun 2003 Port to 2.5 APIs --cel 35 * 31 Mar 2004 Handle direct I/O without VFS support --cel 36 * 15 Sep 2004 Parallel async reads --cel 37 * 04 May 2005 support O_DIRECT with aio --cel 38 * 39 */ 40 41 #include <linux/errno.h> 42 #include <linux/sched.h> 43 #include <linux/kernel.h> 44 #include <linux/file.h> 45 #include <linux/pagemap.h> 46 #include <linux/kref.h> 47 #include <linux/slab.h> 48 #include <linux/task_io_accounting_ops.h> 49 50 #include <linux/nfs_fs.h> 51 #include <linux/nfs_page.h> 52 #include <linux/sunrpc/clnt.h> 53 54 #include <asm/uaccess.h> 55 #include <linux/atomic.h> 56 57 #include "internal.h" 58 #include "iostat.h" 59 60 #define NFSDBG_FACILITY NFSDBG_VFS 61 62 static struct kmem_cache *nfs_direct_cachep; 63 64 /* 65 * This represents a set of asynchronous requests that we're waiting on 66 */ 67 struct nfs_direct_req { 68 struct kref kref; /* release manager */ 69 70 /* I/O parameters */ 71 struct nfs_open_context *ctx; /* file open context info */ 72 struct nfs_lock_context *l_ctx; /* Lock context info */ 73 struct kiocb * iocb; /* controlling i/o request */ 74 struct inode * inode; /* target file of i/o */ 75 76 /* completion state */ 77 atomic_t io_count; /* i/os we're waiting for */ 78 spinlock_t lock; /* protect completion state */ 79 ssize_t count, /* bytes actually processed */ 80 error; /* any reported error */ 81 struct completion completion; /* wait for i/o completion */ 82 83 /* commit state */ 84 struct list_head rewrite_list; /* saved nfs_write_data structs */ 85 struct nfs_write_data * commit_data; /* special write_data for commits */ 86 int flags; 87 #define NFS_ODIRECT_DO_COMMIT (1) /* an unstable reply was received */ 88 #define NFS_ODIRECT_RESCHED_WRITES (2) /* write verification failed */ 89 struct nfs_writeverf verf; /* unstable write verifier */ 90 }; 91 92 static void nfs_direct_write_complete(struct nfs_direct_req *dreq, struct inode *inode); 93 static const struct rpc_call_ops nfs_write_direct_ops; 94 95 static inline void get_dreq(struct nfs_direct_req *dreq) 96 { 97 atomic_inc(&dreq->io_count); 98 } 99 100 static inline int put_dreq(struct nfs_direct_req *dreq) 101 { 102 return atomic_dec_and_test(&dreq->io_count); 103 } 104 105 /** 106 * nfs_direct_IO - NFS address space operation for direct I/O 107 * @rw: direction (read or write) 108 * @iocb: target I/O control block 109 * @iov: array of vectors that define I/O buffer 110 * @pos: offset in file to begin the operation 111 * @nr_segs: size of iovec array 112 * 113 * The presence of this routine in the address space ops vector means 114 * the NFS client supports direct I/O. However, we shunt off direct 115 * read and write requests before the VFS gets them, so this method 116 * should never be called. 117 */ 118 ssize_t nfs_direct_IO(int rw, struct kiocb *iocb, const struct iovec *iov, loff_t pos, unsigned long nr_segs) 119 { 120 dprintk("NFS: nfs_direct_IO (%s) off/no(%Ld/%lu) EINVAL\n", 121 iocb->ki_filp->f_path.dentry->d_name.name, 122 (long long) pos, nr_segs); 123 124 return -EINVAL; 125 } 126 127 static void nfs_direct_dirty_pages(struct page **pages, unsigned int pgbase, size_t count) 128 { 129 unsigned int npages; 130 unsigned int i; 131 132 if (count == 0) 133 return; 134 pages += (pgbase >> PAGE_SHIFT); 135 npages = (count + (pgbase & ~PAGE_MASK) + PAGE_SIZE - 1) >> PAGE_SHIFT; 136 for (i = 0; i < npages; i++) { 137 struct page *page = pages[i]; 138 if (!PageCompound(page)) 139 set_page_dirty(page); 140 } 141 } 142 143 static void nfs_direct_release_pages(struct page **pages, unsigned int npages) 144 { 145 unsigned int i; 146 for (i = 0; i < npages; i++) 147 page_cache_release(pages[i]); 148 } 149 150 static inline struct nfs_direct_req *nfs_direct_req_alloc(void) 151 { 152 struct nfs_direct_req *dreq; 153 154 dreq = kmem_cache_alloc(nfs_direct_cachep, GFP_KERNEL); 155 if (!dreq) 156 return NULL; 157 158 kref_init(&dreq->kref); 159 kref_get(&dreq->kref); 160 init_completion(&dreq->completion); 161 INIT_LIST_HEAD(&dreq->rewrite_list); 162 dreq->iocb = NULL; 163 dreq->ctx = NULL; 164 dreq->l_ctx = NULL; 165 spin_lock_init(&dreq->lock); 166 atomic_set(&dreq->io_count, 0); 167 dreq->count = 0; 168 dreq->error = 0; 169 dreq->flags = 0; 170 171 return dreq; 172 } 173 174 static void nfs_direct_req_free(struct kref *kref) 175 { 176 struct nfs_direct_req *dreq = container_of(kref, struct nfs_direct_req, kref); 177 178 if (dreq->l_ctx != NULL) 179 nfs_put_lock_context(dreq->l_ctx); 180 if (dreq->ctx != NULL) 181 put_nfs_open_context(dreq->ctx); 182 kmem_cache_free(nfs_direct_cachep, dreq); 183 } 184 185 static void nfs_direct_req_release(struct nfs_direct_req *dreq) 186 { 187 kref_put(&dreq->kref, nfs_direct_req_free); 188 } 189 190 /* 191 * Collects and returns the final error value/byte-count. 192 */ 193 static ssize_t nfs_direct_wait(struct nfs_direct_req *dreq) 194 { 195 ssize_t result = -EIOCBQUEUED; 196 197 /* Async requests don't wait here */ 198 if (dreq->iocb) 199 goto out; 200 201 result = wait_for_completion_killable(&dreq->completion); 202 203 if (!result) 204 result = dreq->error; 205 if (!result) 206 result = dreq->count; 207 208 out: 209 return (ssize_t) result; 210 } 211 212 /* 213 * Synchronous I/O uses a stack-allocated iocb. Thus we can't trust 214 * the iocb is still valid here if this is a synchronous request. 215 */ 216 static void nfs_direct_complete(struct nfs_direct_req *dreq) 217 { 218 if (dreq->iocb) { 219 long res = (long) dreq->error; 220 if (!res) 221 res = (long) dreq->count; 222 aio_complete(dreq->iocb, res, 0); 223 } 224 complete_all(&dreq->completion); 225 226 nfs_direct_req_release(dreq); 227 } 228 229 /* 230 * We must hold a reference to all the pages in this direct read request 231 * until the RPCs complete. This could be long *after* we are woken up in 232 * nfs_direct_wait (for instance, if someone hits ^C on a slow server). 233 */ 234 static void nfs_direct_read_result(struct rpc_task *task, void *calldata) 235 { 236 struct nfs_read_data *data = calldata; 237 238 nfs_readpage_result(task, data); 239 } 240 241 static void nfs_direct_read_release(void *calldata) 242 { 243 244 struct nfs_read_data *data = calldata; 245 struct nfs_direct_req *dreq = (struct nfs_direct_req *) data->req; 246 int status = data->task.tk_status; 247 248 spin_lock(&dreq->lock); 249 if (unlikely(status < 0)) { 250 dreq->error = status; 251 spin_unlock(&dreq->lock); 252 } else { 253 dreq->count += data->res.count; 254 spin_unlock(&dreq->lock); 255 nfs_direct_dirty_pages(data->pagevec, 256 data->args.pgbase, 257 data->res.count); 258 } 259 nfs_direct_release_pages(data->pagevec, data->npages); 260 261 if (put_dreq(dreq)) 262 nfs_direct_complete(dreq); 263 nfs_readdata_free(data); 264 } 265 266 static const struct rpc_call_ops nfs_read_direct_ops = { 267 #if defined(CONFIG_NFS_V4_1) 268 .rpc_call_prepare = nfs_read_prepare, 269 #endif /* CONFIG_NFS_V4_1 */ 270 .rpc_call_done = nfs_direct_read_result, 271 .rpc_release = nfs_direct_read_release, 272 }; 273 274 /* 275 * For each rsize'd chunk of the user's buffer, dispatch an NFS READ 276 * operation. If nfs_readdata_alloc() or get_user_pages() fails, 277 * bail and stop sending more reads. Read length accounting is 278 * handled automatically by nfs_direct_read_result(). Otherwise, if 279 * no requests have been sent, just return an error. 280 */ 281 static ssize_t nfs_direct_read_schedule_segment(struct nfs_direct_req *dreq, 282 const struct iovec *iov, 283 loff_t pos) 284 { 285 struct nfs_open_context *ctx = dreq->ctx; 286 struct inode *inode = ctx->dentry->d_inode; 287 unsigned long user_addr = (unsigned long)iov->iov_base; 288 size_t count = iov->iov_len; 289 size_t rsize = NFS_SERVER(inode)->rsize; 290 struct rpc_task *task; 291 struct rpc_message msg = { 292 .rpc_cred = ctx->cred, 293 }; 294 struct rpc_task_setup task_setup_data = { 295 .rpc_client = NFS_CLIENT(inode), 296 .rpc_message = &msg, 297 .callback_ops = &nfs_read_direct_ops, 298 .workqueue = nfsiod_workqueue, 299 .flags = RPC_TASK_ASYNC, 300 }; 301 unsigned int pgbase; 302 int result; 303 ssize_t started = 0; 304 305 do { 306 struct nfs_read_data *data; 307 size_t bytes; 308 309 pgbase = user_addr & ~PAGE_MASK; 310 bytes = min(rsize,count); 311 312 result = -ENOMEM; 313 data = nfs_readdata_alloc(nfs_page_array_len(pgbase, bytes)); 314 if (unlikely(!data)) 315 break; 316 317 down_read(¤t->mm->mmap_sem); 318 result = get_user_pages(current, current->mm, user_addr, 319 data->npages, 1, 0, data->pagevec, NULL); 320 up_read(¤t->mm->mmap_sem); 321 if (result < 0) { 322 nfs_readdata_free(data); 323 break; 324 } 325 if ((unsigned)result < data->npages) { 326 bytes = result * PAGE_SIZE; 327 if (bytes <= pgbase) { 328 nfs_direct_release_pages(data->pagevec, result); 329 nfs_readdata_free(data); 330 break; 331 } 332 bytes -= pgbase; 333 data->npages = result; 334 } 335 336 get_dreq(dreq); 337 338 data->req = (struct nfs_page *) dreq; 339 data->inode = inode; 340 data->cred = msg.rpc_cred; 341 data->args.fh = NFS_FH(inode); 342 data->args.context = ctx; 343 data->args.lock_context = dreq->l_ctx; 344 data->args.offset = pos; 345 data->args.pgbase = pgbase; 346 data->args.pages = data->pagevec; 347 data->args.count = bytes; 348 data->res.fattr = &data->fattr; 349 data->res.eof = 0; 350 data->res.count = bytes; 351 nfs_fattr_init(&data->fattr); 352 msg.rpc_argp = &data->args; 353 msg.rpc_resp = &data->res; 354 355 task_setup_data.task = &data->task; 356 task_setup_data.callback_data = data; 357 NFS_PROTO(inode)->read_setup(data, &msg); 358 359 task = rpc_run_task(&task_setup_data); 360 if (IS_ERR(task)) 361 break; 362 rpc_put_task(task); 363 364 dprintk("NFS: %5u initiated direct read call " 365 "(req %s/%Ld, %zu bytes @ offset %Lu)\n", 366 data->task.tk_pid, 367 inode->i_sb->s_id, 368 (long long)NFS_FILEID(inode), 369 bytes, 370 (unsigned long long)data->args.offset); 371 372 started += bytes; 373 user_addr += bytes; 374 pos += bytes; 375 /* FIXME: Remove this unnecessary math from final patch */ 376 pgbase += bytes; 377 pgbase &= ~PAGE_MASK; 378 BUG_ON(pgbase != (user_addr & ~PAGE_MASK)); 379 380 count -= bytes; 381 } while (count != 0); 382 383 if (started) 384 return started; 385 return result < 0 ? (ssize_t) result : -EFAULT; 386 } 387 388 static ssize_t nfs_direct_read_schedule_iovec(struct nfs_direct_req *dreq, 389 const struct iovec *iov, 390 unsigned long nr_segs, 391 loff_t pos) 392 { 393 ssize_t result = -EINVAL; 394 size_t requested_bytes = 0; 395 unsigned long seg; 396 397 get_dreq(dreq); 398 399 for (seg = 0; seg < nr_segs; seg++) { 400 const struct iovec *vec = &iov[seg]; 401 result = nfs_direct_read_schedule_segment(dreq, vec, pos); 402 if (result < 0) 403 break; 404 requested_bytes += result; 405 if ((size_t)result < vec->iov_len) 406 break; 407 pos += vec->iov_len; 408 } 409 410 /* 411 * If no bytes were started, return the error, and let the 412 * generic layer handle the completion. 413 */ 414 if (requested_bytes == 0) { 415 nfs_direct_req_release(dreq); 416 return result < 0 ? result : -EIO; 417 } 418 419 if (put_dreq(dreq)) 420 nfs_direct_complete(dreq); 421 return 0; 422 } 423 424 static ssize_t nfs_direct_read(struct kiocb *iocb, const struct iovec *iov, 425 unsigned long nr_segs, loff_t pos) 426 { 427 ssize_t result = -ENOMEM; 428 struct inode *inode = iocb->ki_filp->f_mapping->host; 429 struct nfs_direct_req *dreq; 430 431 dreq = nfs_direct_req_alloc(); 432 if (dreq == NULL) 433 goto out; 434 435 dreq->inode = inode; 436 dreq->ctx = get_nfs_open_context(nfs_file_open_context(iocb->ki_filp)); 437 dreq->l_ctx = nfs_get_lock_context(dreq->ctx); 438 if (dreq->l_ctx == NULL) 439 goto out_release; 440 if (!is_sync_kiocb(iocb)) 441 dreq->iocb = iocb; 442 443 result = nfs_direct_read_schedule_iovec(dreq, iov, nr_segs, pos); 444 if (!result) 445 result = nfs_direct_wait(dreq); 446 out_release: 447 nfs_direct_req_release(dreq); 448 out: 449 return result; 450 } 451 452 static void nfs_direct_free_writedata(struct nfs_direct_req *dreq) 453 { 454 while (!list_empty(&dreq->rewrite_list)) { 455 struct nfs_write_data *data = list_entry(dreq->rewrite_list.next, struct nfs_write_data, pages); 456 list_del(&data->pages); 457 nfs_direct_release_pages(data->pagevec, data->npages); 458 nfs_writedata_free(data); 459 } 460 } 461 462 #if defined(CONFIG_NFS_V3) || defined(CONFIG_NFS_V4) 463 static void nfs_direct_write_reschedule(struct nfs_direct_req *dreq) 464 { 465 struct inode *inode = dreq->inode; 466 struct list_head *p; 467 struct nfs_write_data *data; 468 struct rpc_task *task; 469 struct rpc_message msg = { 470 .rpc_cred = dreq->ctx->cred, 471 }; 472 struct rpc_task_setup task_setup_data = { 473 .rpc_client = NFS_CLIENT(inode), 474 .rpc_message = &msg, 475 .callback_ops = &nfs_write_direct_ops, 476 .workqueue = nfsiod_workqueue, 477 .flags = RPC_TASK_ASYNC, 478 }; 479 480 dreq->count = 0; 481 get_dreq(dreq); 482 483 list_for_each(p, &dreq->rewrite_list) { 484 data = list_entry(p, struct nfs_write_data, pages); 485 486 get_dreq(dreq); 487 488 /* Use stable writes */ 489 data->args.stable = NFS_FILE_SYNC; 490 491 /* 492 * Reset data->res. 493 */ 494 nfs_fattr_init(&data->fattr); 495 data->res.count = data->args.count; 496 memset(&data->verf, 0, sizeof(data->verf)); 497 498 /* 499 * Reuse data->task; data->args should not have changed 500 * since the original request was sent. 501 */ 502 task_setup_data.task = &data->task; 503 task_setup_data.callback_data = data; 504 msg.rpc_argp = &data->args; 505 msg.rpc_resp = &data->res; 506 NFS_PROTO(inode)->write_setup(data, &msg); 507 508 /* 509 * We're called via an RPC callback, so BKL is already held. 510 */ 511 task = rpc_run_task(&task_setup_data); 512 if (!IS_ERR(task)) 513 rpc_put_task(task); 514 515 dprintk("NFS: %5u rescheduled direct write call (req %s/%Ld, %u bytes @ offset %Lu)\n", 516 data->task.tk_pid, 517 inode->i_sb->s_id, 518 (long long)NFS_FILEID(inode), 519 data->args.count, 520 (unsigned long long)data->args.offset); 521 } 522 523 if (put_dreq(dreq)) 524 nfs_direct_write_complete(dreq, inode); 525 } 526 527 static void nfs_direct_commit_result(struct rpc_task *task, void *calldata) 528 { 529 struct nfs_write_data *data = calldata; 530 531 /* Call the NFS version-specific code */ 532 NFS_PROTO(data->inode)->commit_done(task, data); 533 } 534 535 static void nfs_direct_commit_release(void *calldata) 536 { 537 struct nfs_write_data *data = calldata; 538 struct nfs_direct_req *dreq = (struct nfs_direct_req *) data->req; 539 int status = data->task.tk_status; 540 541 if (status < 0) { 542 dprintk("NFS: %5u commit failed with error %d.\n", 543 data->task.tk_pid, status); 544 dreq->flags = NFS_ODIRECT_RESCHED_WRITES; 545 } else if (memcmp(&dreq->verf, &data->verf, sizeof(data->verf))) { 546 dprintk("NFS: %5u commit verify failed\n", data->task.tk_pid); 547 dreq->flags = NFS_ODIRECT_RESCHED_WRITES; 548 } 549 550 dprintk("NFS: %5u commit returned %d\n", data->task.tk_pid, status); 551 nfs_direct_write_complete(dreq, data->inode); 552 nfs_commit_free(data); 553 } 554 555 static const struct rpc_call_ops nfs_commit_direct_ops = { 556 #if defined(CONFIG_NFS_V4_1) 557 .rpc_call_prepare = nfs_write_prepare, 558 #endif /* CONFIG_NFS_V4_1 */ 559 .rpc_call_done = nfs_direct_commit_result, 560 .rpc_release = nfs_direct_commit_release, 561 }; 562 563 static void nfs_direct_commit_schedule(struct nfs_direct_req *dreq) 564 { 565 struct nfs_write_data *data = dreq->commit_data; 566 struct rpc_task *task; 567 struct rpc_message msg = { 568 .rpc_argp = &data->args, 569 .rpc_resp = &data->res, 570 .rpc_cred = dreq->ctx->cred, 571 }; 572 struct rpc_task_setup task_setup_data = { 573 .task = &data->task, 574 .rpc_client = NFS_CLIENT(dreq->inode), 575 .rpc_message = &msg, 576 .callback_ops = &nfs_commit_direct_ops, 577 .callback_data = data, 578 .workqueue = nfsiod_workqueue, 579 .flags = RPC_TASK_ASYNC, 580 }; 581 582 data->inode = dreq->inode; 583 data->cred = msg.rpc_cred; 584 585 data->args.fh = NFS_FH(data->inode); 586 data->args.offset = 0; 587 data->args.count = 0; 588 data->args.context = dreq->ctx; 589 data->args.lock_context = dreq->l_ctx; 590 data->res.count = 0; 591 data->res.fattr = &data->fattr; 592 data->res.verf = &data->verf; 593 nfs_fattr_init(&data->fattr); 594 595 NFS_PROTO(data->inode)->commit_setup(data, &msg); 596 597 /* Note: task.tk_ops->rpc_release will free dreq->commit_data */ 598 dreq->commit_data = NULL; 599 600 dprintk("NFS: %5u initiated commit call\n", data->task.tk_pid); 601 602 task = rpc_run_task(&task_setup_data); 603 if (!IS_ERR(task)) 604 rpc_put_task(task); 605 } 606 607 static void nfs_direct_write_complete(struct nfs_direct_req *dreq, struct inode *inode) 608 { 609 int flags = dreq->flags; 610 611 dreq->flags = 0; 612 switch (flags) { 613 case NFS_ODIRECT_DO_COMMIT: 614 nfs_direct_commit_schedule(dreq); 615 break; 616 case NFS_ODIRECT_RESCHED_WRITES: 617 nfs_direct_write_reschedule(dreq); 618 break; 619 default: 620 if (dreq->commit_data != NULL) 621 nfs_commit_free(dreq->commit_data); 622 nfs_direct_free_writedata(dreq); 623 nfs_zap_mapping(inode, inode->i_mapping); 624 nfs_direct_complete(dreq); 625 } 626 } 627 628 static void nfs_alloc_commit_data(struct nfs_direct_req *dreq) 629 { 630 dreq->commit_data = nfs_commitdata_alloc(); 631 if (dreq->commit_data != NULL) 632 dreq->commit_data->req = (struct nfs_page *) dreq; 633 } 634 #else 635 static inline void nfs_alloc_commit_data(struct nfs_direct_req *dreq) 636 { 637 dreq->commit_data = NULL; 638 } 639 640 static void nfs_direct_write_complete(struct nfs_direct_req *dreq, struct inode *inode) 641 { 642 nfs_direct_free_writedata(dreq); 643 nfs_zap_mapping(inode, inode->i_mapping); 644 nfs_direct_complete(dreq); 645 } 646 #endif 647 648 static void nfs_direct_write_result(struct rpc_task *task, void *calldata) 649 { 650 struct nfs_write_data *data = calldata; 651 652 nfs_writeback_done(task, data); 653 } 654 655 /* 656 * NB: Return the value of the first error return code. Subsequent 657 * errors after the first one are ignored. 658 */ 659 static void nfs_direct_write_release(void *calldata) 660 { 661 struct nfs_write_data *data = calldata; 662 struct nfs_direct_req *dreq = (struct nfs_direct_req *) data->req; 663 int status = data->task.tk_status; 664 665 spin_lock(&dreq->lock); 666 667 if (unlikely(status < 0)) { 668 /* An error has occurred, so we should not commit */ 669 dreq->flags = 0; 670 dreq->error = status; 671 } 672 if (unlikely(dreq->error != 0)) 673 goto out_unlock; 674 675 dreq->count += data->res.count; 676 677 if (data->res.verf->committed != NFS_FILE_SYNC) { 678 switch (dreq->flags) { 679 case 0: 680 memcpy(&dreq->verf, &data->verf, sizeof(dreq->verf)); 681 dreq->flags = NFS_ODIRECT_DO_COMMIT; 682 break; 683 case NFS_ODIRECT_DO_COMMIT: 684 if (memcmp(&dreq->verf, &data->verf, sizeof(dreq->verf))) { 685 dprintk("NFS: %5u write verify failed\n", data->task.tk_pid); 686 dreq->flags = NFS_ODIRECT_RESCHED_WRITES; 687 } 688 } 689 } 690 out_unlock: 691 spin_unlock(&dreq->lock); 692 693 if (put_dreq(dreq)) 694 nfs_direct_write_complete(dreq, data->inode); 695 } 696 697 static const struct rpc_call_ops nfs_write_direct_ops = { 698 #if defined(CONFIG_NFS_V4_1) 699 .rpc_call_prepare = nfs_write_prepare, 700 #endif /* CONFIG_NFS_V4_1 */ 701 .rpc_call_done = nfs_direct_write_result, 702 .rpc_release = nfs_direct_write_release, 703 }; 704 705 /* 706 * For each wsize'd chunk of the user's buffer, dispatch an NFS WRITE 707 * operation. If nfs_writedata_alloc() or get_user_pages() fails, 708 * bail and stop sending more writes. Write length accounting is 709 * handled automatically by nfs_direct_write_result(). Otherwise, if 710 * no requests have been sent, just return an error. 711 */ 712 static ssize_t nfs_direct_write_schedule_segment(struct nfs_direct_req *dreq, 713 const struct iovec *iov, 714 loff_t pos, int sync) 715 { 716 struct nfs_open_context *ctx = dreq->ctx; 717 struct inode *inode = ctx->dentry->d_inode; 718 unsigned long user_addr = (unsigned long)iov->iov_base; 719 size_t count = iov->iov_len; 720 struct rpc_task *task; 721 struct rpc_message msg = { 722 .rpc_cred = ctx->cred, 723 }; 724 struct rpc_task_setup task_setup_data = { 725 .rpc_client = NFS_CLIENT(inode), 726 .rpc_message = &msg, 727 .callback_ops = &nfs_write_direct_ops, 728 .workqueue = nfsiod_workqueue, 729 .flags = RPC_TASK_ASYNC, 730 }; 731 size_t wsize = NFS_SERVER(inode)->wsize; 732 unsigned int pgbase; 733 int result; 734 ssize_t started = 0; 735 736 do { 737 struct nfs_write_data *data; 738 size_t bytes; 739 740 pgbase = user_addr & ~PAGE_MASK; 741 bytes = min(wsize,count); 742 743 result = -ENOMEM; 744 data = nfs_writedata_alloc(nfs_page_array_len(pgbase, bytes)); 745 if (unlikely(!data)) 746 break; 747 748 down_read(¤t->mm->mmap_sem); 749 result = get_user_pages(current, current->mm, user_addr, 750 data->npages, 0, 0, data->pagevec, NULL); 751 up_read(¤t->mm->mmap_sem); 752 if (result < 0) { 753 nfs_writedata_free(data); 754 break; 755 } 756 if ((unsigned)result < data->npages) { 757 bytes = result * PAGE_SIZE; 758 if (bytes <= pgbase) { 759 nfs_direct_release_pages(data->pagevec, result); 760 nfs_writedata_free(data); 761 break; 762 } 763 bytes -= pgbase; 764 data->npages = result; 765 } 766 767 get_dreq(dreq); 768 769 list_move_tail(&data->pages, &dreq->rewrite_list); 770 771 data->req = (struct nfs_page *) dreq; 772 data->inode = inode; 773 data->cred = msg.rpc_cred; 774 data->args.fh = NFS_FH(inode); 775 data->args.context = ctx; 776 data->args.lock_context = dreq->l_ctx; 777 data->args.offset = pos; 778 data->args.pgbase = pgbase; 779 data->args.pages = data->pagevec; 780 data->args.count = bytes; 781 data->args.stable = sync; 782 data->res.fattr = &data->fattr; 783 data->res.count = bytes; 784 data->res.verf = &data->verf; 785 nfs_fattr_init(&data->fattr); 786 787 task_setup_data.task = &data->task; 788 task_setup_data.callback_data = data; 789 msg.rpc_argp = &data->args; 790 msg.rpc_resp = &data->res; 791 NFS_PROTO(inode)->write_setup(data, &msg); 792 793 task = rpc_run_task(&task_setup_data); 794 if (IS_ERR(task)) 795 break; 796 rpc_put_task(task); 797 798 dprintk("NFS: %5u initiated direct write call " 799 "(req %s/%Ld, %zu bytes @ offset %Lu)\n", 800 data->task.tk_pid, 801 inode->i_sb->s_id, 802 (long long)NFS_FILEID(inode), 803 bytes, 804 (unsigned long long)data->args.offset); 805 806 started += bytes; 807 user_addr += bytes; 808 pos += bytes; 809 810 /* FIXME: Remove this useless math from the final patch */ 811 pgbase += bytes; 812 pgbase &= ~PAGE_MASK; 813 BUG_ON(pgbase != (user_addr & ~PAGE_MASK)); 814 815 count -= bytes; 816 } while (count != 0); 817 818 if (started) 819 return started; 820 return result < 0 ? (ssize_t) result : -EFAULT; 821 } 822 823 static ssize_t nfs_direct_write_schedule_iovec(struct nfs_direct_req *dreq, 824 const struct iovec *iov, 825 unsigned long nr_segs, 826 loff_t pos, int sync) 827 { 828 ssize_t result = 0; 829 size_t requested_bytes = 0; 830 unsigned long seg; 831 832 get_dreq(dreq); 833 834 for (seg = 0; seg < nr_segs; seg++) { 835 const struct iovec *vec = &iov[seg]; 836 result = nfs_direct_write_schedule_segment(dreq, vec, 837 pos, sync); 838 if (result < 0) 839 break; 840 requested_bytes += result; 841 if ((size_t)result < vec->iov_len) 842 break; 843 pos += vec->iov_len; 844 } 845 846 /* 847 * If no bytes were started, return the error, and let the 848 * generic layer handle the completion. 849 */ 850 if (requested_bytes == 0) { 851 nfs_direct_req_release(dreq); 852 return result < 0 ? result : -EIO; 853 } 854 855 if (put_dreq(dreq)) 856 nfs_direct_write_complete(dreq, dreq->inode); 857 return 0; 858 } 859 860 static ssize_t nfs_direct_write(struct kiocb *iocb, const struct iovec *iov, 861 unsigned long nr_segs, loff_t pos, 862 size_t count) 863 { 864 ssize_t result = -ENOMEM; 865 struct inode *inode = iocb->ki_filp->f_mapping->host; 866 struct nfs_direct_req *dreq; 867 size_t wsize = NFS_SERVER(inode)->wsize; 868 int sync = NFS_UNSTABLE; 869 870 dreq = nfs_direct_req_alloc(); 871 if (!dreq) 872 goto out; 873 nfs_alloc_commit_data(dreq); 874 875 if (dreq->commit_data == NULL || count <= wsize) 876 sync = NFS_FILE_SYNC; 877 878 dreq->inode = inode; 879 dreq->ctx = get_nfs_open_context(nfs_file_open_context(iocb->ki_filp)); 880 dreq->l_ctx = nfs_get_lock_context(dreq->ctx); 881 if (dreq->l_ctx == NULL) 882 goto out_release; 883 if (!is_sync_kiocb(iocb)) 884 dreq->iocb = iocb; 885 886 result = nfs_direct_write_schedule_iovec(dreq, iov, nr_segs, pos, sync); 887 if (!result) 888 result = nfs_direct_wait(dreq); 889 out_release: 890 nfs_direct_req_release(dreq); 891 out: 892 return result; 893 } 894 895 /** 896 * nfs_file_direct_read - file direct read operation for NFS files 897 * @iocb: target I/O control block 898 * @iov: vector of user buffers into which to read data 899 * @nr_segs: size of iov vector 900 * @pos: byte offset in file where reading starts 901 * 902 * We use this function for direct reads instead of calling 903 * generic_file_aio_read() in order to avoid gfar's check to see if 904 * the request starts before the end of the file. For that check 905 * to work, we must generate a GETATTR before each direct read, and 906 * even then there is a window between the GETATTR and the subsequent 907 * READ where the file size could change. Our preference is simply 908 * to do all reads the application wants, and the server will take 909 * care of managing the end of file boundary. 910 * 911 * This function also eliminates unnecessarily updating the file's 912 * atime locally, as the NFS server sets the file's atime, and this 913 * client must read the updated atime from the server back into its 914 * cache. 915 */ 916 ssize_t nfs_file_direct_read(struct kiocb *iocb, const struct iovec *iov, 917 unsigned long nr_segs, loff_t pos) 918 { 919 ssize_t retval = -EINVAL; 920 struct file *file = iocb->ki_filp; 921 struct address_space *mapping = file->f_mapping; 922 size_t count; 923 924 count = iov_length(iov, nr_segs); 925 nfs_add_stats(mapping->host, NFSIOS_DIRECTREADBYTES, count); 926 927 dfprintk(FILE, "NFS: direct read(%s/%s, %zd@%Ld)\n", 928 file->f_path.dentry->d_parent->d_name.name, 929 file->f_path.dentry->d_name.name, 930 count, (long long) pos); 931 932 retval = 0; 933 if (!count) 934 goto out; 935 936 retval = nfs_sync_mapping(mapping); 937 if (retval) 938 goto out; 939 940 task_io_account_read(count); 941 942 retval = nfs_direct_read(iocb, iov, nr_segs, pos); 943 if (retval > 0) 944 iocb->ki_pos = pos + retval; 945 946 out: 947 return retval; 948 } 949 950 /** 951 * nfs_file_direct_write - file direct write operation for NFS files 952 * @iocb: target I/O control block 953 * @iov: vector of user buffers from which to write data 954 * @nr_segs: size of iov vector 955 * @pos: byte offset in file where writing starts 956 * 957 * We use this function for direct writes instead of calling 958 * generic_file_aio_write() in order to avoid taking the inode 959 * semaphore and updating the i_size. The NFS server will set 960 * the new i_size and this client must read the updated size 961 * back into its cache. We let the server do generic write 962 * parameter checking and report problems. 963 * 964 * We eliminate local atime updates, see direct read above. 965 * 966 * We avoid unnecessary page cache invalidations for normal cached 967 * readers of this file. 968 * 969 * Note that O_APPEND is not supported for NFS direct writes, as there 970 * is no atomic O_APPEND write facility in the NFS protocol. 971 */ 972 ssize_t nfs_file_direct_write(struct kiocb *iocb, const struct iovec *iov, 973 unsigned long nr_segs, loff_t pos) 974 { 975 ssize_t retval = -EINVAL; 976 struct file *file = iocb->ki_filp; 977 struct address_space *mapping = file->f_mapping; 978 size_t count; 979 980 count = iov_length(iov, nr_segs); 981 nfs_add_stats(mapping->host, NFSIOS_DIRECTWRITTENBYTES, count); 982 983 dfprintk(FILE, "NFS: direct write(%s/%s, %zd@%Ld)\n", 984 file->f_path.dentry->d_parent->d_name.name, 985 file->f_path.dentry->d_name.name, 986 count, (long long) pos); 987 988 retval = generic_write_checks(file, &pos, &count, 0); 989 if (retval) 990 goto out; 991 992 retval = -EINVAL; 993 if ((ssize_t) count < 0) 994 goto out; 995 retval = 0; 996 if (!count) 997 goto out; 998 999 retval = nfs_sync_mapping(mapping); 1000 if (retval) 1001 goto out; 1002 1003 task_io_account_write(count); 1004 1005 retval = nfs_direct_write(iocb, iov, nr_segs, pos, count); 1006 1007 if (retval > 0) 1008 iocb->ki_pos = pos + retval; 1009 1010 out: 1011 return retval; 1012 } 1013 1014 /** 1015 * nfs_init_directcache - create a slab cache for nfs_direct_req structures 1016 * 1017 */ 1018 int __init nfs_init_directcache(void) 1019 { 1020 nfs_direct_cachep = kmem_cache_create("nfs_direct_cache", 1021 sizeof(struct nfs_direct_req), 1022 0, (SLAB_RECLAIM_ACCOUNT| 1023 SLAB_MEM_SPREAD), 1024 NULL); 1025 if (nfs_direct_cachep == NULL) 1026 return -ENOMEM; 1027 1028 return 0; 1029 } 1030 1031 /** 1032 * nfs_destroy_directcache - destroy the slab cache for nfs_direct_req structures 1033 * 1034 */ 1035 void nfs_destroy_directcache(void) 1036 { 1037 kmem_cache_destroy(nfs_direct_cachep); 1038 } 1039