1#
2# Copyright OpenEmbedded Contributors
3#
4# SPDX-License-Identifier: MIT
5#
6
7from oeqa.selftest.case import OESelftestTestCase
8from oeqa.utils.commands import runCmd
9from oeqa.utils import CommandError
10
11import subprocess
12import threading
13import time
14import signal
15
16class MemLogger(object):
17    def __init__(self):
18        self.info_msgs = []
19        self.error_msgs = []
20
21    def info(self, msg):
22        self.info_msgs.append(msg)
23
24    def error(self, msg):
25        self.error_msgs.append(msg)
26
27class RunCmdTests(OESelftestTestCase):
28    """ Basic tests for runCmd() utility function """
29
30    # The delta is intentionally smaller than the timeout, to detect cases where
31    # we incorrectly apply the timeout more than once.
32    TIMEOUT = 10
33    DELTA = 8
34
35    def test_result_okay(self):
36        result = runCmd("true")
37        self.assertEqual(result.status, 0)
38
39    def test_result_false(self):
40        result = runCmd("false", ignore_status=True)
41        self.assertEqual(result.status, 1)
42
43    def test_shell(self):
44        # A shell is used for all string commands.
45        result = runCmd("false; true", ignore_status=True)
46        self.assertEqual(result.status, 0)
47
48    def test_no_shell(self):
49        self.assertRaises(FileNotFoundError,
50                          runCmd, "false; true", shell=False)
51
52    def test_list_not_found(self):
53        self.assertRaises(FileNotFoundError,
54                          runCmd, ["false; true"])
55
56    def test_list_okay(self):
57        result = runCmd(["true"])
58        self.assertEqual(result.status, 0)
59
60    def test_result_assertion(self):
61        self.assertRaisesRegex(AssertionError, "Command 'echo .* false' returned non-zero exit status 1:\nfoobar",
62                                runCmd, "echo foobar >&2; false", shell=True)
63
64    def test_result_exception(self):
65        self.assertRaisesRegex(CommandError, "Command 'echo .* false' returned non-zero exit status 1 with output: foobar",
66                                runCmd, "echo foobar >&2; false", shell=True, assert_error=False)
67
68    def test_output(self):
69        result = runCmd("echo stdout; echo stderr >&2", shell=True, sync=False)
70        self.assertEqual("stdout\nstderr", result.output)
71        self.assertEqual("", result.error)
72
73    def test_output_split(self):
74        result = runCmd("echo stdout; echo stderr >&2", shell=True, stderr=subprocess.PIPE, sync=False)
75        self.assertEqual("stdout", result.output)
76        self.assertEqual("stderr", result.error)
77
78    def test_timeout(self):
79        numthreads = threading.active_count()
80        start = time.time()
81        # Killing a hanging process only works when not using a shell?!
82        result = runCmd(['sleep', '60'], timeout=self.TIMEOUT, ignore_status=True, sync=False)
83        self.assertEqual(result.status, -signal.SIGTERM)
84        end = time.time()
85        self.assertLess(end - start, self.TIMEOUT + self.DELTA)
86        self.assertEqual(numthreads, threading.active_count(), msg="Thread counts were not equal before (%s) and after (%s), active threads: %s" % (numthreads, threading.active_count(), threading.enumerate()))
87
88    def test_timeout_split(self):
89        numthreads = threading.active_count()
90        start = time.time()
91        # Killing a hanging process only works when not using a shell?!
92        result = runCmd(['sleep', '60'], timeout=self.TIMEOUT, ignore_status=True, stderr=subprocess.PIPE, sync=False)
93        self.assertEqual(result.status, -signal.SIGTERM)
94        end = time.time()
95        self.assertLess(end - start, self.TIMEOUT + self.DELTA)
96        self.assertEqual(numthreads, threading.active_count(), msg="Thread counts were not equal before (%s) and after (%s), active threads: %s" % (numthreads, threading.active_count(), threading.enumerate()))
97
98    def test_stdin(self):
99        numthreads = threading.active_count()
100        result = runCmd("cat", data=b"hello world", timeout=self.TIMEOUT, sync=False)
101        self.assertEqual("hello world", result.output)
102        self.assertEqual(numthreads, threading.active_count(), msg="Thread counts were not equal before (%s) and after (%s), active threads: %s" % (numthreads, threading.active_count(), threading.enumerate()))
103        self.assertEqual(numthreads, 1)
104
105    def test_stdin_timeout(self):
106        numthreads = threading.active_count()
107        start = time.time()
108        result = runCmd(['sleep', '60'], data=b"hello world", timeout=self.TIMEOUT, ignore_status=True, sync=False)
109        self.assertEqual(result.status, -signal.SIGTERM)
110        end = time.time()
111        self.assertLess(end - start, self.TIMEOUT + self.DELTA)
112        self.assertEqual(numthreads, threading.active_count(), msg="Thread counts were not equal before (%s) and after (%s), active threads: %s" % (numthreads, threading.active_count(), threading.enumerate()))
113
114    def test_log(self):
115        log = MemLogger()
116        result = runCmd("echo stdout; echo stderr >&2", shell=True, output_log=log, sync=False)
117        self.assertEqual(["Running: echo stdout; echo stderr >&2", "stdout", "stderr"], log.info_msgs)
118        self.assertEqual([], log.error_msgs)
119
120    def test_log_split(self):
121        log = MemLogger()
122        result = runCmd("echo stdout; echo stderr >&2", shell=True, output_log=log, stderr=subprocess.PIPE, sync=False)
123        self.assertEqual(["Running: echo stdout; echo stderr >&2", "stdout"], log.info_msgs)
124        self.assertEqual(["stderr"], log.error_msgs)
125