1#!/usr/bin/env python3 2# group: rw 3# 4# Tests for shrinking images 5# 6# Copyright (c) 2016-2017 Parallels International GmbH 7# 8# This program is free software; you can redistribute it and/or modify 9# it under the terms of the GNU General Public License as published by 10# the Free Software Foundation; either version 2 of the License, or 11# (at your option) any later version. 12# 13# This program is distributed in the hope that it will be useful, 14# but WITHOUT ANY WARRANTY; without even the implied warranty of 15# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 16# GNU General Public License for more details. 17# 18# You should have received a copy of the GNU General Public License 19# along with this program. If not, see <http://www.gnu.org/licenses/>. 20# 21 22import os, random, iotests, struct, qcow2, sys 23from iotests import qemu_img, qemu_io, image_size 24 25test_img = os.path.join(iotests.test_dir, 'test.img') 26check_img = os.path.join(iotests.test_dir, 'check.img') 27 28def size_to_int(str): 29 suff = ['B', 'K', 'M', 'G', 'T'] 30 return int(str[:-1]) * 1024**suff.index(str[-1:]) 31 32class ShrinkBaseClass(iotests.QMPTestCase): 33 image_len = '128M' 34 shrink_size = '10M' 35 chunk_size = '16M' 36 refcount_bits = '16' 37 38 def __qcow2_check(self, filename): 39 entry_bits = 3 40 entry_size = 1 << entry_bits 41 l1_mask = 0x00fffffffffffe00 42 div_roundup = lambda n, d: (n + d - 1) // d 43 44 def split_by_n(data, n): 45 for x in range(0, len(data), n): 46 yield struct.unpack('>Q', data[x:x + n])[0] & l1_mask 47 48 def check_l1_table(h, l1_data): 49 l1_list = list(split_by_n(l1_data, entry_size)) 50 real_l1_size = div_roundup(h.size, 51 1 << (h.cluster_bits*2 - entry_size)) 52 used, unused = l1_list[:real_l1_size], l1_list[real_l1_size:] 53 54 self.assertTrue(len(used) != 0, "Verifying l1 table content") 55 self.assertFalse(any(unused), "Verifying l1 table content") 56 57 def check_reftable(fd, h, reftable): 58 for offset in split_by_n(reftable, entry_size): 59 if offset != 0: 60 fd.seek(offset) 61 cluster = fd.read(1 << h.cluster_bits) 62 self.assertTrue(any(cluster), "Verifying reftable content") 63 64 with open(filename, "rb") as fd: 65 h = qcow2.QcowHeader(fd) 66 67 fd.seek(h.l1_table_offset) 68 l1_table = fd.read(h.l1_size << entry_bits) 69 70 fd.seek(h.refcount_table_offset) 71 reftable = fd.read(h.refcount_table_clusters << h.cluster_bits) 72 73 check_l1_table(h, l1_table) 74 check_reftable(fd, h, reftable) 75 76 def __raw_check(self, filename): 77 pass 78 79 image_check = { 80 'qcow2' : __qcow2_check, 81 'raw' : __raw_check 82 } 83 84 def setUp(self): 85 if iotests.imgfmt == 'raw': 86 qemu_img('create', '-f', iotests.imgfmt, test_img, self.image_len) 87 qemu_img('create', '-f', iotests.imgfmt, check_img, 88 self.shrink_size) 89 else: 90 qemu_img('create', '-f', iotests.imgfmt, 91 '-o', 'cluster_size=' + self.cluster_size + 92 ',refcount_bits=' + self.refcount_bits, 93 test_img, self.image_len) 94 qemu_img('create', '-f', iotests.imgfmt, 95 '-o', 'cluster_size=%s'% self.cluster_size, 96 check_img, self.shrink_size) 97 qemu_io('-c', 'write -P 0xff 0 ' + self.shrink_size, check_img) 98 99 def tearDown(self): 100 os.remove(test_img) 101 os.remove(check_img) 102 103 def image_verify(self): 104 self.assertEqual(image_size(test_img), image_size(check_img), 105 "Verifying image size") 106 self.image_check[iotests.imgfmt](self, test_img) 107 108 if iotests.imgfmt == 'raw': 109 return 110 self.assertEqual(qemu_img('check', test_img), 0, 111 "Verifying image corruption") 112 113 def test_empty_image(self): 114 qemu_img('resize', '-f', iotests.imgfmt, '--shrink', test_img, 115 self.shrink_size) 116 117 self.assertEqual( 118 qemu_io('-c', 'read -P 0x00 %s'%self.shrink_size, test_img), 119 qemu_io('-c', 'read -P 0x00 %s'%self.shrink_size, check_img), 120 "Verifying image content") 121 122 self.image_verify() 123 124 def test_sequential_write(self): 125 for offs in range(0, size_to_int(self.image_len), 126 size_to_int(self.chunk_size)): 127 qemu_io('-c', 'write -P 0xff %d %s' % (offs, self.chunk_size), 128 test_img) 129 130 qemu_img('resize', '-f', iotests.imgfmt, '--shrink', test_img, 131 self.shrink_size) 132 133 self.assertEqual(qemu_img("compare", test_img, check_img), 0, 134 "Verifying image content") 135 136 self.image_verify() 137 138 def test_random_write(self): 139 offs_list = list(range(0, size_to_int(self.image_len), 140 size_to_int(self.chunk_size))) 141 random.shuffle(offs_list) 142 for offs in offs_list: 143 qemu_io('-c', 'write -P 0xff %d %s' % (offs, self.chunk_size), 144 test_img) 145 146 qemu_img('resize', '-f', iotests.imgfmt, '--shrink', test_img, 147 self.shrink_size) 148 149 self.assertEqual(qemu_img("compare", test_img, check_img), 0, 150 "Verifying image content") 151 152 self.image_verify() 153 154class TestShrink512(ShrinkBaseClass): 155 image_len = '3M' 156 shrink_size = '1M' 157 chunk_size = '256K' 158 cluster_size = '512' 159 refcount_bits = '64' 160 161class TestShrink64K(ShrinkBaseClass): 162 cluster_size = '64K' 163 164class TestShrink1M(ShrinkBaseClass): 165 cluster_size = '1M' 166 refcount_bits = '1' 167 168ShrinkBaseClass = None 169 170if __name__ == '__main__': 171 iotests.main(supported_fmts=['raw', 'qcow2'], 172 supported_protocols=['file'], 173 unsupported_imgopts=['compat']) 174