1#!/usr/bin/env python3
2#
3# Modified for use in OE by Richard Purdie, 2018
4#
5# Modified by: Corey Goldberg, 2013
6#   License: GPLv2+
7#
8# Original code from:
9#   Bazaar (bzrlib.tests.__init__.py, v2.6, copied Jun 01 2013)
10#   Copyright (C) 2005-2011 Canonical Ltd
11#   License: GPLv2+
12
13import os
14import sys
15import traceback
16import unittest
17import subprocess
18import testtools
19import threading
20import time
21import io
22
23from queue import Queue
24from itertools import cycle
25from subunit import ProtocolTestCase, TestProtocolClient
26from subunit.test_results import AutoTimingTestResultDecorator
27from testtools import ThreadsafeForwardingResult, iterate_tests
28
29import bb.utils
30import oe.path
31
32_all__ = [
33    'ConcurrentTestSuite',
34    'fork_for_tests',
35    'partition_tests',
36]
37
38#
39# Patch the version from testtools to allow access to _test_start and allow
40# computation of timing information and threading progress
41#
42class BBThreadsafeForwardingResult(ThreadsafeForwardingResult):
43
44    def __init__(self, target, semaphore, threadnum, totalinprocess, totaltests):
45        super(BBThreadsafeForwardingResult, self).__init__(target, semaphore)
46        self.threadnum = threadnum
47        self.totalinprocess = totalinprocess
48        self.totaltests = totaltests
49
50    def _add_result_with_semaphore(self, method, test, *args, **kwargs):
51        self.semaphore.acquire()
52        try:
53            self.result.starttime[test.id()] = self._test_start.timestamp()
54            self.result.threadprogress[self.threadnum].append(test.id())
55            totalprogress = sum(len(x) for x in self.result.threadprogress.values())
56            self.result.progressinfo[test.id()] = "%s: %s/%s %s/%s (%ss) (%s)" % (
57                    self.threadnum,
58                    len(self.result.threadprogress[self.threadnum]),
59                    self.totalinprocess,
60                    totalprogress,
61                    self.totaltests,
62                    "{0:.2f}".format(time.time()-self._test_start.timestamp()),
63                    test.id())
64        finally:
65            self.semaphore.release()
66        super(BBThreadsafeForwardingResult, self)._add_result_with_semaphore(method, test, *args, **kwargs)
67
68#
69# A dummy structure to add to io.StringIO so that the .buffer object
70# is available and accepts writes. This allows unittest with buffer=True
71# to interact ok with subunit which wants to access sys.stdout.buffer.
72#
73class dummybuf(object):
74   def __init__(self, parent):
75       self.p = parent
76   def write(self, data):
77       self.p.write(data.decode("utf-8"))
78
79#
80# Taken from testtools.ConncurrencyTestSuite but modified for OE use
81#
82class ConcurrentTestSuite(unittest.TestSuite):
83
84    def __init__(self, suite, processes):
85        super(ConcurrentTestSuite, self).__init__([suite])
86        self.processes = processes
87
88    def run(self, result):
89        tests, totaltests = fork_for_tests(self.processes, self)
90        try:
91            threads = {}
92            queue = Queue()
93            semaphore = threading.Semaphore(1)
94            result.threadprogress = {}
95            for i, (test, testnum) in enumerate(tests):
96                result.threadprogress[i] = []
97                process_result = BBThreadsafeForwardingResult(result, semaphore, i, testnum, totaltests)
98                # Force buffering of stdout/stderr so the console doesn't get corrupted by test output
99                # as per default in parent code
100                process_result.buffer = True
101                # We have to add a buffer object to stdout to keep subunit happy
102                process_result._stderr_buffer = io.StringIO()
103                process_result._stderr_buffer.buffer = dummybuf(process_result._stderr_buffer)
104                process_result._stdout_buffer = io.StringIO()
105                process_result._stdout_buffer.buffer = dummybuf(process_result._stdout_buffer)
106                reader_thread = threading.Thread(
107                    target=self._run_test, args=(test, process_result, queue))
108                threads[test] = reader_thread, process_result
109                reader_thread.start()
110            while threads:
111                finished_test = queue.get()
112                threads[finished_test][0].join()
113                del threads[finished_test]
114        except:
115            for thread, process_result in threads.values():
116                process_result.stop()
117            raise
118        finally:
119            for test in tests:
120                test[0]._stream.close()
121
122    def _run_test(self, test, process_result, queue):
123        try:
124            try:
125                test.run(process_result)
126            except Exception:
127                # The run logic itself failed
128                case = testtools.ErrorHolder(
129                    "broken-runner",
130                    error=sys.exc_info())
131                case.run(process_result)
132        finally:
133            queue.put(test)
134
135def removebuilddir(d):
136    delay = 5
137    while delay and os.path.exists(d + "/bitbake.lock"):
138        time.sleep(1)
139        delay = delay - 1
140    bb.utils.prunedir(d)
141
142def fork_for_tests(concurrency_num, suite):
143    result = []
144    test_blocks = partition_tests(suite, concurrency_num)
145    # Clear the tests from the original suite so it doesn't keep them alive
146    suite._tests[:] = []
147    totaltests = sum(len(x) for x in test_blocks)
148    for process_tests in test_blocks:
149        numtests = len(process_tests)
150        process_suite = unittest.TestSuite(process_tests)
151        # Also clear each split list so new suite has only reference
152        process_tests[:] = []
153        c2pread, c2pwrite = os.pipe()
154        # Clear buffers before fork to avoid duplicate output
155        sys.stdout.flush()
156        sys.stderr.flush()
157        pid = os.fork()
158        if pid == 0:
159            ourpid = os.getpid()
160            try:
161                newbuilddir = None
162                stream = os.fdopen(c2pwrite, 'wb', 1)
163                os.close(c2pread)
164
165                # Create a new separate BUILDDIR for each group of tests
166                if 'BUILDDIR' in os.environ:
167                    builddir = os.environ['BUILDDIR']
168                    newbuilddir = builddir + "-st-" + str(ourpid)
169                    selftestdir = os.path.abspath(builddir + "/../meta-selftest")
170                    newselftestdir = newbuilddir + "/meta-selftest"
171
172                    bb.utils.mkdirhier(newbuilddir)
173                    oe.path.copytree(builddir + "/conf", newbuilddir + "/conf")
174                    oe.path.copytree(builddir + "/cache", newbuilddir + "/cache")
175                    oe.path.copytree(selftestdir, newselftestdir)
176
177                    for e in os.environ:
178                        if builddir in os.environ[e]:
179                            os.environ[e] = os.environ[e].replace(builddir, newbuilddir)
180
181                    subprocess.check_output("git init; git add *; git commit -a -m 'initial'", cwd=newselftestdir, shell=True)
182
183                    # Tried to used bitbake-layers add/remove but it requires recipe parsing and hence is too slow
184                    subprocess.check_output("sed %s/conf/bblayers.conf -i -e 's#%s#%s#g'" % (newbuilddir, selftestdir, newselftestdir), cwd=newbuilddir, shell=True)
185
186                    os.chdir(newbuilddir)
187
188                    for t in process_suite:
189                        if not hasattr(t, "tc"):
190                            continue
191                        cp = t.tc.config_paths
192                        for p in cp:
193                            if selftestdir in cp[p] and newselftestdir not in cp[p]:
194                                cp[p] = cp[p].replace(selftestdir, newselftestdir)
195                            if builddir in cp[p] and newbuilddir not in cp[p]:
196                                cp[p] = cp[p].replace(builddir, newbuilddir)
197
198                # Leave stderr and stdout open so we can see test noise
199                # Close stdin so that the child goes away if it decides to
200                # read from stdin (otherwise its a roulette to see what
201                # child actually gets keystrokes for pdb etc).
202                newsi = os.open(os.devnull, os.O_RDWR)
203                os.dup2(newsi, sys.stdin.fileno())
204
205                subunit_client = TestProtocolClient(stream)
206                # Force buffering of stdout/stderr so the console doesn't get corrupted by test output
207                # as per default in parent code
208                subunit_client.buffer = True
209                subunit_result = AutoTimingTestResultDecorator(subunit_client)
210                process_suite.run(subunit_result)
211                if ourpid != os.getpid():
212                    os._exit(0)
213                if newbuilddir:
214                    removebuilddir(newbuilddir)
215            except:
216                # Don't do anything with process children
217                if ourpid != os.getpid():
218                    os._exit(1)
219                # Try and report traceback on stream, but exit with error
220                # even if stream couldn't be created or something else
221                # goes wrong.  The traceback is formatted to a string and
222                # written in one go to avoid interleaving lines from
223                # multiple failing children.
224                try:
225                    stream.write(traceback.format_exc().encode('utf-8'))
226                except:
227                    sys.stderr.write(traceback.format_exc())
228                finally:
229                    if newbuilddir:
230                        removebuilddir(newbuilddir)
231                    stream.flush()
232                    os._exit(1)
233            stream.flush()
234            os._exit(0)
235        else:
236            os.close(c2pwrite)
237            stream = os.fdopen(c2pread, 'rb', 1)
238            test = ProtocolTestCase(stream)
239            result.append((test, numtests))
240    return result, totaltests
241
242def partition_tests(suite, count):
243    # Keep tests from the same class together but allow tests from modules
244    # to go to different processes to aid parallelisation.
245    modules = {}
246    for test in iterate_tests(suite):
247        m = test.__module__ + "." + test.__class__.__name__
248        if m not in modules:
249            modules[m] = []
250        modules[m].append(test)
251
252    # Simply divide the test blocks between the available processes
253    partitions = [list() for _ in range(count)]
254    for partition, m in zip(cycle(partitions), modules):
255        partition.extend(modules[m])
256
257    # No point in empty threads so drop them
258    return [p for p in partitions if p]
259
260