1# Test utilities for fetching & caching assets 2# 3# Copyright 2024 Red Hat, Inc. 4# 5# This work is licensed under the terms of the GNU GPL, version 2 or 6# later. See the COPYING file in the top-level directory. 7 8import hashlib 9import logging 10import os 11import stat 12import sys 13import unittest 14import urllib.request 15from time import sleep 16from pathlib import Path 17from shutil import copyfileobj 18from urllib.error import HTTPError, URLError 19 20class AssetError(Exception): 21 def __init__(self, asset, msg, transient=False): 22 self.url = asset.url 23 self.msg = msg 24 self.transient = transient 25 26 def __str__(self): 27 return "%s: %s" % (self.url, self.msg) 28 29# Instances of this class must be declared as class level variables 30# starting with a name "ASSET_". This enables the pre-caching logic 31# to easily find all referenced assets and download them prior to 32# execution of the tests. 33class Asset: 34 35 def __init__(self, url, hashsum): 36 self.url = url 37 self.hash = hashsum 38 cache_dir_env = os.getenv('QEMU_TEST_CACHE_DIR') 39 if cache_dir_env: 40 self.cache_dir = Path(cache_dir_env, "download") 41 else: 42 self.cache_dir = Path(Path("~").expanduser(), 43 ".cache", "qemu", "download") 44 self.cache_file = Path(self.cache_dir, hashsum) 45 self.log = logging.getLogger('qemu-test') 46 47 def __repr__(self): 48 return "Asset: url=%s hash=%s cache=%s" % ( 49 self.url, self.hash, self.cache_file) 50 51 def __str__(self): 52 return str(self.cache_file) 53 54 def _check(self, cache_file): 55 if self.hash is None: 56 return True 57 if len(self.hash) == 64: 58 hl = hashlib.sha256() 59 elif len(self.hash) == 128: 60 hl = hashlib.sha512() 61 else: 62 raise AssetError(self, "unknown hash type") 63 64 # Calculate the hash of the file: 65 with open(cache_file, 'rb') as file: 66 while True: 67 chunk = file.read(1 << 20) 68 if not chunk: 69 break 70 hl.update(chunk) 71 72 return self.hash == hl.hexdigest() 73 74 def valid(self): 75 return self.cache_file.exists() and self._check(self.cache_file) 76 77 def fetchable(self): 78 return not os.environ.get("QEMU_TEST_NO_DOWNLOAD", False) 79 80 def available(self): 81 return self.valid() or self.fetchable() 82 83 def _wait_for_other_download(self, tmp_cache_file): 84 # Another thread already seems to download the asset, so wait until 85 # it is done, while also checking the size to see whether it is stuck 86 try: 87 current_size = tmp_cache_file.stat().st_size 88 new_size = current_size 89 except: 90 if os.path.exists(self.cache_file): 91 return True 92 raise 93 waittime = lastchange = 600 94 while waittime > 0: 95 sleep(1) 96 waittime -= 1 97 try: 98 new_size = tmp_cache_file.stat().st_size 99 except: 100 if os.path.exists(self.cache_file): 101 return True 102 raise 103 if new_size != current_size: 104 lastchange = waittime 105 current_size = new_size 106 elif lastchange - waittime > 90: 107 return False 108 109 self.log.debug("Time out while waiting for %s!", tmp_cache_file) 110 raise 111 112 def fetch(self): 113 if not self.cache_dir.exists(): 114 self.cache_dir.mkdir(parents=True, exist_ok=True) 115 116 if self.valid(): 117 self.log.debug("Using cached asset %s for %s", 118 self.cache_file, self.url) 119 return str(self.cache_file) 120 121 if not self.fetchable(): 122 raise AssetError(self, 123 "Asset cache is invalid and downloads disabled") 124 125 self.log.info("Downloading %s to %s...", self.url, self.cache_file) 126 tmp_cache_file = self.cache_file.with_suffix(".download") 127 128 for retries in range(3): 129 try: 130 with tmp_cache_file.open("xb") as dst: 131 with urllib.request.urlopen(self.url) as resp: 132 copyfileobj(resp, dst) 133 length_hdr = resp.getheader("Content-Length") 134 135 # Verify downloaded file size against length metadata, if 136 # available. 137 if length_hdr is not None: 138 length = int(length_hdr) 139 fsize = tmp_cache_file.stat().st_size 140 if fsize != length: 141 self.log.error("Unable to download %s: " 142 "connection closed before " 143 "transfer complete (%d/%d)", 144 self.url, fsize, length) 145 tmp_cache_file.unlink() 146 continue 147 break 148 except FileExistsError: 149 self.log.debug("%s already exists, " 150 "waiting for other thread to finish...", 151 tmp_cache_file) 152 if self._wait_for_other_download(tmp_cache_file): 153 return str(self.cache_file) 154 self.log.debug("%s seems to be stale, " 155 "deleting and retrying download...", 156 tmp_cache_file) 157 tmp_cache_file.unlink() 158 continue 159 except HTTPError as e: 160 tmp_cache_file.unlink() 161 self.log.error("Unable to download %s: HTTP error %d", 162 self.url, e.code) 163 # Treat 404 as fatal, since it is highly likely to 164 # indicate a broken test rather than a transient 165 # server or networking problem 166 if e.code == 404: 167 raise AssetError(self, "Unable to download: " 168 "HTTP error %d" % e.code) 169 continue 170 except URLError as e: 171 # This is typically a network/service level error 172 # eg urlopen error [Errno 110] Connection timed out> 173 tmp_cache_file.unlink() 174 self.log.error("Unable to download %s: URL error %s", 175 self.url, e.reason) 176 raise AssetError(self, "Unable to download: URL error %s" % 177 e.reason, transient=True) 178 except Exception as e: 179 tmp_cache_file.unlink() 180 raise AssetError(self, "Unable to download: %s" % e) 181 182 if not os.path.exists(tmp_cache_file): 183 raise AssetError(self, "Download retries exceeded", transient=True) 184 185 try: 186 # Set these just for informational purposes 187 os.setxattr(str(tmp_cache_file), "user.qemu-asset-url", 188 self.url.encode('utf8')) 189 os.setxattr(str(tmp_cache_file), "user.qemu-asset-hash", 190 self.hash.encode('utf8')) 191 except Exception as e: 192 self.log.debug("Unable to set xattr on %s: %s", tmp_cache_file, e) 193 pass 194 195 if not self._check(tmp_cache_file): 196 tmp_cache_file.unlink() 197 raise AssetError(self, "Hash does not match %s" % self.hash) 198 tmp_cache_file.replace(self.cache_file) 199 # Remove write perms to stop tests accidentally modifying them 200 os.chmod(self.cache_file, stat.S_IRUSR | stat.S_IRGRP) 201 202 self.log.info("Cached %s at %s" % (self.url, self.cache_file)) 203 return str(self.cache_file) 204 205 def precache_test(test): 206 log = logging.getLogger('qemu-test') 207 log.setLevel(logging.DEBUG) 208 handler = logging.StreamHandler(sys.stdout) 209 handler.setLevel(logging.DEBUG) 210 formatter = logging.Formatter( 211 '%(asctime)s - %(name)s - %(levelname)s - %(message)s') 212 handler.setFormatter(formatter) 213 log.addHandler(handler) 214 for name, asset in vars(test.__class__).items(): 215 if name.startswith("ASSET_") and type(asset) == Asset: 216 log.info("Attempting to cache '%s'" % asset) 217 try: 218 asset.fetch() 219 except AssetError as e: 220 if not e.transient: 221 raise 222 log.error("%s: skipping asset precache" % e) 223 224 log.removeHandler(handler) 225 226 def precache_suite(suite): 227 for test in suite: 228 if isinstance(test, unittest.TestSuite): 229 Asset.precache_suite(test) 230 elif isinstance(test, unittest.TestCase): 231 Asset.precache_test(test) 232 233 def precache_suites(path, cacheTstamp): 234 loader = unittest.loader.defaultTestLoader 235 tests = loader.loadTestsFromNames([path], None) 236 237 with open(cacheTstamp, "w") as fh: 238 Asset.precache_suite(tests) 239