xref: /openbmc/qemu/tests/functional/qemu_test/uncompress.py (revision 7ae004869aff46fc3195d280b25dc9b94a447be7)
1# SPDX-License-Identifier: GPL-2.0-or-later
2#
3# Utilities for python-based QEMU tests
4#
5# Copyright 2024 Red Hat, Inc.
6#
7# Authors:
8#  Thomas Huth <thuth@redhat.com>
9
10import gzip
11import lzma
12import os
13import stat
14import shutil
15from urllib.parse import urlparse
16from subprocess import run, CalledProcessError
17
18from .asset import Asset
19
20
21def gzip_uncompress(gz_path, output_path):
22    if os.path.exists(output_path):
23        return
24    with gzip.open(gz_path, 'rb') as gz_in:
25        try:
26            with open(output_path, 'wb') as raw_out:
27                shutil.copyfileobj(gz_in, raw_out)
28        except:
29            os.remove(output_path)
30            raise
31
32def lzma_uncompress(xz_path, output_path):
33    if os.path.exists(output_path):
34        return
35    with lzma.open(xz_path, 'rb') as lzma_in:
36        try:
37            with open(output_path, 'wb') as raw_out:
38                shutil.copyfileobj(lzma_in, raw_out)
39        except:
40            os.remove(output_path)
41            raise
42
43
44def zstd_uncompress(zstd_path, output_path):
45    if os.path.exists(output_path):
46        return
47
48    try:
49        run(['zstd', "-f", "-d", zstd_path,
50             "-o", output_path], capture_output=True, check=True)
51    except CalledProcessError as e:
52        os.remove(output_path)
53        raise Exception(
54            f"Unable to decompress zstd file {zstd_path} with {e}") from e
55
56    # zstd copies source archive permissions for the output
57    # file, so must make this writable for QEMU
58    os.chmod(output_path, stat.S_IRUSR | stat.S_IWUSR)
59
60
61def uncompress(compressed, uncompressed, format=None):
62    '''
63    @params compressed: filename, Asset, or file-like object to uncompress
64    @params uncompressed: filename to uncompress into
65    @params format: optional compression format (gzip, lzma)
66
67    Uncompresses @compressed into @uncompressed
68
69    If @format is None, heuristics will be applied to guess the
70    format from the filename or Asset URL. @format must be non-None
71    if @uncompressed is a file-like object.
72
73    Returns the fully qualified path to the uncompessed file
74    '''
75    if format is None:
76        format = guess_uncompress_format(compressed)
77
78    if format == "xz":
79        lzma_uncompress(str(compressed), uncompressed)
80    elif format == "gz":
81        gzip_uncompress(str(compressed), uncompressed)
82    elif format == "zstd":
83        zstd_uncompress(str(compressed), uncompressed)
84    else:
85        raise Exception(f"Unknown compression format {format}")
86
87def guess_uncompress_format(compressed):
88    '''
89    @params compressed: filename, Asset, or file-like object to guess
90
91    Guess the format of @compressed, raising an exception if
92    no format can be determined
93    '''
94    if isinstance(compressed, Asset):
95        compressed = urlparse(compressed.url).path
96    elif not isinstance(compressed, str):
97        raise Exception(f"Unable to guess compression cformat for {compressed}")
98
99    (_name, ext) = os.path.splitext(compressed)
100    if ext == ".xz":
101        return "xz"
102    elif ext == ".gz":
103        return "gz"
104    elif ext in [".zstd", ".zst"]:
105        return 'zstd'
106    else:
107        raise Exception(f"Unknown compression format for {compressed}")
108