1 /* 2 * linux/fs/nfs/direct.c 3 * 4 * Copyright (C) 2003 by Chuck Lever <cel@netapp.com> 5 * 6 * High-performance uncached I/O for the Linux NFS client 7 * 8 * There are important applications whose performance or correctness 9 * depends on uncached access to file data. Database clusters 10 * (multiple copies of the same instance running on separate hosts) 11 * implement their own cache coherency protocol that subsumes file 12 * system cache protocols. Applications that process datasets 13 * considerably larger than the client's memory do not always benefit 14 * from a local cache. A streaming video server, for instance, has no 15 * need to cache the contents of a file. 16 * 17 * When an application requests uncached I/O, all read and write requests 18 * are made directly to the server; data stored or fetched via these 19 * requests is not cached in the Linux page cache. The client does not 20 * correct unaligned requests from applications. All requested bytes are 21 * held on permanent storage before a direct write system call returns to 22 * an application. 23 * 24 * Solaris implements an uncached I/O facility called directio() that 25 * is used for backups and sequential I/O to very large files. Solaris 26 * also supports uncaching whole NFS partitions with "-o forcedirectio," 27 * an undocumented mount option. 28 * 29 * Designed by Jeff Kimmel, Chuck Lever, and Trond Myklebust, with 30 * help from Andrew Morton. 31 * 32 * 18 Dec 2001 Initial implementation for 2.4 --cel 33 * 08 Jul 2002 Version for 2.4.19, with bug fixes --trondmy 34 * 08 Jun 2003 Port to 2.5 APIs --cel 35 * 31 Mar 2004 Handle direct I/O without VFS support --cel 36 * 15 Sep 2004 Parallel async reads --cel 37 * 04 May 2005 support O_DIRECT with aio --cel 38 * 39 */ 40 41 #include <linux/errno.h> 42 #include <linux/sched.h> 43 #include <linux/kernel.h> 44 #include <linux/file.h> 45 #include <linux/pagemap.h> 46 #include <linux/kref.h> 47 #include <linux/slab.h> 48 #include <linux/task_io_accounting_ops.h> 49 50 #include <linux/nfs_fs.h> 51 #include <linux/nfs_page.h> 52 #include <linux/sunrpc/clnt.h> 53 54 #include <asm/uaccess.h> 55 #include <linux/atomic.h> 56 57 #include "internal.h" 58 #include "iostat.h" 59 60 #define NFSDBG_FACILITY NFSDBG_VFS 61 62 static struct kmem_cache *nfs_direct_cachep; 63 64 /* 65 * This represents a set of asynchronous requests that we're waiting on 66 */ 67 struct nfs_direct_req { 68 struct kref kref; /* release manager */ 69 70 /* I/O parameters */ 71 struct nfs_open_context *ctx; /* file open context info */ 72 struct nfs_lock_context *l_ctx; /* Lock context info */ 73 struct kiocb * iocb; /* controlling i/o request */ 74 struct inode * inode; /* target file of i/o */ 75 76 /* completion state */ 77 atomic_t io_count; /* i/os we're waiting for */ 78 spinlock_t lock; /* protect completion state */ 79 ssize_t count, /* bytes actually processed */ 80 error; /* any reported error */ 81 struct completion completion; /* wait for i/o completion */ 82 83 /* commit state */ 84 struct list_head rewrite_list; /* saved nfs_write_data structs */ 85 struct nfs_write_data * commit_data; /* special write_data for commits */ 86 int flags; 87 #define NFS_ODIRECT_DO_COMMIT (1) /* an unstable reply was received */ 88 #define NFS_ODIRECT_RESCHED_WRITES (2) /* write verification failed */ 89 struct nfs_writeverf verf; /* unstable write verifier */ 90 }; 91 92 static void nfs_direct_write_complete(struct nfs_direct_req *dreq, struct inode *inode); 93 static const struct rpc_call_ops nfs_write_direct_ops; 94 95 static inline void get_dreq(struct nfs_direct_req *dreq) 96 { 97 atomic_inc(&dreq->io_count); 98 } 99 100 static inline int put_dreq(struct nfs_direct_req *dreq) 101 { 102 return atomic_dec_and_test(&dreq->io_count); 103 } 104 105 /** 106 * nfs_direct_IO - NFS address space operation for direct I/O 107 * @rw: direction (read or write) 108 * @iocb: target I/O control block 109 * @iov: array of vectors that define I/O buffer 110 * @pos: offset in file to begin the operation 111 * @nr_segs: size of iovec array 112 * 113 * The presence of this routine in the address space ops vector means 114 * the NFS client supports direct I/O. However, we shunt off direct 115 * read and write requests before the VFS gets them, so this method 116 * should never be called. 117 */ 118 ssize_t nfs_direct_IO(int rw, struct kiocb *iocb, const struct iovec *iov, loff_t pos, unsigned long nr_segs) 119 { 120 dprintk("NFS: nfs_direct_IO (%s) off/no(%Ld/%lu) EINVAL\n", 121 iocb->ki_filp->f_path.dentry->d_name.name, 122 (long long) pos, nr_segs); 123 124 return -EINVAL; 125 } 126 127 static void nfs_direct_dirty_pages(struct page **pages, unsigned int pgbase, size_t count) 128 { 129 unsigned int npages; 130 unsigned int i; 131 132 if (count == 0) 133 return; 134 pages += (pgbase >> PAGE_SHIFT); 135 npages = (count + (pgbase & ~PAGE_MASK) + PAGE_SIZE - 1) >> PAGE_SHIFT; 136 for (i = 0; i < npages; i++) { 137 struct page *page = pages[i]; 138 if (!PageCompound(page)) 139 set_page_dirty(page); 140 } 141 } 142 143 static void nfs_direct_release_pages(struct page **pages, unsigned int npages) 144 { 145 unsigned int i; 146 for (i = 0; i < npages; i++) 147 page_cache_release(pages[i]); 148 } 149 150 static inline struct nfs_direct_req *nfs_direct_req_alloc(void) 151 { 152 struct nfs_direct_req *dreq; 153 154 dreq = kmem_cache_alloc(nfs_direct_cachep, GFP_KERNEL); 155 if (!dreq) 156 return NULL; 157 158 kref_init(&dreq->kref); 159 kref_get(&dreq->kref); 160 init_completion(&dreq->completion); 161 INIT_LIST_HEAD(&dreq->rewrite_list); 162 dreq->iocb = NULL; 163 dreq->ctx = NULL; 164 dreq->l_ctx = NULL; 165 spin_lock_init(&dreq->lock); 166 atomic_set(&dreq->io_count, 0); 167 dreq->count = 0; 168 dreq->error = 0; 169 dreq->flags = 0; 170 171 return dreq; 172 } 173 174 static void nfs_direct_req_free(struct kref *kref) 175 { 176 struct nfs_direct_req *dreq = container_of(kref, struct nfs_direct_req, kref); 177 178 if (dreq->l_ctx != NULL) 179 nfs_put_lock_context(dreq->l_ctx); 180 if (dreq->ctx != NULL) 181 put_nfs_open_context(dreq->ctx); 182 kmem_cache_free(nfs_direct_cachep, dreq); 183 } 184 185 static void nfs_direct_req_release(struct nfs_direct_req *dreq) 186 { 187 kref_put(&dreq->kref, nfs_direct_req_free); 188 } 189 190 /* 191 * Collects and returns the final error value/byte-count. 192 */ 193 static ssize_t nfs_direct_wait(struct nfs_direct_req *dreq) 194 { 195 ssize_t result = -EIOCBQUEUED; 196 197 /* Async requests don't wait here */ 198 if (dreq->iocb) 199 goto out; 200 201 result = wait_for_completion_killable(&dreq->completion); 202 203 if (!result) 204 result = dreq->error; 205 if (!result) 206 result = dreq->count; 207 208 out: 209 return (ssize_t) result; 210 } 211 212 /* 213 * Synchronous I/O uses a stack-allocated iocb. Thus we can't trust 214 * the iocb is still valid here if this is a synchronous request. 215 */ 216 static void nfs_direct_complete(struct nfs_direct_req *dreq) 217 { 218 if (dreq->iocb) { 219 long res = (long) dreq->error; 220 if (!res) 221 res = (long) dreq->count; 222 aio_complete(dreq->iocb, res, 0); 223 } 224 complete_all(&dreq->completion); 225 226 nfs_direct_req_release(dreq); 227 } 228 229 /* 230 * We must hold a reference to all the pages in this direct read request 231 * until the RPCs complete. This could be long *after* we are woken up in 232 * nfs_direct_wait (for instance, if someone hits ^C on a slow server). 233 */ 234 static void nfs_direct_read_result(struct rpc_task *task, void *calldata) 235 { 236 struct nfs_read_data *data = calldata; 237 238 nfs_readpage_result(task, data); 239 } 240 241 static void nfs_direct_read_release(void *calldata) 242 { 243 244 struct nfs_read_data *data = calldata; 245 struct nfs_direct_req *dreq = (struct nfs_direct_req *) data->req; 246 int status = data->task.tk_status; 247 248 spin_lock(&dreq->lock); 249 if (unlikely(status < 0)) { 250 dreq->error = status; 251 spin_unlock(&dreq->lock); 252 } else { 253 dreq->count += data->res.count; 254 spin_unlock(&dreq->lock); 255 nfs_direct_dirty_pages(data->pagevec, 256 data->args.pgbase, 257 data->res.count); 258 } 259 nfs_direct_release_pages(data->pagevec, data->npages); 260 261 if (put_dreq(dreq)) 262 nfs_direct_complete(dreq); 263 nfs_readdata_free(data); 264 } 265 266 static const struct rpc_call_ops nfs_read_direct_ops = { 267 .rpc_call_prepare = nfs_read_prepare, 268 .rpc_call_done = nfs_direct_read_result, 269 .rpc_release = nfs_direct_read_release, 270 }; 271 272 /* 273 * For each rsize'd chunk of the user's buffer, dispatch an NFS READ 274 * operation. If nfs_readdata_alloc() or get_user_pages() fails, 275 * bail and stop sending more reads. Read length accounting is 276 * handled automatically by nfs_direct_read_result(). Otherwise, if 277 * no requests have been sent, just return an error. 278 */ 279 static ssize_t nfs_direct_read_schedule_segment(struct nfs_direct_req *dreq, 280 const struct iovec *iov, 281 loff_t pos) 282 { 283 struct nfs_open_context *ctx = dreq->ctx; 284 struct inode *inode = ctx->dentry->d_inode; 285 unsigned long user_addr = (unsigned long)iov->iov_base; 286 size_t count = iov->iov_len; 287 size_t rsize = NFS_SERVER(inode)->rsize; 288 struct rpc_task *task; 289 struct rpc_message msg = { 290 .rpc_cred = ctx->cred, 291 }; 292 struct rpc_task_setup task_setup_data = { 293 .rpc_client = NFS_CLIENT(inode), 294 .rpc_message = &msg, 295 .callback_ops = &nfs_read_direct_ops, 296 .workqueue = nfsiod_workqueue, 297 .flags = RPC_TASK_ASYNC, 298 }; 299 unsigned int pgbase; 300 int result; 301 ssize_t started = 0; 302 303 do { 304 struct nfs_read_data *data; 305 size_t bytes; 306 307 pgbase = user_addr & ~PAGE_MASK; 308 bytes = min(rsize,count); 309 310 result = -ENOMEM; 311 data = nfs_readdata_alloc(nfs_page_array_len(pgbase, bytes)); 312 if (unlikely(!data)) 313 break; 314 315 down_read(¤t->mm->mmap_sem); 316 result = get_user_pages(current, current->mm, user_addr, 317 data->npages, 1, 0, data->pagevec, NULL); 318 up_read(¤t->mm->mmap_sem); 319 if (result < 0) { 320 nfs_readdata_free(data); 321 break; 322 } 323 if ((unsigned)result < data->npages) { 324 bytes = result * PAGE_SIZE; 325 if (bytes <= pgbase) { 326 nfs_direct_release_pages(data->pagevec, result); 327 nfs_readdata_free(data); 328 break; 329 } 330 bytes -= pgbase; 331 data->npages = result; 332 } 333 334 get_dreq(dreq); 335 336 data->req = (struct nfs_page *) dreq; 337 data->inode = inode; 338 data->cred = msg.rpc_cred; 339 data->args.fh = NFS_FH(inode); 340 data->args.context = ctx; 341 data->args.lock_context = dreq->l_ctx; 342 data->args.offset = pos; 343 data->args.pgbase = pgbase; 344 data->args.pages = data->pagevec; 345 data->args.count = bytes; 346 data->res.fattr = &data->fattr; 347 data->res.eof = 0; 348 data->res.count = bytes; 349 nfs_fattr_init(&data->fattr); 350 msg.rpc_argp = &data->args; 351 msg.rpc_resp = &data->res; 352 353 task_setup_data.task = &data->task; 354 task_setup_data.callback_data = data; 355 NFS_PROTO(inode)->read_setup(data, &msg); 356 357 task = rpc_run_task(&task_setup_data); 358 if (IS_ERR(task)) 359 break; 360 rpc_put_task(task); 361 362 dprintk("NFS: %5u initiated direct read call " 363 "(req %s/%Ld, %zu bytes @ offset %Lu)\n", 364 data->task.tk_pid, 365 inode->i_sb->s_id, 366 (long long)NFS_FILEID(inode), 367 bytes, 368 (unsigned long long)data->args.offset); 369 370 started += bytes; 371 user_addr += bytes; 372 pos += bytes; 373 /* FIXME: Remove this unnecessary math from final patch */ 374 pgbase += bytes; 375 pgbase &= ~PAGE_MASK; 376 BUG_ON(pgbase != (user_addr & ~PAGE_MASK)); 377 378 count -= bytes; 379 } while (count != 0); 380 381 if (started) 382 return started; 383 return result < 0 ? (ssize_t) result : -EFAULT; 384 } 385 386 static ssize_t nfs_direct_read_schedule_iovec(struct nfs_direct_req *dreq, 387 const struct iovec *iov, 388 unsigned long nr_segs, 389 loff_t pos) 390 { 391 ssize_t result = -EINVAL; 392 size_t requested_bytes = 0; 393 unsigned long seg; 394 395 get_dreq(dreq); 396 397 for (seg = 0; seg < nr_segs; seg++) { 398 const struct iovec *vec = &iov[seg]; 399 result = nfs_direct_read_schedule_segment(dreq, vec, pos); 400 if (result < 0) 401 break; 402 requested_bytes += result; 403 if ((size_t)result < vec->iov_len) 404 break; 405 pos += vec->iov_len; 406 } 407 408 /* 409 * If no bytes were started, return the error, and let the 410 * generic layer handle the completion. 411 */ 412 if (requested_bytes == 0) { 413 nfs_direct_req_release(dreq); 414 return result < 0 ? result : -EIO; 415 } 416 417 if (put_dreq(dreq)) 418 nfs_direct_complete(dreq); 419 return 0; 420 } 421 422 static ssize_t nfs_direct_read(struct kiocb *iocb, const struct iovec *iov, 423 unsigned long nr_segs, loff_t pos) 424 { 425 ssize_t result = -ENOMEM; 426 struct inode *inode = iocb->ki_filp->f_mapping->host; 427 struct nfs_direct_req *dreq; 428 429 dreq = nfs_direct_req_alloc(); 430 if (dreq == NULL) 431 goto out; 432 433 dreq->inode = inode; 434 dreq->ctx = get_nfs_open_context(nfs_file_open_context(iocb->ki_filp)); 435 dreq->l_ctx = nfs_get_lock_context(dreq->ctx); 436 if (dreq->l_ctx == NULL) 437 goto out_release; 438 if (!is_sync_kiocb(iocb)) 439 dreq->iocb = iocb; 440 441 result = nfs_direct_read_schedule_iovec(dreq, iov, nr_segs, pos); 442 if (!result) 443 result = nfs_direct_wait(dreq); 444 out_release: 445 nfs_direct_req_release(dreq); 446 out: 447 return result; 448 } 449 450 static void nfs_direct_free_writedata(struct nfs_direct_req *dreq) 451 { 452 while (!list_empty(&dreq->rewrite_list)) { 453 struct nfs_write_data *data = list_entry(dreq->rewrite_list.next, struct nfs_write_data, pages); 454 list_del(&data->pages); 455 nfs_direct_release_pages(data->pagevec, data->npages); 456 nfs_writedata_free(data); 457 } 458 } 459 460 #if defined(CONFIG_NFS_V3) || defined(CONFIG_NFS_V4) 461 static void nfs_direct_write_reschedule(struct nfs_direct_req *dreq) 462 { 463 struct inode *inode = dreq->inode; 464 struct list_head *p; 465 struct nfs_write_data *data; 466 struct rpc_task *task; 467 struct rpc_message msg = { 468 .rpc_cred = dreq->ctx->cred, 469 }; 470 struct rpc_task_setup task_setup_data = { 471 .rpc_client = NFS_CLIENT(inode), 472 .rpc_message = &msg, 473 .callback_ops = &nfs_write_direct_ops, 474 .workqueue = nfsiod_workqueue, 475 .flags = RPC_TASK_ASYNC, 476 }; 477 478 dreq->count = 0; 479 get_dreq(dreq); 480 481 list_for_each(p, &dreq->rewrite_list) { 482 data = list_entry(p, struct nfs_write_data, pages); 483 484 get_dreq(dreq); 485 486 /* Use stable writes */ 487 data->args.stable = NFS_FILE_SYNC; 488 489 /* 490 * Reset data->res. 491 */ 492 nfs_fattr_init(&data->fattr); 493 data->res.count = data->args.count; 494 memset(&data->verf, 0, sizeof(data->verf)); 495 496 /* 497 * Reuse data->task; data->args should not have changed 498 * since the original request was sent. 499 */ 500 task_setup_data.task = &data->task; 501 task_setup_data.callback_data = data; 502 msg.rpc_argp = &data->args; 503 msg.rpc_resp = &data->res; 504 NFS_PROTO(inode)->write_setup(data, &msg); 505 506 /* 507 * We're called via an RPC callback, so BKL is already held. 508 */ 509 task = rpc_run_task(&task_setup_data); 510 if (!IS_ERR(task)) 511 rpc_put_task(task); 512 513 dprintk("NFS: %5u rescheduled direct write call (req %s/%Ld, %u bytes @ offset %Lu)\n", 514 data->task.tk_pid, 515 inode->i_sb->s_id, 516 (long long)NFS_FILEID(inode), 517 data->args.count, 518 (unsigned long long)data->args.offset); 519 } 520 521 if (put_dreq(dreq)) 522 nfs_direct_write_complete(dreq, inode); 523 } 524 525 static void nfs_direct_commit_result(struct rpc_task *task, void *calldata) 526 { 527 struct nfs_write_data *data = calldata; 528 529 /* Call the NFS version-specific code */ 530 NFS_PROTO(data->inode)->commit_done(task, data); 531 } 532 533 static void nfs_direct_commit_release(void *calldata) 534 { 535 struct nfs_write_data *data = calldata; 536 struct nfs_direct_req *dreq = (struct nfs_direct_req *) data->req; 537 int status = data->task.tk_status; 538 539 if (status < 0) { 540 dprintk("NFS: %5u commit failed with error %d.\n", 541 data->task.tk_pid, status); 542 dreq->flags = NFS_ODIRECT_RESCHED_WRITES; 543 } else if (memcmp(&dreq->verf, &data->verf, sizeof(data->verf))) { 544 dprintk("NFS: %5u commit verify failed\n", data->task.tk_pid); 545 dreq->flags = NFS_ODIRECT_RESCHED_WRITES; 546 } 547 548 dprintk("NFS: %5u commit returned %d\n", data->task.tk_pid, status); 549 nfs_direct_write_complete(dreq, data->inode); 550 nfs_commit_free(data); 551 } 552 553 static const struct rpc_call_ops nfs_commit_direct_ops = { 554 .rpc_call_prepare = nfs_write_prepare, 555 .rpc_call_done = nfs_direct_commit_result, 556 .rpc_release = nfs_direct_commit_release, 557 }; 558 559 static void nfs_direct_commit_schedule(struct nfs_direct_req *dreq) 560 { 561 struct nfs_write_data *data = dreq->commit_data; 562 struct rpc_task *task; 563 struct rpc_message msg = { 564 .rpc_argp = &data->args, 565 .rpc_resp = &data->res, 566 .rpc_cred = dreq->ctx->cred, 567 }; 568 struct rpc_task_setup task_setup_data = { 569 .task = &data->task, 570 .rpc_client = NFS_CLIENT(dreq->inode), 571 .rpc_message = &msg, 572 .callback_ops = &nfs_commit_direct_ops, 573 .callback_data = data, 574 .workqueue = nfsiod_workqueue, 575 .flags = RPC_TASK_ASYNC, 576 }; 577 578 data->inode = dreq->inode; 579 data->cred = msg.rpc_cred; 580 581 data->args.fh = NFS_FH(data->inode); 582 data->args.offset = 0; 583 data->args.count = 0; 584 data->args.context = dreq->ctx; 585 data->args.lock_context = dreq->l_ctx; 586 data->res.count = 0; 587 data->res.fattr = &data->fattr; 588 data->res.verf = &data->verf; 589 nfs_fattr_init(&data->fattr); 590 591 NFS_PROTO(data->inode)->commit_setup(data, &msg); 592 593 /* Note: task.tk_ops->rpc_release will free dreq->commit_data */ 594 dreq->commit_data = NULL; 595 596 dprintk("NFS: %5u initiated commit call\n", data->task.tk_pid); 597 598 task = rpc_run_task(&task_setup_data); 599 if (!IS_ERR(task)) 600 rpc_put_task(task); 601 } 602 603 static void nfs_direct_write_complete(struct nfs_direct_req *dreq, struct inode *inode) 604 { 605 int flags = dreq->flags; 606 607 dreq->flags = 0; 608 switch (flags) { 609 case NFS_ODIRECT_DO_COMMIT: 610 nfs_direct_commit_schedule(dreq); 611 break; 612 case NFS_ODIRECT_RESCHED_WRITES: 613 nfs_direct_write_reschedule(dreq); 614 break; 615 default: 616 if (dreq->commit_data != NULL) 617 nfs_commit_free(dreq->commit_data); 618 nfs_direct_free_writedata(dreq); 619 nfs_zap_mapping(inode, inode->i_mapping); 620 nfs_direct_complete(dreq); 621 } 622 } 623 624 static void nfs_alloc_commit_data(struct nfs_direct_req *dreq) 625 { 626 dreq->commit_data = nfs_commitdata_alloc(); 627 if (dreq->commit_data != NULL) 628 dreq->commit_data->req = (struct nfs_page *) dreq; 629 } 630 #else 631 static inline void nfs_alloc_commit_data(struct nfs_direct_req *dreq) 632 { 633 dreq->commit_data = NULL; 634 } 635 636 static void nfs_direct_write_complete(struct nfs_direct_req *dreq, struct inode *inode) 637 { 638 nfs_direct_free_writedata(dreq); 639 nfs_zap_mapping(inode, inode->i_mapping); 640 nfs_direct_complete(dreq); 641 } 642 #endif 643 644 static void nfs_direct_write_result(struct rpc_task *task, void *calldata) 645 { 646 struct nfs_write_data *data = calldata; 647 648 nfs_writeback_done(task, data); 649 } 650 651 /* 652 * NB: Return the value of the first error return code. Subsequent 653 * errors after the first one are ignored. 654 */ 655 static void nfs_direct_write_release(void *calldata) 656 { 657 struct nfs_write_data *data = calldata; 658 struct nfs_direct_req *dreq = (struct nfs_direct_req *) data->req; 659 int status = data->task.tk_status; 660 661 spin_lock(&dreq->lock); 662 663 if (unlikely(status < 0)) { 664 /* An error has occurred, so we should not commit */ 665 dreq->flags = 0; 666 dreq->error = status; 667 } 668 if (unlikely(dreq->error != 0)) 669 goto out_unlock; 670 671 dreq->count += data->res.count; 672 673 if (data->res.verf->committed != NFS_FILE_SYNC) { 674 switch (dreq->flags) { 675 case 0: 676 memcpy(&dreq->verf, &data->verf, sizeof(dreq->verf)); 677 dreq->flags = NFS_ODIRECT_DO_COMMIT; 678 break; 679 case NFS_ODIRECT_DO_COMMIT: 680 if (memcmp(&dreq->verf, &data->verf, sizeof(dreq->verf))) { 681 dprintk("NFS: %5u write verify failed\n", data->task.tk_pid); 682 dreq->flags = NFS_ODIRECT_RESCHED_WRITES; 683 } 684 } 685 } 686 out_unlock: 687 spin_unlock(&dreq->lock); 688 689 if (put_dreq(dreq)) 690 nfs_direct_write_complete(dreq, data->inode); 691 } 692 693 static const struct rpc_call_ops nfs_write_direct_ops = { 694 .rpc_call_prepare = nfs_write_prepare, 695 .rpc_call_done = nfs_direct_write_result, 696 .rpc_release = nfs_direct_write_release, 697 }; 698 699 /* 700 * For each wsize'd chunk of the user's buffer, dispatch an NFS WRITE 701 * operation. If nfs_writedata_alloc() or get_user_pages() fails, 702 * bail and stop sending more writes. Write length accounting is 703 * handled automatically by nfs_direct_write_result(). Otherwise, if 704 * no requests have been sent, just return an error. 705 */ 706 static ssize_t nfs_direct_write_schedule_segment(struct nfs_direct_req *dreq, 707 const struct iovec *iov, 708 loff_t pos, int sync) 709 { 710 struct nfs_open_context *ctx = dreq->ctx; 711 struct inode *inode = ctx->dentry->d_inode; 712 unsigned long user_addr = (unsigned long)iov->iov_base; 713 size_t count = iov->iov_len; 714 struct rpc_task *task; 715 struct rpc_message msg = { 716 .rpc_cred = ctx->cred, 717 }; 718 struct rpc_task_setup task_setup_data = { 719 .rpc_client = NFS_CLIENT(inode), 720 .rpc_message = &msg, 721 .callback_ops = &nfs_write_direct_ops, 722 .workqueue = nfsiod_workqueue, 723 .flags = RPC_TASK_ASYNC, 724 }; 725 size_t wsize = NFS_SERVER(inode)->wsize; 726 unsigned int pgbase; 727 int result; 728 ssize_t started = 0; 729 730 do { 731 struct nfs_write_data *data; 732 size_t bytes; 733 734 pgbase = user_addr & ~PAGE_MASK; 735 bytes = min(wsize,count); 736 737 result = -ENOMEM; 738 data = nfs_writedata_alloc(nfs_page_array_len(pgbase, bytes)); 739 if (unlikely(!data)) 740 break; 741 742 down_read(¤t->mm->mmap_sem); 743 result = get_user_pages(current, current->mm, user_addr, 744 data->npages, 0, 0, data->pagevec, NULL); 745 up_read(¤t->mm->mmap_sem); 746 if (result < 0) { 747 nfs_writedata_free(data); 748 break; 749 } 750 if ((unsigned)result < data->npages) { 751 bytes = result * PAGE_SIZE; 752 if (bytes <= pgbase) { 753 nfs_direct_release_pages(data->pagevec, result); 754 nfs_writedata_free(data); 755 break; 756 } 757 bytes -= pgbase; 758 data->npages = result; 759 } 760 761 get_dreq(dreq); 762 763 list_move_tail(&data->pages, &dreq->rewrite_list); 764 765 data->req = (struct nfs_page *) dreq; 766 data->inode = inode; 767 data->cred = msg.rpc_cred; 768 data->args.fh = NFS_FH(inode); 769 data->args.context = ctx; 770 data->args.lock_context = dreq->l_ctx; 771 data->args.offset = pos; 772 data->args.pgbase = pgbase; 773 data->args.pages = data->pagevec; 774 data->args.count = bytes; 775 data->args.stable = sync; 776 data->res.fattr = &data->fattr; 777 data->res.count = bytes; 778 data->res.verf = &data->verf; 779 nfs_fattr_init(&data->fattr); 780 781 task_setup_data.task = &data->task; 782 task_setup_data.callback_data = data; 783 msg.rpc_argp = &data->args; 784 msg.rpc_resp = &data->res; 785 NFS_PROTO(inode)->write_setup(data, &msg); 786 787 task = rpc_run_task(&task_setup_data); 788 if (IS_ERR(task)) 789 break; 790 rpc_put_task(task); 791 792 dprintk("NFS: %5u initiated direct write call " 793 "(req %s/%Ld, %zu bytes @ offset %Lu)\n", 794 data->task.tk_pid, 795 inode->i_sb->s_id, 796 (long long)NFS_FILEID(inode), 797 bytes, 798 (unsigned long long)data->args.offset); 799 800 started += bytes; 801 user_addr += bytes; 802 pos += bytes; 803 804 /* FIXME: Remove this useless math from the final patch */ 805 pgbase += bytes; 806 pgbase &= ~PAGE_MASK; 807 BUG_ON(pgbase != (user_addr & ~PAGE_MASK)); 808 809 count -= bytes; 810 } while (count != 0); 811 812 if (started) 813 return started; 814 return result < 0 ? (ssize_t) result : -EFAULT; 815 } 816 817 static ssize_t nfs_direct_write_schedule_iovec(struct nfs_direct_req *dreq, 818 const struct iovec *iov, 819 unsigned long nr_segs, 820 loff_t pos, int sync) 821 { 822 ssize_t result = 0; 823 size_t requested_bytes = 0; 824 unsigned long seg; 825 826 get_dreq(dreq); 827 828 for (seg = 0; seg < nr_segs; seg++) { 829 const struct iovec *vec = &iov[seg]; 830 result = nfs_direct_write_schedule_segment(dreq, vec, 831 pos, sync); 832 if (result < 0) 833 break; 834 requested_bytes += result; 835 if ((size_t)result < vec->iov_len) 836 break; 837 pos += vec->iov_len; 838 } 839 840 /* 841 * If no bytes were started, return the error, and let the 842 * generic layer handle the completion. 843 */ 844 if (requested_bytes == 0) { 845 nfs_direct_req_release(dreq); 846 return result < 0 ? result : -EIO; 847 } 848 849 if (put_dreq(dreq)) 850 nfs_direct_write_complete(dreq, dreq->inode); 851 return 0; 852 } 853 854 static ssize_t nfs_direct_write(struct kiocb *iocb, const struct iovec *iov, 855 unsigned long nr_segs, loff_t pos, 856 size_t count) 857 { 858 ssize_t result = -ENOMEM; 859 struct inode *inode = iocb->ki_filp->f_mapping->host; 860 struct nfs_direct_req *dreq; 861 size_t wsize = NFS_SERVER(inode)->wsize; 862 int sync = NFS_UNSTABLE; 863 864 dreq = nfs_direct_req_alloc(); 865 if (!dreq) 866 goto out; 867 nfs_alloc_commit_data(dreq); 868 869 if (dreq->commit_data == NULL || count <= wsize) 870 sync = NFS_FILE_SYNC; 871 872 dreq->inode = inode; 873 dreq->ctx = get_nfs_open_context(nfs_file_open_context(iocb->ki_filp)); 874 dreq->l_ctx = nfs_get_lock_context(dreq->ctx); 875 if (dreq->l_ctx == NULL) 876 goto out_release; 877 if (!is_sync_kiocb(iocb)) 878 dreq->iocb = iocb; 879 880 result = nfs_direct_write_schedule_iovec(dreq, iov, nr_segs, pos, sync); 881 if (!result) 882 result = nfs_direct_wait(dreq); 883 out_release: 884 nfs_direct_req_release(dreq); 885 out: 886 return result; 887 } 888 889 /** 890 * nfs_file_direct_read - file direct read operation for NFS files 891 * @iocb: target I/O control block 892 * @iov: vector of user buffers into which to read data 893 * @nr_segs: size of iov vector 894 * @pos: byte offset in file where reading starts 895 * 896 * We use this function for direct reads instead of calling 897 * generic_file_aio_read() in order to avoid gfar's check to see if 898 * the request starts before the end of the file. For that check 899 * to work, we must generate a GETATTR before each direct read, and 900 * even then there is a window between the GETATTR and the subsequent 901 * READ where the file size could change. Our preference is simply 902 * to do all reads the application wants, and the server will take 903 * care of managing the end of file boundary. 904 * 905 * This function also eliminates unnecessarily updating the file's 906 * atime locally, as the NFS server sets the file's atime, and this 907 * client must read the updated atime from the server back into its 908 * cache. 909 */ 910 ssize_t nfs_file_direct_read(struct kiocb *iocb, const struct iovec *iov, 911 unsigned long nr_segs, loff_t pos) 912 { 913 ssize_t retval = -EINVAL; 914 struct file *file = iocb->ki_filp; 915 struct address_space *mapping = file->f_mapping; 916 size_t count; 917 918 count = iov_length(iov, nr_segs); 919 nfs_add_stats(mapping->host, NFSIOS_DIRECTREADBYTES, count); 920 921 dfprintk(FILE, "NFS: direct read(%s/%s, %zd@%Ld)\n", 922 file->f_path.dentry->d_parent->d_name.name, 923 file->f_path.dentry->d_name.name, 924 count, (long long) pos); 925 926 retval = 0; 927 if (!count) 928 goto out; 929 930 retval = nfs_sync_mapping(mapping); 931 if (retval) 932 goto out; 933 934 task_io_account_read(count); 935 936 retval = nfs_direct_read(iocb, iov, nr_segs, pos); 937 if (retval > 0) 938 iocb->ki_pos = pos + retval; 939 940 out: 941 return retval; 942 } 943 944 /** 945 * nfs_file_direct_write - file direct write operation for NFS files 946 * @iocb: target I/O control block 947 * @iov: vector of user buffers from which to write data 948 * @nr_segs: size of iov vector 949 * @pos: byte offset in file where writing starts 950 * 951 * We use this function for direct writes instead of calling 952 * generic_file_aio_write() in order to avoid taking the inode 953 * semaphore and updating the i_size. The NFS server will set 954 * the new i_size and this client must read the updated size 955 * back into its cache. We let the server do generic write 956 * parameter checking and report problems. 957 * 958 * We eliminate local atime updates, see direct read above. 959 * 960 * We avoid unnecessary page cache invalidations for normal cached 961 * readers of this file. 962 * 963 * Note that O_APPEND is not supported for NFS direct writes, as there 964 * is no atomic O_APPEND write facility in the NFS protocol. 965 */ 966 ssize_t nfs_file_direct_write(struct kiocb *iocb, const struct iovec *iov, 967 unsigned long nr_segs, loff_t pos) 968 { 969 ssize_t retval = -EINVAL; 970 struct file *file = iocb->ki_filp; 971 struct address_space *mapping = file->f_mapping; 972 size_t count; 973 974 count = iov_length(iov, nr_segs); 975 nfs_add_stats(mapping->host, NFSIOS_DIRECTWRITTENBYTES, count); 976 977 dfprintk(FILE, "NFS: direct write(%s/%s, %zd@%Ld)\n", 978 file->f_path.dentry->d_parent->d_name.name, 979 file->f_path.dentry->d_name.name, 980 count, (long long) pos); 981 982 retval = generic_write_checks(file, &pos, &count, 0); 983 if (retval) 984 goto out; 985 986 retval = -EINVAL; 987 if ((ssize_t) count < 0) 988 goto out; 989 retval = 0; 990 if (!count) 991 goto out; 992 993 retval = nfs_sync_mapping(mapping); 994 if (retval) 995 goto out; 996 997 task_io_account_write(count); 998 999 retval = nfs_direct_write(iocb, iov, nr_segs, pos, count); 1000 1001 if (retval > 0) 1002 iocb->ki_pos = pos + retval; 1003 1004 out: 1005 return retval; 1006 } 1007 1008 /** 1009 * nfs_init_directcache - create a slab cache for nfs_direct_req structures 1010 * 1011 */ 1012 int __init nfs_init_directcache(void) 1013 { 1014 nfs_direct_cachep = kmem_cache_create("nfs_direct_cache", 1015 sizeof(struct nfs_direct_req), 1016 0, (SLAB_RECLAIM_ACCOUNT| 1017 SLAB_MEM_SPREAD), 1018 NULL); 1019 if (nfs_direct_cachep == NULL) 1020 return -ENOMEM; 1021 1022 return 0; 1023 } 1024 1025 /** 1026 * nfs_destroy_directcache - destroy the slab cache for nfs_direct_req structures 1027 * 1028 */ 1029 void nfs_destroy_directcache(void) 1030 { 1031 kmem_cache_destroy(nfs_direct_cachep); 1032 } 1033