1#!/usr/bin/env python 2# 3# Test whether the backing BDSs are correct after completion of a 4# mirror block job; in "existing" modes (drive-mirror with 5# mode=existing and blockdev-mirror) the backing chain should not be 6# overridden. 7# 8# Copyright (C) 2016 Red Hat, Inc. 9# 10# This program is free software; you can redistribute it and/or modify 11# it under the terms of the GNU General Public License as published by 12# the Free Software Foundation; either version 2 of the License, or 13# (at your option) any later version. 14# 15# This program is distributed in the hope that it will be useful, 16# but WITHOUT ANY WARRANTY; without even the implied warranty of 17# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 18# GNU General Public License for more details. 19# 20# You should have received a copy of the GNU General Public License 21# along with this program. If not, see <http://www.gnu.org/licenses/>. 22# 23 24import os 25import iotests 26from iotests import qemu_img 27 28back0_img = os.path.join(iotests.test_dir, 'back0.' + iotests.imgfmt) 29back1_img = os.path.join(iotests.test_dir, 'back1.' + iotests.imgfmt) 30back2_img = os.path.join(iotests.test_dir, 'back2.' + iotests.imgfmt) 31source_img = os.path.join(iotests.test_dir, 'source.' + iotests.imgfmt) 32target_img = os.path.join(iotests.test_dir, 'target.' + iotests.imgfmt) 33 34 35# Class variables for controlling its behavior: 36# 37# existing: If True, explicitly create the target image and blockdev-add it 38# target_backing: If existing is True: Use this filename as the backing file 39# of the target image 40# (None: no backing file) 41# target_blockdev_backing: If existing is True: Pass this dict as "backing" 42# for the blockdev-add command 43# (None: do not pass "backing") 44# target_real_backing: If existing is True: The real filename of the backing 45# image during runtime, only makes sense if 46# target_blockdev_backing is not None 47# (None: same as target_backing) 48 49class BaseClass(iotests.QMPTestCase): 50 target_blockdev_backing = None 51 target_real_backing = None 52 53 def setUp(self): 54 qemu_img('create', '-f', iotests.imgfmt, back0_img, '1M') 55 qemu_img('create', '-f', iotests.imgfmt, '-b', back0_img, back1_img) 56 qemu_img('create', '-f', iotests.imgfmt, '-b', back1_img, back2_img) 57 qemu_img('create', '-f', iotests.imgfmt, '-b', back2_img, source_img) 58 59 self.vm = iotests.VM() 60 self.vm.add_drive(None, '', 'none') 61 self.vm.launch() 62 63 # Add the BDS via blockdev-add so it stays around after the mirror block 64 # job has been completed 65 result = self.vm.qmp('blockdev-add', 66 options={'node-name': 'source', 67 'driver': iotests.imgfmt, 68 'file': {'driver': 'file', 69 'filename': source_img}}) 70 self.assert_qmp(result, 'return', {}) 71 72 result = self.vm.qmp('x-blockdev-insert-medium', 73 device='drive0', node_name='source') 74 self.assert_qmp(result, 'return', {}) 75 76 self.assertIntactSourceBackingChain() 77 78 if self.existing: 79 if self.target_backing: 80 qemu_img('create', '-f', iotests.imgfmt, 81 '-b', self.target_backing, target_img, '1M') 82 else: 83 qemu_img('create', '-f', iotests.imgfmt, target_img, '1M') 84 85 if self.cmd == 'blockdev-mirror': 86 options = { 'node-name': 'target', 87 'driver': iotests.imgfmt, 88 'file': { 'driver': 'file', 89 'filename': target_img } } 90 if self.target_blockdev_backing: 91 options['backing'] = self.target_blockdev_backing 92 93 result = self.vm.qmp('blockdev-add', options=options) 94 self.assert_qmp(result, 'return', {}) 95 96 def tearDown(self): 97 self.vm.shutdown() 98 os.remove(source_img) 99 os.remove(back2_img) 100 os.remove(back1_img) 101 os.remove(back0_img) 102 try: 103 os.remove(target_img) 104 except OSError: 105 pass 106 107 def findBlockNode(self, node_name, id=None): 108 if id: 109 result = self.vm.qmp('query-block') 110 for device in result['return']: 111 if device['device'] == id: 112 if node_name: 113 self.assert_qmp(device, 'inserted/node-name', node_name) 114 return device['inserted'] 115 else: 116 result = self.vm.qmp('query-named-block-nodes') 117 for node in result['return']: 118 if node['node-name'] == node_name: 119 return node 120 121 self.fail('Cannot find node %s/%s' % (id, node_name)) 122 123 def assertIntactSourceBackingChain(self): 124 node = self.findBlockNode('source') 125 126 self.assert_qmp(node, 'image' + '/backing-image' * 0 + '/filename', 127 source_img) 128 self.assert_qmp(node, 'image' + '/backing-image' * 1 + '/filename', 129 back2_img) 130 self.assert_qmp(node, 'image' + '/backing-image' * 2 + '/filename', 131 back1_img) 132 self.assert_qmp(node, 'image' + '/backing-image' * 3 + '/filename', 133 back0_img) 134 self.assert_qmp_absent(node, 'image' + '/backing-image' * 4) 135 136 def assertCorrectBackingImage(self, node, default_image): 137 if self.existing: 138 if self.target_real_backing: 139 image = self.target_real_backing 140 else: 141 image = self.target_backing 142 else: 143 image = default_image 144 145 if image: 146 self.assert_qmp(node, 'image/backing-image/filename', image) 147 else: 148 self.assert_qmp_absent(node, 'image/backing-image') 149 150 151# Class variables for controlling its behavior: 152# 153# cmd: Mirroring command to execute, either drive-mirror or blockdev-mirror 154 155class MirrorBaseClass(BaseClass): 156 def runMirror(self, sync): 157 if self.cmd == 'blockdev-mirror': 158 result = self.vm.qmp(self.cmd, device='drive0', sync=sync, 159 target='target') 160 else: 161 if self.existing: 162 mode = 'existing' 163 else: 164 mode = 'absolute-paths' 165 result = self.vm.qmp(self.cmd, device='drive0', sync=sync, 166 target=target_img, format=iotests.imgfmt, 167 mode=mode, node_name='target') 168 169 self.assert_qmp(result, 'return', {}) 170 171 self.vm.event_wait('BLOCK_JOB_READY') 172 173 result = self.vm.qmp('block-job-complete', device='drive0') 174 self.assert_qmp(result, 'return', {}) 175 176 self.vm.event_wait('BLOCK_JOB_COMPLETED') 177 178 def testFull(self): 179 self.runMirror('full') 180 181 node = self.findBlockNode('target', 'drive0') 182 self.assertCorrectBackingImage(node, None) 183 self.assertIntactSourceBackingChain() 184 185 def testTop(self): 186 self.runMirror('top') 187 188 node = self.findBlockNode('target', 'drive0') 189 self.assertCorrectBackingImage(node, back2_img) 190 self.assertIntactSourceBackingChain() 191 192 def testNone(self): 193 self.runMirror('none') 194 195 node = self.findBlockNode('target', 'drive0') 196 self.assertCorrectBackingImage(node, source_img) 197 self.assertIntactSourceBackingChain() 198 199 200class TestDriveMirrorAbsolutePaths(MirrorBaseClass): 201 cmd = 'drive-mirror' 202 existing = False 203 204class TestDriveMirrorExistingNoBacking(MirrorBaseClass): 205 cmd = 'drive-mirror' 206 existing = True 207 target_backing = None 208 209class TestDriveMirrorExistingBacking(MirrorBaseClass): 210 cmd = 'drive-mirror' 211 existing = True 212 target_backing = 'null-co://' 213 214class TestBlockdevMirrorNoBacking(MirrorBaseClass): 215 cmd = 'blockdev-mirror' 216 existing = True 217 target_backing = None 218 219class TestBlockdevMirrorBacking(MirrorBaseClass): 220 cmd = 'blockdev-mirror' 221 existing = True 222 target_backing = 'null-co://' 223 224class TestBlockdevMirrorForcedBacking(MirrorBaseClass): 225 cmd = 'blockdev-mirror' 226 existing = True 227 target_backing = None 228 target_blockdev_backing = { 'driver': 'null-co' } 229 target_real_backing = 'null-co://' 230 231 232class TestCommit(BaseClass): 233 existing = False 234 235 def testCommit(self): 236 result = self.vm.qmp('block-commit', device='drive0', base=back1_img) 237 self.assert_qmp(result, 'return', {}) 238 239 self.vm.event_wait('BLOCK_JOB_READY') 240 241 result = self.vm.qmp('block-job-complete', device='drive0') 242 self.assert_qmp(result, 'return', {}) 243 244 self.vm.event_wait('BLOCK_JOB_COMPLETED') 245 246 node = self.findBlockNode(None, 'drive0') 247 self.assert_qmp(node, 'image' + '/backing-image' * 0 + '/filename', 248 back1_img) 249 self.assert_qmp(node, 'image' + '/backing-image' * 1 + '/filename', 250 back0_img) 251 self.assert_qmp_absent(node, 'image' + '/backing-image' * 2 + 252 '/filename') 253 254 self.assertIntactSourceBackingChain() 255 256 257BaseClass = None 258MirrorBaseClass = None 259 260if __name__ == '__main__': 261 iotests.main(supported_fmts=['qcow2']) 262