xref: /openbmc/openbmc/poky/scripts/patchtest (revision 8460358c3d24c71d9d38fd126c745854a6301564)
1#!/usr/bin/env python3
2# ex:ts=4:sw=4:sts=4:et
3# -*- tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*-
4#
5# patchtest: execute all unittest test cases discovered for a single patch
6#
7# Copyright (C) 2016 Intel Corporation
8#
9# SPDX-License-Identifier: GPL-2.0-only
10#
11
12import json
13import logging
14import os
15import sys
16import traceback
17import unittest
18
19# Include current path so test cases can see it
20sys.path.insert(0, os.path.dirname(os.path.realpath(__file__)))
21
22# Include patchtest library
23sys.path.insert(0, os.path.join(os.path.dirname(os.path.realpath(__file__)), '../meta/lib/patchtest'))
24
25from patchtest_parser import PatchtestParser
26from repo import PatchTestRepo
27
28logger = logging.getLogger("patchtest")
29loggerhandler = logging.StreamHandler()
30loggerhandler.setFormatter(logging.Formatter("%(message)s"))
31logger.addHandler(loggerhandler)
32logger.setLevel(logging.INFO)
33info = logger.info
34error = logger.error
35
36def getResult(patch, mergepatch, logfile=None):
37
38    class PatchTestResult(unittest.TextTestResult):
39        """ Patchtest TextTestResult """
40        shouldStop  = True
41        longMessage = False
42
43        success     = 'PASS'
44        fail        = 'FAIL'
45        skip        = 'SKIP'
46
47        def startTestRun(self):
48            # let's create the repo already, it can be used later on
49            repoargs = {
50                "repodir": PatchtestParser.repodir,
51                "commit": PatchtestParser.basecommit,
52                "branch": PatchtestParser.basebranch,
53                "patch": patch,
54            }
55
56            self.repo_error    = False
57            self.test_error    = False
58            self.test_failure  = False
59
60            try:
61                self.repo = PatchtestParser.repo = PatchTestRepo(**repoargs)
62            except:
63                logger.error(traceback.print_exc())
64                self.repo_error = True
65                self.stop()
66                return
67
68            if mergepatch:
69                self.repo.merge()
70
71        def addError(self, test, err):
72            self.test_error = True
73            (ty, va, trace) = err
74            logger.error(traceback.print_exc())
75
76        def addFailure(self, test, err):
77            test_description = test.id().split('.')[-1].replace('_', ' ').replace("cve", "CVE").replace("signed off by",
78            "Signed-off-by").replace("upstream status",
79            "Upstream-Status").replace("non auh",
80            "non-AUH").replace("presence format", "presence")
81            self.test_failure = True
82            fail_str = '{}: {}: {} ({})'.format(self.fail,
83            test_description, json.loads(str(err[1]))["issue"],
84            test.id())
85            print(fail_str)
86            if logfile:
87                with open(logfile, "a") as f:
88                    f.write(fail_str + "\n")
89
90        def addSuccess(self, test):
91            test_description = test.id().split('.')[-1].replace('_', ' ').replace("cve", "CVE").replace("signed off by",
92            "Signed-off-by").replace("upstream status",
93            "Upstream-Status").replace("non auh",
94            "non-AUH").replace("presence format", "presence")
95            success_str = '{}: {} ({})'.format(self.success,
96            test_description, test.id())
97            print(success_str)
98            if logfile:
99                with open(logfile, "a") as f:
100                    f.write(success_str + "\n")
101
102        def addSkip(self, test, reason):
103            test_description = test.id().split('.')[-1].replace('_', ' ').replace("cve", "CVE").replace("signed off by",
104            "Signed-off-by").replace("upstream status",
105            "Upstream-Status").replace("non auh",
106            "non-AUH").replace("presence format", "presence")
107            skip_str = '{}: {}: {} ({})'.format(self.skip,
108            test_description, json.loads(str(reason))["issue"],
109            test.id())
110            print(skip_str)
111            if logfile:
112                with open(logfile, "a") as f:
113                    f.write(skip_str + "\n")
114
115        def stopTestRun(self):
116
117            # in case there was an error on repo object creation, just return
118            if self.repo_error:
119                return
120
121            self.repo.clean()
122
123    return PatchTestResult
124
125def _runner(resultklass, prefix=None):
126    # load test with the corresponding prefix
127    loader = unittest.TestLoader()
128    if prefix:
129        loader.testMethodPrefix = prefix
130
131    # create the suite with discovered tests and the corresponding runner
132    suite = loader.discover(
133        start_dir=PatchtestParser.testdir,
134        pattern=PatchtestParser.pattern,
135        top_level_dir=PatchtestParser.topdir,
136    )
137    ntc = suite.countTestCases()
138
139    # if there are no test cases, just quit
140    if not ntc:
141        return 2
142    runner = unittest.TextTestRunner(resultclass=resultklass, verbosity=0)
143
144    try:
145        result = runner.run(suite)
146    except:
147        logger.error(traceback.print_exc())
148        logger.error('patchtest: something went wrong')
149        return 1
150    if result.test_failure or result.test_error:
151        return 1
152
153    return 0
154
155def run(patch, logfile=None):
156    """ Load, setup and run pre and post-merge tests """
157    # Get the result class and install the control-c handler
158    unittest.installHandler()
159
160    # run pre-merge tests, meaning those methods with 'pretest' as prefix
161    premerge_resultklass = getResult(patch, False, logfile)
162    premerge_result = _runner(premerge_resultklass, 'pretest')
163
164    # run post-merge tests, meaning those methods with 'test' as prefix
165    postmerge_resultklass = getResult(patch, True, logfile)
166    postmerge_result = _runner(postmerge_resultklass, 'test')
167
168    print_result_message(premerge_result, postmerge_result)
169    return premerge_result or postmerge_result
170
171def print_result_message(preresult, postresult):
172    print("----------------------------------------------------------------------\n")
173    if preresult == 2 and postresult == 2:
174        logger.error(
175            "patchtest: No test cases found - did you specify the correct suite directory?"
176        )
177    if preresult == 1 or postresult == 1:
178        logger.error(
179            "WARNING: patchtest: At least one patchtest caused a failure or an error - please check https://wiki.yoctoproject.org/wiki/Patchtest for further guidance"
180        )
181    else:
182        logger.info("OK: patchtest: All patchtests passed")
183    print("----------------------------------------------------------------------\n")
184
185def main():
186    tmp_patch = False
187    patch_path = PatchtestParser.patch_path
188    log_results = PatchtestParser.log_results
189    log_path = None
190    patch_list = None
191
192    git_status = os.popen("(cd %s && git status)" % PatchtestParser.repodir).read()
193    status_matches = ["Changes not staged for commit", "Changes to be committed"]
194    if any([match in git_status for match in status_matches]):
195        logger.error("patchtest: there are uncommitted changes in the target repo that would be overwritten. Please commit or restore them before running patchtest")
196        return 1
197
198    if os.path.isdir(patch_path):
199        patch_list = [os.path.join(patch_path, filename) for filename in sorted(os.listdir(patch_path))]
200    else:
201        patch_list = [patch_path]
202
203    for patch in patch_list:
204        if os.path.getsize(patch) == 0:
205            logger.error('patchtest: patch is empty')
206            return 1
207
208        logger.info('Testing patch %s' % patch)
209
210        if log_results:
211            log_path = patch + ".testresult"
212            with open(log_path, "a") as f:
213                f.write("Patchtest results for patch '%s':\n\n" % patch)
214
215        try:
216            if log_path:
217                run(patch, log_path)
218            else:
219                run(patch)
220        finally:
221            if tmp_patch:
222                os.remove(patch)
223
224if __name__ == '__main__':
225    ret = 1
226
227    # Parse the command line arguments and store it on the PatchtestParser namespace
228    PatchtestParser.set_namespace()
229
230    # set debugging level
231    if PatchtestParser.debug:
232        logger.setLevel(logging.DEBUG)
233
234    # if topdir not define, default it to testdir
235    if not PatchtestParser.topdir:
236        PatchtestParser.topdir = PatchtestParser.testdir
237
238    try:
239        ret = main()
240    except Exception:
241        import traceback
242        traceback.print_exc(5)
243
244    sys.exit(ret)
245