1# 2# Copyright OpenEmbedded Contributors 3# 4# SPDX-License-Identifier: MIT 5# 6 7from oeqa.selftest.case import OESelftestTestCase 8from oeqa.utils.commands import runCmd 9from oeqa.utils import CommandError 10 11import subprocess 12import threading 13import time 14import signal 15 16class MemLogger(object): 17 def __init__(self): 18 self.info_msgs = [] 19 self.error_msgs = [] 20 21 def info(self, msg): 22 self.info_msgs.append(msg) 23 24 def error(self, msg): 25 self.error_msgs.append(msg) 26 27class RunCmdTests(OESelftestTestCase): 28 """ Basic tests for runCmd() utility function """ 29 30 # The delta is intentionally smaller than the timeout, to detect cases where 31 # we incorrectly apply the timeout more than once. 32 TIMEOUT = 10 33 DELTA = 8 34 35 def test_result_okay(self): 36 result = runCmd("true") 37 self.assertEqual(result.status, 0) 38 39 def test_result_false(self): 40 result = runCmd("false", ignore_status=True) 41 self.assertEqual(result.status, 1) 42 43 def test_shell(self): 44 # A shell is used for all string commands. 45 result = runCmd("false; true", ignore_status=True) 46 self.assertEqual(result.status, 0) 47 48 def test_no_shell(self): 49 self.assertRaises(FileNotFoundError, 50 runCmd, "false; true", shell=False) 51 52 def test_list_not_found(self): 53 self.assertRaises(FileNotFoundError, 54 runCmd, ["false; true"]) 55 56 def test_list_okay(self): 57 result = runCmd(["true"]) 58 self.assertEqual(result.status, 0) 59 60 def test_result_assertion(self): 61 self.assertRaisesRegex(AssertionError, "Command 'echo .* false' returned non-zero exit status 1:\nfoobar", 62 runCmd, "echo foobar >&2; false", shell=True) 63 64 def test_result_exception(self): 65 self.assertRaisesRegex(CommandError, "Command 'echo .* false' returned non-zero exit status 1 with output: foobar", 66 runCmd, "echo foobar >&2; false", shell=True, assert_error=False) 67 68 def test_output(self): 69 result = runCmd("echo stdout; echo stderr >&2", shell=True, sync=False) 70 self.assertEqual("stdout\nstderr", result.output) 71 self.assertEqual("", result.error) 72 73 def test_output_split(self): 74 result = runCmd("echo stdout; echo stderr >&2", shell=True, stderr=subprocess.PIPE, sync=False) 75 self.assertEqual("stdout", result.output) 76 self.assertEqual("stderr", result.error) 77 78 def test_timeout(self): 79 numthreads = threading.active_count() 80 start = time.time() 81 # Killing a hanging process only works when not using a shell?! 82 result = runCmd(['sleep', '60'], timeout=self.TIMEOUT, ignore_status=True, sync=False) 83 self.assertEqual(result.status, -signal.SIGTERM) 84 end = time.time() 85 self.assertLess(end - start, self.TIMEOUT + self.DELTA) 86 self.assertEqual(numthreads, threading.active_count(), msg="Thread counts were not equal before (%s) and after (%s), active threads: %s" % (numthreads, threading.active_count(), threading.enumerate())) 87 88 def test_timeout_split(self): 89 numthreads = threading.active_count() 90 start = time.time() 91 # Killing a hanging process only works when not using a shell?! 92 result = runCmd(['sleep', '60'], timeout=self.TIMEOUT, ignore_status=True, stderr=subprocess.PIPE, sync=False) 93 self.assertEqual(result.status, -signal.SIGTERM) 94 end = time.time() 95 self.assertLess(end - start, self.TIMEOUT + self.DELTA) 96 self.assertEqual(numthreads, threading.active_count(), msg="Thread counts were not equal before (%s) and after (%s), active threads: %s" % (numthreads, threading.active_count(), threading.enumerate())) 97 98 def test_stdin(self): 99 numthreads = threading.active_count() 100 result = runCmd("cat", data=b"hello world", timeout=self.TIMEOUT, sync=False) 101 self.assertEqual("hello world", result.output) 102 self.assertEqual(numthreads, threading.active_count(), msg="Thread counts were not equal before (%s) and after (%s), active threads: %s" % (numthreads, threading.active_count(), threading.enumerate())) 103 self.assertEqual(numthreads, 1) 104 105 def test_stdin_timeout(self): 106 numthreads = threading.active_count() 107 start = time.time() 108 result = runCmd(['sleep', '60'], data=b"hello world", timeout=self.TIMEOUT, ignore_status=True, sync=False) 109 self.assertEqual(result.status, -signal.SIGTERM) 110 end = time.time() 111 self.assertLess(end - start, self.TIMEOUT + self.DELTA) 112 self.assertEqual(numthreads, threading.active_count(), msg="Thread counts were not equal before (%s) and after (%s), active threads: %s" % (numthreads, threading.active_count(), threading.enumerate())) 113 114 def test_log(self): 115 log = MemLogger() 116 result = runCmd("echo stdout; echo stderr >&2", shell=True, output_log=log, sync=False) 117 self.assertEqual(["Running: echo stdout; echo stderr >&2", "stdout", "stderr"], log.info_msgs) 118 self.assertEqual([], log.error_msgs) 119 120 def test_log_split(self): 121 log = MemLogger() 122 result = runCmd("echo stdout; echo stderr >&2", shell=True, output_log=log, stderr=subprocess.PIPE, sync=False) 123 self.assertEqual(["Running: echo stdout; echo stderr >&2", "stdout"], log.info_msgs) 124 self.assertEqual(["stderr"], log.error_msgs) 125