1# Library for manipulations with qcow2 image 2# 3# Copyright (c) 2020 Virtuozzo International GmbH. 4# Copyright (C) 2012 Red Hat, Inc. 5# 6# This program is free software; you can redistribute it and/or modify 7# it under the terms of the GNU General Public License as published by 8# the Free Software Foundation; either version 2 of the License, or 9# (at your option) any later version. 10# 11# This program is distributed in the hope that it will be useful, 12# but WITHOUT ANY WARRANTY; without even the implied warranty of 13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 14# GNU General Public License for more details. 15# 16# You should have received a copy of the GNU General Public License 17# along with this program. If not, see <http://www.gnu.org/licenses/>. 18# 19 20import struct 21import string 22 23 24class Qcow2Field: 25 26 def __init__(self, value): 27 self.value = value 28 29 def __str__(self): 30 return str(self.value) 31 32 33class Flags64(Qcow2Field): 34 35 def __str__(self): 36 bits = [] 37 for bit in range(64): 38 if self.value & (1 << bit): 39 bits.append(bit) 40 return str(bits) 41 42 43class Enum(Qcow2Field): 44 45 def __str__(self): 46 return f'{self.value:#x} ({self.mapping.get(self.value, "<unknown>")})' 47 48 49class Qcow2StructMeta(type): 50 51 # Mapping from c types to python struct format 52 ctypes = { 53 'u8': 'B', 54 'u16': 'H', 55 'u32': 'I', 56 'u64': 'Q' 57 } 58 59 def __init__(self, name, bases, attrs): 60 if 'fields' in attrs: 61 self.fmt = '>' + ''.join(self.ctypes[f[0]] for f in self.fields) 62 63 64class Qcow2Struct(metaclass=Qcow2StructMeta): 65 66 """Qcow2Struct: base class for qcow2 data structures 67 68 Successors should define fields class variable, which is: list of tuples, 69 each of three elements: 70 - c-type (one of 'u8', 'u16', 'u32', 'u64') 71 - format (format_spec to use with .format() when dump or 'mask' to dump 72 bitmasks) 73 - field name 74 """ 75 76 def __init__(self, fd=None, offset=None, data=None): 77 """ 78 Two variants: 79 1. Specify data. fd and offset must be None. 80 2. Specify fd and offset, data must be None. offset may be omitted 81 in this case, than current position of fd is used. 82 """ 83 if data is None: 84 assert fd is not None 85 buf_size = struct.calcsize(self.fmt) 86 if offset is not None: 87 fd.seek(offset) 88 data = fd.read(buf_size) 89 else: 90 assert fd is None and offset is None 91 92 values = struct.unpack(self.fmt, data) 93 self.__dict__ = dict((field[2], values[i]) 94 for i, field in enumerate(self.fields)) 95 96 def dump(self): 97 for f in self.fields: 98 value = self.__dict__[f[2]] 99 if isinstance(f[1], str): 100 value_str = f[1].format(value) 101 else: 102 value_str = str(f[1](value)) 103 104 print('{:<25} {}'.format(f[2], value_str)) 105 106 107class Qcow2BitmapExt(Qcow2Struct): 108 109 fields = ( 110 ('u32', '{}', 'nb_bitmaps'), 111 ('u32', '{}', 'reserved32'), 112 ('u64', '{:#x}', 'bitmap_directory_size'), 113 ('u64', '{:#x}', 'bitmap_directory_offset') 114 ) 115 116 117QCOW2_EXT_MAGIC_BITMAPS = 0x23852875 118 119 120class QcowHeaderExtension(Qcow2Struct): 121 122 class Magic(Enum): 123 mapping = { 124 0xe2792aca: 'Backing format', 125 0x6803f857: 'Feature table', 126 0x0537be77: 'Crypto header', 127 QCOW2_EXT_MAGIC_BITMAPS: 'Bitmaps', 128 0x44415441: 'Data file' 129 } 130 131 fields = ( 132 ('u32', Magic, 'magic'), 133 ('u32', '{}', 'length') 134 # length bytes of data follows 135 # then padding to next multiply of 8 136 ) 137 138 def __init__(self, magic=None, length=None, data=None, fd=None): 139 """ 140 Support both loading from fd and creation from user data. 141 For fd-based creation current position in a file will be used to read 142 the data. 143 144 This should be somehow refactored and functionality should be moved to 145 superclass (to allow creation of any qcow2 struct), but then, fields 146 of variable length (data here) should be supported in base class 147 somehow. Note also, that we probably want to parse different 148 extensions. Should they be subclasses of this class, or how to do it 149 better? Should it be something like QAPI union with discriminator field 150 (magic here). So, it's a TODO. We'll see how to properly refactor this 151 when we have more qcow2 structures. 152 """ 153 if fd is None: 154 assert all(v is not None for v in (magic, length, data)) 155 self.magic = magic 156 self.length = length 157 if length % 8 != 0: 158 padding = 8 - (length % 8) 159 data += b'\0' * padding 160 self.data = data 161 else: 162 assert all(v is None for v in (magic, length, data)) 163 super().__init__(fd=fd) 164 padded = (self.length + 7) & ~7 165 self.data = fd.read(padded) 166 assert self.data is not None 167 168 if self.magic == QCOW2_EXT_MAGIC_BITMAPS: 169 self.obj = Qcow2BitmapExt(data=self.data) 170 else: 171 self.obj = None 172 173 def dump(self): 174 super().dump() 175 176 if self.obj is None: 177 data = self.data[:self.length] 178 if all(c in string.printable.encode('ascii') for c in data): 179 data = f"'{ data.decode('ascii') }'" 180 else: 181 data = '<binary>' 182 print(f'{"data":<25} {data}') 183 else: 184 self.obj.dump() 185 186 @classmethod 187 def create(cls, magic, data): 188 return QcowHeaderExtension(magic, len(data), data) 189 190 191class QcowHeader(Qcow2Struct): 192 193 fields = ( 194 # Version 2 header fields 195 ('u32', '{:#x}', 'magic'), 196 ('u32', '{}', 'version'), 197 ('u64', '{:#x}', 'backing_file_offset'), 198 ('u32', '{:#x}', 'backing_file_size'), 199 ('u32', '{}', 'cluster_bits'), 200 ('u64', '{}', 'size'), 201 ('u32', '{}', 'crypt_method'), 202 ('u32', '{}', 'l1_size'), 203 ('u64', '{:#x}', 'l1_table_offset'), 204 ('u64', '{:#x}', 'refcount_table_offset'), 205 ('u32', '{}', 'refcount_table_clusters'), 206 ('u32', '{}', 'nb_snapshots'), 207 ('u64', '{:#x}', 'snapshot_offset'), 208 209 # Version 3 header fields 210 ('u64', Flags64, 'incompatible_features'), 211 ('u64', Flags64, 'compatible_features'), 212 ('u64', Flags64, 'autoclear_features'), 213 ('u32', '{}', 'refcount_order'), 214 ('u32', '{}', 'header_length'), 215 ) 216 217 def __init__(self, fd): 218 super().__init__(fd=fd, offset=0) 219 220 self.set_defaults() 221 self.cluster_size = 1 << self.cluster_bits 222 223 fd.seek(self.header_length) 224 self.load_extensions(fd) 225 226 if self.backing_file_offset: 227 fd.seek(self.backing_file_offset) 228 self.backing_file = fd.read(self.backing_file_size) 229 else: 230 self.backing_file = None 231 232 def set_defaults(self): 233 if self.version == 2: 234 self.incompatible_features = 0 235 self.compatible_features = 0 236 self.autoclear_features = 0 237 self.refcount_order = 4 238 self.header_length = 72 239 240 def load_extensions(self, fd): 241 self.extensions = [] 242 243 if self.backing_file_offset != 0: 244 end = min(self.cluster_size, self.backing_file_offset) 245 else: 246 end = self.cluster_size 247 248 while fd.tell() < end: 249 ext = QcowHeaderExtension(fd=fd) 250 if ext.magic == 0: 251 break 252 else: 253 self.extensions.append(ext) 254 255 def update_extensions(self, fd): 256 257 fd.seek(self.header_length) 258 extensions = self.extensions 259 extensions.append(QcowHeaderExtension(0, 0, b'')) 260 for ex in extensions: 261 buf = struct.pack('>II', ex.magic, ex.length) 262 fd.write(buf) 263 fd.write(ex.data) 264 265 if self.backing_file is not None: 266 self.backing_file_offset = fd.tell() 267 fd.write(self.backing_file) 268 269 if fd.tell() > self.cluster_size: 270 raise Exception('I think I just broke the image...') 271 272 def update(self, fd): 273 header_bytes = self.header_length 274 275 self.update_extensions(fd) 276 277 fd.seek(0) 278 header = tuple(self.__dict__[f] for t, p, f in QcowHeader.fields) 279 buf = struct.pack(QcowHeader.fmt, *header) 280 buf = buf[0:header_bytes-1] 281 fd.write(buf) 282 283 def dump_extensions(self): 284 for ex in self.extensions: 285 print('Header extension:') 286 ex.dump() 287 print() 288