1# Test utilities for fetching & caching assets 2# 3# Copyright 2024 Red Hat, Inc. 4# 5# This work is licensed under the terms of the GNU GPL, version 2 or 6# later. See the COPYING file in the top-level directory. 7 8import hashlib 9import logging 10import os 11import subprocess 12import sys 13import unittest 14import urllib.request 15from time import sleep 16from pathlib import Path 17from shutil import copyfileobj 18 19 20# Instances of this class must be declared as class level variables 21# starting with a name "ASSET_". This enables the pre-caching logic 22# to easily find all referenced assets and download them prior to 23# execution of the tests. 24class Asset: 25 26 def __init__(self, url, hashsum): 27 self.url = url 28 self.hash = hashsum 29 cache_dir_env = os.getenv('QEMU_TEST_CACHE_DIR') 30 if cache_dir_env: 31 self.cache_dir = Path(cache_dir_env, "download") 32 else: 33 self.cache_dir = Path(Path("~").expanduser(), 34 ".cache", "qemu", "download") 35 self.cache_file = Path(self.cache_dir, hashsum) 36 self.log = logging.getLogger('qemu-test') 37 38 def __repr__(self): 39 return "Asset: url=%s hash=%s cache=%s" % ( 40 self.url, self.hash, self.cache_file) 41 42 def _check(self, cache_file): 43 if self.hash is None: 44 return True 45 if len(self.hash) == 64: 46 sum_prog = 'sha256sum' 47 elif len(self.hash) == 128: 48 sum_prog = 'sha512sum' 49 else: 50 raise Exception("unknown hash type") 51 52 checksum = subprocess.check_output( 53 [sum_prog, str(cache_file)]).split()[0] 54 return self.hash == checksum.decode("utf-8") 55 56 def valid(self): 57 return self.cache_file.exists() and self._check(self.cache_file) 58 59 def _wait_for_other_download(self, tmp_cache_file): 60 # Another thread already seems to download the asset, so wait until 61 # it is done, while also checking the size to see whether it is stuck 62 try: 63 current_size = tmp_cache_file.stat().st_size 64 new_size = current_size 65 except: 66 if os.path.exists(self.cache_file): 67 return True 68 raise 69 waittime = lastchange = 600 70 while waittime > 0: 71 sleep(1) 72 waittime -= 1 73 try: 74 new_size = tmp_cache_file.stat().st_size 75 except: 76 if os.path.exists(self.cache_file): 77 return True 78 raise 79 if new_size != current_size: 80 lastchange = waittime 81 current_size = new_size 82 elif lastchange - waittime > 90: 83 return False 84 85 self.log.debug("Time out while waiting for %s!", tmp_cache_file) 86 raise 87 88 def fetch(self): 89 if not self.cache_dir.exists(): 90 self.cache_dir.mkdir(parents=True, exist_ok=True) 91 92 if self.valid(): 93 self.log.debug("Using cached asset %s for %s", 94 self.cache_file, self.url) 95 return str(self.cache_file) 96 97 if os.environ.get("QEMU_TEST_NO_DOWNLOAD", False): 98 raise Exception("Asset cache is invalid and downloads disabled") 99 100 self.log.info("Downloading %s to %s...", self.url, self.cache_file) 101 tmp_cache_file = self.cache_file.with_suffix(".download") 102 103 for retries in range(3): 104 try: 105 with tmp_cache_file.open("xb") as dst: 106 with urllib.request.urlopen(self.url) as resp: 107 copyfileobj(resp, dst) 108 break 109 except FileExistsError: 110 self.log.debug("%s already exists, " 111 "waiting for other thread to finish...", 112 tmp_cache_file) 113 if self._wait_for_other_download(tmp_cache_file): 114 return str(self.cache_file) 115 self.log.debug("%s seems to be stale, " 116 "deleting and retrying download...", 117 tmp_cache_file) 118 tmp_cache_file.unlink() 119 continue 120 except Exception as e: 121 self.log.error("Unable to download %s: %s", self.url, e) 122 tmp_cache_file.unlink() 123 raise 124 125 try: 126 # Set these just for informational purposes 127 os.setxattr(str(tmp_cache_file), "user.qemu-asset-url", 128 self.url.encode('utf8')) 129 os.setxattr(str(tmp_cache_file), "user.qemu-asset-hash", 130 self.hash.encode('utf8')) 131 except Exception as e: 132 self.log.debug("Unable to set xattr on %s: %s", tmp_cache_file, e) 133 pass 134 135 if not self._check(tmp_cache_file): 136 tmp_cache_file.unlink() 137 raise Exception("Hash of %s does not match %s" % 138 (self.url, self.hash)) 139 tmp_cache_file.replace(self.cache_file) 140 141 self.log.info("Cached %s at %s" % (self.url, self.cache_file)) 142 return str(self.cache_file) 143 144 def precache_test(test): 145 log = logging.getLogger('qemu-test') 146 log.setLevel(logging.DEBUG) 147 handler = logging.StreamHandler(sys.stdout) 148 handler.setLevel(logging.DEBUG) 149 formatter = logging.Formatter( 150 '%(asctime)s - %(name)s - %(levelname)s - %(message)s') 151 handler.setFormatter(formatter) 152 log.addHandler(handler) 153 for name, asset in vars(test.__class__).items(): 154 if name.startswith("ASSET_") and type(asset) == Asset: 155 log.info("Attempting to cache '%s'" % asset) 156 asset.fetch() 157 log.removeHandler(handler) 158 159 def precache_suite(suite): 160 for test in suite: 161 if isinstance(test, unittest.TestSuite): 162 Asset.precache_suite(test) 163 elif isinstance(test, unittest.TestCase): 164 Asset.precache_test(test) 165 166 def precache_suites(path, cacheTstamp): 167 loader = unittest.loader.defaultTestLoader 168 tests = loader.loadTestsFromNames([path], None) 169 170 with open(cacheTstamp, "w") as fh: 171 Asset.precache_suite(tests) 172