1#
2# SPDX-License-Identifier: MIT
3#
4
5from oeqa.selftest.case import OESelftestTestCase
6from oeqa.utils.commands import runCmd
7from oeqa.utils import CommandError
8
9import subprocess
10import threading
11import time
12import signal
13
14class MemLogger(object):
15    def __init__(self):
16        self.info_msgs = []
17        self.error_msgs = []
18
19    def info(self, msg):
20        self.info_msgs.append(msg)
21
22    def error(self, msg):
23        self.error_msgs.append(msg)
24
25class RunCmdTests(OESelftestTestCase):
26    """ Basic tests for runCmd() utility function """
27
28    # The delta is intentionally smaller than the timeout, to detect cases where
29    # we incorrectly apply the timeout more than once.
30    TIMEOUT = 5
31    DELTA = 3
32
33    def test_result_okay(self):
34        result = runCmd("true")
35        self.assertEqual(result.status, 0)
36
37    def test_result_false(self):
38        result = runCmd("false", ignore_status=True)
39        self.assertEqual(result.status, 1)
40
41    def test_shell(self):
42        # A shell is used for all string commands.
43        result = runCmd("false; true", ignore_status=True)
44        self.assertEqual(result.status, 0)
45
46    def test_no_shell(self):
47        self.assertRaises(FileNotFoundError,
48                          runCmd, "false; true", shell=False)
49
50    def test_list_not_found(self):
51        self.assertRaises(FileNotFoundError,
52                          runCmd, ["false; true"])
53
54    def test_list_okay(self):
55        result = runCmd(["true"])
56        self.assertEqual(result.status, 0)
57
58    def test_result_assertion(self):
59        self.assertRaisesRegexp(AssertionError, "Command 'echo .* false' returned non-zero exit status 1:\nfoobar",
60                                runCmd, "echo foobar >&2; false", shell=True)
61
62    def test_result_exception(self):
63        self.assertRaisesRegexp(CommandError, "Command 'echo .* false' returned non-zero exit status 1 with output: foobar",
64                                runCmd, "echo foobar >&2; false", shell=True, assert_error=False)
65
66    def test_output(self):
67        result = runCmd("echo stdout; echo stderr >&2", shell=True)
68        self.assertEqual("stdout\nstderr", result.output)
69        self.assertEqual("", result.error)
70
71    def test_output_split(self):
72        result = runCmd("echo stdout; echo stderr >&2", shell=True, stderr=subprocess.PIPE)
73        self.assertEqual("stdout", result.output)
74        self.assertEqual("stderr", result.error)
75
76    def test_timeout(self):
77        numthreads = threading.active_count()
78        start = time.time()
79        # Killing a hanging process only works when not using a shell?!
80        result = runCmd(['sleep', '60'], timeout=self.TIMEOUT, ignore_status=True)
81        self.assertEqual(result.status, -signal.SIGTERM)
82        end = time.time()
83        self.assertLess(end - start, self.TIMEOUT + self.DELTA)
84        self.assertEqual(numthreads, threading.active_count(), msg="Thread counts were not equal before (%s) and after (%s), active threads: %s" % (numthreads, threading.active_count(), threading.enumerate()))
85
86    def test_timeout_split(self):
87        numthreads = threading.active_count()
88        start = time.time()
89        # Killing a hanging process only works when not using a shell?!
90        result = runCmd(['sleep', '60'], timeout=self.TIMEOUT, ignore_status=True, stderr=subprocess.PIPE)
91        self.assertEqual(result.status, -signal.SIGTERM)
92        end = time.time()
93        self.assertLess(end - start, self.TIMEOUT + self.DELTA)
94        self.assertEqual(numthreads, threading.active_count(), msg="Thread counts were not equal before (%s) and after (%s), active threads: %s" % (numthreads, threading.active_count(), threading.enumerate()))
95
96    def test_stdin(self):
97        numthreads = threading.active_count()
98        result = runCmd("cat", data=b"hello world", timeout=self.TIMEOUT)
99        self.assertEqual("hello world", result.output)
100        self.assertEqual(numthreads, threading.active_count(), msg="Thread counts were not equal before (%s) and after (%s), active threads: %s" % (numthreads, threading.active_count(), threading.enumerate()))
101        self.assertEqual(numthreads, 1)
102
103    def test_stdin_timeout(self):
104        numthreads = threading.active_count()
105        start = time.time()
106        result = runCmd(['sleep', '60'], data=b"hello world", timeout=self.TIMEOUT, ignore_status=True)
107        self.assertEqual(result.status, -signal.SIGTERM)
108        end = time.time()
109        self.assertLess(end - start, self.TIMEOUT + self.DELTA)
110        self.assertEqual(numthreads, threading.active_count(), msg="Thread counts were not equal before (%s) and after (%s), active threads: %s" % (numthreads, threading.active_count(), threading.enumerate()))
111
112    def test_log(self):
113        log = MemLogger()
114        result = runCmd("echo stdout; echo stderr >&2", shell=True, output_log=log)
115        self.assertEqual(["Running: echo stdout; echo stderr >&2", "stdout", "stderr"], log.info_msgs)
116        self.assertEqual([], log.error_msgs)
117
118    def test_log_split(self):
119        log = MemLogger()
120        result = runCmd("echo stdout; echo stderr >&2", shell=True, output_log=log, stderr=subprocess.PIPE)
121        self.assertEqual(["Running: echo stdout; echo stderr >&2", "stdout"], log.info_msgs)
122        self.assertEqual(["stderr"], log.error_msgs)
123