1# Copyright (C) 2014 Intel Corporation
2#
3# SPDX-License-Identifier: MIT
4#
5# This module adds support to testimage.bbclass to deploy images and run
6# tests using a "controller image" - this is a "known good" image that is
7# installed onto the device as part of initial setup and will be booted into
8# with no interaction; we can then use it to deploy the image to be tested
9# to a second partition before running the tests.
10#
11# For an example controller image, see core-image-testcontroller
12# (meta/recipes-extended/images/core-image-testcontroller.bb)
13
14import os
15import bb
16import traceback
17import time
18import subprocess
19
20import oeqa.targetcontrol
21import oeqa.utils.sshcontrol as sshcontrol
22import oeqa.utils.commands as commands
23from oeqa.utils import CommandError
24
25from abc import ABCMeta, abstractmethod
26
27class ControllerImageHardwareTarget(oeqa.targetcontrol.BaseTarget, metaclass=ABCMeta):
28
29    supported_image_fstypes = ['tar.gz', 'tar.bz2']
30
31    def __init__(self, d):
32        super(ControllerImageHardwareTarget, self).__init__(d)
33
34        # target ip
35        addr = d.getVar("TEST_TARGET_IP") or bb.fatal('Please set TEST_TARGET_IP with the IP address of the machine you want to run the tests on.')
36        self.ip = addr.split(":")[0]
37        try:
38            self.port = addr.split(":")[1]
39        except IndexError:
40            self.port = None
41        bb.note("Target IP: %s" % self.ip)
42        self.server_ip = d.getVar("TEST_SERVER_IP")
43        if not self.server_ip:
44            try:
45                self.server_ip = subprocess.check_output(['ip', 'route', 'get', self.ip ]).split("\n")[0].split()[-1]
46            except Exception as e:
47                bb.fatal("Failed to determine the host IP address (alternatively you can set TEST_SERVER_IP with the IP address of this machine): %s" % e)
48        bb.note("Server IP: %s" % self.server_ip)
49
50        # test rootfs + kernel
51        self.image_fstype = self.get_image_fstype(d)
52        self.rootfs = os.path.join(d.getVar("DEPLOY_DIR_IMAGE"), d.getVar("IMAGE_LINK_NAME") + '.' + self.image_fstype)
53        self.kernel = os.path.join(d.getVar("DEPLOY_DIR_IMAGE"), d.getVar("KERNEL_IMAGETYPE", False) + '-' + d.getVar('MACHINE', False) + '.bin')
54        if not os.path.isfile(self.rootfs):
55            # we could've checked that IMAGE_FSTYPES contains tar.gz but the config for running testimage might not be
56            # the same as the config with which the image was build, ie
57            # you bitbake core-image-sato with IMAGE_FSTYPES += "tar.gz"
58            # and your autobuilder overwrites the config, adds the test bits and runs bitbake core-image-sato -c testimage
59            bb.fatal("No rootfs found. Did you build the image ?\nIf yes, did you build it with IMAGE_FSTYPES += \"tar.gz\" ? \
60                      \nExpected path: %s" % self.rootfs)
61        if not os.path.isfile(self.kernel):
62            bb.fatal("No kernel found. Expected path: %s" % self.kernel)
63
64        # controller ssh connection
65        self.controller = None
66        # if the user knows what they are doing, then by all means...
67        self.user_cmds = d.getVar("TEST_DEPLOY_CMDS")
68        self.deploy_cmds = None
69
70        # this is the name of the command that controls the power for a board
71        # e.g: TEST_POWERCONTROL_CMD = "/home/user/myscripts/powercontrol.py ${MACHINE} what-ever-other-args-the-script-wants"
72        # the command should take as the last argument "off" and "on" and "cycle" (off, on)
73        self.powercontrol_cmd = d.getVar("TEST_POWERCONTROL_CMD") or None
74        self.powercontrol_args = d.getVar("TEST_POWERCONTROL_EXTRA_ARGS", False) or ""
75
76        self.serialcontrol_cmd = d.getVar("TEST_SERIALCONTROL_CMD") or None
77        self.serialcontrol_args = d.getVar("TEST_SERIALCONTROL_EXTRA_ARGS", False) or ""
78
79        self.origenv = os.environ
80        if self.powercontrol_cmd or self.serialcontrol_cmd:
81            # the external script for controlling power might use ssh
82            # ssh + keys means we need the original user env
83            bborigenv = d.getVar("BB_ORIGENV", False) or {}
84            for key in bborigenv:
85                val = bborigenv.getVar(key)
86                if val is not None:
87                    self.origenv[key] = str(val)
88
89        if self.powercontrol_cmd:
90            if self.powercontrol_args:
91                self.powercontrol_cmd = "%s %s" % (self.powercontrol_cmd, self.powercontrol_args)
92        if self.serialcontrol_cmd:
93            if self.serialcontrol_args:
94                self.serialcontrol_cmd = "%s %s" % (self.serialcontrol_cmd, self.serialcontrol_args)
95
96    def power_ctl(self, msg):
97        if self.powercontrol_cmd:
98            cmd = "%s %s" % (self.powercontrol_cmd, msg)
99            try:
100                commands.runCmd(cmd, assert_error=False, start_new_session=True, env=self.origenv)
101            except CommandError as e:
102                bb.fatal(str(e))
103
104    def power_cycle(self, conn):
105        if self.powercontrol_cmd:
106            # be nice, don't just cut power
107            conn.run("shutdown -h now")
108            time.sleep(10)
109            self.power_ctl("cycle")
110        else:
111            status, output = conn.run("sync; { sleep 1; reboot; } > /dev/null &")
112            if status != 0:
113                bb.error("Failed rebooting target and no power control command defined. You need to manually reset the device.\n%s" % output)
114
115    def _wait_until_booted(self):
116        ''' Waits until the target device has booted (if we have just power cycled it) '''
117        # Subclasses with better methods of determining boot can override this
118        time.sleep(120)
119
120    def deploy(self):
121        # base class just sets the ssh log file for us
122        super(ControllerImageHardwareTarget, self).deploy()
123        self.controller = sshcontrol.SSHControl(ip=self.ip, logfile=self.sshlog, timeout=600, port=self.port)
124        status, output = self.controller.run("cat /etc/controllerimage")
125        if status != 0:
126            # We're not booted into the controller image, so try rebooting
127            bb.plain("%s - booting into the controller image" % self.pn)
128            self.power_ctl("cycle")
129            self._wait_until_booted()
130
131        bb.plain("%s - deploying image on target" % self.pn)
132        status, output = self.controller.run("cat /etc/controllerimage")
133        if status != 0:
134            bb.fatal("No ssh connectivity or target isn't running a controller image.\n%s" % output)
135        if self.user_cmds:
136            self.deploy_cmds = self.user_cmds.split("\n")
137        try:
138            self._deploy()
139        except Exception as e:
140            bb.fatal("Failed deploying test image: %s" % e)
141
142    @abstractmethod
143    def _deploy(self):
144        pass
145
146    def start(self, extra_bootparams=None):
147        bb.plain("%s - boot test image on target" % self.pn)
148        self._start()
149        # set the ssh object for the target/test image
150        self.connection = sshcontrol.SSHControl(self.ip, logfile=self.sshlog, port=self.port)
151        bb.plain("%s - start running tests" % self.pn)
152
153    @abstractmethod
154    def _start(self):
155        pass
156
157    def stop(self):
158        bb.plain("%s - reboot/powercycle target" % self.pn)
159        self.power_cycle(self.controller)
160
161
162class SystemdbootTarget(ControllerImageHardwareTarget):
163
164    def __init__(self, d):
165        super(SystemdbootTarget, self).__init__(d)
166        # this the value we need to set in the LoaderEntryOneShot EFI variable
167        # so the system boots the 'test' bootloader label and not the default
168        # The first four bytes are EFI bits, and the rest is an utf-16le string
169        # (EFI vars values need to be utf-16)
170        # $ echo -en "test\0" | iconv -f ascii -t utf-16le | hexdump -C
171        # 00000000  74 00 65 00 73 00 74 00  00 00                    |t.e.s.t...|
172        self.efivarvalue = r'\x07\x00\x00\x00\x74\x00\x65\x00\x73\x00\x74\x00\x00\x00'
173        self.deploy_cmds = [
174                'mount -L boot /boot',
175                'mkdir -p /mnt/testrootfs',
176                'mount -L testrootfs /mnt/testrootfs',
177                'modprobe efivarfs',
178                'mount -t efivarfs efivarfs /sys/firmware/efi/efivars',
179                'cp ~/test-kernel /boot',
180                'rm -rf /mnt/testrootfs/*',
181                'tar xvf ~/test-rootfs.%s -C /mnt/testrootfs' % self.image_fstype,
182                'printf "%s" > /sys/firmware/efi/efivars/LoaderEntryOneShot-4a67b082-0a4c-41cf-b6c7-440b29bb8c4f' % self.efivarvalue
183                ]
184
185    def _deploy(self):
186        # make sure these aren't mounted
187        self.controller.run("umount /boot; umount /mnt/testrootfs; umount /sys/firmware/efi/efivars;")
188        # from now on, every deploy cmd should return 0
189        # else an exception will be thrown by sshcontrol
190        self.controller.ignore_status = False
191        self.controller.copy_to(self.rootfs, "~/test-rootfs." + self.image_fstype)
192        self.controller.copy_to(self.kernel, "~/test-kernel")
193        for cmd in self.deploy_cmds:
194            self.controller.run(cmd)
195
196    def _start(self, params=None):
197        self.power_cycle(self.controller)
198        # there are better ways than a timeout but this should work for now
199        time.sleep(120)
200