1# Copyright (C) 2014 Intel Corporation 2# 3# SPDX-License-Identifier: MIT 4# 5# This module adds support to testimage.bbclass to deploy images and run 6# tests using a "controller image" - this is a "known good" image that is 7# installed onto the device as part of initial setup and will be booted into 8# with no interaction; we can then use it to deploy the image to be tested 9# to a second partition before running the tests. 10# 11# For an example controller image, see core-image-testcontroller 12# (meta/recipes-extended/images/core-image-testcontroller.bb) 13 14import os 15import bb 16import traceback 17import time 18import subprocess 19 20import oeqa.targetcontrol 21import oeqa.utils.sshcontrol as sshcontrol 22import oeqa.utils.commands as commands 23from oeqa.utils import CommandError 24 25from abc import ABCMeta, abstractmethod 26 27class ControllerImageHardwareTarget(oeqa.targetcontrol.BaseTarget, metaclass=ABCMeta): 28 29 supported_image_fstypes = ['tar.gz', 'tar.bz2'] 30 31 def __init__(self, d): 32 super(ControllerImageHardwareTarget, self).__init__(d) 33 34 # target ip 35 addr = d.getVar("TEST_TARGET_IP") or bb.fatal('Please set TEST_TARGET_IP with the IP address of the machine you want to run the tests on.') 36 self.ip = addr.split(":")[0] 37 try: 38 self.port = addr.split(":")[1] 39 except IndexError: 40 self.port = None 41 bb.note("Target IP: %s" % self.ip) 42 self.server_ip = d.getVar("TEST_SERVER_IP") 43 if not self.server_ip: 44 try: 45 self.server_ip = subprocess.check_output(['ip', 'route', 'get', self.ip ]).split("\n")[0].split()[-1] 46 except Exception as e: 47 bb.fatal("Failed to determine the host IP address (alternatively you can set TEST_SERVER_IP with the IP address of this machine): %s" % e) 48 bb.note("Server IP: %s" % self.server_ip) 49 50 # test rootfs + kernel 51 self.image_fstype = self.get_image_fstype(d) 52 self.rootfs = os.path.join(d.getVar("DEPLOY_DIR_IMAGE"), d.getVar("IMAGE_LINK_NAME") + '.' + self.image_fstype) 53 self.kernel = os.path.join(d.getVar("DEPLOY_DIR_IMAGE"), d.getVar("KERNEL_IMAGETYPE", False) + '-' + d.getVar('MACHINE', False) + '.bin') 54 if not os.path.isfile(self.rootfs): 55 # we could've checked that IMAGE_FSTYPES contains tar.gz but the config for running testimage might not be 56 # the same as the config with which the image was build, ie 57 # you bitbake core-image-sato with IMAGE_FSTYPES += "tar.gz" 58 # and your autobuilder overwrites the config, adds the test bits and runs bitbake core-image-sato -c testimage 59 bb.fatal("No rootfs found. Did you build the image ?\nIf yes, did you build it with IMAGE_FSTYPES += \"tar.gz\" ? \ 60 \nExpected path: %s" % self.rootfs) 61 if not os.path.isfile(self.kernel): 62 bb.fatal("No kernel found. Expected path: %s" % self.kernel) 63 64 # controller ssh connection 65 self.controller = None 66 # if the user knows what they are doing, then by all means... 67 self.user_cmds = d.getVar("TEST_DEPLOY_CMDS") 68 self.deploy_cmds = None 69 70 # this is the name of the command that controls the power for a board 71 # e.g: TEST_POWERCONTROL_CMD = "/home/user/myscripts/powercontrol.py ${MACHINE} what-ever-other-args-the-script-wants" 72 # the command should take as the last argument "off" and "on" and "cycle" (off, on) 73 self.powercontrol_cmd = d.getVar("TEST_POWERCONTROL_CMD") or None 74 self.powercontrol_args = d.getVar("TEST_POWERCONTROL_EXTRA_ARGS", False) or "" 75 76 self.serialcontrol_cmd = d.getVar("TEST_SERIALCONTROL_CMD") or None 77 self.serialcontrol_args = d.getVar("TEST_SERIALCONTROL_EXTRA_ARGS", False) or "" 78 79 self.origenv = os.environ 80 if self.powercontrol_cmd or self.serialcontrol_cmd: 81 # the external script for controlling power might use ssh 82 # ssh + keys means we need the original user env 83 bborigenv = d.getVar("BB_ORIGENV", False) or {} 84 for key in bborigenv: 85 val = bborigenv.getVar(key) 86 if val is not None: 87 self.origenv[key] = str(val) 88 89 if self.powercontrol_cmd: 90 if self.powercontrol_args: 91 self.powercontrol_cmd = "%s %s" % (self.powercontrol_cmd, self.powercontrol_args) 92 if self.serialcontrol_cmd: 93 if self.serialcontrol_args: 94 self.serialcontrol_cmd = "%s %s" % (self.serialcontrol_cmd, self.serialcontrol_args) 95 96 def power_ctl(self, msg): 97 if self.powercontrol_cmd: 98 cmd = "%s %s" % (self.powercontrol_cmd, msg) 99 try: 100 commands.runCmd(cmd, assert_error=False, start_new_session=True, env=self.origenv) 101 except CommandError as e: 102 bb.fatal(str(e)) 103 104 def power_cycle(self, conn): 105 if self.powercontrol_cmd: 106 # be nice, don't just cut power 107 conn.run("shutdown -h now") 108 time.sleep(10) 109 self.power_ctl("cycle") 110 else: 111 status, output = conn.run("sync; { sleep 1; reboot; } > /dev/null &") 112 if status != 0: 113 bb.error("Failed rebooting target and no power control command defined. You need to manually reset the device.\n%s" % output) 114 115 def _wait_until_booted(self): 116 ''' Waits until the target device has booted (if we have just power cycled it) ''' 117 # Subclasses with better methods of determining boot can override this 118 time.sleep(120) 119 120 def deploy(self): 121 # base class just sets the ssh log file for us 122 super(ControllerImageHardwareTarget, self).deploy() 123 self.controller = sshcontrol.SSHControl(ip=self.ip, logfile=self.sshlog, timeout=600, port=self.port) 124 status, output = self.controller.run("cat /etc/controllerimage") 125 if status != 0: 126 # We're not booted into the controller image, so try rebooting 127 bb.plain("%s - booting into the controller image" % self.pn) 128 self.power_ctl("cycle") 129 self._wait_until_booted() 130 131 bb.plain("%s - deploying image on target" % self.pn) 132 status, output = self.controller.run("cat /etc/controllerimage") 133 if status != 0: 134 bb.fatal("No ssh connectivity or target isn't running a controller image.\n%s" % output) 135 if self.user_cmds: 136 self.deploy_cmds = self.user_cmds.split("\n") 137 try: 138 self._deploy() 139 except Exception as e: 140 bb.fatal("Failed deploying test image: %s" % e) 141 142 @abstractmethod 143 def _deploy(self): 144 pass 145 146 def start(self, extra_bootparams=None): 147 bb.plain("%s - boot test image on target" % self.pn) 148 self._start() 149 # set the ssh object for the target/test image 150 self.connection = sshcontrol.SSHControl(self.ip, logfile=self.sshlog, port=self.port) 151 bb.plain("%s - start running tests" % self.pn) 152 153 @abstractmethod 154 def _start(self): 155 pass 156 157 def stop(self): 158 bb.plain("%s - reboot/powercycle target" % self.pn) 159 self.power_cycle(self.controller) 160 161 162class SystemdbootTarget(ControllerImageHardwareTarget): 163 164 def __init__(self, d): 165 super(SystemdbootTarget, self).__init__(d) 166 # this the value we need to set in the LoaderEntryOneShot EFI variable 167 # so the system boots the 'test' bootloader label and not the default 168 # The first four bytes are EFI bits, and the rest is an utf-16le string 169 # (EFI vars values need to be utf-16) 170 # $ echo -en "test\0" | iconv -f ascii -t utf-16le | hexdump -C 171 # 00000000 74 00 65 00 73 00 74 00 00 00 |t.e.s.t...| 172 self.efivarvalue = r'\x07\x00\x00\x00\x74\x00\x65\x00\x73\x00\x74\x00\x00\x00' 173 self.deploy_cmds = [ 174 'mount -L boot /boot', 175 'mkdir -p /mnt/testrootfs', 176 'mount -L testrootfs /mnt/testrootfs', 177 'modprobe efivarfs', 178 'mount -t efivarfs efivarfs /sys/firmware/efi/efivars', 179 'cp ~/test-kernel /boot', 180 'rm -rf /mnt/testrootfs/*', 181 'tar xvf ~/test-rootfs.%s -C /mnt/testrootfs' % self.image_fstype, 182 'printf "%s" > /sys/firmware/efi/efivars/LoaderEntryOneShot-4a67b082-0a4c-41cf-b6c7-440b29bb8c4f' % self.efivarvalue 183 ] 184 185 def _deploy(self): 186 # make sure these aren't mounted 187 self.controller.run("umount /boot; umount /mnt/testrootfs; umount /sys/firmware/efi/efivars;") 188 # from now on, every deploy cmd should return 0 189 # else an exception will be thrown by sshcontrol 190 self.controller.ignore_status = False 191 self.controller.copy_to(self.rootfs, "~/test-rootfs." + self.image_fstype) 192 self.controller.copy_to(self.kernel, "~/test-kernel") 193 for cmd in self.deploy_cmds: 194 self.controller.run(cmd) 195 196 def _start(self, params=None): 197 self.power_cycle(self.controller) 198 # there are better ways than a timeout but this should work for now 199 time.sleep(120) 200