1#!/usr/bin/env python 2# 3# Tests for shrinking images 4# 5# Copyright (c) 2016-2017 Parallels International GmbH 6# 7# This program is free software; you can redistribute it and/or modify 8# it under the terms of the GNU General Public License as published by 9# the Free Software Foundation; either version 2 of the License, or 10# (at your option) any later version. 11# 12# This program is distributed in the hope that it will be useful, 13# but WITHOUT ANY WARRANTY; without even the implied warranty of 14# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 15# GNU General Public License for more details. 16# 17# You should have received a copy of the GNU General Public License 18# along with this program. If not, see <http://www.gnu.org/licenses/>. 19# 20 21import os, random, iotests, struct, qcow2, sys 22from iotests import qemu_img, qemu_io, image_size 23 24if sys.version_info.major == 2: 25 range = xrange 26 27test_img = os.path.join(iotests.test_dir, 'test.img') 28check_img = os.path.join(iotests.test_dir, 'check.img') 29 30def size_to_int(str): 31 suff = ['B', 'K', 'M', 'G', 'T'] 32 return int(str[:-1]) * 1024**suff.index(str[-1:]) 33 34class ShrinkBaseClass(iotests.QMPTestCase): 35 image_len = '128M' 36 shrink_size = '10M' 37 chunk_size = '16M' 38 refcount_bits = '16' 39 40 def __qcow2_check(self, filename): 41 entry_bits = 3 42 entry_size = 1 << entry_bits 43 l1_mask = 0x00fffffffffffe00 44 div_roundup = lambda n, d: (n + d - 1) // d 45 46 def split_by_n(data, n): 47 for x in range(0, len(data), n): 48 yield struct.unpack('>Q', data[x:x + n])[0] & l1_mask 49 50 def check_l1_table(h, l1_data): 51 l1_list = list(split_by_n(l1_data, entry_size)) 52 real_l1_size = div_roundup(h.size, 53 1 << (h.cluster_bits*2 - entry_size)) 54 used, unused = l1_list[:real_l1_size], l1_list[real_l1_size:] 55 56 self.assertTrue(len(used) != 0, "Verifying l1 table content") 57 self.assertFalse(any(unused), "Verifying l1 table content") 58 59 def check_reftable(fd, h, reftable): 60 for offset in split_by_n(reftable, entry_size): 61 if offset != 0: 62 fd.seek(offset) 63 cluster = fd.read(1 << h.cluster_bits) 64 self.assertTrue(any(cluster), "Verifying reftable content") 65 66 with open(filename, "rb") as fd: 67 h = qcow2.QcowHeader(fd) 68 69 fd.seek(h.l1_table_offset) 70 l1_table = fd.read(h.l1_size << entry_bits) 71 72 fd.seek(h.refcount_table_offset) 73 reftable = fd.read(h.refcount_table_clusters << h.cluster_bits) 74 75 check_l1_table(h, l1_table) 76 check_reftable(fd, h, reftable) 77 78 def __raw_check(self, filename): 79 pass 80 81 image_check = { 82 'qcow2' : __qcow2_check, 83 'raw' : __raw_check 84 } 85 86 def setUp(self): 87 if iotests.imgfmt == 'raw': 88 qemu_img('create', '-f', iotests.imgfmt, test_img, self.image_len) 89 qemu_img('create', '-f', iotests.imgfmt, check_img, 90 self.shrink_size) 91 else: 92 qemu_img('create', '-f', iotests.imgfmt, 93 '-o', 'cluster_size=' + self.cluster_size + 94 ',refcount_bits=' + self.refcount_bits, 95 test_img, self.image_len) 96 qemu_img('create', '-f', iotests.imgfmt, 97 '-o', 'cluster_size=%s'% self.cluster_size, 98 check_img, self.shrink_size) 99 qemu_io('-c', 'write -P 0xff 0 ' + self.shrink_size, check_img) 100 101 def tearDown(self): 102 os.remove(test_img) 103 os.remove(check_img) 104 105 def image_verify(self): 106 self.assertEqual(image_size(test_img), image_size(check_img), 107 "Verifying image size") 108 self.image_check[iotests.imgfmt](self, test_img) 109 110 if iotests.imgfmt == 'raw': 111 return 112 self.assertEqual(qemu_img('check', test_img), 0, 113 "Verifying image corruption") 114 115 def test_empty_image(self): 116 qemu_img('resize', '-f', iotests.imgfmt, '--shrink', test_img, 117 self.shrink_size) 118 119 self.assertEqual( 120 qemu_io('-c', 'read -P 0x00 %s'%self.shrink_size, test_img), 121 qemu_io('-c', 'read -P 0x00 %s'%self.shrink_size, check_img), 122 "Verifying image content") 123 124 self.image_verify() 125 126 def test_sequential_write(self): 127 for offs in range(0, size_to_int(self.image_len), 128 size_to_int(self.chunk_size)): 129 qemu_io('-c', 'write -P 0xff %d %s' % (offs, self.chunk_size), 130 test_img) 131 132 qemu_img('resize', '-f', iotests.imgfmt, '--shrink', test_img, 133 self.shrink_size) 134 135 self.assertEqual(qemu_img("compare", test_img, check_img), 0, 136 "Verifying image content") 137 138 self.image_verify() 139 140 def test_random_write(self): 141 offs_list = list(range(0, size_to_int(self.image_len), 142 size_to_int(self.chunk_size))) 143 random.shuffle(offs_list) 144 for offs in offs_list: 145 qemu_io('-c', 'write -P 0xff %d %s' % (offs, self.chunk_size), 146 test_img) 147 148 qemu_img('resize', '-f', iotests.imgfmt, '--shrink', test_img, 149 self.shrink_size) 150 151 self.assertEqual(qemu_img("compare", test_img, check_img), 0, 152 "Verifying image content") 153 154 self.image_verify() 155 156class TestShrink512(ShrinkBaseClass): 157 image_len = '3M' 158 shrink_size = '1M' 159 chunk_size = '256K' 160 cluster_size = '512' 161 refcount_bits = '64' 162 163class TestShrink64K(ShrinkBaseClass): 164 cluster_size = '64K' 165 166class TestShrink1M(ShrinkBaseClass): 167 cluster_size = '1M' 168 refcount_bits = '1' 169 170ShrinkBaseClass = None 171 172if __name__ == '__main__': 173 iotests.main(supported_fmts=['raw', 'qcow2']) 174