xref: /openbmc/qemu/tests/qemu-iotests/qcow2_format.py (revision 458e056257e67254546e58158f3f74ce040c7ca1)
1# Library for manipulations with qcow2 image
2#
3# Copyright (c) 2020 Virtuozzo International GmbH.
4#
5# This program is free software; you can redistribute it and/or modify
6# it under the terms of the GNU General Public License as published by
7# the Free Software Foundation; either version 2 of the License, or
8# (at your option) any later version.
9#
10# This program is distributed in the hope that it will be useful,
11# but WITHOUT ANY WARRANTY; without even the implied warranty of
12# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13# GNU General Public License for more details.
14#
15# You should have received a copy of the GNU General Public License
16# along with this program.  If not, see <http://www.gnu.org/licenses/>.
17#
18
19import struct
20import string
21
22
23class Qcow2Field:
24
25    def __init__(self, value):
26        self.value = value
27
28    def __str__(self):
29        return str(self.value)
30
31
32class Flags64(Qcow2Field):
33
34    def __str__(self):
35        bits = []
36        for bit in range(64):
37            if self.value & (1 << bit):
38                bits.append(bit)
39        return str(bits)
40
41
42class Enum(Qcow2Field):
43
44    def __str__(self):
45        return f'{self.value:#x} ({self.mapping.get(self.value, "<unknown>")})'
46
47
48class Qcow2StructMeta(type):
49
50    # Mapping from c types to python struct format
51    ctypes = {
52        'u8': 'B',
53        'u16': 'H',
54        'u32': 'I',
55        'u64': 'Q'
56    }
57
58    def __init__(self, name, bases, attrs):
59        if 'fields' in attrs:
60            self.fmt = '>' + ''.join(self.ctypes[f[0]] for f in self.fields)
61
62
63class Qcow2Struct(metaclass=Qcow2StructMeta):
64
65    """Qcow2Struct: base class for qcow2 data structures
66
67    Successors should define fields class variable, which is: list of tuples,
68    each of three elements:
69        - c-type (one of 'u8', 'u16', 'u32', 'u64')
70        - format (format_spec to use with .format() when dump or 'mask' to dump
71                  bitmasks)
72        - field name
73    """
74
75    def __init__(self, fd=None, offset=None, data=None):
76        """
77        Two variants:
78            1. Specify data. fd and offset must be None.
79            2. Specify fd and offset, data must be None. offset may be omitted
80               in this case, than current position of fd is used.
81        """
82        if data is None:
83            assert fd is not None
84            buf_size = struct.calcsize(self.fmt)
85            if offset is not None:
86                fd.seek(offset)
87            data = fd.read(buf_size)
88        else:
89            assert fd is None and offset is None
90
91        values = struct.unpack(self.fmt, data)
92        self.__dict__ = dict((field[2], values[i])
93                             for i, field in enumerate(self.fields))
94
95    def dump(self):
96        for f in self.fields:
97            value = self.__dict__[f[2]]
98            if isinstance(f[1], str):
99                value_str = f[1].format(value)
100            else:
101                value_str = str(f[1](value))
102
103            print('{:<25} {}'.format(f[2], value_str))
104
105
106class Qcow2BitmapExt(Qcow2Struct):
107
108    fields = (
109        ('u32', '{}', 'nb_bitmaps'),
110        ('u32', '{}', 'reserved32'),
111        ('u64', '{:#x}', 'bitmap_directory_size'),
112        ('u64', '{:#x}', 'bitmap_directory_offset')
113    )
114
115
116QCOW2_EXT_MAGIC_BITMAPS = 0x23852875
117
118
119class QcowHeaderExtension(Qcow2Struct):
120
121    class Magic(Enum):
122        mapping = {
123            0xe2792aca: 'Backing format',
124            0x6803f857: 'Feature table',
125            0x0537be77: 'Crypto header',
126            QCOW2_EXT_MAGIC_BITMAPS: 'Bitmaps',
127            0x44415441: 'Data file'
128        }
129
130    fields = (
131        ('u32', Magic, 'magic'),
132        ('u32', '{}', 'length')
133        # length bytes of data follows
134        # then padding to next multiply of 8
135    )
136
137    def __init__(self, magic=None, length=None, data=None, fd=None):
138        """
139        Support both loading from fd and creation from user data.
140        For fd-based creation current position in a file will be used to read
141        the data.
142
143        This should be somehow refactored and functionality should be moved to
144        superclass (to allow creation of any qcow2 struct), but then, fields
145        of variable length (data here) should be supported in base class
146        somehow. Note also, that we probably want to parse different
147        extensions. Should they be subclasses of this class, or how to do it
148        better? Should it be something like QAPI union with discriminator field
149        (magic here). So, it's a TODO. We'll see how to properly refactor this
150        when we have more qcow2 structures.
151        """
152        if fd is None:
153            assert all(v is not None for v in (magic, length, data))
154            self.magic = magic
155            self.length = length
156            if length % 8 != 0:
157                padding = 8 - (length % 8)
158                data += b'\0' * padding
159            self.data = data
160        else:
161            assert all(v is None for v in (magic, length, data))
162            super().__init__(fd=fd)
163            padded = (self.length + 7) & ~7
164            self.data = fd.read(padded)
165            assert self.data is not None
166
167        if self.magic == QCOW2_EXT_MAGIC_BITMAPS:
168            self.obj = Qcow2BitmapExt(data=self.data)
169        else:
170            self.obj = None
171
172    def dump(self):
173        super().dump()
174
175        if self.obj is None:
176            data = self.data[:self.length]
177            if all(c in string.printable.encode('ascii') for c in data):
178                data = f"'{ data.decode('ascii') }'"
179            else:
180                data = '<binary>'
181            print(f'{"data":<25} {data}')
182        else:
183            self.obj.dump()
184
185    @classmethod
186    def create(cls, magic, data):
187        return QcowHeaderExtension(magic, len(data), data)
188
189
190class QcowHeader(Qcow2Struct):
191
192    fields = (
193        # Version 2 header fields
194        ('u32', '{:#x}', 'magic'),
195        ('u32', '{}', 'version'),
196        ('u64', '{:#x}', 'backing_file_offset'),
197        ('u32', '{:#x}', 'backing_file_size'),
198        ('u32', '{}', 'cluster_bits'),
199        ('u64', '{}', 'size'),
200        ('u32', '{}', 'crypt_method'),
201        ('u32', '{}', 'l1_size'),
202        ('u64', '{:#x}', 'l1_table_offset'),
203        ('u64', '{:#x}', 'refcount_table_offset'),
204        ('u32', '{}', 'refcount_table_clusters'),
205        ('u32', '{}', 'nb_snapshots'),
206        ('u64', '{:#x}', 'snapshot_offset'),
207
208        # Version 3 header fields
209        ('u64', Flags64, 'incompatible_features'),
210        ('u64', Flags64, 'compatible_features'),
211        ('u64', Flags64, 'autoclear_features'),
212        ('u32', '{}', 'refcount_order'),
213        ('u32', '{}', 'header_length'),
214    )
215
216    def __init__(self, fd):
217        super().__init__(fd=fd, offset=0)
218
219        self.set_defaults()
220        self.cluster_size = 1 << self.cluster_bits
221
222        fd.seek(self.header_length)
223        self.load_extensions(fd)
224
225        if self.backing_file_offset:
226            fd.seek(self.backing_file_offset)
227            self.backing_file = fd.read(self.backing_file_size)
228        else:
229            self.backing_file = None
230
231    def set_defaults(self):
232        if self.version == 2:
233            self.incompatible_features = 0
234            self.compatible_features = 0
235            self.autoclear_features = 0
236            self.refcount_order = 4
237            self.header_length = 72
238
239    def load_extensions(self, fd):
240        self.extensions = []
241
242        if self.backing_file_offset != 0:
243            end = min(self.cluster_size, self.backing_file_offset)
244        else:
245            end = self.cluster_size
246
247        while fd.tell() < end:
248            ext = QcowHeaderExtension(fd=fd)
249            if ext.magic == 0:
250                break
251            else:
252                self.extensions.append(ext)
253
254    def update_extensions(self, fd):
255
256        fd.seek(self.header_length)
257        extensions = self.extensions
258        extensions.append(QcowHeaderExtension(0, 0, b''))
259        for ex in extensions:
260            buf = struct.pack('>II', ex.magic, ex.length)
261            fd.write(buf)
262            fd.write(ex.data)
263
264        if self.backing_file is not None:
265            self.backing_file_offset = fd.tell()
266            fd.write(self.backing_file)
267
268        if fd.tell() > self.cluster_size:
269            raise Exception('I think I just broke the image...')
270
271    def update(self, fd):
272        header_bytes = self.header_length
273
274        self.update_extensions(fd)
275
276        fd.seek(0)
277        header = tuple(self.__dict__[f] for t, p, f in QcowHeader.fields)
278        buf = struct.pack(QcowHeader.fmt, *header)
279        buf = buf[0:header_bytes-1]
280        fd.write(buf)
281
282    def dump_extensions(self):
283        for ex in self.extensions:
284            print('Header extension:')
285            ex.dump()
286            print()
287