xref: /openbmc/linux/kernel/relay.c (revision 8d1413b28033c49c7f1a4d320e815d7a5531acee)
1 /*
2  * Public API and common code for kernel->userspace relay file support.
3  *
4  * See Documentation/filesystems/relayfs.txt for an overview of relayfs.
5  *
6  * Copyright (C) 2002-2005 - Tom Zanussi (zanussi@us.ibm.com), IBM Corp
7  * Copyright (C) 1999-2005 - Karim Yaghmour (karim@opersys.com)
8  *
9  * Moved to kernel/relay.c by Paul Mundt, 2006.
10  *
11  * This file is released under the GPL.
12  */
13 #include <linux/errno.h>
14 #include <linux/stddef.h>
15 #include <linux/slab.h>
16 #include <linux/module.h>
17 #include <linux/string.h>
18 #include <linux/relay.h>
19 #include <linux/vmalloc.h>
20 #include <linux/mm.h>
21 
22 /*
23  * close() vm_op implementation for relay file mapping.
24  */
25 static void relay_file_mmap_close(struct vm_area_struct *vma)
26 {
27 	struct rchan_buf *buf = vma->vm_private_data;
28 	buf->chan->cb->buf_unmapped(buf, vma->vm_file);
29 }
30 
31 /*
32  * nopage() vm_op implementation for relay file mapping.
33  */
34 static struct page *relay_buf_nopage(struct vm_area_struct *vma,
35 				     unsigned long address,
36 				     int *type)
37 {
38 	struct page *page;
39 	struct rchan_buf *buf = vma->vm_private_data;
40 	unsigned long offset = address - vma->vm_start;
41 
42 	if (address > vma->vm_end)
43 		return NOPAGE_SIGBUS; /* Disallow mremap */
44 	if (!buf)
45 		return NOPAGE_OOM;
46 
47 	page = vmalloc_to_page(buf->start + offset);
48 	if (!page)
49 		return NOPAGE_OOM;
50 	get_page(page);
51 
52 	if (type)
53 		*type = VM_FAULT_MINOR;
54 
55 	return page;
56 }
57 
58 /*
59  * vm_ops for relay file mappings.
60  */
61 static struct vm_operations_struct relay_file_mmap_ops = {
62 	.nopage = relay_buf_nopage,
63 	.close = relay_file_mmap_close,
64 };
65 
66 /**
67  *	relay_mmap_buf: - mmap channel buffer to process address space
68  *	@buf: relay channel buffer
69  *	@vma: vm_area_struct describing memory to be mapped
70  *
71  *	Returns 0 if ok, negative on error
72  *
73  *	Caller should already have grabbed mmap_sem.
74  */
75 int relay_mmap_buf(struct rchan_buf *buf, struct vm_area_struct *vma)
76 {
77 	unsigned long length = vma->vm_end - vma->vm_start;
78 	struct file *filp = vma->vm_file;
79 
80 	if (!buf)
81 		return -EBADF;
82 
83 	if (length != (unsigned long)buf->chan->alloc_size)
84 		return -EINVAL;
85 
86 	vma->vm_ops = &relay_file_mmap_ops;
87 	vma->vm_private_data = buf;
88 	buf->chan->cb->buf_mapped(buf, filp);
89 
90 	return 0;
91 }
92 
93 /**
94  *	relay_alloc_buf - allocate a channel buffer
95  *	@buf: the buffer struct
96  *	@size: total size of the buffer
97  *
98  *	Returns a pointer to the resulting buffer, %NULL if unsuccessful. The
99  *	passed in size will get page aligned, if it isn't already.
100  */
101 static void *relay_alloc_buf(struct rchan_buf *buf, size_t *size)
102 {
103 	void *mem;
104 	unsigned int i, j, n_pages;
105 
106 	*size = PAGE_ALIGN(*size);
107 	n_pages = *size >> PAGE_SHIFT;
108 
109 	buf->page_array = kcalloc(n_pages, sizeof(struct page *), GFP_KERNEL);
110 	if (!buf->page_array)
111 		return NULL;
112 
113 	for (i = 0; i < n_pages; i++) {
114 		buf->page_array[i] = alloc_page(GFP_KERNEL);
115 		if (unlikely(!buf->page_array[i]))
116 			goto depopulate;
117 	}
118 	mem = vmap(buf->page_array, n_pages, VM_MAP, PAGE_KERNEL);
119 	if (!mem)
120 		goto depopulate;
121 
122 	memset(mem, 0, *size);
123 	buf->page_count = n_pages;
124 	return mem;
125 
126 depopulate:
127 	for (j = 0; j < i; j++)
128 		__free_page(buf->page_array[j]);
129 	kfree(buf->page_array);
130 	return NULL;
131 }
132 
133 /**
134  *	relay_create_buf - allocate and initialize a channel buffer
135  *	@chan: the relay channel
136  *
137  *	Returns channel buffer if successful, %NULL otherwise.
138  */
139 struct rchan_buf *relay_create_buf(struct rchan *chan)
140 {
141 	struct rchan_buf *buf = kcalloc(1, sizeof(struct rchan_buf), GFP_KERNEL);
142 	if (!buf)
143 		return NULL;
144 
145 	buf->padding = kmalloc(chan->n_subbufs * sizeof(size_t *), GFP_KERNEL);
146 	if (!buf->padding)
147 		goto free_buf;
148 
149 	buf->start = relay_alloc_buf(buf, &chan->alloc_size);
150 	if (!buf->start)
151 		goto free_buf;
152 
153 	buf->chan = chan;
154 	kref_get(&buf->chan->kref);
155 	return buf;
156 
157 free_buf:
158 	kfree(buf->padding);
159 	kfree(buf);
160 	return NULL;
161 }
162 
163 /**
164  *	relay_destroy_channel - free the channel struct
165  *	@kref: target kernel reference that contains the relay channel
166  *
167  *	Should only be called from kref_put().
168  */
169 void relay_destroy_channel(struct kref *kref)
170 {
171 	struct rchan *chan = container_of(kref, struct rchan, kref);
172 	kfree(chan);
173 }
174 
175 /**
176  *	relay_destroy_buf - destroy an rchan_buf struct and associated buffer
177  *	@buf: the buffer struct
178  */
179 void relay_destroy_buf(struct rchan_buf *buf)
180 {
181 	struct rchan *chan = buf->chan;
182 	unsigned int i;
183 
184 	if (likely(buf->start)) {
185 		vunmap(buf->start);
186 		for (i = 0; i < buf->page_count; i++)
187 			__free_page(buf->page_array[i]);
188 		kfree(buf->page_array);
189 	}
190 	kfree(buf->padding);
191 	kfree(buf);
192 	kref_put(&chan->kref, relay_destroy_channel);
193 }
194 
195 /**
196  *	relay_remove_buf - remove a channel buffer
197  *	@kref: target kernel reference that contains the relay buffer
198  *
199  *	Removes the file from the fileystem, which also frees the
200  *	rchan_buf_struct and the channel buffer.  Should only be called from
201  *	kref_put().
202  */
203 void relay_remove_buf(struct kref *kref)
204 {
205 	struct rchan_buf *buf = container_of(kref, struct rchan_buf, kref);
206 	buf->chan->cb->remove_buf_file(buf->dentry);
207 	relay_destroy_buf(buf);
208 }
209 
210 /**
211  *	relay_buf_empty - boolean, is the channel buffer empty?
212  *	@buf: channel buffer
213  *
214  *	Returns 1 if the buffer is empty, 0 otherwise.
215  */
216 int relay_buf_empty(struct rchan_buf *buf)
217 {
218 	return (buf->subbufs_produced - buf->subbufs_consumed) ? 0 : 1;
219 }
220 EXPORT_SYMBOL_GPL(relay_buf_empty);
221 
222 /**
223  *	relay_buf_full - boolean, is the channel buffer full?
224  *	@buf: channel buffer
225  *
226  *	Returns 1 if the buffer is full, 0 otherwise.
227  */
228 int relay_buf_full(struct rchan_buf *buf)
229 {
230 	size_t ready = buf->subbufs_produced - buf->subbufs_consumed;
231 	return (ready >= buf->chan->n_subbufs) ? 1 : 0;
232 }
233 EXPORT_SYMBOL_GPL(relay_buf_full);
234 
235 /*
236  * High-level relay kernel API and associated functions.
237  */
238 
239 /*
240  * rchan_callback implementations defining default channel behavior.  Used
241  * in place of corresponding NULL values in client callback struct.
242  */
243 
244 /*
245  * subbuf_start() default callback.  Does nothing.
246  */
247 static int subbuf_start_default_callback (struct rchan_buf *buf,
248 					  void *subbuf,
249 					  void *prev_subbuf,
250 					  size_t prev_padding)
251 {
252 	if (relay_buf_full(buf))
253 		return 0;
254 
255 	return 1;
256 }
257 
258 /*
259  * buf_mapped() default callback.  Does nothing.
260  */
261 static void buf_mapped_default_callback(struct rchan_buf *buf,
262 					struct file *filp)
263 {
264 }
265 
266 /*
267  * buf_unmapped() default callback.  Does nothing.
268  */
269 static void buf_unmapped_default_callback(struct rchan_buf *buf,
270 					  struct file *filp)
271 {
272 }
273 
274 /*
275  * create_buf_file_create() default callback.  Does nothing.
276  */
277 static struct dentry *create_buf_file_default_callback(const char *filename,
278 						       struct dentry *parent,
279 						       int mode,
280 						       struct rchan_buf *buf,
281 						       int *is_global)
282 {
283 	return NULL;
284 }
285 
286 /*
287  * remove_buf_file() default callback.  Does nothing.
288  */
289 static int remove_buf_file_default_callback(struct dentry *dentry)
290 {
291 	return -EINVAL;
292 }
293 
294 /* relay channel default callbacks */
295 static struct rchan_callbacks default_channel_callbacks = {
296 	.subbuf_start = subbuf_start_default_callback,
297 	.buf_mapped = buf_mapped_default_callback,
298 	.buf_unmapped = buf_unmapped_default_callback,
299 	.create_buf_file = create_buf_file_default_callback,
300 	.remove_buf_file = remove_buf_file_default_callback,
301 };
302 
303 /**
304  *	wakeup_readers - wake up readers waiting on a channel
305  *	@private: the channel buffer
306  *
307  *	This is the work function used to defer reader waking.  The
308  *	reason waking is deferred is that calling directly from write
309  *	causes problems if you're writing from say the scheduler.
310  */
311 static void wakeup_readers(struct work_struct *work)
312 {
313 	struct rchan_buf *buf =
314 		container_of(work, struct rchan_buf, wake_readers.work);
315 	wake_up_interruptible(&buf->read_wait);
316 }
317 
318 /**
319  *	__relay_reset - reset a channel buffer
320  *	@buf: the channel buffer
321  *	@init: 1 if this is a first-time initialization
322  *
323  *	See relay_reset for description of effect.
324  */
325 static inline void __relay_reset(struct rchan_buf *buf, unsigned int init)
326 {
327 	size_t i;
328 
329 	if (init) {
330 		init_waitqueue_head(&buf->read_wait);
331 		kref_init(&buf->kref);
332 		INIT_DELAYED_WORK(&buf->wake_readers, NULL);
333 	} else {
334 		cancel_delayed_work(&buf->wake_readers);
335 		flush_scheduled_work();
336 	}
337 
338 	buf->subbufs_produced = 0;
339 	buf->subbufs_consumed = 0;
340 	buf->bytes_consumed = 0;
341 	buf->finalized = 0;
342 	buf->data = buf->start;
343 	buf->offset = 0;
344 
345 	for (i = 0; i < buf->chan->n_subbufs; i++)
346 		buf->padding[i] = 0;
347 
348 	buf->chan->cb->subbuf_start(buf, buf->data, NULL, 0);
349 }
350 
351 /**
352  *	relay_reset - reset the channel
353  *	@chan: the channel
354  *
355  *	This has the effect of erasing all data from all channel buffers
356  *	and restarting the channel in its initial state.  The buffers
357  *	are not freed, so any mappings are still in effect.
358  *
359  *	NOTE: Care should be taken that the channel isn't actually
360  *	being used by anything when this call is made.
361  */
362 void relay_reset(struct rchan *chan)
363 {
364 	unsigned int i;
365 	struct rchan_buf *prev = NULL;
366 
367 	if (!chan)
368 		return;
369 
370 	for (i = 0; i < NR_CPUS; i++) {
371 		if (!chan->buf[i] || chan->buf[i] == prev)
372 			break;
373 		__relay_reset(chan->buf[i], 0);
374 		prev = chan->buf[i];
375 	}
376 }
377 EXPORT_SYMBOL_GPL(relay_reset);
378 
379 /*
380  *	relay_open_buf - create a new relay channel buffer
381  *
382  *	Internal - used by relay_open().
383  */
384 static struct rchan_buf *relay_open_buf(struct rchan *chan,
385 					const char *filename,
386 					struct dentry *parent,
387 					int *is_global)
388 {
389 	struct rchan_buf *buf;
390 	struct dentry *dentry;
391 
392 	if (*is_global)
393 		return chan->buf[0];
394 
395 	buf = relay_create_buf(chan);
396 	if (!buf)
397 		return NULL;
398 
399 	/* Create file in fs */
400 	dentry = chan->cb->create_buf_file(filename, parent, S_IRUSR,
401 					   buf, is_global);
402 	if (!dentry) {
403 		relay_destroy_buf(buf);
404 		return NULL;
405 	}
406 
407 	buf->dentry = dentry;
408 	__relay_reset(buf, 1);
409 
410 	return buf;
411 }
412 
413 /**
414  *	relay_close_buf - close a channel buffer
415  *	@buf: channel buffer
416  *
417  *	Marks the buffer finalized and restores the default callbacks.
418  *	The channel buffer and channel buffer data structure are then freed
419  *	automatically when the last reference is given up.
420  */
421 static inline void relay_close_buf(struct rchan_buf *buf)
422 {
423 	buf->finalized = 1;
424 	cancel_delayed_work(&buf->wake_readers);
425 	flush_scheduled_work();
426 	kref_put(&buf->kref, relay_remove_buf);
427 }
428 
429 static inline void setup_callbacks(struct rchan *chan,
430 				   struct rchan_callbacks *cb)
431 {
432 	if (!cb) {
433 		chan->cb = &default_channel_callbacks;
434 		return;
435 	}
436 
437 	if (!cb->subbuf_start)
438 		cb->subbuf_start = subbuf_start_default_callback;
439 	if (!cb->buf_mapped)
440 		cb->buf_mapped = buf_mapped_default_callback;
441 	if (!cb->buf_unmapped)
442 		cb->buf_unmapped = buf_unmapped_default_callback;
443 	if (!cb->create_buf_file)
444 		cb->create_buf_file = create_buf_file_default_callback;
445 	if (!cb->remove_buf_file)
446 		cb->remove_buf_file = remove_buf_file_default_callback;
447 	chan->cb = cb;
448 }
449 
450 /**
451  *	relay_open - create a new relay channel
452  *	@base_filename: base name of files to create
453  *	@parent: dentry of parent directory, %NULL for root directory
454  *	@subbuf_size: size of sub-buffers
455  *	@n_subbufs: number of sub-buffers
456  *	@cb: client callback functions
457  *
458  *	Returns channel pointer if successful, %NULL otherwise.
459  *
460  *	Creates a channel buffer for each cpu using the sizes and
461  *	attributes specified.  The created channel buffer files
462  *	will be named base_filename0...base_filenameN-1.  File
463  *	permissions will be S_IRUSR.
464  */
465 struct rchan *relay_open(const char *base_filename,
466 			 struct dentry *parent,
467 			 size_t subbuf_size,
468 			 size_t n_subbufs,
469 			 struct rchan_callbacks *cb)
470 {
471 	unsigned int i;
472 	struct rchan *chan;
473 	char *tmpname;
474 	int is_global = 0;
475 
476 	if (!base_filename)
477 		return NULL;
478 
479 	if (!(subbuf_size && n_subbufs))
480 		return NULL;
481 
482 	chan = kcalloc(1, sizeof(struct rchan), GFP_KERNEL);
483 	if (!chan)
484 		return NULL;
485 
486 	chan->version = RELAYFS_CHANNEL_VERSION;
487 	chan->n_subbufs = n_subbufs;
488 	chan->subbuf_size = subbuf_size;
489 	chan->alloc_size = FIX_SIZE(subbuf_size * n_subbufs);
490 	setup_callbacks(chan, cb);
491 	kref_init(&chan->kref);
492 
493 	tmpname = kmalloc(NAME_MAX + 1, GFP_KERNEL);
494 	if (!tmpname)
495 		goto free_chan;
496 
497 	for_each_online_cpu(i) {
498 		sprintf(tmpname, "%s%d", base_filename, i);
499 		chan->buf[i] = relay_open_buf(chan, tmpname, parent,
500 					      &is_global);
501 		if (!chan->buf[i])
502 			goto free_bufs;
503 
504 		chan->buf[i]->cpu = i;
505 	}
506 
507 	kfree(tmpname);
508 	return chan;
509 
510 free_bufs:
511 	for (i = 0; i < NR_CPUS; i++) {
512 		if (!chan->buf[i])
513 			break;
514 		relay_close_buf(chan->buf[i]);
515 		if (is_global)
516 			break;
517 	}
518 	kfree(tmpname);
519 
520 free_chan:
521 	kref_put(&chan->kref, relay_destroy_channel);
522 	return NULL;
523 }
524 EXPORT_SYMBOL_GPL(relay_open);
525 
526 /**
527  *	relay_switch_subbuf - switch to a new sub-buffer
528  *	@buf: channel buffer
529  *	@length: size of current event
530  *
531  *	Returns either the length passed in or 0 if full.
532  *
533  *	Performs sub-buffer-switch tasks such as invoking callbacks,
534  *	updating padding counts, waking up readers, etc.
535  */
536 size_t relay_switch_subbuf(struct rchan_buf *buf, size_t length)
537 {
538 	void *old, *new;
539 	size_t old_subbuf, new_subbuf;
540 
541 	if (unlikely(length > buf->chan->subbuf_size))
542 		goto toobig;
543 
544 	if (buf->offset != buf->chan->subbuf_size + 1) {
545 		buf->prev_padding = buf->chan->subbuf_size - buf->offset;
546 		old_subbuf = buf->subbufs_produced % buf->chan->n_subbufs;
547 		buf->padding[old_subbuf] = buf->prev_padding;
548 		buf->subbufs_produced++;
549 		buf->dentry->d_inode->i_size += buf->chan->subbuf_size -
550 			buf->padding[old_subbuf];
551 		smp_mb();
552 		if (waitqueue_active(&buf->read_wait)) {
553 			PREPARE_DELAYED_WORK(&buf->wake_readers,
554 					     wakeup_readers);
555 			schedule_delayed_work(&buf->wake_readers, 1);
556 		}
557 	}
558 
559 	old = buf->data;
560 	new_subbuf = buf->subbufs_produced % buf->chan->n_subbufs;
561 	new = buf->start + new_subbuf * buf->chan->subbuf_size;
562 	buf->offset = 0;
563 	if (!buf->chan->cb->subbuf_start(buf, new, old, buf->prev_padding)) {
564 		buf->offset = buf->chan->subbuf_size + 1;
565 		return 0;
566 	}
567 	buf->data = new;
568 	buf->padding[new_subbuf] = 0;
569 
570 	if (unlikely(length + buf->offset > buf->chan->subbuf_size))
571 		goto toobig;
572 
573 	return length;
574 
575 toobig:
576 	buf->chan->last_toobig = length;
577 	return 0;
578 }
579 EXPORT_SYMBOL_GPL(relay_switch_subbuf);
580 
581 /**
582  *	relay_subbufs_consumed - update the buffer's sub-buffers-consumed count
583  *	@chan: the channel
584  *	@cpu: the cpu associated with the channel buffer to update
585  *	@subbufs_consumed: number of sub-buffers to add to current buf's count
586  *
587  *	Adds to the channel buffer's consumed sub-buffer count.
588  *	subbufs_consumed should be the number of sub-buffers newly consumed,
589  *	not the total consumed.
590  *
591  *	NOTE: Kernel clients don't need to call this function if the channel
592  *	mode is 'overwrite'.
593  */
594 void relay_subbufs_consumed(struct rchan *chan,
595 			    unsigned int cpu,
596 			    size_t subbufs_consumed)
597 {
598 	struct rchan_buf *buf;
599 
600 	if (!chan)
601 		return;
602 
603 	if (cpu >= NR_CPUS || !chan->buf[cpu])
604 		return;
605 
606 	buf = chan->buf[cpu];
607 	buf->subbufs_consumed += subbufs_consumed;
608 	if (buf->subbufs_consumed > buf->subbufs_produced)
609 		buf->subbufs_consumed = buf->subbufs_produced;
610 }
611 EXPORT_SYMBOL_GPL(relay_subbufs_consumed);
612 
613 /**
614  *	relay_close - close the channel
615  *	@chan: the channel
616  *
617  *	Closes all channel buffers and frees the channel.
618  */
619 void relay_close(struct rchan *chan)
620 {
621 	unsigned int i;
622 	struct rchan_buf *prev = NULL;
623 
624 	if (!chan)
625 		return;
626 
627 	for (i = 0; i < NR_CPUS; i++) {
628 		if (!chan->buf[i] || chan->buf[i] == prev)
629 			break;
630 		relay_close_buf(chan->buf[i]);
631 		prev = chan->buf[i];
632 	}
633 
634 	if (chan->last_toobig)
635 		printk(KERN_WARNING "relay: one or more items not logged "
636 		       "[item size (%Zd) > sub-buffer size (%Zd)]\n",
637 		       chan->last_toobig, chan->subbuf_size);
638 
639 	kref_put(&chan->kref, relay_destroy_channel);
640 }
641 EXPORT_SYMBOL_GPL(relay_close);
642 
643 /**
644  *	relay_flush - close the channel
645  *	@chan: the channel
646  *
647  *	Flushes all channel buffers, i.e. forces buffer switch.
648  */
649 void relay_flush(struct rchan *chan)
650 {
651 	unsigned int i;
652 	struct rchan_buf *prev = NULL;
653 
654 	if (!chan)
655 		return;
656 
657 	for (i = 0; i < NR_CPUS; i++) {
658 		if (!chan->buf[i] || chan->buf[i] == prev)
659 			break;
660 		relay_switch_subbuf(chan->buf[i], 0);
661 		prev = chan->buf[i];
662 	}
663 }
664 EXPORT_SYMBOL_GPL(relay_flush);
665 
666 /**
667  *	relay_file_open - open file op for relay files
668  *	@inode: the inode
669  *	@filp: the file
670  *
671  *	Increments the channel buffer refcount.
672  */
673 static int relay_file_open(struct inode *inode, struct file *filp)
674 {
675 	struct rchan_buf *buf = inode->i_private;
676 	kref_get(&buf->kref);
677 	filp->private_data = buf;
678 
679 	return 0;
680 }
681 
682 /**
683  *	relay_file_mmap - mmap file op for relay files
684  *	@filp: the file
685  *	@vma: the vma describing what to map
686  *
687  *	Calls upon relay_mmap_buf to map the file into user space.
688  */
689 static int relay_file_mmap(struct file *filp, struct vm_area_struct *vma)
690 {
691 	struct rchan_buf *buf = filp->private_data;
692 	return relay_mmap_buf(buf, vma);
693 }
694 
695 /**
696  *	relay_file_poll - poll file op for relay files
697  *	@filp: the file
698  *	@wait: poll table
699  *
700  *	Poll implemention.
701  */
702 static unsigned int relay_file_poll(struct file *filp, poll_table *wait)
703 {
704 	unsigned int mask = 0;
705 	struct rchan_buf *buf = filp->private_data;
706 
707 	if (buf->finalized)
708 		return POLLERR;
709 
710 	if (filp->f_mode & FMODE_READ) {
711 		poll_wait(filp, &buf->read_wait, wait);
712 		if (!relay_buf_empty(buf))
713 			mask |= POLLIN | POLLRDNORM;
714 	}
715 
716 	return mask;
717 }
718 
719 /**
720  *	relay_file_release - release file op for relay files
721  *	@inode: the inode
722  *	@filp: the file
723  *
724  *	Decrements the channel refcount, as the filesystem is
725  *	no longer using it.
726  */
727 static int relay_file_release(struct inode *inode, struct file *filp)
728 {
729 	struct rchan_buf *buf = filp->private_data;
730 	kref_put(&buf->kref, relay_remove_buf);
731 
732 	return 0;
733 }
734 
735 /*
736  *	relay_file_read_consume - update the consumed count for the buffer
737  */
738 static void relay_file_read_consume(struct rchan_buf *buf,
739 				    size_t read_pos,
740 				    size_t bytes_consumed)
741 {
742 	size_t subbuf_size = buf->chan->subbuf_size;
743 	size_t n_subbufs = buf->chan->n_subbufs;
744 	size_t read_subbuf;
745 
746 	if (buf->bytes_consumed + bytes_consumed > subbuf_size) {
747 		relay_subbufs_consumed(buf->chan, buf->cpu, 1);
748 		buf->bytes_consumed = 0;
749 	}
750 
751 	buf->bytes_consumed += bytes_consumed;
752 	read_subbuf = read_pos / buf->chan->subbuf_size;
753 	if (buf->bytes_consumed + buf->padding[read_subbuf] == subbuf_size) {
754 		if ((read_subbuf == buf->subbufs_produced % n_subbufs) &&
755 		    (buf->offset == subbuf_size))
756 			return;
757 		relay_subbufs_consumed(buf->chan, buf->cpu, 1);
758 		buf->bytes_consumed = 0;
759 	}
760 }
761 
762 /*
763  *	relay_file_read_avail - boolean, are there unconsumed bytes available?
764  */
765 static int relay_file_read_avail(struct rchan_buf *buf, size_t read_pos)
766 {
767 	size_t subbuf_size = buf->chan->subbuf_size;
768 	size_t n_subbufs = buf->chan->n_subbufs;
769 	size_t produced = buf->subbufs_produced;
770 	size_t consumed = buf->subbufs_consumed;
771 
772 	relay_file_read_consume(buf, read_pos, 0);
773 
774 	if (unlikely(buf->offset > subbuf_size)) {
775 		if (produced == consumed)
776 			return 0;
777 		return 1;
778 	}
779 
780 	if (unlikely(produced - consumed >= n_subbufs)) {
781 		consumed = (produced / n_subbufs) * n_subbufs;
782 		buf->subbufs_consumed = consumed;
783 	}
784 
785 	produced = (produced % n_subbufs) * subbuf_size + buf->offset;
786 	consumed = (consumed % n_subbufs) * subbuf_size + buf->bytes_consumed;
787 
788 	if (consumed > produced)
789 		produced += n_subbufs * subbuf_size;
790 
791 	if (consumed == produced)
792 		return 0;
793 
794 	return 1;
795 }
796 
797 /**
798  *	relay_file_read_subbuf_avail - return bytes available in sub-buffer
799  *	@read_pos: file read position
800  *	@buf: relay channel buffer
801  */
802 static size_t relay_file_read_subbuf_avail(size_t read_pos,
803 					   struct rchan_buf *buf)
804 {
805 	size_t padding, avail = 0;
806 	size_t read_subbuf, read_offset, write_subbuf, write_offset;
807 	size_t subbuf_size = buf->chan->subbuf_size;
808 
809 	write_subbuf = (buf->data - buf->start) / subbuf_size;
810 	write_offset = buf->offset > subbuf_size ? subbuf_size : buf->offset;
811 	read_subbuf = read_pos / subbuf_size;
812 	read_offset = read_pos % subbuf_size;
813 	padding = buf->padding[read_subbuf];
814 
815 	if (read_subbuf == write_subbuf) {
816 		if (read_offset + padding < write_offset)
817 			avail = write_offset - (read_offset + padding);
818 	} else
819 		avail = (subbuf_size - padding) - read_offset;
820 
821 	return avail;
822 }
823 
824 /**
825  *	relay_file_read_start_pos - find the first available byte to read
826  *	@read_pos: file read position
827  *	@buf: relay channel buffer
828  *
829  *	If the read_pos is in the middle of padding, return the
830  *	position of the first actually available byte, otherwise
831  *	return the original value.
832  */
833 static size_t relay_file_read_start_pos(size_t read_pos,
834 					struct rchan_buf *buf)
835 {
836 	size_t read_subbuf, padding, padding_start, padding_end;
837 	size_t subbuf_size = buf->chan->subbuf_size;
838 	size_t n_subbufs = buf->chan->n_subbufs;
839 
840 	read_subbuf = read_pos / subbuf_size;
841 	padding = buf->padding[read_subbuf];
842 	padding_start = (read_subbuf + 1) * subbuf_size - padding;
843 	padding_end = (read_subbuf + 1) * subbuf_size;
844 	if (read_pos >= padding_start && read_pos < padding_end) {
845 		read_subbuf = (read_subbuf + 1) % n_subbufs;
846 		read_pos = read_subbuf * subbuf_size;
847 	}
848 
849 	return read_pos;
850 }
851 
852 /**
853  *	relay_file_read_end_pos - return the new read position
854  *	@read_pos: file read position
855  *	@buf: relay channel buffer
856  *	@count: number of bytes to be read
857  */
858 static size_t relay_file_read_end_pos(struct rchan_buf *buf,
859 				      size_t read_pos,
860 				      size_t count)
861 {
862 	size_t read_subbuf, padding, end_pos;
863 	size_t subbuf_size = buf->chan->subbuf_size;
864 	size_t n_subbufs = buf->chan->n_subbufs;
865 
866 	read_subbuf = read_pos / subbuf_size;
867 	padding = buf->padding[read_subbuf];
868 	if (read_pos % subbuf_size + count + padding == subbuf_size)
869 		end_pos = (read_subbuf + 1) * subbuf_size;
870 	else
871 		end_pos = read_pos + count;
872 	if (end_pos >= subbuf_size * n_subbufs)
873 		end_pos = 0;
874 
875 	return end_pos;
876 }
877 
878 /*
879  *	subbuf_read_actor - read up to one subbuf's worth of data
880  */
881 static int subbuf_read_actor(size_t read_start,
882 			     struct rchan_buf *buf,
883 			     size_t avail,
884 			     read_descriptor_t *desc,
885 			     read_actor_t actor)
886 {
887 	void *from;
888 	int ret = 0;
889 
890 	from = buf->start + read_start;
891 	ret = avail;
892 	if (copy_to_user(desc->arg.buf, from, avail)) {
893 		desc->error = -EFAULT;
894 		ret = 0;
895 	}
896 	desc->arg.data += ret;
897 	desc->written += ret;
898 	desc->count -= ret;
899 
900 	return ret;
901 }
902 
903 /*
904  *	subbuf_send_actor - send up to one subbuf's worth of data
905  */
906 static int subbuf_send_actor(size_t read_start,
907 			     struct rchan_buf *buf,
908 			     size_t avail,
909 			     read_descriptor_t *desc,
910 			     read_actor_t actor)
911 {
912 	unsigned long pidx, poff;
913 	unsigned int subbuf_pages;
914 	int ret = 0;
915 
916 	subbuf_pages = buf->chan->alloc_size >> PAGE_SHIFT;
917 	pidx = (read_start / PAGE_SIZE) % subbuf_pages;
918 	poff = read_start & ~PAGE_MASK;
919 	while (avail) {
920 		struct page *p = buf->page_array[pidx];
921 		unsigned int len;
922 
923 		len = PAGE_SIZE - poff;
924 		if (len > avail)
925 			len = avail;
926 
927 		len = actor(desc, p, poff, len);
928 		if (desc->error)
929 			break;
930 
931 		avail -= len;
932 		ret += len;
933 		poff = 0;
934 		pidx = (pidx + 1) % subbuf_pages;
935 	}
936 
937 	return ret;
938 }
939 
940 typedef int (*subbuf_actor_t) (size_t read_start,
941 			       struct rchan_buf *buf,
942 			       size_t avail,
943 			       read_descriptor_t *desc,
944 			       read_actor_t actor);
945 
946 /*
947  *	relay_file_read_subbufs - read count bytes, bridging subbuf boundaries
948  */
949 static inline ssize_t relay_file_read_subbufs(struct file *filp,
950 					      loff_t *ppos,
951 					      subbuf_actor_t subbuf_actor,
952 					      read_actor_t actor,
953 					      read_descriptor_t *desc)
954 {
955 	struct rchan_buf *buf = filp->private_data;
956 	size_t read_start, avail;
957 	int ret;
958 
959 	if (!desc->count)
960 		return 0;
961 
962 	mutex_lock(&filp->f_dentry->d_inode->i_mutex);
963 	do {
964 		if (!relay_file_read_avail(buf, *ppos))
965 			break;
966 
967 		read_start = relay_file_read_start_pos(*ppos, buf);
968 		avail = relay_file_read_subbuf_avail(read_start, buf);
969 		if (!avail)
970 			break;
971 
972 		avail = min(desc->count, avail);
973 		ret = subbuf_actor(read_start, buf, avail, desc, actor);
974 		if (desc->error < 0)
975 			break;
976 
977 		if (ret) {
978 			relay_file_read_consume(buf, read_start, ret);
979 			*ppos = relay_file_read_end_pos(buf, read_start, ret);
980 		}
981 	} while (desc->count && ret);
982 	mutex_unlock(&filp->f_dentry->d_inode->i_mutex);
983 
984 	return desc->written;
985 }
986 
987 static ssize_t relay_file_read(struct file *filp,
988 			       char __user *buffer,
989 			       size_t count,
990 			       loff_t *ppos)
991 {
992 	read_descriptor_t desc;
993 	desc.written = 0;
994 	desc.count = count;
995 	desc.arg.buf = buffer;
996 	desc.error = 0;
997 	return relay_file_read_subbufs(filp, ppos, subbuf_read_actor,
998 				       NULL, &desc);
999 }
1000 
1001 static ssize_t relay_file_sendfile(struct file *filp,
1002 				   loff_t *ppos,
1003 				   size_t count,
1004 				   read_actor_t actor,
1005 				   void *target)
1006 {
1007 	read_descriptor_t desc;
1008 	desc.written = 0;
1009 	desc.count = count;
1010 	desc.arg.data = target;
1011 	desc.error = 0;
1012 	return relay_file_read_subbufs(filp, ppos, subbuf_send_actor,
1013 				       actor, &desc);
1014 }
1015 
1016 struct file_operations relay_file_operations = {
1017 	.open		= relay_file_open,
1018 	.poll		= relay_file_poll,
1019 	.mmap		= relay_file_mmap,
1020 	.read		= relay_file_read,
1021 	.llseek		= no_llseek,
1022 	.release	= relay_file_release,
1023 	.sendfile       = relay_file_sendfile,
1024 };
1025 EXPORT_SYMBOL_GPL(relay_file_operations);
1026