1# Copyright (c) 2012 The Chromium OS Authors.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4#
5# Copyright (c) 2003-2005 by Peter Astrand <astrand@lysator.liu.se>
6# Licensed to PSF under a Contributor Agreement.
7# See http://www.python.org/2.4/license for licensing details.
8
9"""Subprocress execution
10
11This module holds a subclass of subprocess.Popen with our own required
12features, mainly that we get access to the subprocess output while it
13is running rather than just at the end. This makes it easiler to show
14progress information and filter output in real time.
15"""
16
17import errno
18import os
19import pty
20import select
21import subprocess
22import sys
23import unittest
24
25
26# Import these here so the caller does not need to import subprocess also.
27PIPE = subprocess.PIPE
28STDOUT = subprocess.STDOUT
29PIPE_PTY = -3     # Pipe output through a pty
30stay_alive = True
31
32
33class Popen(subprocess.Popen):
34    """Like subprocess.Popen with ptys and incremental output
35
36    This class deals with running a child process and filtering its output on
37    both stdout and stderr while it is running. We do this so we can monitor
38    progress, and possibly relay the output to the user if requested.
39
40    The class is similar to subprocess.Popen, the equivalent is something like:
41
42        Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
43
44    But this class has many fewer features, and two enhancement:
45
46    1. Rather than getting the output data only at the end, this class sends it
47         to a provided operation as it arrives.
48    2. We use pseudo terminals so that the child will hopefully flush its output
49         to us as soon as it is produced, rather than waiting for the end of a
50         line.
51
52    Use CommunicateFilter() to handle output from the subprocess.
53
54    """
55
56    def __init__(self, args, stdin=None, stdout=PIPE_PTY, stderr=PIPE_PTY,
57                 shell=False, cwd=None, env=None, **kwargs):
58        """Cut-down constructor
59
60        Args:
61            args: Program and arguments for subprocess to execute.
62            stdin: See subprocess.Popen()
63            stdout: See subprocess.Popen(), except that we support the sentinel
64                    value of cros_subprocess.PIPE_PTY.
65            stderr: See subprocess.Popen(), except that we support the sentinel
66                    value of cros_subprocess.PIPE_PTY.
67            shell: See subprocess.Popen()
68            cwd: Working directory to change to for subprocess, or None if none.
69            env: Environment to use for this subprocess, or None to inherit parent.
70            kwargs: No other arguments are supported at the moment.    Passing other
71                    arguments will cause a ValueError to be raised.
72        """
73        stdout_pty = None
74        stderr_pty = None
75
76        if stdout == PIPE_PTY:
77            stdout_pty = pty.openpty()
78            stdout = os.fdopen(stdout_pty[1])
79        if stderr == PIPE_PTY:
80            stderr_pty = pty.openpty()
81            stderr = os.fdopen(stderr_pty[1])
82
83        super(Popen, self).__init__(args, stdin=stdin,
84                stdout=stdout, stderr=stderr, shell=shell, cwd=cwd, env=env,
85                **kwargs)
86
87        # If we're on a PTY, we passed the slave half of the PTY to the subprocess.
88        # We want to use the master half on our end from now on.    Setting this here
89        # does make some assumptions about the implementation of subprocess, but
90        # those assumptions are pretty minor.
91
92        # Note that if stderr is STDOUT, then self.stderr will be set to None by
93        # this constructor.
94        if stdout_pty is not None:
95            self.stdout = os.fdopen(stdout_pty[0])
96        if stderr_pty is not None:
97            self.stderr = os.fdopen(stderr_pty[0])
98
99        # Insist that unit tests exist for other arguments we don't support.
100        if kwargs:
101            raise ValueError("Unit tests do not test extra args - please add tests")
102
103    def CommunicateFilter(self, output):
104        """Interact with process: Read data from stdout and stderr.
105
106        This method runs until end-of-file is reached, then waits for the
107        subprocess to terminate.
108
109        The output function is sent all output from the subprocess and must be
110        defined like this:
111
112            def Output([self,] stream, data)
113            Args:
114                stream: the stream the output was received on, which will be
115                        sys.stdout or sys.stderr.
116                data: a string containing the data
117
118        Note: The data read is buffered in memory, so do not use this
119        method if the data size is large or unlimited.
120
121        Args:
122            output: Function to call with each fragment of output.
123
124        Returns:
125            A tuple (stdout, stderr, combined) which is the data received on
126            stdout, stderr and the combined data (interleaved stdout and stderr).
127
128            Note that the interleaved output will only be sensible if you have
129            set both stdout and stderr to PIPE or PIPE_PTY. Even then it depends on
130            the timing of the output in the subprocess. If a subprocess flips
131            between stdout and stderr quickly in succession, by the time we come to
132            read the output from each we may see several lines in each, and will read
133            all the stdout lines, then all the stderr lines. So the interleaving
134            may not be correct. In this case you might want to pass
135            stderr=cros_subprocess.STDOUT to the constructor.
136
137            This feature is still useful for subprocesses where stderr is
138            rarely used and indicates an error.
139
140            Note also that if you set stderr to STDOUT, then stderr will be empty
141            and the combined output will just be the same as stdout.
142        """
143
144        read_set = []
145        write_set = []
146        stdout = None # Return
147        stderr = None # Return
148
149        if self.stdin:
150            # Flush stdio buffer.    This might block, if the user has
151            # been writing to .stdin in an uncontrolled fashion.
152            self.stdin.flush()
153            if input:
154                write_set.append(self.stdin)
155            else:
156                self.stdin.close()
157        if self.stdout:
158            read_set.append(self.stdout)
159            stdout = []
160        if self.stderr and self.stderr != self.stdout:
161            read_set.append(self.stderr)
162            stderr = []
163        combined = []
164
165        input_offset = 0
166        while read_set or write_set:
167            try:
168                rlist, wlist, _ = select.select(read_set, write_set, [], 0.2)
169            except select.error as e:
170                if e.args[0] == errno.EINTR:
171                    continue
172                raise
173
174            if not stay_alive:
175                    self.terminate()
176
177            if self.stdin in wlist:
178                # When select has indicated that the file is writable,
179                # we can write up to PIPE_BUF bytes without risk
180                # blocking.    POSIX defines PIPE_BUF >= 512
181                chunk = input[input_offset : input_offset + 512]
182                bytes_written = os.write(self.stdin.fileno(), chunk)
183                input_offset += bytes_written
184                if input_offset >= len(input):
185                    self.stdin.close()
186                    write_set.remove(self.stdin)
187
188            if self.stdout in rlist:
189                data = ""
190                # We will get an error on read if the pty is closed
191                try:
192                    data = os.read(self.stdout.fileno(), 1024)
193                    if isinstance(data, bytes):
194                        data = data.decode('utf-8')
195                except OSError:
196                    pass
197                if data == "":
198                    self.stdout.close()
199                    read_set.remove(self.stdout)
200                else:
201                    stdout.append(data)
202                    combined.append(data)
203                    if output:
204                        output(sys.stdout, data)
205            if self.stderr in rlist:
206                data = ""
207                # We will get an error on read if the pty is closed
208                try:
209                    data = os.read(self.stderr.fileno(), 1024)
210                    if isinstance(data, bytes):
211                        data = data.decode('utf-8')
212                except OSError:
213                    pass
214                if data == "":
215                    self.stderr.close()
216                    read_set.remove(self.stderr)
217                else:
218                    stderr.append(data)
219                    combined.append(data)
220                    if output:
221                        output(sys.stderr, data)
222
223        # All data exchanged.    Translate lists into strings.
224        if stdout is not None:
225            stdout = ''.join(stdout)
226        else:
227            stdout = ''
228        if stderr is not None:
229            stderr = ''.join(stderr)
230        else:
231            stderr = ''
232        combined = ''.join(combined)
233
234        # Translate newlines, if requested.    We cannot let the file
235        # object do the translation: It is based on stdio, which is
236        # impossible to combine with select (unless forcing no
237        # buffering).
238        if self.universal_newlines and hasattr(file, 'newlines'):
239            if stdout:
240                stdout = self._translate_newlines(stdout)
241            if stderr:
242                stderr = self._translate_newlines(stderr)
243
244        self.wait()
245        return (stdout, stderr, combined)
246
247
248# Just being a unittest.TestCase gives us 14 public methods.    Unless we
249# disable this, we can only have 6 tests in a TestCase.    That's not enough.
250#
251# pylint: disable=R0904
252
253class TestSubprocess(unittest.TestCase):
254    """Our simple unit test for this module"""
255
256    class MyOperation:
257        """Provides a operation that we can pass to Popen"""
258        def __init__(self, input_to_send=None):
259            """Constructor to set up the operation and possible input.
260
261            Args:
262                input_to_send: a text string to send when we first get input. We will
263                    add \r\n to the string.
264            """
265            self.stdout_data = ''
266            self.stderr_data = ''
267            self.combined_data = ''
268            self.stdin_pipe = None
269            self._input_to_send = input_to_send
270            if input_to_send:
271                pipe = os.pipe()
272                self.stdin_read_pipe = pipe[0]
273                self._stdin_write_pipe = os.fdopen(pipe[1], 'w')
274
275        def Output(self, stream, data):
276            """Output handler for Popen. Stores the data for later comparison"""
277            if stream == sys.stdout:
278                self.stdout_data += data
279            if stream == sys.stderr:
280                self.stderr_data += data
281            self.combined_data += data
282
283            # Output the input string if we have one.
284            if self._input_to_send:
285                self._stdin_write_pipe.write(self._input_to_send + '\r\n')
286                self._stdin_write_pipe.flush()
287
288    def _BasicCheck(self, plist, oper):
289        """Basic checks that the output looks sane."""
290        self.assertEqual(plist[0], oper.stdout_data)
291        self.assertEqual(plist[1], oper.stderr_data)
292        self.assertEqual(plist[2], oper.combined_data)
293
294        # The total length of stdout and stderr should equal the combined length
295        self.assertEqual(len(plist[0]) + len(plist[1]), len(plist[2]))
296
297    def test_simple(self):
298        """Simple redirection: Get process list"""
299        oper = TestSubprocess.MyOperation()
300        plist = Popen(['ps']).CommunicateFilter(oper.Output)
301        self._BasicCheck(plist, oper)
302
303    def test_stderr(self):
304        """Check stdout and stderr"""
305        oper = TestSubprocess.MyOperation()
306        cmd = 'echo fred >/dev/stderr && false || echo bad'
307        plist = Popen([cmd], shell=True).CommunicateFilter(oper.Output)
308        self._BasicCheck(plist, oper)
309        self.assertEqual(plist [0], 'bad\r\n')
310        self.assertEqual(plist [1], 'fred\r\n')
311
312    def test_shell(self):
313        """Check with and without shell works"""
314        oper = TestSubprocess.MyOperation()
315        cmd = 'echo test >/dev/stderr'
316        self.assertRaises(OSError, Popen, [cmd], shell=False)
317        plist = Popen([cmd], shell=True).CommunicateFilter(oper.Output)
318        self._BasicCheck(plist, oper)
319        self.assertEqual(len(plist [0]), 0)
320        self.assertEqual(plist [1], 'test\r\n')
321
322    def test_list_args(self):
323        """Check with and without shell works using list arguments"""
324        oper = TestSubprocess.MyOperation()
325        cmd = ['echo', 'test', '>/dev/stderr']
326        plist = Popen(cmd, shell=False).CommunicateFilter(oper.Output)
327        self._BasicCheck(plist, oper)
328        self.assertEqual(plist [0], ' '.join(cmd[1:]) + '\r\n')
329        self.assertEqual(len(plist [1]), 0)
330
331        oper = TestSubprocess.MyOperation()
332
333        # this should be interpreted as 'echo' with the other args dropped
334        cmd = ['echo', 'test', '>/dev/stderr']
335        plist = Popen(cmd, shell=True).CommunicateFilter(oper.Output)
336        self._BasicCheck(plist, oper)
337        self.assertEqual(plist [0], '\r\n')
338
339    def test_cwd(self):
340        """Check we can change directory"""
341        for shell in (False, True):
342            oper = TestSubprocess.MyOperation()
343            plist = Popen('pwd', shell=shell, cwd='/tmp').CommunicateFilter(oper.Output)
344            self._BasicCheck(plist, oper)
345            self.assertEqual(plist [0], '/tmp\r\n')
346
347    def test_env(self):
348        """Check we can change environment"""
349        for add in (False, True):
350            oper = TestSubprocess.MyOperation()
351            env = os.environ
352            if add:
353                env ['FRED'] = 'fred'
354            cmd = 'echo $FRED'
355            plist = Popen(cmd, shell=True, env=env).CommunicateFilter(oper.Output)
356            self._BasicCheck(plist, oper)
357            self.assertEqual(plist [0], add and 'fred\r\n' or '\r\n')
358
359    def test_extra_args(self):
360        """Check we can't add extra arguments"""
361        self.assertRaises(ValueError, Popen, 'true', close_fds=False)
362
363    def test_basic_input(self):
364        """Check that incremental input works
365
366        We set up a subprocess which will prompt for name. When we see this prompt
367        we send the name as input to the process. It should then print the name
368        properly to stdout.
369        """
370        oper = TestSubprocess.MyOperation('Flash')
371        prompt = 'What is your name?: '
372        cmd = 'echo -n "%s"; read name; echo Hello $name' % prompt
373        plist = Popen([cmd], stdin=oper.stdin_read_pipe,
374                shell=True).CommunicateFilter(oper.Output)
375        self._BasicCheck(plist, oper)
376        self.assertEqual(len(plist [1]), 0)
377        self.assertEqual(plist [0], prompt + 'Hello Flash\r\r\n')
378
379    def test_isatty(self):
380        """Check that ptys appear as terminals to the subprocess"""
381        oper = TestSubprocess.MyOperation()
382        cmd = ('if [ -t %d ]; then echo "terminal %d" >&%d; '
383                'else echo "not %d" >&%d; fi;')
384        both_cmds = ''
385        for fd in (1, 2):
386            both_cmds += cmd % (fd, fd, fd, fd, fd)
387        plist = Popen(both_cmds, shell=True).CommunicateFilter(oper.Output)
388        self._BasicCheck(plist, oper)
389        self.assertEqual(plist [0], 'terminal 1\r\n')
390        self.assertEqual(plist [1], 'terminal 2\r\n')
391
392        # Now try with PIPE and make sure it is not a terminal
393        oper = TestSubprocess.MyOperation()
394        plist = Popen(both_cmds, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
395                shell=True).CommunicateFilter(oper.Output)
396        self._BasicCheck(plist, oper)
397        self.assertEqual(plist [0], 'not 1\n')
398        self.assertEqual(plist [1], 'not 2\n')
399
400if __name__ == '__main__':
401    unittest.main()
402