1#!/usr/bin/env python3 2# 3# SPDX-License-Identifier: GPL-2.0-or-later 4# 5# Modified for use in OE by Richard Purdie, 2018 6# 7# Modified by: Corey Goldberg, 2013 8# License: GPLv2+ 9# 10# Original code from: 11# Bazaar (bzrlib.tests.__init__.py, v2.6, copied Jun 01 2013) 12# Copyright (C) 2005-2011 Canonical Ltd 13# License: GPLv2+ 14 15import os 16import sys 17import traceback 18import unittest 19import subprocess 20import testtools 21import threading 22import time 23import io 24import json 25import subunit 26 27from queue import Queue 28from itertools import cycle 29from subunit import ProtocolTestCase, TestProtocolClient 30from subunit.test_results import AutoTimingTestResultDecorator 31from testtools import ThreadsafeForwardingResult, iterate_tests 32from testtools.content import Content 33from testtools.content_type import ContentType 34from oeqa.utils.commands import get_test_layer 35 36import bb.utils 37import oe.path 38 39_all__ = [ 40 'ConcurrentTestSuite', 41 'fork_for_tests', 42 'partition_tests', 43] 44 45# 46# Patch the version from testtools to allow access to _test_start and allow 47# computation of timing information and threading progress 48# 49class BBThreadsafeForwardingResult(ThreadsafeForwardingResult): 50 51 def __init__(self, target, semaphore, threadnum, totalinprocess, totaltests): 52 super(BBThreadsafeForwardingResult, self).__init__(target, semaphore) 53 self.threadnum = threadnum 54 self.totalinprocess = totalinprocess 55 self.totaltests = totaltests 56 57 def _add_result_with_semaphore(self, method, test, *args, **kwargs): 58 self.semaphore.acquire() 59 try: 60 if self._test_start: 61 self.result.starttime[test.id()] = self._test_start.timestamp() 62 self.result.threadprogress[self.threadnum].append(test.id()) 63 totalprogress = sum(len(x) for x in self.result.threadprogress.values()) 64 self.result.progressinfo[test.id()] = "%s: %s/%s %s/%s (%ss) (%s)" % ( 65 self.threadnum, 66 len(self.result.threadprogress[self.threadnum]), 67 self.totalinprocess, 68 totalprogress, 69 self.totaltests, 70 "{0:.2f}".format(time.time()-self._test_start.timestamp()), 71 test.id()) 72 finally: 73 self.semaphore.release() 74 super(BBThreadsafeForwardingResult, self)._add_result_with_semaphore(method, test, *args, **kwargs) 75 76class ProxyTestResult: 77 # a very basic TestResult proxy, in order to modify add* calls 78 def __init__(self, target): 79 self.result = target 80 81 def _addResult(self, method, test, *args, exception = False, **kwargs): 82 return method(test, *args, **kwargs) 83 84 def addError(self, test, err = None, **kwargs): 85 self._addResult(self.result.addError, test, err, exception = True, **kwargs) 86 87 def addFailure(self, test, err = None, **kwargs): 88 self._addResult(self.result.addFailure, test, err, exception = True, **kwargs) 89 90 def addSuccess(self, test, **kwargs): 91 self._addResult(self.result.addSuccess, test, **kwargs) 92 93 def addExpectedFailure(self, test, err = None, **kwargs): 94 self._addResult(self.result.addExpectedFailure, test, err, exception = True, **kwargs) 95 96 def addUnexpectedSuccess(self, test, **kwargs): 97 self._addResult(self.result.addUnexpectedSuccess, test, **kwargs) 98 99 def __getattr__(self, attr): 100 return getattr(self.result, attr) 101 102class ExtraResultsDecoderTestResult(ProxyTestResult): 103 def _addResult(self, method, test, *args, exception = False, **kwargs): 104 if "details" in kwargs and "extraresults" in kwargs["details"]: 105 if isinstance(kwargs["details"]["extraresults"], Content): 106 kwargs = kwargs.copy() 107 kwargs["details"] = kwargs["details"].copy() 108 extraresults = kwargs["details"]["extraresults"] 109 data = bytearray() 110 for b in extraresults.iter_bytes(): 111 data += b 112 extraresults = json.loads(data.decode()) 113 kwargs["details"]["extraresults"] = extraresults 114 return method(test, *args, **kwargs) 115 116class ExtraResultsEncoderTestResult(ProxyTestResult): 117 def _addResult(self, method, test, *args, exception = False, **kwargs): 118 if hasattr(test, "extraresults"): 119 extras = lambda : [json.dumps(test.extraresults).encode()] 120 kwargs = kwargs.copy() 121 if "details" not in kwargs: 122 kwargs["details"] = {} 123 else: 124 kwargs["details"] = kwargs["details"].copy() 125 kwargs["details"]["extraresults"] = Content(ContentType("application", "json", {'charset': 'utf8'}), extras) 126 # if using details, need to encode any exceptions into the details obj, 127 # testtools does not handle "err" and "details" together. 128 if "details" in kwargs and exception and (len(args) >= 1 and args[0] is not None): 129 kwargs["details"]["traceback"] = testtools.content.TracebackContent(args[0], test) 130 args = [] 131 return method(test, *args, **kwargs) 132 133# 134# We have to patch subunit since it doesn't understand how to handle addError 135# outside of a running test case. This can happen if classSetUp() fails 136# for a class of tests. This unfortunately has horrible internal knowledge. 137# 138def outSideTestaddError(self, offset, line): 139 """An 'error:' directive has been read.""" 140 test_name = line[offset:-1].decode('utf8') 141 self.parser._current_test = subunit.RemotedTestCase(test_name) 142 self.parser.current_test_description = test_name 143 self.parser._state = self.parser._reading_error_details 144 self.parser._reading_error_details.set_simple() 145 self.parser.subunitLineReceived(line) 146 147subunit._OutSideTest.addError = outSideTestaddError 148 149 150# 151# A dummy structure to add to io.StringIO so that the .buffer object 152# is available and accepts writes. This allows unittest with buffer=True 153# to interact ok with subunit which wants to access sys.stdout.buffer. 154# 155class dummybuf(object): 156 def __init__(self, parent): 157 self.p = parent 158 def write(self, data): 159 self.p.write(data.decode("utf-8")) 160 161# 162# Taken from testtools.ConncurrencyTestSuite but modified for OE use 163# 164class ConcurrentTestSuite(unittest.TestSuite): 165 166 def __init__(self, suite, processes): 167 super(ConcurrentTestSuite, self).__init__([suite]) 168 self.processes = processes 169 170 def run(self, result): 171 tests, totaltests = fork_for_tests(self.processes, self) 172 try: 173 threads = {} 174 queue = Queue() 175 semaphore = threading.Semaphore(1) 176 result.threadprogress = {} 177 for i, (test, testnum) in enumerate(tests): 178 result.threadprogress[i] = [] 179 process_result = BBThreadsafeForwardingResult( 180 ExtraResultsDecoderTestResult(result), 181 semaphore, i, testnum, totaltests) 182 # Force buffering of stdout/stderr so the console doesn't get corrupted by test output 183 # as per default in parent code 184 process_result.buffer = True 185 # We have to add a buffer object to stdout to keep subunit happy 186 process_result._stderr_buffer = io.StringIO() 187 process_result._stderr_buffer.buffer = dummybuf(process_result._stderr_buffer) 188 process_result._stdout_buffer = io.StringIO() 189 process_result._stdout_buffer.buffer = dummybuf(process_result._stdout_buffer) 190 reader_thread = threading.Thread( 191 target=self._run_test, args=(test, process_result, queue)) 192 threads[test] = reader_thread, process_result 193 reader_thread.start() 194 while threads: 195 finished_test = queue.get() 196 threads[finished_test][0].join() 197 del threads[finished_test] 198 except: 199 for thread, process_result in threads.values(): 200 process_result.stop() 201 raise 202 finally: 203 for test in tests: 204 test[0]._stream.close() 205 206 def _run_test(self, test, process_result, queue): 207 try: 208 try: 209 test.run(process_result) 210 except Exception: 211 # The run logic itself failed 212 case = testtools.ErrorHolder( 213 "broken-runner", 214 error=sys.exc_info()) 215 case.run(process_result) 216 finally: 217 queue.put(test) 218 219def removebuilddir(d): 220 delay = 5 221 while delay and os.path.exists(d + "/bitbake.lock"): 222 time.sleep(1) 223 delay = delay - 1 224 bb.utils.prunedir(d, ionice=True) 225 226def fork_for_tests(concurrency_num, suite): 227 result = [] 228 if 'BUILDDIR' in os.environ: 229 selftestdir = get_test_layer() 230 231 test_blocks = partition_tests(suite, concurrency_num) 232 # Clear the tests from the original suite so it doesn't keep them alive 233 suite._tests[:] = [] 234 totaltests = sum(len(x) for x in test_blocks) 235 for process_tests in test_blocks: 236 numtests = len(process_tests) 237 process_suite = unittest.TestSuite(process_tests) 238 # Also clear each split list so new suite has only reference 239 process_tests[:] = [] 240 c2pread, c2pwrite = os.pipe() 241 # Clear buffers before fork to avoid duplicate output 242 sys.stdout.flush() 243 sys.stderr.flush() 244 pid = os.fork() 245 if pid == 0: 246 ourpid = os.getpid() 247 try: 248 newbuilddir = None 249 stream = os.fdopen(c2pwrite, 'wb', 1) 250 os.close(c2pread) 251 252 # Create a new separate BUILDDIR for each group of tests 253 if 'BUILDDIR' in os.environ: 254 builddir = os.environ['BUILDDIR'] 255 newbuilddir = builddir + "-st-" + str(ourpid) 256 newselftestdir = newbuilddir + "/meta-selftest" 257 258 bb.utils.mkdirhier(newbuilddir) 259 oe.path.copytree(builddir + "/conf", newbuilddir + "/conf") 260 oe.path.copytree(builddir + "/cache", newbuilddir + "/cache") 261 oe.path.copytree(selftestdir, newselftestdir) 262 263 for e in os.environ: 264 if builddir in os.environ[e]: 265 os.environ[e] = os.environ[e].replace(builddir, newbuilddir) 266 267 subprocess.check_output("git init; git add *; git commit -a -m 'initial'", cwd=newselftestdir, shell=True) 268 269 # Tried to used bitbake-layers add/remove but it requires recipe parsing and hence is too slow 270 subprocess.check_output("sed %s/conf/bblayers.conf -i -e 's#%s#%s#g'" % (newbuilddir, selftestdir, newselftestdir), cwd=newbuilddir, shell=True) 271 272 os.chdir(newbuilddir) 273 274 for t in process_suite: 275 if not hasattr(t, "tc"): 276 continue 277 cp = t.tc.config_paths 278 for p in cp: 279 if selftestdir in cp[p] and newselftestdir not in cp[p]: 280 cp[p] = cp[p].replace(selftestdir, newselftestdir) 281 if builddir in cp[p] and newbuilddir not in cp[p]: 282 cp[p] = cp[p].replace(builddir, newbuilddir) 283 284 # Leave stderr and stdout open so we can see test noise 285 # Close stdin so that the child goes away if it decides to 286 # read from stdin (otherwise its a roulette to see what 287 # child actually gets keystrokes for pdb etc). 288 newsi = os.open(os.devnull, os.O_RDWR) 289 os.dup2(newsi, sys.stdin.fileno()) 290 291 subunit_client = TestProtocolClient(stream) 292 # Force buffering of stdout/stderr so the console doesn't get corrupted by test output 293 # as per default in parent code 294 subunit_client.buffer = True 295 subunit_result = AutoTimingTestResultDecorator(subunit_client) 296 process_suite.run(ExtraResultsEncoderTestResult(subunit_result)) 297 if ourpid != os.getpid(): 298 os._exit(0) 299 if newbuilddir: 300 removebuilddir(newbuilddir) 301 except: 302 # Don't do anything with process children 303 if ourpid != os.getpid(): 304 os._exit(1) 305 # Try and report traceback on stream, but exit with error 306 # even if stream couldn't be created or something else 307 # goes wrong. The traceback is formatted to a string and 308 # written in one go to avoid interleaving lines from 309 # multiple failing children. 310 try: 311 stream.write(traceback.format_exc().encode('utf-8')) 312 except: 313 sys.stderr.write(traceback.format_exc()) 314 finally: 315 if newbuilddir: 316 removebuilddir(newbuilddir) 317 stream.flush() 318 os._exit(1) 319 stream.flush() 320 os._exit(0) 321 else: 322 os.close(c2pwrite) 323 stream = os.fdopen(c2pread, 'rb', 1) 324 test = ProtocolTestCase(stream) 325 result.append((test, numtests)) 326 return result, totaltests 327 328def partition_tests(suite, count): 329 # Keep tests from the same class together but allow tests from modules 330 # to go to different processes to aid parallelisation. 331 modules = {} 332 for test in iterate_tests(suite): 333 m = test.__module__ + "." + test.__class__.__name__ 334 if m not in modules: 335 modules[m] = [] 336 modules[m].append(test) 337 338 # Simply divide the test blocks between the available processes 339 partitions = [list() for _ in range(count)] 340 for partition, m in zip(cycle(partitions), modules): 341 partition.extend(modules[m]) 342 343 # No point in empty threads so drop them 344 return [p for p in partitions if p] 345 346