xref: /openbmc/openbmc/poky/meta/lib/oeqa/oetest.py (revision 73bd93f1)
1#
2# Copyright (C) 2013 Intel Corporation
3#
4# SPDX-License-Identifier: MIT
5#
6
7# Main unittest module used by testimage.bbclass
8# This provides the oeRuntimeTest base class which is inherited by all tests in meta/lib/oeqa/runtime.
9
10# It also has some helper functions and it's responsible for actually starting the tests
11
12import os, re, sys
13import unittest
14import inspect
15import subprocess
16import signal
17import shutil
18import functools
19try:
20    import bb
21except ImportError:
22    pass
23import logging
24
25import oeqa.runtime
26# Exported test doesn't require sdkext
27try:
28    import oeqa.sdkext
29except ImportError:
30    pass
31from oeqa.utils.decorators import LogResults, gettag
32
33logger = logging.getLogger("BitBake")
34
35def getVar(obj):
36    #extend form dict, if a variable didn't exists, need find it in testcase
37    class VarDict(dict):
38        def __getitem__(self, key):
39            return gettag(obj, key)
40    return VarDict()
41
42def checkTags(tc, tagexp):
43    return eval(tagexp, None, getVar(tc))
44
45def filterByTagExp(testsuite, tagexp):
46    if not tagexp:
47        return testsuite
48    caseList = []
49    for each in testsuite:
50        if not isinstance(each, unittest.BaseTestSuite):
51            if checkTags(each, tagexp):
52                caseList.append(each)
53        else:
54            caseList.append(filterByTagExp(each, tagexp))
55    return testsuite.__class__(caseList)
56
57@LogResults
58class oeTest(unittest.TestCase):
59
60    longMessage = True
61
62    @classmethod
63    def hasPackage(self, pkg):
64        """
65        True if the full package name exists in the manifest, False otherwise.
66        """
67        return pkg in oeTest.tc.pkgmanifest
68
69    @classmethod
70    def hasPackageMatch(self, match):
71        """
72        True if match exists in the manifest as a regular expression substring,
73        False otherwise.
74        """
75        for s in oeTest.tc.pkgmanifest:
76            if re.match(match, s):
77                return True
78        return False
79
80    @classmethod
81    def hasFeature(self,feature):
82        if feature in oeTest.tc.imagefeatures or \
83                feature in oeTest.tc.distrofeatures:
84            return True
85        else:
86            return False
87
88class oeRuntimeTest(oeTest):
89    def __init__(self, methodName='runTest'):
90        self.target = oeRuntimeTest.tc.target
91        super(oeRuntimeTest, self).__init__(methodName)
92
93    def setUp(self):
94        # Install packages in the DUT
95        self.tc.install_uninstall_packages(self.id())
96
97        # Check if test needs to run
98        if self.tc.sigterm:
99            self.fail("Got SIGTERM")
100        elif (type(self.target).__name__ == "QemuTarget"):
101            self.assertTrue(self.target.check(), msg = "Qemu not running?")
102
103        self.setUpLocal()
104
105    # a setup method before tests but after the class instantiation
106    def setUpLocal(self):
107        pass
108
109    def tearDown(self):
110        # Uninstall packages in the DUT
111        self.tc.install_uninstall_packages(self.id(), False)
112        self.tearDownLocal()
113
114    # Method to be run after tearDown and implemented by child classes
115    def tearDownLocal(self):
116        pass
117
118def getmodule(pos=2):
119    # stack returns a list of tuples containg frame information
120    # First element of the list the is current frame, caller is 1
121    frameinfo = inspect.stack()[pos]
122    modname = inspect.getmodulename(frameinfo[1])
123    #modname = inspect.getmodule(frameinfo[0]).__name__
124    return modname
125
126def skipModule(reason, pos=2):
127    modname = getmodule(pos)
128    if modname not in oeTest.tc.testsrequired:
129        raise unittest.SkipTest("%s: %s" % (modname, reason))
130    else:
131        raise Exception("\nTest %s wants to be skipped.\nReason is: %s" \
132                "\nTest was required in TEST_SUITES, so either the condition for skipping is wrong" \
133                "\nor the image really doesn't have the required feature/package when it should." % (modname, reason))
134
135def skipModuleIf(cond, reason):
136
137    if cond:
138        skipModule(reason, 3)
139
140def skipModuleUnless(cond, reason):
141
142    if not cond:
143        skipModule(reason, 3)
144
145_buffer_logger = ""
146def custom_verbose(msg, *args, **kwargs):
147    global _buffer_logger
148    if msg[-1] != "\n":
149        _buffer_logger += msg
150    else:
151        _buffer_logger += msg
152        try:
153            bb.plain(_buffer_logger.rstrip("\n"), *args, **kwargs)
154        except NameError:
155            logger.info(_buffer_logger.rstrip("\n"), *args, **kwargs)
156        _buffer_logger = ""
157
158class TestContext(object):
159    def __init__(self, d, exported=False):
160        self.d = d
161
162        self.testsuites = self._get_test_suites()
163
164        if exported:
165            path = [os.path.dirname(os.path.abspath(__file__))]
166            extrapath = ""
167        else:
168            path = d.getVar("BBPATH").split(':')
169            extrapath = "lib/oeqa"
170
171        self.testslist = self._get_tests_list(path, extrapath)
172        self.testsrequired = self._get_test_suites_required()
173
174        self.filesdir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "runtime/files")
175        self.corefilesdir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "files")
176        self.imagefeatures = d.getVar("IMAGE_FEATURES").split()
177        self.distrofeatures = d.getVar("DISTRO_FEATURES").split()
178
179    # get testcase list from specified file
180    # if path is a relative path, then relative to build/conf/
181    def _read_testlist(self, fpath, builddir):
182        if not os.path.isabs(fpath):
183            fpath = os.path.join(builddir, "conf", fpath)
184        if not os.path.exists(fpath):
185            bb.fatal("No such manifest file: ", fpath)
186        tcs = []
187        for line in open(fpath).readlines():
188            line = line.strip()
189            if line and not line.startswith("#"):
190                tcs.append(line)
191        return " ".join(tcs)
192
193    # return test list by type also filter if TEST_SUITES is specified
194    def _get_tests_list(self, bbpath, extrapath):
195        testslist = []
196
197        type = self._get_test_namespace()
198
199        # This relies on lib/ under each directory in BBPATH being added to sys.path
200        # (as done by default in base.bbclass)
201        for testname in self.testsuites:
202            if testname != "auto":
203                if testname.startswith("oeqa."):
204                    testslist.append(testname)
205                    continue
206                found = False
207                for p in bbpath:
208                    if os.path.exists(os.path.join(p, extrapath, type, testname + ".py")):
209                        testslist.append("oeqa." + type + "." + testname)
210                        found = True
211                        break
212                    elif os.path.exists(os.path.join(p, extrapath, type, testname.split(".")[0] + ".py")):
213                        testslist.append("oeqa." + type + "." + testname)
214                        found = True
215                        break
216                if not found:
217                    bb.fatal('Test %s specified in TEST_SUITES could not be found in lib/oeqa/runtime under BBPATH' % testname)
218
219        if "auto" in self.testsuites:
220            def add_auto_list(path):
221                files = sorted([f for f in os.listdir(path) if f.endswith('.py') and not f.startswith('_')])
222                for f in files:
223                    module = 'oeqa.' + type + '.' + f[:-3]
224                    if module not in testslist:
225                        testslist.append(module)
226
227            for p in bbpath:
228                testpath = os.path.join(p, 'lib', 'oeqa', type)
229                bb.debug(2, 'Searching for tests in %s' % testpath)
230                if os.path.exists(testpath):
231                    add_auto_list(testpath)
232
233        return testslist
234
235    def getTestModules(self):
236        """
237        Returns all the test modules in the testlist.
238        """
239
240        import pkgutil
241
242        modules = []
243        for test in self.testslist:
244            if re.search(r"\w+\.\w+\.test_\S+", test):
245                test = '.'.join(t.split('.')[:3])
246            module = pkgutil.get_loader(test)
247            modules.append(module)
248
249        return modules
250
251    def getModulefromID(self, test_id):
252        """
253        Returns the test module based on a test id.
254        """
255
256        module_name = ".".join(test_id.split(".")[:3])
257        modules = self.getTestModules()
258        for module in modules:
259            if module.name == module_name:
260                return module
261
262        return None
263
264    def getTests(self, test):
265        '''Return all individual tests executed when running the suite.'''
266        # Unfortunately unittest does not have an API for this, so we have
267        # to rely on implementation details. This only needs to work
268        # for TestSuite containing TestCase.
269        method = getattr(test, '_testMethodName', None)
270        if method:
271            # leaf case: a TestCase
272            yield test
273        else:
274            # Look into TestSuite.
275            tests = getattr(test, '_tests', [])
276            for t1 in tests:
277                for t2 in self.getTests(t1):
278                    yield t2
279
280    def loadTests(self):
281        setattr(oeTest, "tc", self)
282
283        testloader = unittest.TestLoader()
284        testloader.sortTestMethodsUsing = None
285        suites = [testloader.loadTestsFromName(name) for name in self.testslist]
286        suites = filterByTagExp(suites, getattr(self, "tagexp", None))
287
288        # Determine dependencies between suites by looking for @skipUnlessPassed
289        # method annotations. Suite A depends on suite B if any method in A
290        # depends on a method on B.
291        for suite in suites:
292            suite.dependencies = []
293            suite.depth = 0
294            for test in self.getTests(suite):
295                methodname = getattr(test, '_testMethodName', None)
296                if methodname:
297                    method = getattr(test, methodname)
298                    depends_on = getattr(method, '_depends_on', None)
299                    if depends_on:
300                        for dep_suite in suites:
301                            if depends_on in [getattr(t, '_testMethodName', None) for t in self.getTests(dep_suite)]:
302                                if dep_suite not in suite.dependencies and \
303                                   dep_suite is not suite:
304                                    suite.dependencies.append(dep_suite)
305                                break
306                        else:
307                            logger.warning("Test %s was declared as @skipUnlessPassed('%s') but that test is either not defined or not active. Will run the test anyway." %
308                                    (test, depends_on))
309
310        # Use brute-force topological sort to determine ordering. Sort by
311        # depth (higher depth = must run later), with original ordering to
312        # break ties.
313        def set_suite_depth(suite):
314            for dep in suite.dependencies:
315                new_depth = set_suite_depth(dep) + 1
316                if new_depth > suite.depth:
317                    suite.depth = new_depth
318            return suite.depth
319
320        for index, suite in enumerate(suites):
321            set_suite_depth(suite)
322            suite.index = index
323
324        def cmp(a, b):
325            return (a > b) - (a < b)
326
327        def cmpfunc(a, b):
328            return cmp((a.depth, a.index), (b.depth, b.index))
329
330        suites.sort(key=functools.cmp_to_key(cmpfunc))
331
332        self.suite = testloader.suiteClass(suites)
333
334        return self.suite
335
336    def runTests(self):
337        logger.info("Test modules  %s" % self.testslist)
338        if hasattr(self, "tagexp") and self.tagexp:
339            logger.info("Filter test cases by tags: %s" % self.tagexp)
340        logger.info("Found %s tests" % self.suite.countTestCases())
341        runner = unittest.TextTestRunner(verbosity=2)
342        if 'bb' in sys.modules:
343            runner.stream.write = custom_verbose
344
345        return runner.run(self.suite)
346
347class RuntimeTestContext(TestContext):
348    def __init__(self, d, target, exported=False):
349        super(RuntimeTestContext, self).__init__(d, exported)
350
351        self.target = target
352
353        self.pkgmanifest = {}
354        manifest = os.path.join(d.getVar("DEPLOY_DIR_IMAGE"),
355                d.getVar("IMAGE_LINK_NAME") + ".manifest")
356        nomanifest = d.getVar("IMAGE_NO_MANIFEST")
357        if nomanifest is None or nomanifest != "1":
358            try:
359                with open(manifest) as f:
360                    for line in f:
361                        (pkg, arch, version) = line.strip().split()
362                        self.pkgmanifest[pkg] = (version, arch)
363            except IOError as e:
364                bb.fatal("No package manifest file found. Did you build the image?\n%s" % e)
365
366    def _get_test_namespace(self):
367        return "runtime"
368
369    def _get_test_suites(self):
370        testsuites = []
371
372        manifests = (self.d.getVar("TEST_SUITES_MANIFEST") or '').split()
373        if manifests:
374            for manifest in manifests:
375                testsuites.extend(self._read_testlist(manifest,
376                                  self.d.getVar("TOPDIR")).split())
377
378        else:
379            testsuites = self.d.getVar("TEST_SUITES").split()
380
381        return testsuites
382
383    def _get_test_suites_required(self):
384        return [t for t in self.d.getVar("TEST_SUITES").split() if t != "auto"]
385
386    def extract_packages(self):
387        """
388        Find packages that will be needed during runtime.
389        """
390
391        modules = self.getTestModules()
392        bbpaths = self.d.getVar("BBPATH").split(":")
393
394        shutil.rmtree(self.d.getVar("TEST_EXTRACTED_DIR"))
395        shutil.rmtree(self.d.getVar("TEST_PACKAGED_DIR"))
396        for module in modules:
397            json_file = self._getJsonFile(module)
398            if json_file:
399                needed_packages = self._getNeededPackages(json_file)
400                self._perform_package_extraction(needed_packages)
401
402    def _perform_package_extraction(self, needed_packages):
403        """
404        Extract packages that will be needed during runtime.
405        """
406
407        import oe.path
408
409        extracted_path = self.d.getVar("TEST_EXTRACTED_DIR")
410        packaged_path = self.d.getVar("TEST_PACKAGED_DIR")
411
412        for key,value in needed_packages.items():
413            packages = ()
414            if isinstance(value, dict):
415                packages = (value, )
416            elif isinstance(value, list):
417                packages = value
418            else:
419                bb.fatal("Failed to process needed packages for %s; "
420                         "Value must be a dict or list" % key)
421
422            for package in packages:
423                pkg = package["pkg"]
424                rm = package.get("rm", False)
425                extract = package.get("extract", True)
426                if extract:
427                    dst_dir = os.path.join(extracted_path, pkg)
428                else:
429                    dst_dir = os.path.join(packaged_path)
430
431                # Extract package and copy it to TEST_EXTRACTED_DIR
432                pkg_dir = self._extract_in_tmpdir(pkg)
433                if extract:
434
435                    # Same package used for more than one test,
436                    # don't need to extract again.
437                    if os.path.exists(dst_dir):
438                        continue
439                    oe.path.copytree(pkg_dir, dst_dir)
440                    shutil.rmtree(pkg_dir)
441
442                # Copy package to TEST_PACKAGED_DIR
443                else:
444                    self._copy_package(pkg)
445
446    def _getJsonFile(self, module):
447        """
448        Returns the path of the JSON file for a module, empty if doesn't exitst.
449        """
450
451        module_file = module.path
452        json_file = "%s.json" % module_file.rsplit(".", 1)[0]
453        if os.path.isfile(module_file) and os.path.isfile(json_file):
454            return json_file
455        else:
456            return ""
457
458    def _getNeededPackages(self, json_file, test=None):
459        """
460        Returns a dict with needed packages based on a JSON file.
461
462
463        If a test is specified it will return the dict just for that test.
464        """
465
466        import json
467
468        needed_packages = {}
469
470        with open(json_file) as f:
471            test_packages = json.load(f)
472        for key,value in test_packages.items():
473            needed_packages[key] = value
474
475        if test:
476            if test in needed_packages:
477                needed_packages = needed_packages[test]
478            else:
479                needed_packages = {}
480
481        return needed_packages
482
483    def _extract_in_tmpdir(self, pkg):
484        """"
485        Returns path to a temp directory where the package was
486        extracted without dependencies.
487        """
488
489        from oeqa.utils.package_manager import get_package_manager
490
491        pkg_path = os.path.join(self.d.getVar("TEST_INSTALL_TMP_DIR"), pkg)
492        pm = get_package_manager(self.d, pkg_path)
493        extract_dir = pm.extract(pkg)
494        shutil.rmtree(pkg_path)
495
496        return extract_dir
497
498    def _copy_package(self, pkg):
499        """
500        Copy the RPM, DEB or IPK package to dst_dir
501        """
502
503        from oeqa.utils.package_manager import get_package_manager
504
505        pkg_path = os.path.join(self.d.getVar("TEST_INSTALL_TMP_DIR"), pkg)
506        dst_dir = self.d.getVar("TEST_PACKAGED_DIR")
507        pm = get_package_manager(self.d, pkg_path)
508        pkg_info = pm.package_info(pkg)
509        file_path = pkg_info[pkg]["filepath"]
510        shutil.copy2(file_path, dst_dir)
511        shutil.rmtree(pkg_path)
512
513    def install_uninstall_packages(self, test_id, pkg_dir, install):
514        """
515        Check if the test requires a package and Install/Uninstall it in the DUT
516        """
517
518        test = test_id.split(".")[4]
519        module = self.getModulefromID(test_id)
520        json = self._getJsonFile(module)
521        if json:
522            needed_packages = self._getNeededPackages(json, test)
523            if needed_packages:
524                self._install_uninstall_packages(needed_packages, pkg_dir, install)
525
526    def _install_uninstall_packages(self, needed_packages, pkg_dir, install=True):
527        """
528        Install/Uninstall packages in the DUT without using a package manager
529        """
530
531        if isinstance(needed_packages, dict):
532            packages = [needed_packages]
533        elif isinstance(needed_packages, list):
534            packages = needed_packages
535
536        for package in packages:
537            pkg = package["pkg"]
538            rm = package.get("rm", False)
539            extract = package.get("extract", True)
540            src_dir = os.path.join(pkg_dir, pkg)
541
542            # Install package
543            if install and extract:
544                self.target.connection.copy_dir_to(src_dir, "/")
545
546            # Uninstall package
547            elif not install and rm:
548                self.target.connection.delete_dir_structure(src_dir, "/")
549
550class ImageTestContext(RuntimeTestContext):
551    def __init__(self, d, target, host_dumper):
552        super(ImageTestContext, self).__init__(d, target)
553
554        self.tagexp = d.getVar("TEST_SUITES_TAGS")
555
556        self.host_dumper = host_dumper
557
558        self.sigterm = False
559        self.origsigtermhandler = signal.getsignal(signal.SIGTERM)
560        signal.signal(signal.SIGTERM, self._sigterm_exception)
561
562    def _sigterm_exception(self, signum, stackframe):
563        bb.warn("TestImage received SIGTERM, shutting down...")
564        self.sigterm = True
565        self.target.stop()
566
567    def install_uninstall_packages(self, test_id, install=True):
568        """
569        Check if the test requires a package and Install/Uninstall it in the DUT
570        """
571
572        pkg_dir = self.d.getVar("TEST_EXTRACTED_DIR")
573        super(ImageTestContext, self).install_uninstall_packages(test_id, pkg_dir, install)
574
575class ExportTestContext(RuntimeTestContext):
576    def __init__(self, d, target, exported=False, parsedArgs={}):
577        """
578        This class is used when exporting tests and when are executed outside OE environment.
579
580        parsedArgs can contain the following:
581            - tag:      Filter test by tag.
582        """
583        super(ExportTestContext, self).__init__(d, target, exported)
584
585        tag = parsedArgs.get("tag", None)
586        self.tagexp = tag if tag != None else d.getVar("TEST_SUITES_TAGS")
587
588        self.sigterm = None
589
590    def install_uninstall_packages(self, test_id, install=True):
591        """
592        Check if the test requires a package and Install/Uninstall it in the DUT
593        """
594
595        export_dir = os.path.dirname(os.path.dirname(os.path.realpath(__file__)))
596        extracted_dir = self.d.getVar("TEST_EXPORT_EXTRACTED_DIR")
597        pkg_dir = os.path.join(export_dir, extracted_dir)
598        super(ExportTestContext, self).install_uninstall_packages(test_id, pkg_dir, install)
599