1from oeqa.runtime.case import OERuntimeTestCase
2from oeqa.core.decorator.data import skipIfNotInDataVar
3from oeqa.core.decorator.depends import OETestDepends
4
5
6class FvpDevicesTest(OERuntimeTestCase):
7    def run_cmd(self, cmd, check=True):
8        """
9        A wrapper around self.target.run, which:
10          * Fails the test on command failure by default
11          * Allows the "run" behavior to be overridden in sub-classes
12        """
13        (status, output) = self.target.run(cmd)
14        if status and check:
15            self.fail("Command '%s' returned non-zero exit "
16                      "status %d:\n%s" % (cmd, status, output))
17
18        return (status, output)
19
20    def check_devices(self, cls, min_count, search_drivers):
21        # Find all the devices of the specified class
22        cmd = f'find "/sys/class/{cls}" -type l -maxdepth 1'
23        _, output = self.run_cmd(cmd)
24
25        devices = output.split()
26        self.assertGreaterEqual(len(devices),
27                                min_count,
28                                msg='Device count is lower than expected')
29
30        # Assert that at least one of the devices uses at least one of the
31        # drivers
32        drivers = set()
33        for device in devices:
34            cmd = f'basename "$(readlink "{device}/device/driver")"'
35            _, output = self.run_cmd(cmd)
36            drivers.update(output.split())
37
38        self.assertTrue(drivers & set(search_drivers),
39                      msg='No device uses either of the drivers: ' +
40                        str(search_drivers))
41
42    def check_rng(self, hw_random, dev):
43        cmd = f'cat {hw_random} | grep {dev}'
44        self.run_cmd(cmd)
45
46    def set_cpu(self, cpu_num, flag):
47        # Issue echo command
48        self.run_cmd(
49            f'echo "{flag}" > "/sys/devices/system/cpu/cpu{cpu_num}/online"',
50            check = False,
51        )
52        _, output = self.run_cmd(
53            f'cat "/sys/devices/system/cpu/cpu{cpu_num}/online"'
54        )
55
56        return output == flag
57
58    def enable_cpu(self, cpu_num):
59        return self.set_cpu(cpu_num, "1")
60
61    def disable_cpu(self, cpu_num):
62        return self.set_cpu(cpu_num, "0")
63
64    @OETestDepends(['ssh.SSHTest.test_ssh'])
65    @skipIfNotInDataVar('TEST_FVP_DEVICES', 'cpu_hotplug',
66                        'cpu_hotplug not included in BSP tests')
67    def test_cpu_hotplug(self):
68        _, cpus = self.run_cmd('find /sys/firmware/devicetree/base/cpus/'
69                               ' -name "cpu@*" -maxdepth 1 | wc -l')
70
71        try:
72            count_cpus = int(cpus)
73        except ValueError:
74            self.fail(f"Expected number of CPUs, but found this:\n{cpus}")
75
76        self.num_cpus = int(self.td.get('TEST_CPU_HOTPLUG_NUM_CPUS',
77                                        count_cpus))
78        try:
79            # Test that all cores are online
80            _, cpus = self.run_cmd('grep -c "processor" /proc/cpuinfo')
81            self.assertEqual(int(cpus), self.num_cpus)
82            # Don't try to disable here the only cpu present in the system.
83            if self.num_cpus > 1:
84                # Test that we can stop each core individually
85                for i in range(self.num_cpus):
86                    self.assertTrue(self.disable_cpu(i))
87                    self.assertTrue(self.enable_cpu(i))
88
89            # Test that we cannot disable all cores
90            for i in range(self.num_cpus - 1):
91                self.assertTrue(self.disable_cpu(i))
92            # Disabling last core should trigger an error
93            self.assertFalse(self.disable_cpu(self.num_cpus - 1))
94        finally:
95            # Ensure all CPUs are re-enabled
96            for i in range(self.num_cpus):
97                self.enable_cpu(i)
98
99    @OETestDepends(['ssh.SSHTest.test_ssh'])
100    @skipIfNotInDataVar('TEST_FVP_DEVICES', 'rtc',
101                        'rtc device not included in BSP tests')
102    def test_rtc(self):
103        self.check_devices("rtc", 1, ["rtc-pl031"])
104        self.run_cmd('hwclock')
105
106    @OETestDepends(['ssh.SSHTest.test_ssh'])
107    @skipIfNotInDataVar('TEST_FVP_DEVICES', 'watchdog',
108                        'watchdog device not included in BSP tests')
109    def test_watchdog(self):
110        self.check_devices("watchdog", 1, ["sp805-wdt", "sbsa-gwdt"])
111
112    @OETestDepends(['ssh.SSHTest.test_ssh'])
113    @skipIfNotInDataVar('TEST_FVP_DEVICES', 'networking',
114                        'networking device not included in BSP tests')
115    def test_networking(self):
116        self.check_devices("net", 2, ["virtio_net", "vif"])
117
118        # Check that outbound network connections work
119        self.run_cmd('wget -O /dev/null "https://www.arm.com"')
120
121    @OETestDepends(['ssh.SSHTest.test_ssh'])
122    @skipIfNotInDataVar('TEST_FVP_DEVICES', 'virtiorng',
123                        'virtiorng device not included in BSP tests')
124    def test_virtiorng(self):
125        self.check_rng('/sys/devices/virtual/misc/hw_random/rng_available',
126                       'virtio_rng.0')
127        self.check_rng('/sys/devices/virtual/misc/hw_random/rng_current',
128                       'virtio_rng.0')
129
130        self.run_cmd('hexdump -n 32 /dev/hwrng')
131