1# Class for actually running tests. 2# 3# Copyright (c) 2020-2021 Virtuozzo International GmbH 4# 5# This program is free software; you can redistribute it and/or modify 6# it under the terms of the GNU General Public License as published by 7# the Free Software Foundation; either version 2 of the License, or 8# (at your option) any later version. 9# 10# This program is distributed in the hope that it will be useful, 11# but WITHOUT ANY WARRANTY; without even the implied warranty of 12# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 13# GNU General Public License for more details. 14# 15# You should have received a copy of the GNU General Public License 16# along with this program. If not, see <http://www.gnu.org/licenses/>. 17# 18 19import os 20from pathlib import Path 21import datetime 22import time 23import difflib 24import subprocess 25import contextlib 26import json 27import termios 28import sys 29from contextlib import contextmanager 30from typing import List, Optional, Iterator, Any, Sequence, Dict, \ 31 ContextManager 32 33from testenv import TestEnv 34 35 36def silent_unlink(path: Path) -> None: 37 try: 38 path.unlink() 39 except OSError: 40 pass 41 42 43def file_diff(file1: str, file2: str) -> List[str]: 44 with open(file1, encoding="utf-8") as f1, \ 45 open(file2, encoding="utf-8") as f2: 46 # We want to ignore spaces at line ends. There are a lot of mess about 47 # it in iotests. 48 # TODO: fix all tests to not produce extra spaces, fix all .out files 49 # and use strict diff here! 50 seq1 = [line.rstrip() for line in f1] 51 seq2 = [line.rstrip() for line in f2] 52 res = [line.rstrip() 53 for line in difflib.unified_diff(seq1, seq2, file1, file2)] 54 return res 55 56 57# We want to save current tty settings during test run, 58# since an aborting qemu call may leave things screwed up. 59@contextmanager 60def savetty() -> Iterator[None]: 61 isterm = sys.stdin.isatty() 62 if isterm: 63 fd = sys.stdin.fileno() 64 attr = termios.tcgetattr(fd) 65 66 try: 67 yield 68 finally: 69 if isterm: 70 termios.tcsetattr(fd, termios.TCSADRAIN, attr) 71 72 73class LastElapsedTime(ContextManager['LastElapsedTime']): 74 """ Cache for elapsed time for tests, to show it during new test run 75 76 It is safe to use get() at any time. To use update(), you must either 77 use it inside with-block or use save() after update(). 78 """ 79 def __init__(self, cache_file: str, env: TestEnv) -> None: 80 self.env = env 81 self.cache_file = cache_file 82 self.cache: Dict[str, Dict[str, Dict[str, float]]] 83 84 try: 85 with open(cache_file, encoding="utf-8") as f: 86 self.cache = json.load(f) 87 except (OSError, ValueError): 88 self.cache = {} 89 90 def get(self, test: str, 91 default: Optional[float] = None) -> Optional[float]: 92 if test not in self.cache: 93 return default 94 95 if self.env.imgproto not in self.cache[test]: 96 return default 97 98 return self.cache[test][self.env.imgproto].get(self.env.imgfmt, 99 default) 100 101 def update(self, test: str, elapsed: float) -> None: 102 d = self.cache.setdefault(test, {}) 103 d.setdefault(self.env.imgproto, {})[self.env.imgfmt] = elapsed 104 105 def save(self) -> None: 106 with open(self.cache_file, 'w', encoding="utf-8") as f: 107 json.dump(self.cache, f) 108 109 def __enter__(self) -> 'LastElapsedTime': 110 return self 111 112 def __exit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None: 113 self.save() 114 115 116class TestResult: 117 def __init__(self, status: str, description: str = '', 118 elapsed: Optional[float] = None, diff: Sequence[str] = (), 119 casenotrun: str = '', interrupted: bool = False) -> None: 120 self.status = status 121 self.description = description 122 self.elapsed = elapsed 123 self.diff = diff 124 self.casenotrun = casenotrun 125 self.interrupted = interrupted 126 127 128class TestRunner(ContextManager['TestRunner']): 129 def __init__(self, env: TestEnv, makecheck: bool = False, 130 color: str = 'auto') -> None: 131 self.env = env 132 self.test_run_env = self.env.get_env() 133 self.makecheck = makecheck 134 self.last_elapsed = LastElapsedTime('.last-elapsed-cache', env) 135 136 assert color in ('auto', 'on', 'off') 137 self.color = (color == 'on') or (color == 'auto' and 138 sys.stdout.isatty()) 139 140 self._stack: contextlib.ExitStack 141 142 def __enter__(self) -> 'TestRunner': 143 self._stack = contextlib.ExitStack() 144 self._stack.enter_context(self.env) 145 self._stack.enter_context(self.last_elapsed) 146 self._stack.enter_context(savetty()) 147 return self 148 149 def __exit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None: 150 self._stack.close() 151 152 def test_print_one_line(self, test: str, starttime: str, 153 endtime: Optional[str] = None, status: str = '...', 154 lasttime: Optional[float] = None, 155 thistime: Optional[float] = None, 156 description: str = '', 157 test_field_width: Optional[int] = None, 158 end: str = '\n') -> None: 159 """ Print short test info before/after test run """ 160 test = os.path.basename(test) 161 162 if test_field_width is None: 163 test_field_width = 8 164 165 if self.makecheck and status != '...': 166 if status and status != 'pass': 167 status = f' [{status}]' 168 else: 169 status = '' 170 171 print(f' TEST iotest-{self.env.imgfmt}: {test}{status}') 172 return 173 174 if lasttime: 175 lasttime_s = f' (last: {lasttime:.1f}s)' 176 else: 177 lasttime_s = '' 178 if thistime: 179 thistime_s = f'{thistime:.1f}s' 180 else: 181 thistime_s = '...' 182 183 if endtime: 184 endtime = f'[{endtime}]' 185 else: 186 endtime = '' 187 188 if self.color: 189 if status == 'pass': 190 col = '\033[32m' 191 elif status == 'fail': 192 col = '\033[1m\033[31m' 193 elif status == 'not run': 194 col = '\033[33m' 195 else: 196 col = '' 197 198 col_end = '\033[0m' 199 else: 200 col = '' 201 col_end = '' 202 203 print(f'{test:{test_field_width}} {col}{status:10}{col_end} ' 204 f'[{starttime}] {endtime:13}{thistime_s:5} {lasttime_s:14} ' 205 f'{description}', end=end) 206 207 def find_reference(self, test: str) -> str: 208 if self.env.cachemode == 'none': 209 ref = f'{test}.out.nocache' 210 if os.path.isfile(ref): 211 return ref 212 213 ref = f'{test}.out.{self.env.imgfmt}' 214 if os.path.isfile(ref): 215 return ref 216 217 ref = f'{test}.{self.env.qemu_default_machine}.out' 218 if os.path.isfile(ref): 219 return ref 220 221 return f'{test}.out' 222 223 def do_run_test(self, test: str) -> TestResult: 224 f_test = Path(test) 225 f_bad = Path(f_test.name + '.out.bad') 226 f_notrun = Path(f_test.name + '.notrun') 227 f_casenotrun = Path(f_test.name + '.casenotrun') 228 f_reference = Path(self.find_reference(test)) 229 230 if not f_test.exists(): 231 return TestResult(status='fail', 232 description=f'No such test file: {f_test}') 233 234 if not os.access(str(f_test), os.X_OK): 235 sys.exit(f'Not executable: {f_test}') 236 237 if not f_reference.exists(): 238 return TestResult(status='not run', 239 description='No qualified output ' 240 f'(expected {f_reference})') 241 242 for p in (f_bad, f_notrun, f_casenotrun): 243 silent_unlink(p) 244 245 args = [str(f_test.resolve())] 246 if self.env.debug: 247 args.append('-d') 248 249 with f_test.open(encoding="utf-8") as f: 250 try: 251 if f.readline().rstrip() == '#!/usr/bin/env python3': 252 args.insert(0, self.env.python) 253 except UnicodeDecodeError: # binary test? for future. 254 pass 255 256 env = os.environ.copy() 257 env.update(self.test_run_env) 258 259 t0 = time.time() 260 with f_bad.open('w', encoding="utf-8") as f: 261 proc = subprocess.Popen(args, cwd=str(f_test.parent), env=env, 262 stdout=f, stderr=subprocess.STDOUT) 263 try: 264 proc.wait() 265 except KeyboardInterrupt: 266 proc.terminate() 267 proc.wait() 268 return TestResult(status='not run', 269 description='Interrupted by user', 270 interrupted=True) 271 ret = proc.returncode 272 273 elapsed = round(time.time() - t0, 1) 274 275 if ret != 0: 276 return TestResult(status='fail', elapsed=elapsed, 277 description=f'failed, exit status {ret}', 278 diff=file_diff(str(f_reference), str(f_bad))) 279 280 if f_notrun.exists(): 281 return TestResult(status='not run', 282 description=f_notrun.read_text().strip()) 283 284 casenotrun = '' 285 if f_casenotrun.exists(): 286 casenotrun = f_casenotrun.read_text() 287 288 diff = file_diff(str(f_reference), str(f_bad)) 289 if diff: 290 return TestResult(status='fail', elapsed=elapsed, 291 description=f'output mismatch (see {f_bad})', 292 diff=diff, casenotrun=casenotrun) 293 else: 294 f_bad.unlink() 295 self.last_elapsed.update(test, elapsed) 296 return TestResult(status='pass', elapsed=elapsed, 297 casenotrun=casenotrun) 298 299 def run_test(self, test: str, 300 test_field_width: Optional[int] = None) -> TestResult: 301 last_el = self.last_elapsed.get(test) 302 start = datetime.datetime.now().strftime('%H:%M:%S') 303 304 if not self.makecheck: 305 self.test_print_one_line(test=test, starttime=start, 306 lasttime=last_el, end='\r', 307 test_field_width=test_field_width) 308 309 res = self.do_run_test(test) 310 311 end = datetime.datetime.now().strftime('%H:%M:%S') 312 self.test_print_one_line(test=test, status=res.status, 313 starttime=start, endtime=end, 314 lasttime=last_el, thistime=res.elapsed, 315 description=res.description, 316 test_field_width=test_field_width) 317 318 if res.casenotrun: 319 print(res.casenotrun) 320 321 return res 322 323 def run_tests(self, tests: List[str]) -> bool: 324 n_run = 0 325 failed = [] 326 notrun = [] 327 casenotrun = [] 328 329 if not self.makecheck: 330 self.env.print_env() 331 print() 332 333 test_field_width = max(len(os.path.basename(t)) for t in tests) + 2 334 335 for t in tests: 336 name = os.path.basename(t) 337 res = self.run_test(t, test_field_width=test_field_width) 338 339 assert res.status in ('pass', 'fail', 'not run') 340 341 if res.casenotrun: 342 casenotrun.append(t) 343 344 if res.status != 'not run': 345 n_run += 1 346 347 if res.status == 'fail': 348 failed.append(name) 349 if self.makecheck: 350 self.env.print_env() 351 if res.diff: 352 print('\n'.join(res.diff)) 353 elif res.status == 'not run': 354 notrun.append(name) 355 356 if res.interrupted: 357 break 358 359 if notrun: 360 print('Not run:', ' '.join(notrun)) 361 362 if casenotrun: 363 print('Some cases not run in:', ' '.join(casenotrun)) 364 365 if failed: 366 print('Failures:', ' '.join(failed)) 367 print(f'Failed {len(failed)} of {n_run} iotests') 368 return False 369 else: 370 print(f'Passed all {n_run} iotests') 371 return True 372