1# Base class to be used by all test cases defined in the suite
2#
3# Copyright (C) 2016 Intel Corporation
4#
5# SPDX-License-Identifier: GPL-2.0-only
6
7import unittest
8import logging
9import json
10import unidiff
11from data import PatchTestInput
12import mailbox
13import collections
14import sys
15import os
16import re
17
18sys.path.insert(0, os.path.join(os.path.dirname(__file__), 'pyparsing'))
19
20logger = logging.getLogger('patchtest')
21debug=logger.debug
22info=logger.info
23warn=logger.warn
24error=logger.error
25
26Commit = collections.namedtuple('Commit', ['author', 'subject', 'commit_message', 'shortlog', 'payload'])
27
28class PatchtestOEError(Exception):
29    """Exception for handling patchtest-oe errors"""
30    def __init__(self, message, exitcode=1):
31        super().__init__(message)
32        self.exitcode = exitcode
33
34class Base(unittest.TestCase):
35    # if unit test fails, fail message will throw at least the following JSON: {"id": <testid>}
36
37    endcommit_messages_regex = re.compile(r'\(From \w+-\w+ rev:|(?<!\S)Signed-off-by|(?<!\S)---\n')
38    patchmetadata_regex   = re.compile(r'-{3} \S+|\+{3} \S+|@{2} -\d+,\d+ \+\d+,\d+ @{2} \S+')
39
40
41    @staticmethod
42    def msg_to_commit(msg):
43        payload = msg.get_payload()
44        return Commit(subject=msg['subject'].replace('\n', ' ').replace('  ', ' '),
45                      author=msg.get('From'),
46                      shortlog=Base.shortlog(msg['subject']),
47                      commit_message=Base.commit_message(payload),
48                      payload=payload)
49
50    @staticmethod
51    def commit_message(payload):
52        commit_message = payload.__str__()
53        match = Base.endcommit_messages_regex.search(payload)
54        if match:
55            commit_message = payload[:match.start()]
56        return commit_message
57
58    @staticmethod
59    def shortlog(shlog):
60        # remove possible prefix (between brackets) before colon
61        start = shlog.find(']', 0, shlog.find(':'))
62        # remove also newlines and spaces at both sides
63        return shlog[start + 1:].replace('\n', '').strip()
64
65    @classmethod
66    def setUpClass(cls):
67
68        # General objects: mailbox.mbox and patchset
69        cls.mbox = mailbox.mbox(PatchTestInput.repo.patch)
70
71        # Patch may be malformed, so try parsing it
72        cls.unidiff_parse_error = ''
73        cls.patchset = None
74        try:
75            cls.patchset = unidiff.PatchSet.from_filename(PatchTestInput.repo.patch, encoding=u'UTF-8')
76        except unidiff.UnidiffParseError as upe:
77            cls.patchset = []
78            cls.unidiff_parse_error = str(upe)
79
80        # Easy to iterate list of commits
81        cls.commits = []
82        for msg in cls.mbox:
83            if msg['subject'] and msg.get_payload():
84                cls.commits.append(Base.msg_to_commit(msg))
85
86        cls.setUpClassLocal()
87
88    @classmethod
89    def tearDownClass(cls):
90        cls.tearDownClassLocal()
91
92    @classmethod
93    def setUpClassLocal(cls):
94        pass
95
96    @classmethod
97    def tearDownClassLocal(cls):
98        pass
99
100    def fail(self, issue, fix=None, commit=None, data=None):
101        """ Convert to a JSON string failure data"""
102        value = {'id': self.id(),
103                 'issue': issue}
104
105        if fix:
106            value['fix'] = fix
107        if commit:
108            value['commit'] = {'subject': commit.subject,
109                               'shortlog': commit.shortlog}
110
111        # extend return value with other useful info
112        if data:
113            value['data'] = data
114
115        return super(Base, self).fail(json.dumps(value))
116
117    def skip(self, issue, data=None):
118        """ Convert the skip string to JSON"""
119        value = {'id': self.id(),
120                 'issue': issue}
121
122        # extend return value with other useful info
123        if data:
124            value['data'] = data
125
126        return super(Base, self).skipTest(json.dumps(value))
127
128    def shortid(self):
129        return self.id().split('.')[-1]
130
131    def __str__(self):
132        return json.dumps({'id': self.id()})
133
134class Metadata(Base):
135    @classmethod
136    def setUpClassLocal(cls):
137        cls.tinfoil = cls.setup_tinfoil()
138
139        # get info about added/modified/remove recipes
140        cls.added, cls.modified, cls.removed = cls.get_metadata_stats(cls.patchset)
141
142    @classmethod
143    def tearDownClassLocal(cls):
144        cls.tinfoil.shutdown()
145
146    @classmethod
147    def setup_tinfoil(cls, config_only=False):
148        """Initialize tinfoil api from bitbake"""
149
150        # import relevant libraries
151        try:
152            scripts_path = os.path.join(PatchTestInput.repodir, 'scripts', 'lib')
153            if scripts_path not in sys.path:
154                sys.path.insert(0, scripts_path)
155            import scriptpath
156            scriptpath.add_bitbake_lib_path()
157            import bb.tinfoil
158        except ImportError:
159            raise PatchtestOEError('Could not import tinfoil module')
160
161        orig_cwd = os.path.abspath(os.curdir)
162
163        # Load tinfoil
164        tinfoil = None
165        try:
166            builddir = os.environ.get('BUILDDIR')
167            if not builddir:
168                logger.warn('Bitbake environment not loaded?')
169                return tinfoil
170            os.chdir(builddir)
171            tinfoil = bb.tinfoil.Tinfoil()
172            tinfoil.prepare(config_only=config_only)
173        except bb.tinfoil.TinfoilUIException as te:
174            if tinfoil:
175                tinfoil.shutdown()
176            raise PatchtestOEError('Could not prepare properly tinfoil (TinfoilUIException)')
177        except Exception as e:
178            if tinfoil:
179                tinfoil.shutdown()
180            raise e
181        finally:
182            os.chdir(orig_cwd)
183
184        return tinfoil
185
186    @classmethod
187    def get_metadata_stats(cls, patchset):
188        """Get lists of added, modified and removed metadata files"""
189
190        def find_pn(data, path):
191            """Find the PN from data"""
192            pn = None
193            pn_native = None
194            for _path, _pn in data:
195                if path in _path:
196                    if 'native' in _pn:
197                        # store the native PN but look for the non-native one first
198                        pn_native = _pn
199                    else:
200                        pn = _pn
201                        break
202            else:
203                # sent the native PN if found previously
204                if pn_native:
205                    return pn_native
206
207                # on renames (usually upgrades), we need to check (FILE) base names
208                # because the unidiff library does not provided the new filename, just the modified one
209                # and tinfoil datastore, once the patch is merged, will contain the new filename
210                path_basename = path.split('_')[0]
211                for _path, _pn in data:
212                    _path_basename = _path.split('_')[0]
213                    if path_basename == _path_basename:
214                        pn = _pn
215            return pn
216
217        if not cls.tinfoil:
218            cls.tinfoil = cls.setup_tinfoil()
219
220        added_paths, modified_paths, removed_paths = [], [], []
221        added, modified, removed = [], [], []
222
223        # get metadata filename additions, modification and removals
224        for patch in patchset:
225            if patch.path.endswith('.bb') or patch.path.endswith('.bbappend') or patch.path.endswith('.inc'):
226                if patch.is_added_file:
227                    added_paths.append(os.path.join(os.path.abspath(PatchTestInput.repodir), patch.path))
228                elif patch.is_modified_file:
229                    modified_paths.append(os.path.join(os.path.abspath(PatchTestInput.repodir), patch.path))
230                elif patch.is_removed_file:
231                    removed_paths.append(os.path.join(os.path.abspath(PatchTestInput.repodir), patch.path))
232
233        data = cls.tinfoil.cooker.recipecaches[''].pkg_fn.items()
234
235        added = [find_pn(data,path) for path in added_paths]
236        modified = [find_pn(data,path) for path in modified_paths]
237        removed = [find_pn(data,path) for path in removed_paths]
238
239        return [a for a in added if a], [m for m in modified if m], [r for r in removed if r]
240