1# 2# Copyright (C) 2013 Intel Corporation 3# 4# SPDX-License-Identifier: MIT 5# 6 7# Main unittest module used by testimage.bbclass 8# This provides the oeRuntimeTest base class which is inherited by all tests in meta/lib/oeqa/runtime. 9 10# It also has some helper functions and it's responsible for actually starting the tests 11 12import os, re, sys 13import unittest 14import inspect 15import subprocess 16import signal 17import shutil 18import functools 19try: 20 import bb 21except ImportError: 22 pass 23import logging 24 25import oeqa.runtime 26# Exported test doesn't require sdkext 27try: 28 import oeqa.sdkext 29except ImportError: 30 pass 31from oeqa.utils.decorators import LogResults, gettag 32 33logger = logging.getLogger("BitBake") 34 35def getVar(obj): 36 #extend form dict, if a variable didn't exists, need find it in testcase 37 class VarDict(dict): 38 def __getitem__(self, key): 39 return gettag(obj, key) 40 return VarDict() 41 42def checkTags(tc, tagexp): 43 return eval(tagexp, None, getVar(tc)) 44 45def filterByTagExp(testsuite, tagexp): 46 if not tagexp: 47 return testsuite 48 caseList = [] 49 for each in testsuite: 50 if not isinstance(each, unittest.BaseTestSuite): 51 if checkTags(each, tagexp): 52 caseList.append(each) 53 else: 54 caseList.append(filterByTagExp(each, tagexp)) 55 return testsuite.__class__(caseList) 56 57@LogResults 58class oeTest(unittest.TestCase): 59 60 longMessage = True 61 62 @classmethod 63 def hasPackage(self, pkg): 64 """ 65 True if the full package name exists in the manifest, False otherwise. 66 """ 67 return pkg in oeTest.tc.pkgmanifest 68 69 @classmethod 70 def hasPackageMatch(self, match): 71 """ 72 True if match exists in the manifest as a regular expression substring, 73 False otherwise. 74 """ 75 for s in oeTest.tc.pkgmanifest: 76 if re.match(match, s): 77 return True 78 return False 79 80 @classmethod 81 def hasFeature(self,feature): 82 if feature in oeTest.tc.imagefeatures or \ 83 feature in oeTest.tc.distrofeatures: 84 return True 85 else: 86 return False 87 88class oeRuntimeTest(oeTest): 89 def __init__(self, methodName='runTest'): 90 self.target = oeRuntimeTest.tc.target 91 super(oeRuntimeTest, self).__init__(methodName) 92 93 def setUp(self): 94 # Install packages in the DUT 95 self.tc.install_uninstall_packages(self.id()) 96 97 # Check if test needs to run 98 if self.tc.sigterm: 99 self.fail("Got SIGTERM") 100 elif (type(self.target).__name__ == "QemuTarget"): 101 self.assertTrue(self.target.check(), msg = "Qemu not running?") 102 103 self.setUpLocal() 104 105 # a setup method before tests but after the class instantiation 106 def setUpLocal(self): 107 pass 108 109 def tearDown(self): 110 # Uninstall packages in the DUT 111 self.tc.install_uninstall_packages(self.id(), False) 112 self.tearDownLocal() 113 114 # Method to be run after tearDown and implemented by child classes 115 def tearDownLocal(self): 116 pass 117 118def getmodule(pos=2): 119 # stack returns a list of tuples containg frame information 120 # First element of the list the is current frame, caller is 1 121 frameinfo = inspect.stack()[pos] 122 modname = inspect.getmodulename(frameinfo[1]) 123 #modname = inspect.getmodule(frameinfo[0]).__name__ 124 return modname 125 126def skipModule(reason, pos=2): 127 modname = getmodule(pos) 128 if modname not in oeTest.tc.testsrequired: 129 raise unittest.SkipTest("%s: %s" % (modname, reason)) 130 else: 131 raise Exception("\nTest %s wants to be skipped.\nReason is: %s" \ 132 "\nTest was required in TEST_SUITES, so either the condition for skipping is wrong" \ 133 "\nor the image really doesn't have the required feature/package when it should." % (modname, reason)) 134 135def skipModuleIf(cond, reason): 136 137 if cond: 138 skipModule(reason, 3) 139 140def skipModuleUnless(cond, reason): 141 142 if not cond: 143 skipModule(reason, 3) 144 145_buffer_logger = "" 146def custom_verbose(msg, *args, **kwargs): 147 global _buffer_logger 148 if msg[-1] != "\n": 149 _buffer_logger += msg 150 else: 151 _buffer_logger += msg 152 try: 153 bb.plain(_buffer_logger.rstrip("\n"), *args, **kwargs) 154 except NameError: 155 logger.info(_buffer_logger.rstrip("\n"), *args, **kwargs) 156 _buffer_logger = "" 157 158class TestContext(object): 159 def __init__(self, d, exported=False): 160 self.d = d 161 162 self.testsuites = self._get_test_suites() 163 164 if exported: 165 path = [os.path.dirname(os.path.abspath(__file__))] 166 extrapath = "" 167 else: 168 path = d.getVar("BBPATH").split(':') 169 extrapath = "lib/oeqa" 170 171 self.testslist = self._get_tests_list(path, extrapath) 172 self.testsrequired = self._get_test_suites_required() 173 174 self.filesdir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "runtime/files") 175 self.corefilesdir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "files") 176 self.imagefeatures = d.getVar("IMAGE_FEATURES").split() 177 self.distrofeatures = d.getVar("DISTRO_FEATURES").split() 178 179 # get testcase list from specified file 180 # if path is a relative path, then relative to build/conf/ 181 def _read_testlist(self, fpath, builddir): 182 if not os.path.isabs(fpath): 183 fpath = os.path.join(builddir, "conf", fpath) 184 if not os.path.exists(fpath): 185 bb.fatal("No such manifest file: ", fpath) 186 tcs = [] 187 for line in open(fpath).readlines(): 188 line = line.strip() 189 if line and not line.startswith("#"): 190 tcs.append(line) 191 return " ".join(tcs) 192 193 # return test list by type also filter if TEST_SUITES is specified 194 def _get_tests_list(self, bbpath, extrapath): 195 testslist = [] 196 197 type = self._get_test_namespace() 198 199 # This relies on lib/ under each directory in BBPATH being added to sys.path 200 # (as done by default in base.bbclass) 201 for testname in self.testsuites: 202 if testname != "auto": 203 if testname.startswith("oeqa."): 204 testslist.append(testname) 205 continue 206 found = False 207 for p in bbpath: 208 if os.path.exists(os.path.join(p, extrapath, type, testname + ".py")): 209 testslist.append("oeqa." + type + "." + testname) 210 found = True 211 break 212 elif os.path.exists(os.path.join(p, extrapath, type, testname.split(".")[0] + ".py")): 213 testslist.append("oeqa." + type + "." + testname) 214 found = True 215 break 216 if not found: 217 bb.fatal('Test %s specified in TEST_SUITES could not be found in lib/oeqa/runtime under BBPATH' % testname) 218 219 if "auto" in self.testsuites: 220 def add_auto_list(path): 221 files = sorted([f for f in os.listdir(path) if f.endswith('.py') and not f.startswith('_')]) 222 for f in files: 223 module = 'oeqa.' + type + '.' + f[:-3] 224 if module not in testslist: 225 testslist.append(module) 226 227 for p in bbpath: 228 testpath = os.path.join(p, 'lib', 'oeqa', type) 229 bb.debug(2, 'Searching for tests in %s' % testpath) 230 if os.path.exists(testpath): 231 add_auto_list(testpath) 232 233 return testslist 234 235 def getTestModules(self): 236 """ 237 Returns all the test modules in the testlist. 238 """ 239 240 import pkgutil 241 242 modules = [] 243 for test in self.testslist: 244 if re.search(r"\w+\.\w+\.test_\S+", test): 245 test = '.'.join(t.split('.')[:3]) 246 module = pkgutil.get_loader(test) 247 modules.append(module) 248 249 return modules 250 251 def getModulefromID(self, test_id): 252 """ 253 Returns the test module based on a test id. 254 """ 255 256 module_name = ".".join(test_id.split(".")[:3]) 257 modules = self.getTestModules() 258 for module in modules: 259 if module.name == module_name: 260 return module 261 262 return None 263 264 def getTests(self, test): 265 '''Return all individual tests executed when running the suite.''' 266 # Unfortunately unittest does not have an API for this, so we have 267 # to rely on implementation details. This only needs to work 268 # for TestSuite containing TestCase. 269 method = getattr(test, '_testMethodName', None) 270 if method: 271 # leaf case: a TestCase 272 yield test 273 else: 274 # Look into TestSuite. 275 tests = getattr(test, '_tests', []) 276 for t1 in tests: 277 for t2 in self.getTests(t1): 278 yield t2 279 280 def loadTests(self): 281 setattr(oeTest, "tc", self) 282 283 testloader = unittest.TestLoader() 284 testloader.sortTestMethodsUsing = None 285 suites = [testloader.loadTestsFromName(name) for name in self.testslist] 286 suites = filterByTagExp(suites, getattr(self, "tagexp", None)) 287 288 # Determine dependencies between suites by looking for @skipUnlessPassed 289 # method annotations. Suite A depends on suite B if any method in A 290 # depends on a method on B. 291 for suite in suites: 292 suite.dependencies = [] 293 suite.depth = 0 294 for test in self.getTests(suite): 295 methodname = getattr(test, '_testMethodName', None) 296 if methodname: 297 method = getattr(test, methodname) 298 depends_on = getattr(method, '_depends_on', None) 299 if depends_on: 300 for dep_suite in suites: 301 if depends_on in [getattr(t, '_testMethodName', None) for t in self.getTests(dep_suite)]: 302 if dep_suite not in suite.dependencies and \ 303 dep_suite is not suite: 304 suite.dependencies.append(dep_suite) 305 break 306 else: 307 logger.warning("Test %s was declared as @skipUnlessPassed('%s') but that test is either not defined or not active. Will run the test anyway." % 308 (test, depends_on)) 309 310 # Use brute-force topological sort to determine ordering. Sort by 311 # depth (higher depth = must run later), with original ordering to 312 # break ties. 313 def set_suite_depth(suite): 314 for dep in suite.dependencies: 315 new_depth = set_suite_depth(dep) + 1 316 if new_depth > suite.depth: 317 suite.depth = new_depth 318 return suite.depth 319 320 for index, suite in enumerate(suites): 321 set_suite_depth(suite) 322 suite.index = index 323 324 def cmp(a, b): 325 return (a > b) - (a < b) 326 327 def cmpfunc(a, b): 328 return cmp((a.depth, a.index), (b.depth, b.index)) 329 330 suites.sort(key=functools.cmp_to_key(cmpfunc)) 331 332 self.suite = testloader.suiteClass(suites) 333 334 return self.suite 335 336 def runTests(self): 337 logger.info("Test modules %s" % self.testslist) 338 if hasattr(self, "tagexp") and self.tagexp: 339 logger.info("Filter test cases by tags: %s" % self.tagexp) 340 logger.info("Found %s tests" % self.suite.countTestCases()) 341 runner = unittest.TextTestRunner(verbosity=2) 342 if 'bb' in sys.modules: 343 runner.stream.write = custom_verbose 344 345 return runner.run(self.suite) 346 347class RuntimeTestContext(TestContext): 348 def __init__(self, d, target, exported=False): 349 super(RuntimeTestContext, self).__init__(d, exported) 350 351 self.target = target 352 353 self.pkgmanifest = {} 354 manifest = os.path.join(d.getVar("DEPLOY_DIR_IMAGE"), 355 d.getVar("IMAGE_LINK_NAME") + ".manifest") 356 nomanifest = d.getVar("IMAGE_NO_MANIFEST") 357 if nomanifest is None or nomanifest != "1": 358 try: 359 with open(manifest) as f: 360 for line in f: 361 (pkg, arch, version) = line.strip().split() 362 self.pkgmanifest[pkg] = (version, arch) 363 except IOError as e: 364 bb.fatal("No package manifest file found. Did you build the image?\n%s" % e) 365 366 def _get_test_namespace(self): 367 return "runtime" 368 369 def _get_test_suites(self): 370 testsuites = [] 371 372 manifests = (self.d.getVar("TEST_SUITES_MANIFEST") or '').split() 373 if manifests: 374 for manifest in manifests: 375 testsuites.extend(self._read_testlist(manifest, 376 self.d.getVar("TOPDIR")).split()) 377 378 else: 379 testsuites = self.d.getVar("TEST_SUITES").split() 380 381 return testsuites 382 383 def _get_test_suites_required(self): 384 return [t for t in self.d.getVar("TEST_SUITES").split() if t != "auto"] 385 386 def extract_packages(self): 387 """ 388 Find packages that will be needed during runtime. 389 """ 390 391 modules = self.getTestModules() 392 bbpaths = self.d.getVar("BBPATH").split(":") 393 394 shutil.rmtree(self.d.getVar("TEST_EXTRACTED_DIR")) 395 shutil.rmtree(self.d.getVar("TEST_PACKAGED_DIR")) 396 for module in modules: 397 json_file = self._getJsonFile(module) 398 if json_file: 399 needed_packages = self._getNeededPackages(json_file) 400 self._perform_package_extraction(needed_packages) 401 402 def _perform_package_extraction(self, needed_packages): 403 """ 404 Extract packages that will be needed during runtime. 405 """ 406 407 import oe.path 408 409 extracted_path = self.d.getVar("TEST_EXTRACTED_DIR") 410 packaged_path = self.d.getVar("TEST_PACKAGED_DIR") 411 412 for key,value in needed_packages.items(): 413 packages = () 414 if isinstance(value, dict): 415 packages = (value, ) 416 elif isinstance(value, list): 417 packages = value 418 else: 419 bb.fatal("Failed to process needed packages for %s; " 420 "Value must be a dict or list" % key) 421 422 for package in packages: 423 pkg = package["pkg"] 424 rm = package.get("rm", False) 425 extract = package.get("extract", True) 426 if extract: 427 dst_dir = os.path.join(extracted_path, pkg) 428 else: 429 dst_dir = os.path.join(packaged_path) 430 431 # Extract package and copy it to TEST_EXTRACTED_DIR 432 pkg_dir = self._extract_in_tmpdir(pkg) 433 if extract: 434 435 # Same package used for more than one test, 436 # don't need to extract again. 437 if os.path.exists(dst_dir): 438 continue 439 oe.path.copytree(pkg_dir, dst_dir) 440 shutil.rmtree(pkg_dir) 441 442 # Copy package to TEST_PACKAGED_DIR 443 else: 444 self._copy_package(pkg) 445 446 def _getJsonFile(self, module): 447 """ 448 Returns the path of the JSON file for a module, empty if doesn't exitst. 449 """ 450 451 module_file = module.path 452 json_file = "%s.json" % module_file.rsplit(".", 1)[0] 453 if os.path.isfile(module_file) and os.path.isfile(json_file): 454 return json_file 455 else: 456 return "" 457 458 def _getNeededPackages(self, json_file, test=None): 459 """ 460 Returns a dict with needed packages based on a JSON file. 461 462 463 If a test is specified it will return the dict just for that test. 464 """ 465 466 import json 467 468 needed_packages = {} 469 470 with open(json_file) as f: 471 test_packages = json.load(f) 472 for key,value in test_packages.items(): 473 needed_packages[key] = value 474 475 if test: 476 if test in needed_packages: 477 needed_packages = needed_packages[test] 478 else: 479 needed_packages = {} 480 481 return needed_packages 482 483 def _extract_in_tmpdir(self, pkg): 484 """" 485 Returns path to a temp directory where the package was 486 extracted without dependencies. 487 """ 488 489 from oeqa.utils.package_manager import get_package_manager 490 491 pkg_path = os.path.join(self.d.getVar("TEST_INSTALL_TMP_DIR"), pkg) 492 pm = get_package_manager(self.d, pkg_path) 493 extract_dir = pm.extract(pkg) 494 shutil.rmtree(pkg_path) 495 496 return extract_dir 497 498 def _copy_package(self, pkg): 499 """ 500 Copy the RPM, DEB or IPK package to dst_dir 501 """ 502 503 from oeqa.utils.package_manager import get_package_manager 504 505 pkg_path = os.path.join(self.d.getVar("TEST_INSTALL_TMP_DIR"), pkg) 506 dst_dir = self.d.getVar("TEST_PACKAGED_DIR") 507 pm = get_package_manager(self.d, pkg_path) 508 pkg_info = pm.package_info(pkg) 509 file_path = pkg_info[pkg]["filepath"] 510 shutil.copy2(file_path, dst_dir) 511 shutil.rmtree(pkg_path) 512 513 def install_uninstall_packages(self, test_id, pkg_dir, install): 514 """ 515 Check if the test requires a package and Install/Uninstall it in the DUT 516 """ 517 518 test = test_id.split(".")[4] 519 module = self.getModulefromID(test_id) 520 json = self._getJsonFile(module) 521 if json: 522 needed_packages = self._getNeededPackages(json, test) 523 if needed_packages: 524 self._install_uninstall_packages(needed_packages, pkg_dir, install) 525 526 def _install_uninstall_packages(self, needed_packages, pkg_dir, install=True): 527 """ 528 Install/Uninstall packages in the DUT without using a package manager 529 """ 530 531 if isinstance(needed_packages, dict): 532 packages = [needed_packages] 533 elif isinstance(needed_packages, list): 534 packages = needed_packages 535 536 for package in packages: 537 pkg = package["pkg"] 538 rm = package.get("rm", False) 539 extract = package.get("extract", True) 540 src_dir = os.path.join(pkg_dir, pkg) 541 542 # Install package 543 if install and extract: 544 self.target.connection.copy_dir_to(src_dir, "/") 545 546 # Uninstall package 547 elif not install and rm: 548 self.target.connection.delete_dir_structure(src_dir, "/") 549 550class ImageTestContext(RuntimeTestContext): 551 def __init__(self, d, target, host_dumper): 552 super(ImageTestContext, self).__init__(d, target) 553 554 self.tagexp = d.getVar("TEST_SUITES_TAGS") 555 556 self.host_dumper = host_dumper 557 558 self.sigterm = False 559 self.origsigtermhandler = signal.getsignal(signal.SIGTERM) 560 signal.signal(signal.SIGTERM, self._sigterm_exception) 561 562 def _sigterm_exception(self, signum, stackframe): 563 bb.warn("TestImage received SIGTERM, shutting down...") 564 self.sigterm = True 565 self.target.stop() 566 567 def install_uninstall_packages(self, test_id, install=True): 568 """ 569 Check if the test requires a package and Install/Uninstall it in the DUT 570 """ 571 572 pkg_dir = self.d.getVar("TEST_EXTRACTED_DIR") 573 super(ImageTestContext, self).install_uninstall_packages(test_id, pkg_dir, install) 574 575class ExportTestContext(RuntimeTestContext): 576 def __init__(self, d, target, exported=False, parsedArgs={}): 577 """ 578 This class is used when exporting tests and when are executed outside OE environment. 579 580 parsedArgs can contain the following: 581 - tag: Filter test by tag. 582 """ 583 super(ExportTestContext, self).__init__(d, target, exported) 584 585 tag = parsedArgs.get("tag", None) 586 self.tagexp = tag if tag != None else d.getVar("TEST_SUITES_TAGS") 587 588 self.sigterm = None 589 590 def install_uninstall_packages(self, test_id, install=True): 591 """ 592 Check if the test requires a package and Install/Uninstall it in the DUT 593 """ 594 595 export_dir = os.path.dirname(os.path.dirname(os.path.realpath(__file__))) 596 extracted_dir = self.d.getVar("TEST_EXPORT_EXTRACTED_DIR") 597 pkg_dir = os.path.join(export_dir, extracted_dir) 598 super(ExportTestContext, self).install_uninstall_packages(test_id, pkg_dir, install) 599