1# Library for manipulations with qcow2 image
2#
3# Copyright (c) 2020 Virtuozzo International GmbH.
4# Copyright (C) 2012 Red Hat, Inc.
5#
6# This program is free software; you can redistribute it and/or modify
7# it under the terms of the GNU General Public License as published by
8# the Free Software Foundation; either version 2 of the License, or
9# (at your option) any later version.
10#
11# This program is distributed in the hope that it will be useful,
12# but WITHOUT ANY WARRANTY; without even the implied warranty of
13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14# GNU General Public License for more details.
15#
16# You should have received a copy of the GNU General Public License
17# along with this program.  If not, see <http://www.gnu.org/licenses/>.
18#
19
20import struct
21import string
22
23
24class Qcow2Field:
25
26    def __init__(self, value):
27        self.value = value
28
29    def __str__(self):
30        return str(self.value)
31
32
33class Flags64(Qcow2Field):
34
35    def __str__(self):
36        bits = []
37        for bit in range(64):
38            if self.value & (1 << bit):
39                bits.append(bit)
40        return str(bits)
41
42
43class Enum(Qcow2Field):
44
45    def __str__(self):
46        return f'{self.value:#x} ({self.mapping.get(self.value, "<unknown>")})'
47
48
49class Qcow2StructMeta(type):
50
51    # Mapping from c types to python struct format
52    ctypes = {
53        'u8': 'B',
54        'u16': 'H',
55        'u32': 'I',
56        'u64': 'Q'
57    }
58
59    def __init__(self, name, bases, attrs):
60        if 'fields' in attrs:
61            self.fmt = '>' + ''.join(self.ctypes[f[0]] for f in self.fields)
62
63
64class Qcow2Struct(metaclass=Qcow2StructMeta):
65
66    """Qcow2Struct: base class for qcow2 data structures
67
68    Successors should define fields class variable, which is: list of tuples,
69    each of three elements:
70        - c-type (one of 'u8', 'u16', 'u32', 'u64')
71        - format (format_spec to use with .format() when dump or 'mask' to dump
72                  bitmasks)
73        - field name
74    """
75
76    def __init__(self, fd=None, offset=None, data=None):
77        """
78        Two variants:
79            1. Specify data. fd and offset must be None.
80            2. Specify fd and offset, data must be None. offset may be omitted
81               in this case, than current position of fd is used.
82        """
83        if data is None:
84            assert fd is not None
85            buf_size = struct.calcsize(self.fmt)
86            if offset is not None:
87                fd.seek(offset)
88            data = fd.read(buf_size)
89        else:
90            assert fd is None and offset is None
91
92        values = struct.unpack(self.fmt, data)
93        self.__dict__ = dict((field[2], values[i])
94                             for i, field in enumerate(self.fields))
95
96    def dump(self):
97        for f in self.fields:
98            value = self.__dict__[f[2]]
99            if isinstance(f[1], str):
100                value_str = f[1].format(value)
101            else:
102                value_str = str(f[1](value))
103
104            print('{:<25} {}'.format(f[2], value_str))
105
106
107class Qcow2BitmapExt(Qcow2Struct):
108
109    fields = (
110        ('u32', '{}', 'nb_bitmaps'),
111        ('u32', '{}', 'reserved32'),
112        ('u64', '{:#x}', 'bitmap_directory_size'),
113        ('u64', '{:#x}', 'bitmap_directory_offset')
114    )
115
116
117QCOW2_EXT_MAGIC_BITMAPS = 0x23852875
118
119
120class QcowHeaderExtension(Qcow2Struct):
121
122    class Magic(Enum):
123        mapping = {
124            0xe2792aca: 'Backing format',
125            0x6803f857: 'Feature table',
126            0x0537be77: 'Crypto header',
127            QCOW2_EXT_MAGIC_BITMAPS: 'Bitmaps',
128            0x44415441: 'Data file'
129        }
130
131    fields = (
132        ('u32', Magic, 'magic'),
133        ('u32', '{}', 'length')
134        # length bytes of data follows
135        # then padding to next multiply of 8
136    )
137
138    def __init__(self, magic=None, length=None, data=None, fd=None):
139        """
140        Support both loading from fd and creation from user data.
141        For fd-based creation current position in a file will be used to read
142        the data.
143
144        This should be somehow refactored and functionality should be moved to
145        superclass (to allow creation of any qcow2 struct), but then, fields
146        of variable length (data here) should be supported in base class
147        somehow. Note also, that we probably want to parse different
148        extensions. Should they be subclasses of this class, or how to do it
149        better? Should it be something like QAPI union with discriminator field
150        (magic here). So, it's a TODO. We'll see how to properly refactor this
151        when we have more qcow2 structures.
152        """
153        if fd is None:
154            assert all(v is not None for v in (magic, length, data))
155            self.magic = magic
156            self.length = length
157            if length % 8 != 0:
158                padding = 8 - (length % 8)
159                data += b'\0' * padding
160            self.data = data
161        else:
162            assert all(v is None for v in (magic, length, data))
163            super().__init__(fd=fd)
164            padded = (self.length + 7) & ~7
165            self.data = fd.read(padded)
166            assert self.data is not None
167
168        if self.magic == QCOW2_EXT_MAGIC_BITMAPS:
169            self.obj = Qcow2BitmapExt(data=self.data)
170        else:
171            self.obj = None
172
173    def dump(self):
174        super().dump()
175
176        if self.obj is None:
177            data = self.data[:self.length]
178            if all(c in string.printable.encode('ascii') for c in data):
179                data = f"'{ data.decode('ascii') }'"
180            else:
181                data = '<binary>'
182            print(f'{"data":<25} {data}')
183        else:
184            self.obj.dump()
185
186    @classmethod
187    def create(cls, magic, data):
188        return QcowHeaderExtension(magic, len(data), data)
189
190
191class QcowHeader(Qcow2Struct):
192
193    fields = (
194        # Version 2 header fields
195        ('u32', '{:#x}', 'magic'),
196        ('u32', '{}', 'version'),
197        ('u64', '{:#x}', 'backing_file_offset'),
198        ('u32', '{:#x}', 'backing_file_size'),
199        ('u32', '{}', 'cluster_bits'),
200        ('u64', '{}', 'size'),
201        ('u32', '{}', 'crypt_method'),
202        ('u32', '{}', 'l1_size'),
203        ('u64', '{:#x}', 'l1_table_offset'),
204        ('u64', '{:#x}', 'refcount_table_offset'),
205        ('u32', '{}', 'refcount_table_clusters'),
206        ('u32', '{}', 'nb_snapshots'),
207        ('u64', '{:#x}', 'snapshot_offset'),
208
209        # Version 3 header fields
210        ('u64', Flags64, 'incompatible_features'),
211        ('u64', Flags64, 'compatible_features'),
212        ('u64', Flags64, 'autoclear_features'),
213        ('u32', '{}', 'refcount_order'),
214        ('u32', '{}', 'header_length'),
215    )
216
217    def __init__(self, fd):
218        super().__init__(fd=fd, offset=0)
219
220        self.set_defaults()
221        self.cluster_size = 1 << self.cluster_bits
222
223        fd.seek(self.header_length)
224        self.load_extensions(fd)
225
226        if self.backing_file_offset:
227            fd.seek(self.backing_file_offset)
228            self.backing_file = fd.read(self.backing_file_size)
229        else:
230            self.backing_file = None
231
232    def set_defaults(self):
233        if self.version == 2:
234            self.incompatible_features = 0
235            self.compatible_features = 0
236            self.autoclear_features = 0
237            self.refcount_order = 4
238            self.header_length = 72
239
240    def load_extensions(self, fd):
241        self.extensions = []
242
243        if self.backing_file_offset != 0:
244            end = min(self.cluster_size, self.backing_file_offset)
245        else:
246            end = self.cluster_size
247
248        while fd.tell() < end:
249            ext = QcowHeaderExtension(fd=fd)
250            if ext.magic == 0:
251                break
252            else:
253                self.extensions.append(ext)
254
255    def update_extensions(self, fd):
256
257        fd.seek(self.header_length)
258        extensions = self.extensions
259        extensions.append(QcowHeaderExtension(0, 0, b''))
260        for ex in extensions:
261            buf = struct.pack('>II', ex.magic, ex.length)
262            fd.write(buf)
263            fd.write(ex.data)
264
265        if self.backing_file is not None:
266            self.backing_file_offset = fd.tell()
267            fd.write(self.backing_file)
268
269        if fd.tell() > self.cluster_size:
270            raise Exception('I think I just broke the image...')
271
272    def update(self, fd):
273        header_bytes = self.header_length
274
275        self.update_extensions(fd)
276
277        fd.seek(0)
278        header = tuple(self.__dict__[f] for t, p, f in QcowHeader.fields)
279        buf = struct.pack(QcowHeader.fmt, *header)
280        buf = buf[0:header_bytes-1]
281        fd.write(buf)
282
283    def dump_extensions(self):
284        for ex in self.extensions:
285            print('Header extension:')
286            ex.dump()
287            print()
288