# Test utilities for fetching & caching assets # # Copyright 2024 Red Hat, Inc. # # This work is licensed under the terms of the GNU GPL, version 2 or # later. See the COPYING file in the top-level directory. import hashlib import logging import os import stat import subprocess import sys import unittest import urllib.request from time import sleep from pathlib import Path from shutil import copyfileobj # Instances of this class must be declared as class level variables # starting with a name "ASSET_". This enables the pre-caching logic # to easily find all referenced assets and download them prior to # execution of the tests. class Asset: def __init__(self, url, hashsum): self.url = url self.hash = hashsum cache_dir_env = os.getenv('QEMU_TEST_CACHE_DIR') if cache_dir_env: self.cache_dir = Path(cache_dir_env, "download") else: self.cache_dir = Path(Path("~").expanduser(), ".cache", "qemu", "download") self.cache_file = Path(self.cache_dir, hashsum) self.log = logging.getLogger('qemu-test') def __repr__(self): return "Asset: url=%s hash=%s cache=%s" % ( self.url, self.hash, self.cache_file) def _check(self, cache_file): if self.hash is None: return True if len(self.hash) == 64: hl = hashlib.sha256() elif len(self.hash) == 128: hl = hashlib.sha512() else: raise Exception("unknown hash type") # Calculate the hash of the file: with open(cache_file, 'rb') as file: while True: chunk = file.read(1 << 20) if not chunk: break hl.update(chunk) return self.hash == hl.hexdigest() def valid(self): return self.cache_file.exists() and self._check(self.cache_file) def _wait_for_other_download(self, tmp_cache_file): # Another thread already seems to download the asset, so wait until # it is done, while also checking the size to see whether it is stuck try: current_size = tmp_cache_file.stat().st_size new_size = current_size except: if os.path.exists(self.cache_file): return True raise waittime = lastchange = 600 while waittime > 0: sleep(1) waittime -= 1 try: new_size = tmp_cache_file.stat().st_size except: if os.path.exists(self.cache_file): return True raise if new_size != current_size: lastchange = waittime current_size = new_size elif lastchange - waittime > 90: return False self.log.debug("Time out while waiting for %s!", tmp_cache_file) raise def fetch(self): if not self.cache_dir.exists(): self.cache_dir.mkdir(parents=True, exist_ok=True) if self.valid(): self.log.debug("Using cached asset %s for %s", self.cache_file, self.url) return str(self.cache_file) if os.environ.get("QEMU_TEST_NO_DOWNLOAD", False): raise Exception("Asset cache is invalid and downloads disabled") self.log.info("Downloading %s to %s...", self.url, self.cache_file) tmp_cache_file = self.cache_file.with_suffix(".download") for retries in range(3): try: with tmp_cache_file.open("xb") as dst: with urllib.request.urlopen(self.url) as resp: copyfileobj(resp, dst) break except FileExistsError: self.log.debug("%s already exists, " "waiting for other thread to finish...", tmp_cache_file) if self._wait_for_other_download(tmp_cache_file): return str(self.cache_file) self.log.debug("%s seems to be stale, " "deleting and retrying download...", tmp_cache_file) tmp_cache_file.unlink() continue except Exception as e: self.log.error("Unable to download %s: %s", self.url, e) tmp_cache_file.unlink() raise try: # Set these just for informational purposes os.setxattr(str(tmp_cache_file), "user.qemu-asset-url", self.url.encode('utf8')) os.setxattr(str(tmp_cache_file), "user.qemu-asset-hash", self.hash.encode('utf8')) except Exception as e: self.log.debug("Unable to set xattr on %s: %s", tmp_cache_file, e) pass if not self._check(tmp_cache_file): tmp_cache_file.unlink() raise Exception("Hash of %s does not match %s" % (self.url, self.hash)) tmp_cache_file.replace(self.cache_file) # Remove write perms to stop tests accidentally modifying them os.chmod(self.cache_file, stat.S_IRUSR | stat.S_IRGRP) self.log.info("Cached %s at %s" % (self.url, self.cache_file)) return str(self.cache_file) def precache_test(test): log = logging.getLogger('qemu-test') log.setLevel(logging.DEBUG) handler = logging.StreamHandler(sys.stdout) handler.setLevel(logging.DEBUG) formatter = logging.Formatter( '%(asctime)s - %(name)s - %(levelname)s - %(message)s') handler.setFormatter(formatter) log.addHandler(handler) for name, asset in vars(test.__class__).items(): if name.startswith("ASSET_") and type(asset) == Asset: log.info("Attempting to cache '%s'" % asset) asset.fetch() log.removeHandler(handler) def precache_suite(suite): for test in suite: if isinstance(test, unittest.TestSuite): Asset.precache_suite(test) elif isinstance(test, unittest.TestCase): Asset.precache_test(test) def precache_suites(path, cacheTstamp): loader = unittest.loader.defaultTestLoader tests = loader.loadTestsFromNames([path], None) with open(cacheTstamp, "w") as fh: Asset.precache_suite(tests)