1#!/usr/bin/env python3
2#
3# SPDX-License-Identifier: GPL-2.0-or-later
4#
5# Modified for use in OE by Richard Purdie, 2018
6#
7# Modified by: Corey Goldberg, 2013
8#   License: GPLv2+
9#
10# Original code from:
11#   Bazaar (bzrlib.tests.__init__.py, v2.6, copied Jun 01 2013)
12#   Copyright (C) 2005-2011 Canonical Ltd
13#   License: GPLv2+
14
15import os
16import sys
17import traceback
18import unittest
19import subprocess
20import testtools
21import threading
22import time
23import io
24import json
25import subunit
26
27from queue import Queue
28from itertools import cycle
29from subunit import ProtocolTestCase, TestProtocolClient
30from subunit.test_results import AutoTimingTestResultDecorator
31from testtools import ThreadsafeForwardingResult, iterate_tests
32from testtools.content import Content
33from testtools.content_type import ContentType
34from oeqa.utils.commands import get_test_layer
35
36import bb.utils
37import oe.path
38
39_all__ = [
40    'ConcurrentTestSuite',
41    'fork_for_tests',
42    'partition_tests',
43]
44
45#
46# Patch the version from testtools to allow access to _test_start and allow
47# computation of timing information and threading progress
48#
49class BBThreadsafeForwardingResult(ThreadsafeForwardingResult):
50
51    def __init__(self, target, semaphore, threadnum, totalinprocess, totaltests):
52        super(BBThreadsafeForwardingResult, self).__init__(target, semaphore)
53        self.threadnum = threadnum
54        self.totalinprocess = totalinprocess
55        self.totaltests = totaltests
56
57    def _add_result_with_semaphore(self, method, test, *args, **kwargs):
58        self.semaphore.acquire()
59        try:
60            if self._test_start:
61                self.result.starttime[test.id()] = self._test_start.timestamp()
62                self.result.threadprogress[self.threadnum].append(test.id())
63                totalprogress = sum(len(x) for x in self.result.threadprogress.values())
64                self.result.progressinfo[test.id()] = "%s: %s/%s %s/%s (%ss) (%s)" % (
65                    self.threadnum,
66                    len(self.result.threadprogress[self.threadnum]),
67                    self.totalinprocess,
68                    totalprogress,
69                    self.totaltests,
70                    "{0:.2f}".format(time.time()-self._test_start.timestamp()),
71                    test.id())
72        finally:
73            self.semaphore.release()
74        super(BBThreadsafeForwardingResult, self)._add_result_with_semaphore(method, test, *args, **kwargs)
75
76class ProxyTestResult:
77    # a very basic TestResult proxy, in order to modify add* calls
78    def __init__(self, target):
79        self.result = target
80
81    def _addResult(self, method, test, *args, exception = False, **kwargs):
82        return method(test, *args, **kwargs)
83
84    def addError(self, test, err = None, **kwargs):
85        self._addResult(self.result.addError, test, err, exception = True, **kwargs)
86
87    def addFailure(self, test, err = None, **kwargs):
88        self._addResult(self.result.addFailure, test, err, exception = True, **kwargs)
89
90    def addSuccess(self, test, **kwargs):
91        self._addResult(self.result.addSuccess, test, **kwargs)
92
93    def addExpectedFailure(self, test, err = None, **kwargs):
94        self._addResult(self.result.addExpectedFailure, test, err, exception = True, **kwargs)
95
96    def addUnexpectedSuccess(self, test, **kwargs):
97        self._addResult(self.result.addUnexpectedSuccess, test, **kwargs)
98
99    def __getattr__(self, attr):
100        return getattr(self.result, attr)
101
102class ExtraResultsDecoderTestResult(ProxyTestResult):
103    def _addResult(self, method, test, *args, exception = False, **kwargs):
104        if "details" in kwargs and "extraresults" in kwargs["details"]:
105            if isinstance(kwargs["details"]["extraresults"], Content):
106                kwargs = kwargs.copy()
107                kwargs["details"] = kwargs["details"].copy()
108                extraresults = kwargs["details"]["extraresults"]
109                data = bytearray()
110                for b in extraresults.iter_bytes():
111                    data += b
112                extraresults = json.loads(data.decode())
113                kwargs["details"]["extraresults"] = extraresults
114        return method(test, *args, **kwargs)
115
116class ExtraResultsEncoderTestResult(ProxyTestResult):
117    def _addResult(self, method, test, *args, exception = False, **kwargs):
118        if hasattr(test, "extraresults"):
119            extras = lambda : [json.dumps(test.extraresults).encode()]
120            kwargs = kwargs.copy()
121            if "details" not in kwargs:
122                kwargs["details"] = {}
123            else:
124                kwargs["details"] = kwargs["details"].copy()
125            kwargs["details"]["extraresults"] = Content(ContentType("application", "json", {'charset': 'utf8'}), extras)
126        # if using details, need to encode any exceptions into the details obj,
127        # testtools does not handle "err" and "details" together.
128        if "details" in kwargs and exception and (len(args) >= 1 and args[0] is not None):
129            kwargs["details"]["traceback"] = testtools.content.TracebackContent(args[0], test)
130            args = []
131        return method(test, *args, **kwargs)
132
133#
134# We have to patch subunit since it doesn't understand how to handle addError
135# outside of a running test case. This can happen if classSetUp() fails
136# for a class of tests. This unfortunately has horrible internal knowledge.
137#
138def outSideTestaddError(self, offset, line):
139    """An 'error:' directive has been read."""
140    test_name = line[offset:-1].decode('utf8')
141    self.parser._current_test = subunit.RemotedTestCase(test_name)
142    self.parser.current_test_description = test_name
143    self.parser._state = self.parser._reading_error_details
144    self.parser._reading_error_details.set_simple()
145    self.parser.subunitLineReceived(line)
146
147subunit._OutSideTest.addError = outSideTestaddError
148
149
150#
151# A dummy structure to add to io.StringIO so that the .buffer object
152# is available and accepts writes. This allows unittest with buffer=True
153# to interact ok with subunit which wants to access sys.stdout.buffer.
154#
155class dummybuf(object):
156   def __init__(self, parent):
157       self.p = parent
158   def write(self, data):
159       self.p.write(data.decode("utf-8"))
160
161#
162# Taken from testtools.ConncurrencyTestSuite but modified for OE use
163#
164class ConcurrentTestSuite(unittest.TestSuite):
165
166    def __init__(self, suite, processes):
167        super(ConcurrentTestSuite, self).__init__([suite])
168        self.processes = processes
169
170    def run(self, result):
171        tests, totaltests = fork_for_tests(self.processes, self)
172        try:
173            threads = {}
174            queue = Queue()
175            semaphore = threading.Semaphore(1)
176            result.threadprogress = {}
177            for i, (test, testnum) in enumerate(tests):
178                result.threadprogress[i] = []
179                process_result = BBThreadsafeForwardingResult(
180                        ExtraResultsDecoderTestResult(result),
181                        semaphore, i, testnum, totaltests)
182                # Force buffering of stdout/stderr so the console doesn't get corrupted by test output
183                # as per default in parent code
184                process_result.buffer = True
185                # We have to add a buffer object to stdout to keep subunit happy
186                process_result._stderr_buffer = io.StringIO()
187                process_result._stderr_buffer.buffer = dummybuf(process_result._stderr_buffer)
188                process_result._stdout_buffer = io.StringIO()
189                process_result._stdout_buffer.buffer = dummybuf(process_result._stdout_buffer)
190                reader_thread = threading.Thread(
191                    target=self._run_test, args=(test, process_result, queue))
192                threads[test] = reader_thread, process_result
193                reader_thread.start()
194            while threads:
195                finished_test = queue.get()
196                threads[finished_test][0].join()
197                del threads[finished_test]
198        except:
199            for thread, process_result in threads.values():
200                process_result.stop()
201            raise
202        finally:
203            for test in tests:
204                test[0]._stream.close()
205
206    def _run_test(self, test, process_result, queue):
207        try:
208            try:
209                test.run(process_result)
210            except Exception:
211                # The run logic itself failed
212                case = testtools.ErrorHolder(
213                    "broken-runner",
214                    error=sys.exc_info())
215                case.run(process_result)
216        finally:
217            queue.put(test)
218
219def removebuilddir(d):
220    delay = 5
221    while delay and os.path.exists(d + "/bitbake.lock"):
222        time.sleep(1)
223        delay = delay - 1
224    bb.utils.prunedir(d, ionice=True)
225
226def fork_for_tests(concurrency_num, suite):
227    result = []
228    if 'BUILDDIR' in os.environ:
229        selftestdir = get_test_layer()
230
231    test_blocks = partition_tests(suite, concurrency_num)
232    # Clear the tests from the original suite so it doesn't keep them alive
233    suite._tests[:] = []
234    totaltests = sum(len(x) for x in test_blocks)
235    for process_tests in test_blocks:
236        numtests = len(process_tests)
237        process_suite = unittest.TestSuite(process_tests)
238        # Also clear each split list so new suite has only reference
239        process_tests[:] = []
240        c2pread, c2pwrite = os.pipe()
241        # Clear buffers before fork to avoid duplicate output
242        sys.stdout.flush()
243        sys.stderr.flush()
244        pid = os.fork()
245        if pid == 0:
246            ourpid = os.getpid()
247            try:
248                newbuilddir = None
249                stream = os.fdopen(c2pwrite, 'wb', 1)
250                os.close(c2pread)
251
252                # Create a new separate BUILDDIR for each group of tests
253                if 'BUILDDIR' in os.environ:
254                    builddir = os.environ['BUILDDIR']
255                    newbuilddir = builddir + "-st-" + str(ourpid)
256                    newselftestdir = newbuilddir + "/meta-selftest"
257
258                    bb.utils.mkdirhier(newbuilddir)
259                    oe.path.copytree(builddir + "/conf", newbuilddir + "/conf")
260                    oe.path.copytree(builddir + "/cache", newbuilddir + "/cache")
261                    oe.path.copytree(selftestdir, newselftestdir)
262
263                    for e in os.environ:
264                        if builddir in os.environ[e]:
265                            os.environ[e] = os.environ[e].replace(builddir, newbuilddir)
266
267                    subprocess.check_output("git init; git add *; git commit -a -m 'initial'", cwd=newselftestdir, shell=True)
268
269                    # Tried to used bitbake-layers add/remove but it requires recipe parsing and hence is too slow
270                    subprocess.check_output("sed %s/conf/bblayers.conf -i -e 's#%s#%s#g'" % (newbuilddir, selftestdir, newselftestdir), cwd=newbuilddir, shell=True)
271
272                    os.chdir(newbuilddir)
273
274                    for t in process_suite:
275                        if not hasattr(t, "tc"):
276                            continue
277                        cp = t.tc.config_paths
278                        for p in cp:
279                            if selftestdir in cp[p] and newselftestdir not in cp[p]:
280                                cp[p] = cp[p].replace(selftestdir, newselftestdir)
281                            if builddir in cp[p] and newbuilddir not in cp[p]:
282                                cp[p] = cp[p].replace(builddir, newbuilddir)
283
284                # Leave stderr and stdout open so we can see test noise
285                # Close stdin so that the child goes away if it decides to
286                # read from stdin (otherwise its a roulette to see what
287                # child actually gets keystrokes for pdb etc).
288                newsi = os.open(os.devnull, os.O_RDWR)
289                os.dup2(newsi, sys.stdin.fileno())
290
291                subunit_client = TestProtocolClient(stream)
292                # Force buffering of stdout/stderr so the console doesn't get corrupted by test output
293                # as per default in parent code
294                subunit_client.buffer = True
295                subunit_result = AutoTimingTestResultDecorator(subunit_client)
296                process_suite.run(ExtraResultsEncoderTestResult(subunit_result))
297                if ourpid != os.getpid():
298                    os._exit(0)
299                if newbuilddir:
300                    removebuilddir(newbuilddir)
301            except:
302                # Don't do anything with process children
303                if ourpid != os.getpid():
304                    os._exit(1)
305                # Try and report traceback on stream, but exit with error
306                # even if stream couldn't be created or something else
307                # goes wrong.  The traceback is formatted to a string and
308                # written in one go to avoid interleaving lines from
309                # multiple failing children.
310                try:
311                    stream.write(traceback.format_exc().encode('utf-8'))
312                except:
313                    sys.stderr.write(traceback.format_exc())
314                finally:
315                    if newbuilddir:
316                        removebuilddir(newbuilddir)
317                    stream.flush()
318                    os._exit(1)
319            stream.flush()
320            os._exit(0)
321        else:
322            os.close(c2pwrite)
323            stream = os.fdopen(c2pread, 'rb', 1)
324            test = ProtocolTestCase(stream)
325            result.append((test, numtests))
326    return result, totaltests
327
328def partition_tests(suite, count):
329    # Keep tests from the same class together but allow tests from modules
330    # to go to different processes to aid parallelisation.
331    modules = {}
332    for test in iterate_tests(suite):
333        m = test.__module__ + "." + test.__class__.__name__
334        if m not in modules:
335            modules[m] = []
336        modules[m].append(test)
337
338    # Simply divide the test blocks between the available processes
339    partitions = [list() for _ in range(count)]
340    for partition, m in zip(cycle(partitions), modules):
341        partition.extend(modules[m])
342
343    # No point in empty threads so drop them
344    return [p for p in partitions if p]
345
346