1# Library for manipulations with qcow2 image 2# 3# Copyright (c) 2020 Virtuozzo International GmbH. 4# 5# This program is free software; you can redistribute it and/or modify 6# it under the terms of the GNU General Public License as published by 7# the Free Software Foundation; either version 2 of the License, or 8# (at your option) any later version. 9# 10# This program is distributed in the hope that it will be useful, 11# but WITHOUT ANY WARRANTY; without even the implied warranty of 12# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 13# GNU General Public License for more details. 14# 15# You should have received a copy of the GNU General Public License 16# along with this program. If not, see <http://www.gnu.org/licenses/>. 17# 18 19import struct 20import string 21 22 23class Qcow2Field: 24 25 def __init__(self, value): 26 self.value = value 27 28 def __str__(self): 29 return str(self.value) 30 31 32class Flags64(Qcow2Field): 33 34 def __str__(self): 35 bits = [] 36 for bit in range(64): 37 if self.value & (1 << bit): 38 bits.append(bit) 39 return str(bits) 40 41 42class Enum(Qcow2Field): 43 44 def __str__(self): 45 return f'{self.value:#x} ({self.mapping.get(self.value, "<unknown>")})' 46 47 48class Qcow2StructMeta(type): 49 50 # Mapping from c types to python struct format 51 ctypes = { 52 'u8': 'B', 53 'u16': 'H', 54 'u32': 'I', 55 'u64': 'Q' 56 } 57 58 def __init__(self, name, bases, attrs): 59 if 'fields' in attrs: 60 self.fmt = '>' + ''.join(self.ctypes[f[0]] for f in self.fields) 61 62 63class Qcow2Struct(metaclass=Qcow2StructMeta): 64 65 """Qcow2Struct: base class for qcow2 data structures 66 67 Successors should define fields class variable, which is: list of tuples, 68 each of three elements: 69 - c-type (one of 'u8', 'u16', 'u32', 'u64') 70 - format (format_spec to use with .format() when dump or 'mask' to dump 71 bitmasks) 72 - field name 73 """ 74 75 def __init__(self, fd=None, offset=None, data=None): 76 """ 77 Two variants: 78 1. Specify data. fd and offset must be None. 79 2. Specify fd and offset, data must be None. offset may be omitted 80 in this case, than current position of fd is used. 81 """ 82 if data is None: 83 assert fd is not None 84 buf_size = struct.calcsize(self.fmt) 85 if offset is not None: 86 fd.seek(offset) 87 data = fd.read(buf_size) 88 else: 89 assert fd is None and offset is None 90 91 values = struct.unpack(self.fmt, data) 92 self.__dict__ = dict((field[2], values[i]) 93 for i, field in enumerate(self.fields)) 94 95 def dump(self): 96 for f in self.fields: 97 value = self.__dict__[f[2]] 98 if isinstance(f[1], str): 99 value_str = f[1].format(value) 100 else: 101 value_str = str(f[1](value)) 102 103 print('{:<25} {}'.format(f[2], value_str)) 104 105 106class Qcow2BitmapExt(Qcow2Struct): 107 108 fields = ( 109 ('u32', '{}', 'nb_bitmaps'), 110 ('u32', '{}', 'reserved32'), 111 ('u64', '{:#x}', 'bitmap_directory_size'), 112 ('u64', '{:#x}', 'bitmap_directory_offset') 113 ) 114 115 116QCOW2_EXT_MAGIC_BITMAPS = 0x23852875 117 118 119class QcowHeaderExtension(Qcow2Struct): 120 121 class Magic(Enum): 122 mapping = { 123 0xe2792aca: 'Backing format', 124 0x6803f857: 'Feature table', 125 0x0537be77: 'Crypto header', 126 QCOW2_EXT_MAGIC_BITMAPS: 'Bitmaps', 127 0x44415441: 'Data file' 128 } 129 130 fields = ( 131 ('u32', Magic, 'magic'), 132 ('u32', '{}', 'length') 133 # length bytes of data follows 134 # then padding to next multiply of 8 135 ) 136 137 def __init__(self, magic=None, length=None, data=None, fd=None): 138 """ 139 Support both loading from fd and creation from user data. 140 For fd-based creation current position in a file will be used to read 141 the data. 142 143 This should be somehow refactored and functionality should be moved to 144 superclass (to allow creation of any qcow2 struct), but then, fields 145 of variable length (data here) should be supported in base class 146 somehow. Note also, that we probably want to parse different 147 extensions. Should they be subclasses of this class, or how to do it 148 better? Should it be something like QAPI union with discriminator field 149 (magic here). So, it's a TODO. We'll see how to properly refactor this 150 when we have more qcow2 structures. 151 """ 152 if fd is None: 153 assert all(v is not None for v in (magic, length, data)) 154 self.magic = magic 155 self.length = length 156 if length % 8 != 0: 157 padding = 8 - (length % 8) 158 data += b'\0' * padding 159 self.data = data 160 else: 161 assert all(v is None for v in (magic, length, data)) 162 super().__init__(fd=fd) 163 padded = (self.length + 7) & ~7 164 self.data = fd.read(padded) 165 assert self.data is not None 166 167 if self.magic == QCOW2_EXT_MAGIC_BITMAPS: 168 self.obj = Qcow2BitmapExt(data=self.data) 169 else: 170 self.obj = None 171 172 def dump(self): 173 super().dump() 174 175 if self.obj is None: 176 data = self.data[:self.length] 177 if all(c in string.printable.encode('ascii') for c in data): 178 data = f"'{ data.decode('ascii') }'" 179 else: 180 data = '<binary>' 181 print(f'{"data":<25} {data}') 182 else: 183 self.obj.dump() 184 185 @classmethod 186 def create(cls, magic, data): 187 return QcowHeaderExtension(magic, len(data), data) 188 189 190class QcowHeader(Qcow2Struct): 191 192 fields = ( 193 # Version 2 header fields 194 ('u32', '{:#x}', 'magic'), 195 ('u32', '{}', 'version'), 196 ('u64', '{:#x}', 'backing_file_offset'), 197 ('u32', '{:#x}', 'backing_file_size'), 198 ('u32', '{}', 'cluster_bits'), 199 ('u64', '{}', 'size'), 200 ('u32', '{}', 'crypt_method'), 201 ('u32', '{}', 'l1_size'), 202 ('u64', '{:#x}', 'l1_table_offset'), 203 ('u64', '{:#x}', 'refcount_table_offset'), 204 ('u32', '{}', 'refcount_table_clusters'), 205 ('u32', '{}', 'nb_snapshots'), 206 ('u64', '{:#x}', 'snapshot_offset'), 207 208 # Version 3 header fields 209 ('u64', Flags64, 'incompatible_features'), 210 ('u64', Flags64, 'compatible_features'), 211 ('u64', Flags64, 'autoclear_features'), 212 ('u32', '{}', 'refcount_order'), 213 ('u32', '{}', 'header_length'), 214 ) 215 216 def __init__(self, fd): 217 super().__init__(fd=fd, offset=0) 218 219 self.set_defaults() 220 self.cluster_size = 1 << self.cluster_bits 221 222 fd.seek(self.header_length) 223 self.load_extensions(fd) 224 225 if self.backing_file_offset: 226 fd.seek(self.backing_file_offset) 227 self.backing_file = fd.read(self.backing_file_size) 228 else: 229 self.backing_file = None 230 231 def set_defaults(self): 232 if self.version == 2: 233 self.incompatible_features = 0 234 self.compatible_features = 0 235 self.autoclear_features = 0 236 self.refcount_order = 4 237 self.header_length = 72 238 239 def load_extensions(self, fd): 240 self.extensions = [] 241 242 if self.backing_file_offset != 0: 243 end = min(self.cluster_size, self.backing_file_offset) 244 else: 245 end = self.cluster_size 246 247 while fd.tell() < end: 248 ext = QcowHeaderExtension(fd=fd) 249 if ext.magic == 0: 250 break 251 else: 252 self.extensions.append(ext) 253 254 def update_extensions(self, fd): 255 256 fd.seek(self.header_length) 257 extensions = self.extensions 258 extensions.append(QcowHeaderExtension(0, 0, b'')) 259 for ex in extensions: 260 buf = struct.pack('>II', ex.magic, ex.length) 261 fd.write(buf) 262 fd.write(ex.data) 263 264 if self.backing_file is not None: 265 self.backing_file_offset = fd.tell() 266 fd.write(self.backing_file) 267 268 if fd.tell() > self.cluster_size: 269 raise Exception('I think I just broke the image...') 270 271 def update(self, fd): 272 header_bytes = self.header_length 273 274 self.update_extensions(fd) 275 276 fd.seek(0) 277 header = tuple(self.__dict__[f] for t, p, f in QcowHeader.fields) 278 buf = struct.pack(QcowHeader.fmt, *header) 279 buf = buf[0:header_bytes-1] 280 fd.write(buf) 281 282 def dump_extensions(self): 283 for ex in self.extensions: 284 print('Header extension:') 285 ex.dump() 286 print() 287