1# 2# SPDX-License-Identifier: MIT 3# 4 5from oeqa.selftest.case import OESelftestTestCase 6from oeqa.utils.commands import runCmd 7from oeqa.utils import CommandError 8 9import subprocess 10import threading 11import time 12import signal 13 14class MemLogger(object): 15 def __init__(self): 16 self.info_msgs = [] 17 self.error_msgs = [] 18 19 def info(self, msg): 20 self.info_msgs.append(msg) 21 22 def error(self, msg): 23 self.error_msgs.append(msg) 24 25class RunCmdTests(OESelftestTestCase): 26 """ Basic tests for runCmd() utility function """ 27 28 # The delta is intentionally smaller than the timeout, to detect cases where 29 # we incorrectly apply the timeout more than once. 30 TIMEOUT = 5 31 DELTA = 3 32 33 def test_result_okay(self): 34 result = runCmd("true") 35 self.assertEqual(result.status, 0) 36 37 def test_result_false(self): 38 result = runCmd("false", ignore_status=True) 39 self.assertEqual(result.status, 1) 40 41 def test_shell(self): 42 # A shell is used for all string commands. 43 result = runCmd("false; true", ignore_status=True) 44 self.assertEqual(result.status, 0) 45 46 def test_no_shell(self): 47 self.assertRaises(FileNotFoundError, 48 runCmd, "false; true", shell=False) 49 50 def test_list_not_found(self): 51 self.assertRaises(FileNotFoundError, 52 runCmd, ["false; true"]) 53 54 def test_list_okay(self): 55 result = runCmd(["true"]) 56 self.assertEqual(result.status, 0) 57 58 def test_result_assertion(self): 59 self.assertRaisesRegexp(AssertionError, "Command 'echo .* false' returned non-zero exit status 1:\nfoobar", 60 runCmd, "echo foobar >&2; false", shell=True) 61 62 def test_result_exception(self): 63 self.assertRaisesRegexp(CommandError, "Command 'echo .* false' returned non-zero exit status 1 with output: foobar", 64 runCmd, "echo foobar >&2; false", shell=True, assert_error=False) 65 66 def test_output(self): 67 result = runCmd("echo stdout; echo stderr >&2", shell=True) 68 self.assertEqual("stdout\nstderr", result.output) 69 self.assertEqual("", result.error) 70 71 def test_output_split(self): 72 result = runCmd("echo stdout; echo stderr >&2", shell=True, stderr=subprocess.PIPE) 73 self.assertEqual("stdout", result.output) 74 self.assertEqual("stderr", result.error) 75 76 def test_timeout(self): 77 numthreads = threading.active_count() 78 start = time.time() 79 # Killing a hanging process only works when not using a shell?! 80 result = runCmd(['sleep', '60'], timeout=self.TIMEOUT, ignore_status=True) 81 self.assertEqual(result.status, -signal.SIGTERM) 82 end = time.time() 83 self.assertLess(end - start, self.TIMEOUT + self.DELTA) 84 self.assertEqual(numthreads, threading.active_count(), msg="Thread counts were not equal before (%s) and after (%s), active threads: %s" % (numthreads, threading.active_count(), threading.enumerate())) 85 86 def test_timeout_split(self): 87 numthreads = threading.active_count() 88 start = time.time() 89 # Killing a hanging process only works when not using a shell?! 90 result = runCmd(['sleep', '60'], timeout=self.TIMEOUT, ignore_status=True, stderr=subprocess.PIPE) 91 self.assertEqual(result.status, -signal.SIGTERM) 92 end = time.time() 93 self.assertLess(end - start, self.TIMEOUT + self.DELTA) 94 self.assertEqual(numthreads, threading.active_count(), msg="Thread counts were not equal before (%s) and after (%s), active threads: %s" % (numthreads, threading.active_count(), threading.enumerate())) 95 96 def test_stdin(self): 97 numthreads = threading.active_count() 98 result = runCmd("cat", data=b"hello world", timeout=self.TIMEOUT) 99 self.assertEqual("hello world", result.output) 100 self.assertEqual(numthreads, threading.active_count(), msg="Thread counts were not equal before (%s) and after (%s), active threads: %s" % (numthreads, threading.active_count(), threading.enumerate())) 101 self.assertEqual(numthreads, 1) 102 103 def test_stdin_timeout(self): 104 numthreads = threading.active_count() 105 start = time.time() 106 result = runCmd(['sleep', '60'], data=b"hello world", timeout=self.TIMEOUT, ignore_status=True) 107 self.assertEqual(result.status, -signal.SIGTERM) 108 end = time.time() 109 self.assertLess(end - start, self.TIMEOUT + self.DELTA) 110 self.assertEqual(numthreads, threading.active_count(), msg="Thread counts were not equal before (%s) and after (%s), active threads: %s" % (numthreads, threading.active_count(), threading.enumerate())) 111 112 def test_log(self): 113 log = MemLogger() 114 result = runCmd("echo stdout; echo stderr >&2", shell=True, output_log=log) 115 self.assertEqual(["Running: echo stdout; echo stderr >&2", "stdout", "stderr"], log.info_msgs) 116 self.assertEqual([], log.error_msgs) 117 118 def test_log_split(self): 119 log = MemLogger() 120 result = runCmd("echo stdout; echo stderr >&2", shell=True, output_log=log, stderr=subprocess.PIPE) 121 self.assertEqual(["Running: echo stdout; echo stderr >&2", "stdout"], log.info_msgs) 122 self.assertEqual(["stderr"], log.error_msgs) 123