1#
2# Copyright (C) 2016 Intel Corporation
3#
4# SPDX-License-Identifier: MIT
5#
6
7import os
8import sys
9
10from oeqa.core.context import OETestContext, OETestContextExecutor
11from oeqa.core.target.ssh import OESSHTarget
12from oeqa.core.target.qemu import OEQemuTarget
13from oeqa.utils.dump import HostDumper
14
15from oeqa.runtime.loader import OERuntimeTestLoader
16
17class OERuntimeTestContext(OETestContext):
18    loaderClass = OERuntimeTestLoader
19    runtime_files_dir = os.path.join(
20                        os.path.dirname(os.path.abspath(__file__)), "files")
21
22    def __init__(self, td, logger, target,
23                 host_dumper, image_packages, extract_dir):
24        super(OERuntimeTestContext, self).__init__(td, logger)
25
26        self.target = target
27        self.image_packages = image_packages
28        self.host_dumper = host_dumper
29        self.extract_dir = extract_dir
30        self._set_target_cmds()
31
32    def _set_target_cmds(self):
33        self.target_cmds = {}
34
35        self.target_cmds['ps'] = 'ps'
36        if 'procps' in self.image_packages:
37            self.target_cmds['ps'] = self.target_cmds['ps'] + ' -ef'
38
39class OERuntimeTestContextExecutor(OETestContextExecutor):
40    _context_class = OERuntimeTestContext
41
42    name = 'runtime'
43    help = 'runtime test component'
44    description = 'executes runtime tests over targets'
45
46    default_cases = os.path.join(os.path.abspath(os.path.dirname(__file__)),
47            'cases')
48    default_data = None
49    default_test_data = 'data/testdata.json'
50    default_tests = ''
51    default_json_result_dir = '%s-results' % name
52
53    default_target_type = 'simpleremote'
54    default_manifest = 'data/manifest'
55    default_server_ip = '192.168.7.1'
56    default_target_ip = '192.168.7.2'
57    default_extract_dir = 'packages/extracted'
58
59    def register_commands(self, logger, subparsers):
60        super(OERuntimeTestContextExecutor, self).register_commands(logger, subparsers)
61
62        runtime_group = self.parser.add_argument_group('runtime options')
63
64        runtime_group.add_argument('--target-type', action='store',
65                default=self.default_target_type, choices=['simpleremote', 'qemu'],
66                help="Target type of device under test, default: %s" \
67                % self.default_target_type)
68        runtime_group.add_argument('--target-ip', action='store',
69                default=self.default_target_ip,
70                help="IP address of device under test, default: %s" \
71                % self.default_target_ip)
72        runtime_group.add_argument('--server-ip', action='store',
73                default=self.default_target_ip,
74                help="IP address of device under test, default: %s" \
75                % self.default_server_ip)
76
77        runtime_group.add_argument('--host-dumper-dir', action='store',
78                help="Directory where host status is dumped, if tests fails")
79
80        runtime_group.add_argument('--packages-manifest', action='store',
81                default=self.default_manifest,
82                help="Package manifest of the image under test, default: %s" \
83                % self.default_manifest)
84
85        runtime_group.add_argument('--extract-dir', action='store',
86                default=self.default_extract_dir,
87                help='Directory where extracted packages reside, default: %s' \
88                % self.default_extract_dir)
89
90        runtime_group.add_argument('--qemu-boot', action='store',
91                help="Qemu boot configuration, only needed when target_type is QEMU.")
92
93    @staticmethod
94    def getTarget(target_type, logger, target_ip, server_ip, **kwargs):
95        target = None
96
97        if target_ip:
98            target_ip_port = target_ip.split(':')
99            if len(target_ip_port) == 2:
100                target_ip = target_ip_port[0]
101                kwargs['port'] = target_ip_port[1]
102
103        if server_ip:
104            server_ip_port = server_ip.split(':')
105            if len(server_ip_port) == 2:
106                server_ip = server_ip_port[0]
107                kwargs['server_port'] = int(server_ip_port[1])
108
109        if target_type == 'simpleremote':
110            target = OESSHTarget(logger, target_ip, server_ip, **kwargs)
111        elif target_type == 'qemu':
112            target = OEQemuTarget(logger, server_ip, **kwargs)
113        else:
114            # XXX: This code uses the old naming convention for controllers and
115            # targets, the idea it is to leave just targets as the controller
116            # most of the time was just a wrapper.
117            # XXX: This code tries to import modules from lib/oeqa/controllers
118            # directory and treat them as controllers, it will less error prone
119            # to use introspection to load such modules.
120            # XXX: Don't base your targets on this code it will be refactored
121            # in the near future.
122            # Custom target module loading
123            controller = OERuntimeTestContextExecutor.getControllerModule(target_type)
124            target = controller(logger, target_ip, server_ip, **kwargs)
125
126        return target
127
128    # Search oeqa.controllers module directory for and return a controller
129    # corresponding to the given target name.
130    # AttributeError raised if not found.
131    # ImportError raised if a provided module can not be imported.
132    @staticmethod
133    def getControllerModule(target):
134        controllerslist = OERuntimeTestContextExecutor._getControllerModulenames()
135        controller = OERuntimeTestContextExecutor._loadControllerFromName(target, controllerslist)
136        return controller
137
138    # Return a list of all python modules in lib/oeqa/controllers for each
139    # layer in bbpath
140    @staticmethod
141    def _getControllerModulenames():
142
143        controllerslist = []
144
145        def add_controller_list(path):
146            if not os.path.exists(os.path.join(path, '__init__.py')):
147                raise OSError('Controllers directory %s exists but is missing __init__.py' % path)
148            files = sorted([f for f in os.listdir(path) if f.endswith('.py') and not f.startswith('_') and not f.startswith('.#')])
149            for f in files:
150                module = 'oeqa.controllers.' + f[:-3]
151                if module not in controllerslist:
152                    controllerslist.append(module)
153                else:
154                    raise RuntimeError("Duplicate controller module found for %s. Layers should create unique controller module names" % module)
155
156        # sys.path can contain duplicate paths, but because of the login in
157        # add_controller_list this doesn't work and causes testimage to abort.
158        # Remove duplicates using an intermediate dictionary to ensure this
159        # doesn't happen.
160        for p in list(dict.fromkeys(sys.path)):
161            controllerpath = os.path.join(p, 'oeqa', 'controllers')
162            if os.path.exists(controllerpath):
163                add_controller_list(controllerpath)
164        return controllerslist
165
166    # Search for and return a controller from given target name and
167    # set of module names.
168    # Raise AttributeError if not found.
169    # Raise ImportError if a provided module can not be imported
170    @staticmethod
171    def _loadControllerFromName(target, modulenames):
172        for name in modulenames:
173            obj = OERuntimeTestContextExecutor._loadControllerFromModule(target, name)
174            if obj:
175                return obj
176        raise AttributeError("Unable to load {0} from available modules: {1}".format(target, str(modulenames)))
177
178    # Search for and return a controller or None from given module name
179    @staticmethod
180    def _loadControllerFromModule(target, modulename):
181        try:
182            import importlib
183            module = importlib.import_module(modulename)
184            return getattr(module, target)
185        except AttributeError:
186            return None
187
188    @staticmethod
189    def readPackagesManifest(manifest):
190        if not manifest or not os.path.exists(manifest):
191            raise OSError("Manifest file not exists: %s" % manifest)
192
193        image_packages = set()
194        with open(manifest, 'r') as f:
195            for line in f.readlines():
196                line = line.strip()
197                if line and not line.startswith("#"):
198                    image_packages.add(line.split()[0])
199
200        return image_packages
201
202    @staticmethod
203    def getHostDumper(cmds, directory):
204        return HostDumper(cmds, directory)
205
206    def _process_args(self, logger, args):
207        if not args.packages_manifest:
208            raise TypeError('Manifest file not provided')
209
210        super(OERuntimeTestContextExecutor, self)._process_args(logger, args)
211
212        target_kwargs = {}
213        target_kwargs['qemuboot'] = args.qemu_boot
214
215        self.tc_kwargs['init']['target'] = \
216                OERuntimeTestContextExecutor.getTarget(args.target_type,
217                        None, args.target_ip, args.server_ip, **target_kwargs)
218        self.tc_kwargs['init']['host_dumper'] = \
219                OERuntimeTestContextExecutor.getHostDumper(None,
220                        args.host_dumper_dir)
221        self.tc_kwargs['init']['image_packages'] = \
222                OERuntimeTestContextExecutor.readPackagesManifest(
223                        args.packages_manifest)
224        self.tc_kwargs['init']['extract_dir'] = args.extract_dir
225
226_executor_class = OERuntimeTestContextExecutor
227