1#!/usr/bin/env python3 2# 3# Modified for use in OE by Richard Purdie, 2018 4# 5# Modified by: Corey Goldberg, 2013 6# License: GPLv2+ 7# 8# Original code from: 9# Bazaar (bzrlib.tests.__init__.py, v2.6, copied Jun 01 2013) 10# Copyright (C) 2005-2011 Canonical Ltd 11# License: GPLv2+ 12 13import os 14import sys 15import traceback 16import unittest 17import subprocess 18import testtools 19import threading 20import time 21import io 22 23from queue import Queue 24from itertools import cycle 25from subunit import ProtocolTestCase, TestProtocolClient 26from subunit.test_results import AutoTimingTestResultDecorator 27from testtools import ThreadsafeForwardingResult, iterate_tests 28 29import bb.utils 30import oe.path 31 32_all__ = [ 33 'ConcurrentTestSuite', 34 'fork_for_tests', 35 'partition_tests', 36] 37 38# 39# Patch the version from testtools to allow access to _test_start and allow 40# computation of timing information and threading progress 41# 42class BBThreadsafeForwardingResult(ThreadsafeForwardingResult): 43 44 def __init__(self, target, semaphore, threadnum, totalinprocess, totaltests): 45 super(BBThreadsafeForwardingResult, self).__init__(target, semaphore) 46 self.threadnum = threadnum 47 self.totalinprocess = totalinprocess 48 self.totaltests = totaltests 49 50 def _add_result_with_semaphore(self, method, test, *args, **kwargs): 51 self.semaphore.acquire() 52 try: 53 self.result.starttime[test.id()] = self._test_start.timestamp() 54 self.result.threadprogress[self.threadnum].append(test.id()) 55 totalprogress = sum(len(x) for x in self.result.threadprogress.values()) 56 self.result.progressinfo[test.id()] = "%s: %s/%s %s/%s (%ss) (%s)" % ( 57 self.threadnum, 58 len(self.result.threadprogress[self.threadnum]), 59 self.totalinprocess, 60 totalprogress, 61 self.totaltests, 62 "{0:.2f}".format(time.time()-self._test_start.timestamp()), 63 test.id()) 64 finally: 65 self.semaphore.release() 66 super(BBThreadsafeForwardingResult, self)._add_result_with_semaphore(method, test, *args, **kwargs) 67 68# 69# A dummy structure to add to io.StringIO so that the .buffer object 70# is available and accepts writes. This allows unittest with buffer=True 71# to interact ok with subunit which wants to access sys.stdout.buffer. 72# 73class dummybuf(object): 74 def __init__(self, parent): 75 self.p = parent 76 def write(self, data): 77 self.p.write(data.decode("utf-8")) 78 79# 80# Taken from testtools.ConncurrencyTestSuite but modified for OE use 81# 82class ConcurrentTestSuite(unittest.TestSuite): 83 84 def __init__(self, suite, processes): 85 super(ConcurrentTestSuite, self).__init__([suite]) 86 self.processes = processes 87 88 def run(self, result): 89 tests, totaltests = fork_for_tests(self.processes, self) 90 try: 91 threads = {} 92 queue = Queue() 93 semaphore = threading.Semaphore(1) 94 result.threadprogress = {} 95 for i, (test, testnum) in enumerate(tests): 96 result.threadprogress[i] = [] 97 process_result = BBThreadsafeForwardingResult(result, semaphore, i, testnum, totaltests) 98 # Force buffering of stdout/stderr so the console doesn't get corrupted by test output 99 # as per default in parent code 100 process_result.buffer = True 101 # We have to add a buffer object to stdout to keep subunit happy 102 process_result._stderr_buffer = io.StringIO() 103 process_result._stderr_buffer.buffer = dummybuf(process_result._stderr_buffer) 104 process_result._stdout_buffer = io.StringIO() 105 process_result._stdout_buffer.buffer = dummybuf(process_result._stdout_buffer) 106 reader_thread = threading.Thread( 107 target=self._run_test, args=(test, process_result, queue)) 108 threads[test] = reader_thread, process_result 109 reader_thread.start() 110 while threads: 111 finished_test = queue.get() 112 threads[finished_test][0].join() 113 del threads[finished_test] 114 except: 115 for thread, process_result in threads.values(): 116 process_result.stop() 117 raise 118 finally: 119 for test in tests: 120 test[0]._stream.close() 121 122 def _run_test(self, test, process_result, queue): 123 try: 124 try: 125 test.run(process_result) 126 except Exception: 127 # The run logic itself failed 128 case = testtools.ErrorHolder( 129 "broken-runner", 130 error=sys.exc_info()) 131 case.run(process_result) 132 finally: 133 queue.put(test) 134 135def removebuilddir(d): 136 delay = 5 137 while delay and os.path.exists(d + "/bitbake.lock"): 138 time.sleep(1) 139 delay = delay - 1 140 bb.utils.prunedir(d) 141 142def fork_for_tests(concurrency_num, suite): 143 result = [] 144 test_blocks = partition_tests(suite, concurrency_num) 145 # Clear the tests from the original suite so it doesn't keep them alive 146 suite._tests[:] = [] 147 totaltests = sum(len(x) for x in test_blocks) 148 for process_tests in test_blocks: 149 numtests = len(process_tests) 150 process_suite = unittest.TestSuite(process_tests) 151 # Also clear each split list so new suite has only reference 152 process_tests[:] = [] 153 c2pread, c2pwrite = os.pipe() 154 # Clear buffers before fork to avoid duplicate output 155 sys.stdout.flush() 156 sys.stderr.flush() 157 pid = os.fork() 158 if pid == 0: 159 ourpid = os.getpid() 160 try: 161 newbuilddir = None 162 stream = os.fdopen(c2pwrite, 'wb', 1) 163 os.close(c2pread) 164 165 # Create a new separate BUILDDIR for each group of tests 166 if 'BUILDDIR' in os.environ: 167 builddir = os.environ['BUILDDIR'] 168 newbuilddir = builddir + "-st-" + str(ourpid) 169 selftestdir = os.path.abspath(builddir + "/../meta-selftest") 170 newselftestdir = newbuilddir + "/meta-selftest" 171 172 bb.utils.mkdirhier(newbuilddir) 173 oe.path.copytree(builddir + "/conf", newbuilddir + "/conf") 174 oe.path.copytree(builddir + "/cache", newbuilddir + "/cache") 175 oe.path.copytree(selftestdir, newselftestdir) 176 177 for e in os.environ: 178 if builddir in os.environ[e]: 179 os.environ[e] = os.environ[e].replace(builddir, newbuilddir) 180 181 subprocess.check_output("git init; git add *; git commit -a -m 'initial'", cwd=newselftestdir, shell=True) 182 183 # Tried to used bitbake-layers add/remove but it requires recipe parsing and hence is too slow 184 subprocess.check_output("sed %s/conf/bblayers.conf -i -e 's#%s#%s#g'" % (newbuilddir, selftestdir, newselftestdir), cwd=newbuilddir, shell=True) 185 186 os.chdir(newbuilddir) 187 188 for t in process_suite: 189 if not hasattr(t, "tc"): 190 continue 191 cp = t.tc.config_paths 192 for p in cp: 193 if selftestdir in cp[p] and newselftestdir not in cp[p]: 194 cp[p] = cp[p].replace(selftestdir, newselftestdir) 195 if builddir in cp[p] and newbuilddir not in cp[p]: 196 cp[p] = cp[p].replace(builddir, newbuilddir) 197 198 # Leave stderr and stdout open so we can see test noise 199 # Close stdin so that the child goes away if it decides to 200 # read from stdin (otherwise its a roulette to see what 201 # child actually gets keystrokes for pdb etc). 202 newsi = os.open(os.devnull, os.O_RDWR) 203 os.dup2(newsi, sys.stdin.fileno()) 204 205 subunit_client = TestProtocolClient(stream) 206 # Force buffering of stdout/stderr so the console doesn't get corrupted by test output 207 # as per default in parent code 208 subunit_client.buffer = True 209 subunit_result = AutoTimingTestResultDecorator(subunit_client) 210 process_suite.run(subunit_result) 211 if ourpid != os.getpid(): 212 os._exit(0) 213 if newbuilddir: 214 removebuilddir(newbuilddir) 215 except: 216 # Don't do anything with process children 217 if ourpid != os.getpid(): 218 os._exit(1) 219 # Try and report traceback on stream, but exit with error 220 # even if stream couldn't be created or something else 221 # goes wrong. The traceback is formatted to a string and 222 # written in one go to avoid interleaving lines from 223 # multiple failing children. 224 try: 225 stream.write(traceback.format_exc().encode('utf-8')) 226 except: 227 sys.stderr.write(traceback.format_exc()) 228 finally: 229 if newbuilddir: 230 removebuilddir(newbuilddir) 231 stream.flush() 232 os._exit(1) 233 stream.flush() 234 os._exit(0) 235 else: 236 os.close(c2pwrite) 237 stream = os.fdopen(c2pread, 'rb', 1) 238 test = ProtocolTestCase(stream) 239 result.append((test, numtests)) 240 return result, totaltests 241 242def partition_tests(suite, count): 243 # Keep tests from the same class together but allow tests from modules 244 # to go to different processes to aid parallelisation. 245 modules = {} 246 for test in iterate_tests(suite): 247 m = test.__module__ + "." + test.__class__.__name__ 248 if m not in modules: 249 modules[m] = [] 250 modules[m].append(test) 251 252 # Simply divide the test blocks between the available processes 253 partitions = [list() for _ in range(count)] 254 for partition, m in zip(cycle(partitions), modules): 255 partition.extend(modules[m]) 256 257 # No point in empty threads so drop them 258 return [p for p in partitions if p] 259 260