1from oeqa.runtime.case import OERuntimeTestCase 2from oeqa.core.decorator.data import skipIfNotInDataVar 3from oeqa.core.decorator.depends import OETestDepends 4from time import sleep 5 6 7class FvpDevicesTest(OERuntimeTestCase): 8 def run_cmd(self, cmd, check=True, retry=3): 9 """ 10 A wrapper around self.target.run, which: 11 * Fails the test on command failure by default 12 * Allows the "run" behavior to be overridden in sub-classes 13 * Has a retry mechanism when SSH returns 255 14 """ 15 status = 255 16 # The loop is retrying the self.target.run() which uses SSH only when 17 # the SSH return code is 255, which might be an issue with 18 # "Connection refused" because the port isn't open yet 19 while status == 255 and retry > 0: 20 (status, output) = self.target.run(cmd) 21 retry -= 1 22 # In case the status is 255, delay the next retry to give time to 23 # the system to settle 24 if status == 255: 25 sleep(30) 26 27 if status and check: 28 self.fail("Command '%s' returned non-zero exit " 29 "status %d:\n%s" % (cmd, status, output)) 30 31 return (status, output) 32 33 def check_devices(self, cls, min_count, search_drivers): 34 # Find all the devices of the specified class 35 cmd = f'find "/sys/class/{cls}" -type l -maxdepth 1' 36 _, output = self.run_cmd(cmd) 37 38 devices = output.split() 39 self.assertGreaterEqual(len(devices), 40 min_count, 41 msg='Device count is lower than expected') 42 43 # Assert that at least one of the devices uses at least one of the 44 # drivers 45 drivers = set() 46 for device in devices: 47 cmd = f'basename "$(readlink "{device}/device/driver")"' 48 _, output = self.run_cmd(cmd) 49 drivers.update(output.split()) 50 51 self.assertTrue(drivers & set(search_drivers), 52 msg='No device uses either of the drivers: ' + 53 str(search_drivers)) 54 55 def check_rng(self, hw_random, dev): 56 cmd = f'cat {hw_random} | grep {dev}' 57 self.run_cmd(cmd) 58 59 def set_cpu(self, cpu_num, flag): 60 # Issue echo command 61 self.run_cmd( 62 f'echo "{flag}" > "/sys/devices/system/cpu/cpu{cpu_num}/online"', 63 check = False, 64 ) 65 _, output = self.run_cmd( 66 f'cat "/sys/devices/system/cpu/cpu{cpu_num}/online"' 67 ) 68 69 return output == flag 70 71 def enable_cpu(self, cpu_num): 72 return self.set_cpu(cpu_num, "1") 73 74 def disable_cpu(self, cpu_num): 75 return self.set_cpu(cpu_num, "0") 76 77 @OETestDepends(['ssh.SSHTest.test_ssh']) 78 @skipIfNotInDataVar('TEST_FVP_DEVICES', 'cpu_hotplug', 79 'cpu_hotplug not included in BSP tests') 80 def test_cpu_hotplug(self): 81 _, cpus = self.run_cmd('find /sys/firmware/devicetree/base/cpus/' 82 ' -name "cpu@*" -maxdepth 1 | wc -l') 83 84 try: 85 count_cpus = int(cpus) 86 except ValueError: 87 self.fail(f"Expected number of CPUs, but found this:\n{cpus}") 88 89 self.num_cpus = int(self.td.get('TEST_CPU_HOTPLUG_NUM_CPUS', 90 count_cpus)) 91 try: 92 # Test that all cores are online 93 _, cpus = self.run_cmd('grep -c "processor" /proc/cpuinfo') 94 self.assertEqual(int(cpus), self.num_cpus) 95 # Don't try to disable here the only cpu present in the system. 96 if self.num_cpus > 1: 97 # Test that we can stop each core individually 98 for i in range(self.num_cpus): 99 self.assertTrue(self.disable_cpu(i)) 100 self.assertTrue(self.enable_cpu(i)) 101 102 # Test that we cannot disable all cores 103 for i in range(self.num_cpus - 1): 104 self.assertTrue(self.disable_cpu(i)) 105 # Disabling last core should trigger an error 106 self.assertFalse(self.disable_cpu(self.num_cpus - 1)) 107 finally: 108 # Ensure all CPUs are re-enabled 109 for i in range(self.num_cpus): 110 self.enable_cpu(i) 111 112 @OETestDepends(['ssh.SSHTest.test_ssh']) 113 @skipIfNotInDataVar('TEST_FVP_DEVICES', 'rtc', 114 'rtc device not included in BSP tests') 115 def test_rtc(self): 116 self.check_devices("rtc", 1, ["rtc-pl031"]) 117 self.run_cmd('hwclock') 118 119 @OETestDepends(['ssh.SSHTest.test_ssh']) 120 @skipIfNotInDataVar('TEST_FVP_DEVICES', 'watchdog', 121 'watchdog device not included in BSP tests') 122 def test_watchdog(self): 123 self.check_devices("watchdog", 1, ["sp805-wdt", "sbsa-gwdt"]) 124 125 @OETestDepends(['ssh.SSHTest.test_ssh']) 126 @skipIfNotInDataVar('TEST_FVP_DEVICES', 'networking', 127 'networking device not included in BSP tests') 128 def test_networking(self): 129 self.check_devices("net", 2, ["virtio_net", "vif"]) 130 131 # Check that outbound network connections work 132 self.run_cmd('wget -O /dev/null "https://www.arm.com"') 133 134 @OETestDepends(['ssh.SSHTest.test_ssh']) 135 @skipIfNotInDataVar('TEST_FVP_DEVICES', 'virtiorng', 136 'virtiorng device not included in BSP tests') 137 def test_virtiorng(self): 138 self.check_rng('/sys/devices/virtual/misc/hw_random/rng_available', 139 'virtio_rng.0') 140 self.check_rng('/sys/devices/virtual/misc/hw_random/rng_current', 141 'virtio_rng.0') 142 143 self.run_cmd('hexdump -n 32 /dev/hwrng') 144