1 /* 2 * linux/fs/nfs/direct.c 3 * 4 * Copyright (C) 2003 by Chuck Lever <cel@netapp.com> 5 * 6 * High-performance uncached I/O for the Linux NFS client 7 * 8 * There are important applications whose performance or correctness 9 * depends on uncached access to file data. Database clusters 10 * (multiple copies of the same instance running on separate hosts) 11 * implement their own cache coherency protocol that subsumes file 12 * system cache protocols. Applications that process datasets 13 * considerably larger than the client's memory do not always benefit 14 * from a local cache. A streaming video server, for instance, has no 15 * need to cache the contents of a file. 16 * 17 * When an application requests uncached I/O, all read and write requests 18 * are made directly to the server; data stored or fetched via these 19 * requests is not cached in the Linux page cache. The client does not 20 * correct unaligned requests from applications. All requested bytes are 21 * held on permanent storage before a direct write system call returns to 22 * an application. 23 * 24 * Solaris implements an uncached I/O facility called directio() that 25 * is used for backups and sequential I/O to very large files. Solaris 26 * also supports uncaching whole NFS partitions with "-o forcedirectio," 27 * an undocumented mount option. 28 * 29 * Designed by Jeff Kimmel, Chuck Lever, and Trond Myklebust, with 30 * help from Andrew Morton. 31 * 32 * 18 Dec 2001 Initial implementation for 2.4 --cel 33 * 08 Jul 2002 Version for 2.4.19, with bug fixes --trondmy 34 * 08 Jun 2003 Port to 2.5 APIs --cel 35 * 31 Mar 2004 Handle direct I/O without VFS support --cel 36 * 15 Sep 2004 Parallel async reads --cel 37 * 04 May 2005 support O_DIRECT with aio --cel 38 * 39 */ 40 41 #include <linux/errno.h> 42 #include <linux/sched.h> 43 #include <linux/kernel.h> 44 #include <linux/file.h> 45 #include <linux/pagemap.h> 46 #include <linux/kref.h> 47 #include <linux/slab.h> 48 49 #include <linux/nfs_fs.h> 50 #include <linux/nfs_page.h> 51 #include <linux/sunrpc/clnt.h> 52 53 #include <asm/system.h> 54 #include <asm/uaccess.h> 55 #include <asm/atomic.h> 56 57 #include "internal.h" 58 #include "iostat.h" 59 60 #define NFSDBG_FACILITY NFSDBG_VFS 61 62 static struct kmem_cache *nfs_direct_cachep; 63 64 /* 65 * This represents a set of asynchronous requests that we're waiting on 66 */ 67 struct nfs_direct_req { 68 struct kref kref; /* release manager */ 69 70 /* I/O parameters */ 71 struct nfs_open_context *ctx; /* file open context info */ 72 struct nfs_lock_context *l_ctx; /* Lock context info */ 73 struct kiocb * iocb; /* controlling i/o request */ 74 struct inode * inode; /* target file of i/o */ 75 76 /* completion state */ 77 atomic_t io_count; /* i/os we're waiting for */ 78 spinlock_t lock; /* protect completion state */ 79 ssize_t count, /* bytes actually processed */ 80 error; /* any reported error */ 81 struct completion completion; /* wait for i/o completion */ 82 83 /* commit state */ 84 struct list_head rewrite_list; /* saved nfs_write_data structs */ 85 struct nfs_write_data * commit_data; /* special write_data for commits */ 86 int flags; 87 #define NFS_ODIRECT_DO_COMMIT (1) /* an unstable reply was received */ 88 #define NFS_ODIRECT_RESCHED_WRITES (2) /* write verification failed */ 89 struct nfs_writeverf verf; /* unstable write verifier */ 90 }; 91 92 static void nfs_direct_write_complete(struct nfs_direct_req *dreq, struct inode *inode); 93 static const struct rpc_call_ops nfs_write_direct_ops; 94 95 static inline void get_dreq(struct nfs_direct_req *dreq) 96 { 97 atomic_inc(&dreq->io_count); 98 } 99 100 static inline int put_dreq(struct nfs_direct_req *dreq) 101 { 102 return atomic_dec_and_test(&dreq->io_count); 103 } 104 105 /** 106 * nfs_direct_IO - NFS address space operation for direct I/O 107 * @rw: direction (read or write) 108 * @iocb: target I/O control block 109 * @iov: array of vectors that define I/O buffer 110 * @pos: offset in file to begin the operation 111 * @nr_segs: size of iovec array 112 * 113 * The presence of this routine in the address space ops vector means 114 * the NFS client supports direct I/O. However, we shunt off direct 115 * read and write requests before the VFS gets them, so this method 116 * should never be called. 117 */ 118 ssize_t nfs_direct_IO(int rw, struct kiocb *iocb, const struct iovec *iov, loff_t pos, unsigned long nr_segs) 119 { 120 dprintk("NFS: nfs_direct_IO (%s) off/no(%Ld/%lu) EINVAL\n", 121 iocb->ki_filp->f_path.dentry->d_name.name, 122 (long long) pos, nr_segs); 123 124 return -EINVAL; 125 } 126 127 static void nfs_direct_dirty_pages(struct page **pages, unsigned int pgbase, size_t count) 128 { 129 unsigned int npages; 130 unsigned int i; 131 132 if (count == 0) 133 return; 134 pages += (pgbase >> PAGE_SHIFT); 135 npages = (count + (pgbase & ~PAGE_MASK) + PAGE_SIZE - 1) >> PAGE_SHIFT; 136 for (i = 0; i < npages; i++) { 137 struct page *page = pages[i]; 138 if (!PageCompound(page)) 139 set_page_dirty(page); 140 } 141 } 142 143 static void nfs_direct_release_pages(struct page **pages, unsigned int npages) 144 { 145 unsigned int i; 146 for (i = 0; i < npages; i++) 147 page_cache_release(pages[i]); 148 } 149 150 static inline struct nfs_direct_req *nfs_direct_req_alloc(void) 151 { 152 struct nfs_direct_req *dreq; 153 154 dreq = kmem_cache_alloc(nfs_direct_cachep, GFP_KERNEL); 155 if (!dreq) 156 return NULL; 157 158 kref_init(&dreq->kref); 159 kref_get(&dreq->kref); 160 init_completion(&dreq->completion); 161 INIT_LIST_HEAD(&dreq->rewrite_list); 162 dreq->iocb = NULL; 163 dreq->ctx = NULL; 164 dreq->l_ctx = NULL; 165 spin_lock_init(&dreq->lock); 166 atomic_set(&dreq->io_count, 0); 167 dreq->count = 0; 168 dreq->error = 0; 169 dreq->flags = 0; 170 171 return dreq; 172 } 173 174 static void nfs_direct_req_free(struct kref *kref) 175 { 176 struct nfs_direct_req *dreq = container_of(kref, struct nfs_direct_req, kref); 177 178 if (dreq->l_ctx != NULL) 179 nfs_put_lock_context(dreq->l_ctx); 180 if (dreq->ctx != NULL) 181 put_nfs_open_context(dreq->ctx); 182 kmem_cache_free(nfs_direct_cachep, dreq); 183 } 184 185 static void nfs_direct_req_release(struct nfs_direct_req *dreq) 186 { 187 kref_put(&dreq->kref, nfs_direct_req_free); 188 } 189 190 /* 191 * Collects and returns the final error value/byte-count. 192 */ 193 static ssize_t nfs_direct_wait(struct nfs_direct_req *dreq) 194 { 195 ssize_t result = -EIOCBQUEUED; 196 197 /* Async requests don't wait here */ 198 if (dreq->iocb) 199 goto out; 200 201 result = wait_for_completion_killable(&dreq->completion); 202 203 if (!result) 204 result = dreq->error; 205 if (!result) 206 result = dreq->count; 207 208 out: 209 return (ssize_t) result; 210 } 211 212 /* 213 * Synchronous I/O uses a stack-allocated iocb. Thus we can't trust 214 * the iocb is still valid here if this is a synchronous request. 215 */ 216 static void nfs_direct_complete(struct nfs_direct_req *dreq) 217 { 218 if (dreq->iocb) { 219 long res = (long) dreq->error; 220 if (!res) 221 res = (long) dreq->count; 222 aio_complete(dreq->iocb, res, 0); 223 } 224 complete_all(&dreq->completion); 225 226 nfs_direct_req_release(dreq); 227 } 228 229 /* 230 * We must hold a reference to all the pages in this direct read request 231 * until the RPCs complete. This could be long *after* we are woken up in 232 * nfs_direct_wait (for instance, if someone hits ^C on a slow server). 233 */ 234 static void nfs_direct_read_result(struct rpc_task *task, void *calldata) 235 { 236 struct nfs_read_data *data = calldata; 237 238 nfs_readpage_result(task, data); 239 } 240 241 static void nfs_direct_read_release(void *calldata) 242 { 243 244 struct nfs_read_data *data = calldata; 245 struct nfs_direct_req *dreq = (struct nfs_direct_req *) data->req; 246 int status = data->task.tk_status; 247 248 spin_lock(&dreq->lock); 249 if (unlikely(status < 0)) { 250 dreq->error = status; 251 spin_unlock(&dreq->lock); 252 } else { 253 dreq->count += data->res.count; 254 spin_unlock(&dreq->lock); 255 nfs_direct_dirty_pages(data->pagevec, 256 data->args.pgbase, 257 data->res.count); 258 } 259 nfs_direct_release_pages(data->pagevec, data->npages); 260 261 if (put_dreq(dreq)) 262 nfs_direct_complete(dreq); 263 nfs_readdata_free(data); 264 } 265 266 static const struct rpc_call_ops nfs_read_direct_ops = { 267 #if defined(CONFIG_NFS_V4_1) 268 .rpc_call_prepare = nfs_read_prepare, 269 #endif /* CONFIG_NFS_V4_1 */ 270 .rpc_call_done = nfs_direct_read_result, 271 .rpc_release = nfs_direct_read_release, 272 }; 273 274 /* 275 * For each rsize'd chunk of the user's buffer, dispatch an NFS READ 276 * operation. If nfs_readdata_alloc() or get_user_pages() fails, 277 * bail and stop sending more reads. Read length accounting is 278 * handled automatically by nfs_direct_read_result(). Otherwise, if 279 * no requests have been sent, just return an error. 280 */ 281 static ssize_t nfs_direct_read_schedule_segment(struct nfs_direct_req *dreq, 282 const struct iovec *iov, 283 loff_t pos) 284 { 285 struct nfs_open_context *ctx = dreq->ctx; 286 struct inode *inode = ctx->path.dentry->d_inode; 287 unsigned long user_addr = (unsigned long)iov->iov_base; 288 size_t count = iov->iov_len; 289 size_t rsize = NFS_SERVER(inode)->rsize; 290 struct rpc_task *task; 291 struct rpc_message msg = { 292 .rpc_cred = ctx->cred, 293 }; 294 struct rpc_task_setup task_setup_data = { 295 .rpc_client = NFS_CLIENT(inode), 296 .rpc_message = &msg, 297 .callback_ops = &nfs_read_direct_ops, 298 .workqueue = nfsiod_workqueue, 299 .flags = RPC_TASK_ASYNC, 300 }; 301 unsigned int pgbase; 302 int result; 303 ssize_t started = 0; 304 305 do { 306 struct nfs_read_data *data; 307 size_t bytes; 308 309 pgbase = user_addr & ~PAGE_MASK; 310 bytes = min(rsize,count); 311 312 result = -ENOMEM; 313 data = nfs_readdata_alloc(nfs_page_array_len(pgbase, bytes)); 314 if (unlikely(!data)) 315 break; 316 317 down_read(¤t->mm->mmap_sem); 318 result = get_user_pages(current, current->mm, user_addr, 319 data->npages, 1, 0, data->pagevec, NULL); 320 up_read(¤t->mm->mmap_sem); 321 if (result < 0) { 322 nfs_readdata_free(data); 323 break; 324 } 325 if ((unsigned)result < data->npages) { 326 bytes = result * PAGE_SIZE; 327 if (bytes <= pgbase) { 328 nfs_direct_release_pages(data->pagevec, result); 329 nfs_readdata_free(data); 330 break; 331 } 332 bytes -= pgbase; 333 data->npages = result; 334 } 335 336 get_dreq(dreq); 337 338 data->req = (struct nfs_page *) dreq; 339 data->inode = inode; 340 data->cred = msg.rpc_cred; 341 data->args.fh = NFS_FH(inode); 342 data->args.context = ctx; 343 data->args.lock_context = dreq->l_ctx; 344 data->args.offset = pos; 345 data->args.pgbase = pgbase; 346 data->args.pages = data->pagevec; 347 data->args.count = bytes; 348 data->res.fattr = &data->fattr; 349 data->res.eof = 0; 350 data->res.count = bytes; 351 nfs_fattr_init(&data->fattr); 352 msg.rpc_argp = &data->args; 353 msg.rpc_resp = &data->res; 354 355 task_setup_data.task = &data->task; 356 task_setup_data.callback_data = data; 357 NFS_PROTO(inode)->read_setup(data, &msg); 358 359 task = rpc_run_task(&task_setup_data); 360 if (IS_ERR(task)) 361 break; 362 rpc_put_task(task); 363 364 dprintk("NFS: %5u initiated direct read call " 365 "(req %s/%Ld, %zu bytes @ offset %Lu)\n", 366 data->task.tk_pid, 367 inode->i_sb->s_id, 368 (long long)NFS_FILEID(inode), 369 bytes, 370 (unsigned long long)data->args.offset); 371 372 started += bytes; 373 user_addr += bytes; 374 pos += bytes; 375 /* FIXME: Remove this unnecessary math from final patch */ 376 pgbase += bytes; 377 pgbase &= ~PAGE_MASK; 378 BUG_ON(pgbase != (user_addr & ~PAGE_MASK)); 379 380 count -= bytes; 381 } while (count != 0); 382 383 if (started) 384 return started; 385 return result < 0 ? (ssize_t) result : -EFAULT; 386 } 387 388 static ssize_t nfs_direct_read_schedule_iovec(struct nfs_direct_req *dreq, 389 const struct iovec *iov, 390 unsigned long nr_segs, 391 loff_t pos) 392 { 393 ssize_t result = -EINVAL; 394 size_t requested_bytes = 0; 395 unsigned long seg; 396 397 get_dreq(dreq); 398 399 for (seg = 0; seg < nr_segs; seg++) { 400 const struct iovec *vec = &iov[seg]; 401 result = nfs_direct_read_schedule_segment(dreq, vec, pos); 402 if (result < 0) 403 break; 404 requested_bytes += result; 405 if ((size_t)result < vec->iov_len) 406 break; 407 pos += vec->iov_len; 408 } 409 410 /* 411 * If no bytes were started, return the error, and let the 412 * generic layer handle the completion. 413 */ 414 if (requested_bytes == 0) { 415 nfs_direct_req_release(dreq); 416 return result < 0 ? result : -EIO; 417 } 418 419 if (put_dreq(dreq)) 420 nfs_direct_complete(dreq); 421 return 0; 422 } 423 424 static ssize_t nfs_direct_read(struct kiocb *iocb, const struct iovec *iov, 425 unsigned long nr_segs, loff_t pos) 426 { 427 ssize_t result = -ENOMEM; 428 struct inode *inode = iocb->ki_filp->f_mapping->host; 429 struct nfs_direct_req *dreq; 430 431 dreq = nfs_direct_req_alloc(); 432 if (dreq == NULL) 433 goto out; 434 435 dreq->inode = inode; 436 dreq->ctx = get_nfs_open_context(nfs_file_open_context(iocb->ki_filp)); 437 dreq->l_ctx = nfs_get_lock_context(dreq->ctx); 438 if (dreq->l_ctx == NULL) 439 goto out_release; 440 if (!is_sync_kiocb(iocb)) 441 dreq->iocb = iocb; 442 443 result = nfs_direct_read_schedule_iovec(dreq, iov, nr_segs, pos); 444 if (!result) 445 result = nfs_direct_wait(dreq); 446 out_release: 447 nfs_direct_req_release(dreq); 448 out: 449 return result; 450 } 451 452 static void nfs_direct_free_writedata(struct nfs_direct_req *dreq) 453 { 454 while (!list_empty(&dreq->rewrite_list)) { 455 struct nfs_write_data *data = list_entry(dreq->rewrite_list.next, struct nfs_write_data, pages); 456 list_del(&data->pages); 457 nfs_direct_release_pages(data->pagevec, data->npages); 458 nfs_writedata_free(data); 459 } 460 } 461 462 #if defined(CONFIG_NFS_V3) || defined(CONFIG_NFS_V4) 463 static void nfs_direct_write_reschedule(struct nfs_direct_req *dreq) 464 { 465 struct inode *inode = dreq->inode; 466 struct list_head *p; 467 struct nfs_write_data *data; 468 struct rpc_task *task; 469 struct rpc_message msg = { 470 .rpc_cred = dreq->ctx->cred, 471 }; 472 struct rpc_task_setup task_setup_data = { 473 .rpc_client = NFS_CLIENT(inode), 474 .rpc_message = &msg, 475 .callback_ops = &nfs_write_direct_ops, 476 .workqueue = nfsiod_workqueue, 477 .flags = RPC_TASK_ASYNC, 478 }; 479 480 dreq->count = 0; 481 get_dreq(dreq); 482 483 list_for_each(p, &dreq->rewrite_list) { 484 data = list_entry(p, struct nfs_write_data, pages); 485 486 get_dreq(dreq); 487 488 /* Use stable writes */ 489 data->args.stable = NFS_FILE_SYNC; 490 491 /* 492 * Reset data->res. 493 */ 494 nfs_fattr_init(&data->fattr); 495 data->res.count = data->args.count; 496 memset(&data->verf, 0, sizeof(data->verf)); 497 498 /* 499 * Reuse data->task; data->args should not have changed 500 * since the original request was sent. 501 */ 502 task_setup_data.task = &data->task; 503 task_setup_data.callback_data = data; 504 msg.rpc_argp = &data->args; 505 msg.rpc_resp = &data->res; 506 NFS_PROTO(inode)->write_setup(data, &msg); 507 508 /* 509 * We're called via an RPC callback, so BKL is already held. 510 */ 511 task = rpc_run_task(&task_setup_data); 512 if (!IS_ERR(task)) 513 rpc_put_task(task); 514 515 dprintk("NFS: %5u rescheduled direct write call (req %s/%Ld, %u bytes @ offset %Lu)\n", 516 data->task.tk_pid, 517 inode->i_sb->s_id, 518 (long long)NFS_FILEID(inode), 519 data->args.count, 520 (unsigned long long)data->args.offset); 521 } 522 523 if (put_dreq(dreq)) 524 nfs_direct_write_complete(dreq, inode); 525 } 526 527 static void nfs_direct_commit_result(struct rpc_task *task, void *calldata) 528 { 529 struct nfs_write_data *data = calldata; 530 531 /* Call the NFS version-specific code */ 532 NFS_PROTO(data->inode)->commit_done(task, data); 533 } 534 535 static void nfs_direct_commit_release(void *calldata) 536 { 537 struct nfs_write_data *data = calldata; 538 struct nfs_direct_req *dreq = (struct nfs_direct_req *) data->req; 539 int status = data->task.tk_status; 540 541 if (status < 0) { 542 dprintk("NFS: %5u commit failed with error %d.\n", 543 data->task.tk_pid, status); 544 dreq->flags = NFS_ODIRECT_RESCHED_WRITES; 545 } else if (memcmp(&dreq->verf, &data->verf, sizeof(data->verf))) { 546 dprintk("NFS: %5u commit verify failed\n", data->task.tk_pid); 547 dreq->flags = NFS_ODIRECT_RESCHED_WRITES; 548 } 549 550 dprintk("NFS: %5u commit returned %d\n", data->task.tk_pid, status); 551 nfs_direct_write_complete(dreq, data->inode); 552 nfs_commit_free(data); 553 } 554 555 static const struct rpc_call_ops nfs_commit_direct_ops = { 556 #if defined(CONFIG_NFS_V4_1) 557 .rpc_call_prepare = nfs_write_prepare, 558 #endif /* CONFIG_NFS_V4_1 */ 559 .rpc_call_done = nfs_direct_commit_result, 560 .rpc_release = nfs_direct_commit_release, 561 }; 562 563 static void nfs_direct_commit_schedule(struct nfs_direct_req *dreq) 564 { 565 struct nfs_write_data *data = dreq->commit_data; 566 struct rpc_task *task; 567 struct rpc_message msg = { 568 .rpc_argp = &data->args, 569 .rpc_resp = &data->res, 570 .rpc_cred = dreq->ctx->cred, 571 }; 572 struct rpc_task_setup task_setup_data = { 573 .task = &data->task, 574 .rpc_client = NFS_CLIENT(dreq->inode), 575 .rpc_message = &msg, 576 .callback_ops = &nfs_commit_direct_ops, 577 .callback_data = data, 578 .workqueue = nfsiod_workqueue, 579 .flags = RPC_TASK_ASYNC, 580 }; 581 582 data->inode = dreq->inode; 583 data->cred = msg.rpc_cred; 584 585 data->args.fh = NFS_FH(data->inode); 586 data->args.offset = 0; 587 data->args.count = 0; 588 data->args.context = dreq->ctx; 589 data->args.lock_context = dreq->l_ctx; 590 data->res.count = 0; 591 data->res.fattr = &data->fattr; 592 data->res.verf = &data->verf; 593 nfs_fattr_init(&data->fattr); 594 595 NFS_PROTO(data->inode)->commit_setup(data, &msg); 596 597 /* Note: task.tk_ops->rpc_release will free dreq->commit_data */ 598 dreq->commit_data = NULL; 599 600 dprintk("NFS: %5u initiated commit call\n", data->task.tk_pid); 601 602 task = rpc_run_task(&task_setup_data); 603 if (!IS_ERR(task)) 604 rpc_put_task(task); 605 } 606 607 static void nfs_direct_write_complete(struct nfs_direct_req *dreq, struct inode *inode) 608 { 609 int flags = dreq->flags; 610 611 dreq->flags = 0; 612 switch (flags) { 613 case NFS_ODIRECT_DO_COMMIT: 614 nfs_direct_commit_schedule(dreq); 615 break; 616 case NFS_ODIRECT_RESCHED_WRITES: 617 nfs_direct_write_reschedule(dreq); 618 break; 619 default: 620 if (dreq->commit_data != NULL) 621 nfs_commit_free(dreq->commit_data); 622 nfs_direct_free_writedata(dreq); 623 nfs_zap_mapping(inode, inode->i_mapping); 624 nfs_direct_complete(dreq); 625 } 626 } 627 628 static void nfs_alloc_commit_data(struct nfs_direct_req *dreq) 629 { 630 dreq->commit_data = nfs_commitdata_alloc(); 631 if (dreq->commit_data != NULL) 632 dreq->commit_data->req = (struct nfs_page *) dreq; 633 } 634 #else 635 static inline void nfs_alloc_commit_data(struct nfs_direct_req *dreq) 636 { 637 dreq->commit_data = NULL; 638 } 639 640 static void nfs_direct_write_complete(struct nfs_direct_req *dreq, struct inode *inode) 641 { 642 nfs_direct_free_writedata(dreq); 643 nfs_zap_mapping(inode, inode->i_mapping); 644 nfs_direct_complete(dreq); 645 } 646 #endif 647 648 static void nfs_direct_write_result(struct rpc_task *task, void *calldata) 649 { 650 struct nfs_write_data *data = calldata; 651 652 if (nfs_writeback_done(task, data) != 0) 653 return; 654 } 655 656 /* 657 * NB: Return the value of the first error return code. Subsequent 658 * errors after the first one are ignored. 659 */ 660 static void nfs_direct_write_release(void *calldata) 661 { 662 struct nfs_write_data *data = calldata; 663 struct nfs_direct_req *dreq = (struct nfs_direct_req *) data->req; 664 int status = data->task.tk_status; 665 666 spin_lock(&dreq->lock); 667 668 if (unlikely(status < 0)) { 669 /* An error has occurred, so we should not commit */ 670 dreq->flags = 0; 671 dreq->error = status; 672 } 673 if (unlikely(dreq->error != 0)) 674 goto out_unlock; 675 676 dreq->count += data->res.count; 677 678 if (data->res.verf->committed != NFS_FILE_SYNC) { 679 switch (dreq->flags) { 680 case 0: 681 memcpy(&dreq->verf, &data->verf, sizeof(dreq->verf)); 682 dreq->flags = NFS_ODIRECT_DO_COMMIT; 683 break; 684 case NFS_ODIRECT_DO_COMMIT: 685 if (memcmp(&dreq->verf, &data->verf, sizeof(dreq->verf))) { 686 dprintk("NFS: %5u write verify failed\n", data->task.tk_pid); 687 dreq->flags = NFS_ODIRECT_RESCHED_WRITES; 688 } 689 } 690 } 691 out_unlock: 692 spin_unlock(&dreq->lock); 693 694 if (put_dreq(dreq)) 695 nfs_direct_write_complete(dreq, data->inode); 696 } 697 698 static const struct rpc_call_ops nfs_write_direct_ops = { 699 #if defined(CONFIG_NFS_V4_1) 700 .rpc_call_prepare = nfs_write_prepare, 701 #endif /* CONFIG_NFS_V4_1 */ 702 .rpc_call_done = nfs_direct_write_result, 703 .rpc_release = nfs_direct_write_release, 704 }; 705 706 /* 707 * For each wsize'd chunk of the user's buffer, dispatch an NFS WRITE 708 * operation. If nfs_writedata_alloc() or get_user_pages() fails, 709 * bail and stop sending more writes. Write length accounting is 710 * handled automatically by nfs_direct_write_result(). Otherwise, if 711 * no requests have been sent, just return an error. 712 */ 713 static ssize_t nfs_direct_write_schedule_segment(struct nfs_direct_req *dreq, 714 const struct iovec *iov, 715 loff_t pos, int sync) 716 { 717 struct nfs_open_context *ctx = dreq->ctx; 718 struct inode *inode = ctx->path.dentry->d_inode; 719 unsigned long user_addr = (unsigned long)iov->iov_base; 720 size_t count = iov->iov_len; 721 struct rpc_task *task; 722 struct rpc_message msg = { 723 .rpc_cred = ctx->cred, 724 }; 725 struct rpc_task_setup task_setup_data = { 726 .rpc_client = NFS_CLIENT(inode), 727 .rpc_message = &msg, 728 .callback_ops = &nfs_write_direct_ops, 729 .workqueue = nfsiod_workqueue, 730 .flags = RPC_TASK_ASYNC, 731 }; 732 size_t wsize = NFS_SERVER(inode)->wsize; 733 unsigned int pgbase; 734 int result; 735 ssize_t started = 0; 736 737 do { 738 struct nfs_write_data *data; 739 size_t bytes; 740 741 pgbase = user_addr & ~PAGE_MASK; 742 bytes = min(wsize,count); 743 744 result = -ENOMEM; 745 data = nfs_writedata_alloc(nfs_page_array_len(pgbase, bytes)); 746 if (unlikely(!data)) 747 break; 748 749 down_read(¤t->mm->mmap_sem); 750 result = get_user_pages(current, current->mm, user_addr, 751 data->npages, 0, 0, data->pagevec, NULL); 752 up_read(¤t->mm->mmap_sem); 753 if (result < 0) { 754 nfs_writedata_free(data); 755 break; 756 } 757 if ((unsigned)result < data->npages) { 758 bytes = result * PAGE_SIZE; 759 if (bytes <= pgbase) { 760 nfs_direct_release_pages(data->pagevec, result); 761 nfs_writedata_free(data); 762 break; 763 } 764 bytes -= pgbase; 765 data->npages = result; 766 } 767 768 get_dreq(dreq); 769 770 list_move_tail(&data->pages, &dreq->rewrite_list); 771 772 data->req = (struct nfs_page *) dreq; 773 data->inode = inode; 774 data->cred = msg.rpc_cred; 775 data->args.fh = NFS_FH(inode); 776 data->args.context = ctx; 777 data->args.lock_context = dreq->l_ctx; 778 data->args.offset = pos; 779 data->args.pgbase = pgbase; 780 data->args.pages = data->pagevec; 781 data->args.count = bytes; 782 data->args.stable = sync; 783 data->res.fattr = &data->fattr; 784 data->res.count = bytes; 785 data->res.verf = &data->verf; 786 nfs_fattr_init(&data->fattr); 787 788 task_setup_data.task = &data->task; 789 task_setup_data.callback_data = data; 790 msg.rpc_argp = &data->args; 791 msg.rpc_resp = &data->res; 792 NFS_PROTO(inode)->write_setup(data, &msg); 793 794 task = rpc_run_task(&task_setup_data); 795 if (IS_ERR(task)) 796 break; 797 rpc_put_task(task); 798 799 dprintk("NFS: %5u initiated direct write call " 800 "(req %s/%Ld, %zu bytes @ offset %Lu)\n", 801 data->task.tk_pid, 802 inode->i_sb->s_id, 803 (long long)NFS_FILEID(inode), 804 bytes, 805 (unsigned long long)data->args.offset); 806 807 started += bytes; 808 user_addr += bytes; 809 pos += bytes; 810 811 /* FIXME: Remove this useless math from the final patch */ 812 pgbase += bytes; 813 pgbase &= ~PAGE_MASK; 814 BUG_ON(pgbase != (user_addr & ~PAGE_MASK)); 815 816 count -= bytes; 817 } while (count != 0); 818 819 if (started) 820 return started; 821 return result < 0 ? (ssize_t) result : -EFAULT; 822 } 823 824 static ssize_t nfs_direct_write_schedule_iovec(struct nfs_direct_req *dreq, 825 const struct iovec *iov, 826 unsigned long nr_segs, 827 loff_t pos, int sync) 828 { 829 ssize_t result = 0; 830 size_t requested_bytes = 0; 831 unsigned long seg; 832 833 get_dreq(dreq); 834 835 for (seg = 0; seg < nr_segs; seg++) { 836 const struct iovec *vec = &iov[seg]; 837 result = nfs_direct_write_schedule_segment(dreq, vec, 838 pos, sync); 839 if (result < 0) 840 break; 841 requested_bytes += result; 842 if ((size_t)result < vec->iov_len) 843 break; 844 pos += vec->iov_len; 845 } 846 847 /* 848 * If no bytes were started, return the error, and let the 849 * generic layer handle the completion. 850 */ 851 if (requested_bytes == 0) { 852 nfs_direct_req_release(dreq); 853 return result < 0 ? result : -EIO; 854 } 855 856 if (put_dreq(dreq)) 857 nfs_direct_write_complete(dreq, dreq->inode); 858 return 0; 859 } 860 861 static ssize_t nfs_direct_write(struct kiocb *iocb, const struct iovec *iov, 862 unsigned long nr_segs, loff_t pos, 863 size_t count) 864 { 865 ssize_t result = -ENOMEM; 866 struct inode *inode = iocb->ki_filp->f_mapping->host; 867 struct nfs_direct_req *dreq; 868 size_t wsize = NFS_SERVER(inode)->wsize; 869 int sync = NFS_UNSTABLE; 870 871 dreq = nfs_direct_req_alloc(); 872 if (!dreq) 873 goto out; 874 nfs_alloc_commit_data(dreq); 875 876 if (dreq->commit_data == NULL || count <= wsize) 877 sync = NFS_FILE_SYNC; 878 879 dreq->inode = inode; 880 dreq->ctx = get_nfs_open_context(nfs_file_open_context(iocb->ki_filp)); 881 dreq->l_ctx = nfs_get_lock_context(dreq->ctx); 882 if (dreq->l_ctx == NULL) 883 goto out_release; 884 if (!is_sync_kiocb(iocb)) 885 dreq->iocb = iocb; 886 887 result = nfs_direct_write_schedule_iovec(dreq, iov, nr_segs, pos, sync); 888 if (!result) 889 result = nfs_direct_wait(dreq); 890 out_release: 891 nfs_direct_req_release(dreq); 892 out: 893 return result; 894 } 895 896 /** 897 * nfs_file_direct_read - file direct read operation for NFS files 898 * @iocb: target I/O control block 899 * @iov: vector of user buffers into which to read data 900 * @nr_segs: size of iov vector 901 * @pos: byte offset in file where reading starts 902 * 903 * We use this function for direct reads instead of calling 904 * generic_file_aio_read() in order to avoid gfar's check to see if 905 * the request starts before the end of the file. For that check 906 * to work, we must generate a GETATTR before each direct read, and 907 * even then there is a window between the GETATTR and the subsequent 908 * READ where the file size could change. Our preference is simply 909 * to do all reads the application wants, and the server will take 910 * care of managing the end of file boundary. 911 * 912 * This function also eliminates unnecessarily updating the file's 913 * atime locally, as the NFS server sets the file's atime, and this 914 * client must read the updated atime from the server back into its 915 * cache. 916 */ 917 ssize_t nfs_file_direct_read(struct kiocb *iocb, const struct iovec *iov, 918 unsigned long nr_segs, loff_t pos) 919 { 920 ssize_t retval = -EINVAL; 921 struct file *file = iocb->ki_filp; 922 struct address_space *mapping = file->f_mapping; 923 size_t count; 924 925 count = iov_length(iov, nr_segs); 926 nfs_add_stats(mapping->host, NFSIOS_DIRECTREADBYTES, count); 927 928 dfprintk(FILE, "NFS: direct read(%s/%s, %zd@%Ld)\n", 929 file->f_path.dentry->d_parent->d_name.name, 930 file->f_path.dentry->d_name.name, 931 count, (long long) pos); 932 933 retval = 0; 934 if (!count) 935 goto out; 936 937 retval = nfs_sync_mapping(mapping); 938 if (retval) 939 goto out; 940 941 retval = nfs_direct_read(iocb, iov, nr_segs, pos); 942 if (retval > 0) 943 iocb->ki_pos = pos + retval; 944 945 out: 946 return retval; 947 } 948 949 /** 950 * nfs_file_direct_write - file direct write operation for NFS files 951 * @iocb: target I/O control block 952 * @iov: vector of user buffers from which to write data 953 * @nr_segs: size of iov vector 954 * @pos: byte offset in file where writing starts 955 * 956 * We use this function for direct writes instead of calling 957 * generic_file_aio_write() in order to avoid taking the inode 958 * semaphore and updating the i_size. The NFS server will set 959 * the new i_size and this client must read the updated size 960 * back into its cache. We let the server do generic write 961 * parameter checking and report problems. 962 * 963 * We eliminate local atime updates, see direct read above. 964 * 965 * We avoid unnecessary page cache invalidations for normal cached 966 * readers of this file. 967 * 968 * Note that O_APPEND is not supported for NFS direct writes, as there 969 * is no atomic O_APPEND write facility in the NFS protocol. 970 */ 971 ssize_t nfs_file_direct_write(struct kiocb *iocb, const struct iovec *iov, 972 unsigned long nr_segs, loff_t pos) 973 { 974 ssize_t retval = -EINVAL; 975 struct file *file = iocb->ki_filp; 976 struct address_space *mapping = file->f_mapping; 977 size_t count; 978 979 count = iov_length(iov, nr_segs); 980 nfs_add_stats(mapping->host, NFSIOS_DIRECTWRITTENBYTES, count); 981 982 dfprintk(FILE, "NFS: direct write(%s/%s, %zd@%Ld)\n", 983 file->f_path.dentry->d_parent->d_name.name, 984 file->f_path.dentry->d_name.name, 985 count, (long long) pos); 986 987 retval = generic_write_checks(file, &pos, &count, 0); 988 if (retval) 989 goto out; 990 991 retval = -EINVAL; 992 if ((ssize_t) count < 0) 993 goto out; 994 retval = 0; 995 if (!count) 996 goto out; 997 998 retval = nfs_sync_mapping(mapping); 999 if (retval) 1000 goto out; 1001 1002 retval = nfs_direct_write(iocb, iov, nr_segs, pos, count); 1003 1004 if (retval > 0) 1005 iocb->ki_pos = pos + retval; 1006 1007 out: 1008 return retval; 1009 } 1010 1011 /** 1012 * nfs_init_directcache - create a slab cache for nfs_direct_req structures 1013 * 1014 */ 1015 int __init nfs_init_directcache(void) 1016 { 1017 nfs_direct_cachep = kmem_cache_create("nfs_direct_cache", 1018 sizeof(struct nfs_direct_req), 1019 0, (SLAB_RECLAIM_ACCOUNT| 1020 SLAB_MEM_SPREAD), 1021 NULL); 1022 if (nfs_direct_cachep == NULL) 1023 return -ENOMEM; 1024 1025 return 0; 1026 } 1027 1028 /** 1029 * nfs_destroy_directcache - destroy the slab cache for nfs_direct_req structures 1030 * 1031 */ 1032 void nfs_destroy_directcache(void) 1033 { 1034 kmem_cache_destroy(nfs_direct_cachep); 1035 } 1036