1# Library for manipulations with qcow2 image
2#
3# Copyright (c) 2020 Virtuozzo International GmbH.
4# Copyright (C) 2012 Red Hat, Inc.
5#
6# This program is free software; you can redistribute it and/or modify
7# it under the terms of the GNU General Public License as published by
8# the Free Software Foundation; either version 2 of the License, or
9# (at your option) any later version.
10#
11# This program is distributed in the hope that it will be useful,
12# but WITHOUT ANY WARRANTY; without even the implied warranty of
13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14# GNU General Public License for more details.
15#
16# You should have received a copy of the GNU General Public License
17# along with this program.  If not, see <http://www.gnu.org/licenses/>.
18#
19
20import struct
21import string
22import json
23
24
25class ComplexEncoder(json.JSONEncoder):
26    def default(self, obj):
27        if hasattr(obj, 'to_json'):
28            return obj.to_json()
29        else:
30            return json.JSONEncoder.default(self, obj)
31
32
33class Qcow2Field:
34
35    def __init__(self, value):
36        self.value = value
37
38    def __str__(self):
39        return str(self.value)
40
41
42class Flags64(Qcow2Field):
43
44    def __str__(self):
45        bits = []
46        for bit in range(64):
47            if self.value & (1 << bit):
48                bits.append(bit)
49        return str(bits)
50
51
52class BitmapFlags(Qcow2Field):
53
54    flags = {
55        0x1: 'in-use',
56        0x2: 'auto'
57    }
58
59    def __str__(self):
60        bits = []
61        for bit in range(64):
62            flag = self.value & (1 << bit)
63            if flag:
64                bits.append(self.flags.get(flag, f'bit-{bit}'))
65        return f'{self.value:#x} ({bits})'
66
67
68class Enum(Qcow2Field):
69
70    def __str__(self):
71        return f'{self.value:#x} ({self.mapping.get(self.value, "<unknown>")})'
72
73
74class Qcow2StructMeta(type):
75
76    # Mapping from c types to python struct format
77    ctypes = {
78        'u8': 'B',
79        'u16': 'H',
80        'u32': 'I',
81        'u64': 'Q'
82    }
83
84    def __init__(self, name, bases, attrs):
85        if 'fields' in attrs:
86            self.fmt = '>' + ''.join(self.ctypes[f[0]] for f in self.fields)
87
88
89class Qcow2Struct(metaclass=Qcow2StructMeta):
90
91    """Qcow2Struct: base class for qcow2 data structures
92
93    Successors should define fields class variable, which is: list of tuples,
94    each of three elements:
95        - c-type (one of 'u8', 'u16', 'u32', 'u64')
96        - format (format_spec to use with .format() when dump or 'mask' to dump
97                  bitmasks)
98        - field name
99    """
100
101    def __init__(self, fd=None, offset=None, data=None):
102        """
103        Two variants:
104            1. Specify data. fd and offset must be None.
105            2. Specify fd and offset, data must be None. offset may be omitted
106               in this case, than current position of fd is used.
107        """
108        if data is None:
109            assert fd is not None
110            buf_size = struct.calcsize(self.fmt)
111            if offset is not None:
112                fd.seek(offset)
113            data = fd.read(buf_size)
114        else:
115            assert fd is None and offset is None
116
117        values = struct.unpack(self.fmt, data)
118        self.__dict__ = dict((field[2], values[i])
119                             for i, field in enumerate(self.fields))
120
121    def dump(self, is_json=False):
122        if is_json:
123            print(json.dumps(self.to_json(), indent=4, cls=ComplexEncoder))
124            return
125
126        for f in self.fields:
127            value = self.__dict__[f[2]]
128            if isinstance(f[1], str):
129                value_str = f[1].format(value)
130            else:
131                value_str = str(f[1](value))
132
133            print('{:<25} {}'.format(f[2], value_str))
134
135    def to_json(self):
136        return dict((f[2], self.__dict__[f[2]]) for f in self.fields)
137
138
139class Qcow2BitmapExt(Qcow2Struct):
140
141    fields = (
142        ('u32', '{}', 'nb_bitmaps'),
143        ('u32', '{}', 'reserved32'),
144        ('u64', '{:#x}', 'bitmap_directory_size'),
145        ('u64', '{:#x}', 'bitmap_directory_offset')
146    )
147
148    def __init__(self, fd, cluster_size):
149        super().__init__(fd=fd)
150        tail = struct.calcsize(self.fmt) % 8
151        if tail:
152            fd.seek(8 - tail, 1)
153        position = fd.tell()
154        self.cluster_size = cluster_size
155        self.read_bitmap_directory(fd)
156        fd.seek(position)
157
158    def read_bitmap_directory(self, fd):
159        fd.seek(self.bitmap_directory_offset)
160        self.bitmap_directory = \
161            [Qcow2BitmapDirEntry(fd, cluster_size=self.cluster_size)
162             for _ in range(self.nb_bitmaps)]
163
164    def dump(self):
165        super().dump()
166        for entry in self.bitmap_directory:
167            print()
168            entry.dump()
169
170    def to_json(self):
171        fields_dict = super().to_json()
172        fields_dict['bitmap_directory'] = self.bitmap_directory
173        return fields_dict
174
175
176class Qcow2BitmapDirEntry(Qcow2Struct):
177
178    fields = (
179        ('u64', '{:#x}', 'bitmap_table_offset'),
180        ('u32', '{}', 'bitmap_table_size'),
181        ('u32', BitmapFlags, 'flags'),
182        ('u8',  '{}', 'type'),
183        ('u8',  '{}', 'granularity_bits'),
184        ('u16', '{}', 'name_size'),
185        ('u32', '{}', 'extra_data_size')
186    )
187
188    def __init__(self, fd, cluster_size):
189        super().__init__(fd=fd)
190        self.cluster_size = cluster_size
191        # Seek relative to the current position in the file
192        fd.seek(self.extra_data_size, 1)
193        bitmap_name = fd.read(self.name_size)
194        self.name = bitmap_name.decode('ascii')
195        # Move position to the end of the entry in the directory
196        entry_raw_size = self.bitmap_dir_entry_raw_size()
197        padding = ((entry_raw_size + 7) & ~7) - entry_raw_size
198        fd.seek(padding, 1)
199        self.bitmap_table = Qcow2BitmapTable(fd=fd,
200                                             offset=self.bitmap_table_offset,
201                                             nb_entries=self.bitmap_table_size,
202                                             cluster_size=self.cluster_size)
203
204    def bitmap_dir_entry_raw_size(self):
205        return struct.calcsize(self.fmt) + self.name_size + \
206            self.extra_data_size
207
208    def dump(self):
209        print(f'{"Bitmap name":<25} {self.name}')
210        super(Qcow2BitmapDirEntry, self).dump()
211        self.bitmap_table.dump()
212
213    def to_json(self):
214        # Put the name ahead of the dict
215        return {
216            'name': self.name,
217            **super().to_json(),
218            'bitmap_table': self.bitmap_table
219        }
220
221
222class Qcow2BitmapTableEntry(Qcow2Struct):
223
224    fields = (
225        ('u64',  '{}', 'entry'),
226    )
227
228    BME_TABLE_ENTRY_RESERVED_MASK = 0xff000000000001fe
229    BME_TABLE_ENTRY_OFFSET_MASK = 0x00fffffffffffe00
230    BME_TABLE_ENTRY_FLAG_ALL_ONES = 1
231
232    def __init__(self, fd):
233        super().__init__(fd=fd)
234        self.reserved = self.entry & self.BME_TABLE_ENTRY_RESERVED_MASK
235        self.offset = self.entry & self.BME_TABLE_ENTRY_OFFSET_MASK
236        if self.offset:
237            if self.entry & self.BME_TABLE_ENTRY_FLAG_ALL_ONES:
238                self.type = 'invalid'
239            else:
240                self.type = 'serialized'
241        elif self.entry & self.BME_TABLE_ENTRY_FLAG_ALL_ONES:
242            self.type = 'all-ones'
243        else:
244            self.type = 'all-zeroes'
245
246    def to_json(self):
247        return {'type': self.type, 'offset': self.offset,
248                'reserved': self.reserved}
249
250
251class Qcow2BitmapTable:
252
253    def __init__(self, fd, offset, nb_entries, cluster_size):
254        self.cluster_size = cluster_size
255        position = fd.tell()
256        fd.seek(offset)
257        self.entries = [Qcow2BitmapTableEntry(fd) for _ in range(nb_entries)]
258        fd.seek(position)
259
260    def dump(self):
261        bitmap_table = enumerate(self.entries)
262        print(f'{"Bitmap table":<14} {"type":<15} {"size":<12} {"offset"}')
263        for i, entry in bitmap_table:
264            if entry.type == 'serialized':
265                size = self.cluster_size
266            else:
267                size = 0
268            print(f'{i:<14} {entry.type:<15} {size:<12} {entry.offset}')
269
270    def to_json(self):
271        return self.entries
272
273
274QCOW2_EXT_MAGIC_BITMAPS = 0x23852875
275
276
277class QcowHeaderExtension(Qcow2Struct):
278
279    class Magic(Enum):
280        mapping = {
281            0xe2792aca: 'Backing format',
282            0x6803f857: 'Feature table',
283            0x0537be77: 'Crypto header',
284            QCOW2_EXT_MAGIC_BITMAPS: 'Bitmaps',
285            0x44415441: 'Data file'
286        }
287
288        def to_json(self):
289            return self.mapping.get(self.value, "<unknown>")
290
291    fields = (
292        ('u32', Magic, 'magic'),
293        ('u32', '{}', 'length')
294        # length bytes of data follows
295        # then padding to next multiply of 8
296    )
297
298    def __init__(self, magic=None, length=None, data=None, fd=None,
299                 cluster_size=None):
300        """
301        Support both loading from fd and creation from user data.
302        For fd-based creation current position in a file will be used to read
303        the data.
304        The cluster_size value may be obtained by dependent structures.
305
306        This should be somehow refactored and functionality should be moved to
307        superclass (to allow creation of any qcow2 struct), but then, fields
308        of variable length (data here) should be supported in base class
309        somehow. Note also, that we probably want to parse different
310        extensions. Should they be subclasses of this class, or how to do it
311        better? Should it be something like QAPI union with discriminator field
312        (magic here). So, it's a TODO. We'll see how to properly refactor this
313        when we have more qcow2 structures.
314        """
315        if fd is None:
316            assert all(v is not None for v in (magic, length, data))
317            self.magic = magic
318            self.length = length
319            if length % 8 != 0:
320                padding = 8 - (length % 8)
321                data += b'\0' * padding
322            self.data = data
323        else:
324            assert all(v is None for v in (magic, length, data))
325            super().__init__(fd=fd)
326            if self.magic == QCOW2_EXT_MAGIC_BITMAPS:
327                self.obj = Qcow2BitmapExt(fd=fd, cluster_size=cluster_size)
328                self.data = None
329            else:
330                padded = (self.length + 7) & ~7
331                self.data = fd.read(padded)
332                assert self.data is not None
333                self.obj = None
334
335        if self.data is not None:
336            data_str = self.data[:self.length]
337            if all(c in string.printable.encode(
338                'ascii') for c in data_str):
339                data_str = f"'{ data_str.decode('ascii') }'"
340            else:
341                data_str = '<binary>'
342            self.data_str = data_str
343
344
345    def dump(self):
346        super().dump()
347
348        if self.obj is None:
349            print(f'{"data":<25} {self.data_str}')
350        else:
351            self.obj.dump()
352
353    def to_json(self):
354        # Put the name ahead of the dict
355        res = {'name': self.Magic(self.magic), **super().to_json()}
356        if self.obj is not None:
357            res['data'] = self.obj
358        else:
359            res['data_str'] = self.data_str
360
361        return res
362
363    @classmethod
364    def create(cls, magic, data):
365        return QcowHeaderExtension(magic, len(data), data)
366
367
368class QcowHeader(Qcow2Struct):
369
370    fields = (
371        # Version 2 header fields
372        ('u32', '{:#x}', 'magic'),
373        ('u32', '{}', 'version'),
374        ('u64', '{:#x}', 'backing_file_offset'),
375        ('u32', '{:#x}', 'backing_file_size'),
376        ('u32', '{}', 'cluster_bits'),
377        ('u64', '{}', 'size'),
378        ('u32', '{}', 'crypt_method'),
379        ('u32', '{}', 'l1_size'),
380        ('u64', '{:#x}', 'l1_table_offset'),
381        ('u64', '{:#x}', 'refcount_table_offset'),
382        ('u32', '{}', 'refcount_table_clusters'),
383        ('u32', '{}', 'nb_snapshots'),
384        ('u64', '{:#x}', 'snapshot_offset'),
385
386        # Version 3 header fields
387        ('u64', Flags64, 'incompatible_features'),
388        ('u64', Flags64, 'compatible_features'),
389        ('u64', Flags64, 'autoclear_features'),
390        ('u32', '{}', 'refcount_order'),
391        ('u32', '{}', 'header_length'),
392    )
393
394    def __init__(self, fd):
395        super().__init__(fd=fd, offset=0)
396
397        self.set_defaults()
398        self.cluster_size = 1 << self.cluster_bits
399
400        fd.seek(self.header_length)
401        self.load_extensions(fd)
402
403        if self.backing_file_offset:
404            fd.seek(self.backing_file_offset)
405            self.backing_file = fd.read(self.backing_file_size)
406        else:
407            self.backing_file = None
408
409    def set_defaults(self):
410        if self.version == 2:
411            self.incompatible_features = 0
412            self.compatible_features = 0
413            self.autoclear_features = 0
414            self.refcount_order = 4
415            self.header_length = 72
416
417    def load_extensions(self, fd):
418        self.extensions = []
419
420        if self.backing_file_offset != 0:
421            end = min(self.cluster_size, self.backing_file_offset)
422        else:
423            end = self.cluster_size
424
425        while fd.tell() < end:
426            ext = QcowHeaderExtension(fd=fd, cluster_size=self.cluster_size)
427            if ext.magic == 0:
428                break
429            else:
430                self.extensions.append(ext)
431
432    def update_extensions(self, fd):
433
434        fd.seek(self.header_length)
435        extensions = self.extensions
436        extensions.append(QcowHeaderExtension(0, 0, b''))
437        for ex in extensions:
438            buf = struct.pack('>II', ex.magic, ex.length)
439            fd.write(buf)
440            fd.write(ex.data)
441
442        if self.backing_file is not None:
443            self.backing_file_offset = fd.tell()
444            fd.write(self.backing_file)
445
446        if fd.tell() > self.cluster_size:
447            raise Exception('I think I just broke the image...')
448
449    def update(self, fd):
450        header_bytes = self.header_length
451
452        self.update_extensions(fd)
453
454        fd.seek(0)
455        header = tuple(self.__dict__[f] for t, p, f in QcowHeader.fields)
456        buf = struct.pack(QcowHeader.fmt, *header)
457        buf = buf[0:header_bytes-1]
458        fd.write(buf)
459
460    def dump_extensions(self, is_json=False):
461        if is_json:
462            print(json.dumps(self.extensions, indent=4, cls=ComplexEncoder))
463            return
464
465        for ex in self.extensions:
466            print('Header extension:')
467            ex.dump()
468            print()
469