1# Copyright (c) 2012 The Chromium OS Authors. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4# 5# Copyright (c) 2003-2005 by Peter Astrand <astrand@lysator.liu.se> 6# Licensed to PSF under a Contributor Agreement. 7# See http://www.python.org/2.4/license for licensing details. 8 9"""Subprocress execution 10 11This module holds a subclass of subprocess.Popen with our own required 12features, mainly that we get access to the subprocess output while it 13is running rather than just at the end. This makes it easiler to show 14progress information and filter output in real time. 15""" 16 17import errno 18import os 19import pty 20import select 21import subprocess 22import sys 23import unittest 24 25 26# Import these here so the caller does not need to import subprocess also. 27PIPE = subprocess.PIPE 28STDOUT = subprocess.STDOUT 29PIPE_PTY = -3 # Pipe output through a pty 30stay_alive = True 31 32 33class Popen(subprocess.Popen): 34 """Like subprocess.Popen with ptys and incremental output 35 36 This class deals with running a child process and filtering its output on 37 both stdout and stderr while it is running. We do this so we can monitor 38 progress, and possibly relay the output to the user if requested. 39 40 The class is similar to subprocess.Popen, the equivalent is something like: 41 42 Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE) 43 44 But this class has many fewer features, and two enhancement: 45 46 1. Rather than getting the output data only at the end, this class sends it 47 to a provided operation as it arrives. 48 2. We use pseudo terminals so that the child will hopefully flush its output 49 to us as soon as it is produced, rather than waiting for the end of a 50 line. 51 52 Use CommunicateFilter() to handle output from the subprocess. 53 54 """ 55 56 def __init__(self, args, stdin=None, stdout=PIPE_PTY, stderr=PIPE_PTY, 57 shell=False, cwd=None, env=None, **kwargs): 58 """Cut-down constructor 59 60 Args: 61 args: Program and arguments for subprocess to execute. 62 stdin: See subprocess.Popen() 63 stdout: See subprocess.Popen(), except that we support the sentinel 64 value of cros_subprocess.PIPE_PTY. 65 stderr: See subprocess.Popen(), except that we support the sentinel 66 value of cros_subprocess.PIPE_PTY. 67 shell: See subprocess.Popen() 68 cwd: Working directory to change to for subprocess, or None if none. 69 env: Environment to use for this subprocess, or None to inherit parent. 70 kwargs: No other arguments are supported at the moment. Passing other 71 arguments will cause a ValueError to be raised. 72 """ 73 stdout_pty = None 74 stderr_pty = None 75 76 if stdout == PIPE_PTY: 77 stdout_pty = pty.openpty() 78 stdout = os.fdopen(stdout_pty[1]) 79 if stderr == PIPE_PTY: 80 stderr_pty = pty.openpty() 81 stderr = os.fdopen(stderr_pty[1]) 82 83 super(Popen, self).__init__(args, stdin=stdin, 84 stdout=stdout, stderr=stderr, shell=shell, cwd=cwd, env=env, 85 **kwargs) 86 87 # If we're on a PTY, we passed the slave half of the PTY to the subprocess. 88 # We want to use the master half on our end from now on. Setting this here 89 # does make some assumptions about the implementation of subprocess, but 90 # those assumptions are pretty minor. 91 92 # Note that if stderr is STDOUT, then self.stderr will be set to None by 93 # this constructor. 94 if stdout_pty is not None: 95 self.stdout = os.fdopen(stdout_pty[0]) 96 if stderr_pty is not None: 97 self.stderr = os.fdopen(stderr_pty[0]) 98 99 # Insist that unit tests exist for other arguments we don't support. 100 if kwargs: 101 raise ValueError("Unit tests do not test extra args - please add tests") 102 103 def CommunicateFilter(self, output): 104 """Interact with process: Read data from stdout and stderr. 105 106 This method runs until end-of-file is reached, then waits for the 107 subprocess to terminate. 108 109 The output function is sent all output from the subprocess and must be 110 defined like this: 111 112 def Output([self,] stream, data) 113 Args: 114 stream: the stream the output was received on, which will be 115 sys.stdout or sys.stderr. 116 data: a string containing the data 117 118 Note: The data read is buffered in memory, so do not use this 119 method if the data size is large or unlimited. 120 121 Args: 122 output: Function to call with each fragment of output. 123 124 Returns: 125 A tuple (stdout, stderr, combined) which is the data received on 126 stdout, stderr and the combined data (interleaved stdout and stderr). 127 128 Note that the interleaved output will only be sensible if you have 129 set both stdout and stderr to PIPE or PIPE_PTY. Even then it depends on 130 the timing of the output in the subprocess. If a subprocess flips 131 between stdout and stderr quickly in succession, by the time we come to 132 read the output from each we may see several lines in each, and will read 133 all the stdout lines, then all the stderr lines. So the interleaving 134 may not be correct. In this case you might want to pass 135 stderr=cros_subprocess.STDOUT to the constructor. 136 137 This feature is still useful for subprocesses where stderr is 138 rarely used and indicates an error. 139 140 Note also that if you set stderr to STDOUT, then stderr will be empty 141 and the combined output will just be the same as stdout. 142 """ 143 144 read_set = [] 145 write_set = [] 146 stdout = None # Return 147 stderr = None # Return 148 149 if self.stdin: 150 # Flush stdio buffer. This might block, if the user has 151 # been writing to .stdin in an uncontrolled fashion. 152 self.stdin.flush() 153 if input: 154 write_set.append(self.stdin) 155 else: 156 self.stdin.close() 157 if self.stdout: 158 read_set.append(self.stdout) 159 stdout = [] 160 if self.stderr and self.stderr != self.stdout: 161 read_set.append(self.stderr) 162 stderr = [] 163 combined = [] 164 165 input_offset = 0 166 while read_set or write_set: 167 try: 168 rlist, wlist, _ = select.select(read_set, write_set, [], 0.2) 169 except select.error as e: 170 if e.args[0] == errno.EINTR: 171 continue 172 raise 173 174 if not stay_alive: 175 self.terminate() 176 177 if self.stdin in wlist: 178 # When select has indicated that the file is writable, 179 # we can write up to PIPE_BUF bytes without risk 180 # blocking. POSIX defines PIPE_BUF >= 512 181 chunk = input[input_offset : input_offset + 512] 182 bytes_written = os.write(self.stdin.fileno(), chunk) 183 input_offset += bytes_written 184 if input_offset >= len(input): 185 self.stdin.close() 186 write_set.remove(self.stdin) 187 188 if self.stdout in rlist: 189 data = "" 190 # We will get an error on read if the pty is closed 191 try: 192 data = os.read(self.stdout.fileno(), 1024) 193 except OSError: 194 pass 195 if data == "": 196 self.stdout.close() 197 read_set.remove(self.stdout) 198 else: 199 stdout.append(data) 200 combined.append(data) 201 if output: 202 output(sys.stdout, data) 203 if self.stderr in rlist: 204 data = "" 205 # We will get an error on read if the pty is closed 206 try: 207 data = os.read(self.stderr.fileno(), 1024) 208 except OSError: 209 pass 210 if data == "": 211 self.stderr.close() 212 read_set.remove(self.stderr) 213 else: 214 stderr.append(data) 215 combined.append(data) 216 if output: 217 output(sys.stderr, data) 218 219 # All data exchanged. Translate lists into strings. 220 if stdout is not None: 221 stdout = ''.join(stdout) 222 else: 223 stdout = '' 224 if stderr is not None: 225 stderr = ''.join(stderr) 226 else: 227 stderr = '' 228 combined = ''.join(combined) 229 230 # Translate newlines, if requested. We cannot let the file 231 # object do the translation: It is based on stdio, which is 232 # impossible to combine with select (unless forcing no 233 # buffering). 234 if self.universal_newlines and hasattr(file, 'newlines'): 235 if stdout: 236 stdout = self._translate_newlines(stdout) 237 if stderr: 238 stderr = self._translate_newlines(stderr) 239 240 self.wait() 241 return (stdout, stderr, combined) 242 243 244# Just being a unittest.TestCase gives us 14 public methods. Unless we 245# disable this, we can only have 6 tests in a TestCase. That's not enough. 246# 247# pylint: disable=R0904 248 249class TestSubprocess(unittest.TestCase): 250 """Our simple unit test for this module""" 251 252 class MyOperation: 253 """Provides a operation that we can pass to Popen""" 254 def __init__(self, input_to_send=None): 255 """Constructor to set up the operation and possible input. 256 257 Args: 258 input_to_send: a text string to send when we first get input. We will 259 add \r\n to the string. 260 """ 261 self.stdout_data = '' 262 self.stderr_data = '' 263 self.combined_data = '' 264 self.stdin_pipe = None 265 self._input_to_send = input_to_send 266 if input_to_send: 267 pipe = os.pipe() 268 self.stdin_read_pipe = pipe[0] 269 self._stdin_write_pipe = os.fdopen(pipe[1], 'w') 270 271 def Output(self, stream, data): 272 """Output handler for Popen. Stores the data for later comparison""" 273 if stream == sys.stdout: 274 self.stdout_data += data 275 if stream == sys.stderr: 276 self.stderr_data += data 277 self.combined_data += data 278 279 # Output the input string if we have one. 280 if self._input_to_send: 281 self._stdin_write_pipe.write(self._input_to_send + '\r\n') 282 self._stdin_write_pipe.flush() 283 284 def _BasicCheck(self, plist, oper): 285 """Basic checks that the output looks sane.""" 286 self.assertEqual(plist[0], oper.stdout_data) 287 self.assertEqual(plist[1], oper.stderr_data) 288 self.assertEqual(plist[2], oper.combined_data) 289 290 # The total length of stdout and stderr should equal the combined length 291 self.assertEqual(len(plist[0]) + len(plist[1]), len(plist[2])) 292 293 def test_simple(self): 294 """Simple redirection: Get process list""" 295 oper = TestSubprocess.MyOperation() 296 plist = Popen(['ps']).CommunicateFilter(oper.Output) 297 self._BasicCheck(plist, oper) 298 299 def test_stderr(self): 300 """Check stdout and stderr""" 301 oper = TestSubprocess.MyOperation() 302 cmd = 'echo fred >/dev/stderr && false || echo bad' 303 plist = Popen([cmd], shell=True).CommunicateFilter(oper.Output) 304 self._BasicCheck(plist, oper) 305 self.assertEqual(plist [0], 'bad\r\n') 306 self.assertEqual(plist [1], 'fred\r\n') 307 308 def test_shell(self): 309 """Check with and without shell works""" 310 oper = TestSubprocess.MyOperation() 311 cmd = 'echo test >/dev/stderr' 312 self.assertRaises(OSError, Popen, [cmd], shell=False) 313 plist = Popen([cmd], shell=True).CommunicateFilter(oper.Output) 314 self._BasicCheck(plist, oper) 315 self.assertEqual(len(plist [0]), 0) 316 self.assertEqual(plist [1], 'test\r\n') 317 318 def test_list_args(self): 319 """Check with and without shell works using list arguments""" 320 oper = TestSubprocess.MyOperation() 321 cmd = ['echo', 'test', '>/dev/stderr'] 322 plist = Popen(cmd, shell=False).CommunicateFilter(oper.Output) 323 self._BasicCheck(plist, oper) 324 self.assertEqual(plist [0], ' '.join(cmd[1:]) + '\r\n') 325 self.assertEqual(len(plist [1]), 0) 326 327 oper = TestSubprocess.MyOperation() 328 329 # this should be interpreted as 'echo' with the other args dropped 330 cmd = ['echo', 'test', '>/dev/stderr'] 331 plist = Popen(cmd, shell=True).CommunicateFilter(oper.Output) 332 self._BasicCheck(plist, oper) 333 self.assertEqual(plist [0], '\r\n') 334 335 def test_cwd(self): 336 """Check we can change directory""" 337 for shell in (False, True): 338 oper = TestSubprocess.MyOperation() 339 plist = Popen('pwd', shell=shell, cwd='/tmp').CommunicateFilter(oper.Output) 340 self._BasicCheck(plist, oper) 341 self.assertEqual(plist [0], '/tmp\r\n') 342 343 def test_env(self): 344 """Check we can change environment""" 345 for add in (False, True): 346 oper = TestSubprocess.MyOperation() 347 env = os.environ 348 if add: 349 env ['FRED'] = 'fred' 350 cmd = 'echo $FRED' 351 plist = Popen(cmd, shell=True, env=env).CommunicateFilter(oper.Output) 352 self._BasicCheck(plist, oper) 353 self.assertEqual(plist [0], add and 'fred\r\n' or '\r\n') 354 355 def test_extra_args(self): 356 """Check we can't add extra arguments""" 357 self.assertRaises(ValueError, Popen, 'true', close_fds=False) 358 359 def test_basic_input(self): 360 """Check that incremental input works 361 362 We set up a subprocess which will prompt for name. When we see this prompt 363 we send the name as input to the process. It should then print the name 364 properly to stdout. 365 """ 366 oper = TestSubprocess.MyOperation('Flash') 367 prompt = 'What is your name?: ' 368 cmd = 'echo -n "%s"; read name; echo Hello $name' % prompt 369 plist = Popen([cmd], stdin=oper.stdin_read_pipe, 370 shell=True).CommunicateFilter(oper.Output) 371 self._BasicCheck(plist, oper) 372 self.assertEqual(len(plist [1]), 0) 373 self.assertEqual(plist [0], prompt + 'Hello Flash\r\r\n') 374 375 def test_isatty(self): 376 """Check that ptys appear as terminals to the subprocess""" 377 oper = TestSubprocess.MyOperation() 378 cmd = ('if [ -t %d ]; then echo "terminal %d" >&%d; ' 379 'else echo "not %d" >&%d; fi;') 380 both_cmds = '' 381 for fd in (1, 2): 382 both_cmds += cmd % (fd, fd, fd, fd, fd) 383 plist = Popen(both_cmds, shell=True).CommunicateFilter(oper.Output) 384 self._BasicCheck(plist, oper) 385 self.assertEqual(plist [0], 'terminal 1\r\n') 386 self.assertEqual(plist [1], 'terminal 2\r\n') 387 388 # Now try with PIPE and make sure it is not a terminal 389 oper = TestSubprocess.MyOperation() 390 plist = Popen(both_cmds, stdout=subprocess.PIPE, stderr=subprocess.PIPE, 391 shell=True).CommunicateFilter(oper.Output) 392 self._BasicCheck(plist, oper) 393 self.assertEqual(plist [0], 'not 1\n') 394 self.assertEqual(plist [1], 'not 2\n') 395 396if __name__ == '__main__': 397 unittest.main() 398