1#!/usr/bin/env python3 2# group: rw 3# 4# Test case QMP's encrypted key management 5# 6# Copyright (C) 2019 Red Hat, Inc. 7# 8# This program is free software; you can redistribute it and/or modify 9# it under the terms of the GNU General Public License as published by 10# the Free Software Foundation; either version 2 of the License, or 11# (at your option) any later version. 12# 13# This program is distributed in the hope that it will be useful, 14# but WITHOUT ANY WARRANTY; without even the implied warranty of 15# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 16# GNU General Public License for more details. 17# 18# You should have received a copy of the GNU General Public License 19# along with this program. If not, see <http://www.gnu.org/licenses/>. 20# 21 22import iotests 23import os 24import time 25import json 26 27test_img = os.path.join(iotests.test_dir, 'test.img') 28 29class Secret: 30 def __init__(self, index): 31 self._id = "keysec" + str(index) 32 # you are not supposed to see the password... 33 self._secret = "hunter" + str(index) 34 35 def id(self): 36 return self._id 37 38 def secret(self): 39 return self._secret 40 41 def to_cmdline_object(self): 42 return [ "secret,id=" + self._id + ",data=" + self._secret] 43 44 def to_qmp_object(self): 45 return { "qom_type" : "secret", "id": self.id(), 46 "props": { "data": self.secret() } } 47 48################################################################################ 49class EncryptionSetupTestCase(iotests.QMPTestCase): 50 51 # test case startup 52 def setUp(self): 53 # start the VM 54 self.vm = iotests.VM() 55 self.vm.launch() 56 57 # create the secrets and load 'em into the VM 58 self.secrets = [ Secret(i) for i in range(0, 6) ] 59 for secret in self.secrets: 60 result = self.vm.qmp("object-add", **secret.to_qmp_object()) 61 self.assert_qmp(result, 'return', {}) 62 63 if iotests.imgfmt == "qcow2": 64 self.pfx = "encrypt." 65 self.img_opts = [ '-o', "encrypt.format=luks" ] 66 else: 67 self.pfx = "" 68 self.img_opts = [] 69 70 # test case shutdown 71 def tearDown(self): 72 # stop the VM 73 self.vm.shutdown() 74 75 ########################################################################### 76 # create the encrypted block device 77 def createImg(self, file, secret): 78 79 iotests.qemu_img( 80 'create', 81 '--object', *secret.to_cmdline_object(), 82 '-f', iotests.imgfmt, 83 '-o', self.pfx + 'key-secret=' + secret.id(), 84 '-o', self.pfx + 'iter-time=10', 85 *self.img_opts, 86 file, 87 '1M') 88 89 ########################################################################### 90 # open an encrypted block device 91 def openImageQmp(self, id, file, secret, read_only = False): 92 93 encrypt_options = { 94 'key-secret' : secret.id() 95 } 96 97 if iotests.imgfmt == "qcow2": 98 encrypt_options = { 99 'encrypt': { 100 'format':'luks', 101 **encrypt_options 102 } 103 } 104 105 result = self.vm.qmp('blockdev-add', ** 106 { 107 'driver': iotests.imgfmt, 108 'node-name': id, 109 'read-only': read_only, 110 111 **encrypt_options, 112 113 'file': { 114 'driver': 'file', 115 'filename': test_img, 116 } 117 } 118 ) 119 self.assert_qmp(result, 'return', {}) 120 121 # close the encrypted block device 122 def closeImageQmp(self, id): 123 result = self.vm.qmp('blockdev-del', **{ 'node-name': id }) 124 self.assert_qmp(result, 'return', {}) 125 126 ########################################################################### 127 # add a key to an encrypted block device 128 def addKeyQmp(self, id, new_secret, secret = None, 129 slot = None, force = False): 130 131 crypt_options = { 132 'state' : 'active', 133 'new-secret' : new_secret.id(), 134 'iter-time' : 10 135 } 136 137 if slot != None: 138 crypt_options['keyslot'] = slot 139 140 141 if secret != None: 142 crypt_options['secret'] = secret.id() 143 144 if iotests.imgfmt == "qcow2": 145 crypt_options['format'] = 'luks' 146 crypt_options = { 147 'encrypt': crypt_options 148 } 149 150 args = { 151 'node-name': id, 152 'job-id' : 'job_add_key', 153 'options' : { 154 'driver' : iotests.imgfmt, 155 **crypt_options 156 }, 157 } 158 159 if force == True: 160 args['force'] = True 161 162 #TODO: check what jobs return 163 result = self.vm.qmp('x-blockdev-amend', **args) 164 assert result['return'] == {} 165 self.vm.run_job('job_add_key') 166 167 # erase a key from an encrypted block device 168 def eraseKeyQmp(self, id, old_secret = None, slot = None, force = False): 169 170 crypt_options = { 171 'state' : 'inactive', 172 } 173 174 if slot != None: 175 crypt_options['keyslot'] = slot 176 if old_secret != None: 177 crypt_options['old-secret'] = old_secret.id() 178 179 if iotests.imgfmt == "qcow2": 180 crypt_options['format'] = 'luks' 181 crypt_options = { 182 'encrypt': crypt_options 183 } 184 185 args = { 186 'node-name': id, 187 'job-id' : 'job_erase_key', 188 'options' : { 189 'driver' : iotests.imgfmt, 190 **crypt_options 191 }, 192 } 193 194 if force == True: 195 args['force'] = True 196 197 result = self.vm.qmp('x-blockdev-amend', **args) 198 assert result['return'] == {} 199 self.vm.run_job('job_erase_key') 200 201 ########################################################################### 202 # create image, and change its key 203 def testChangeKey(self): 204 205 # create the image with secret0 and open it 206 self.createImg(test_img, self.secrets[0]); 207 self.openImageQmp("testdev", test_img, self.secrets[0]) 208 209 # add key to slot 1 210 self.addKeyQmp("testdev", new_secret = self.secrets[1]) 211 212 # add key to slot 5 213 self.addKeyQmp("testdev", new_secret = self.secrets[2], slot=5) 214 215 # erase key from slot 0 216 self.eraseKeyQmp("testdev", old_secret = self.secrets[0]) 217 218 #reopen the image with secret1 219 self.closeImageQmp("testdev") 220 self.openImageQmp("testdev", test_img, self.secrets[1]) 221 222 # close and erase the image for good 223 self.closeImageQmp("testdev") 224 os.remove(test_img) 225 226 # test that if we erase the old password, 227 # we can still change the encryption keys using 'old-secret' 228 def testOldPassword(self): 229 230 # create the image with secret0 and open it 231 self.createImg(test_img, self.secrets[0]); 232 self.openImageQmp("testdev", test_img, self.secrets[0]) 233 234 # add key to slot 1 235 self.addKeyQmp("testdev", new_secret = self.secrets[1]) 236 237 # erase key from slot 0 238 self.eraseKeyQmp("testdev", old_secret = self.secrets[0]) 239 240 # this will fail as the old password is no longer valid 241 self.addKeyQmp("testdev", new_secret = self.secrets[2]) 242 243 # this will work 244 self.addKeyQmp("testdev", new_secret = self.secrets[2], secret = self.secrets[1]) 245 246 # close and erase the image for good 247 self.closeImageQmp("testdev") 248 os.remove(test_img) 249 250 def testUseForceLuke(self): 251 252 self.createImg(test_img, self.secrets[0]); 253 self.openImageQmp("testdev", test_img, self.secrets[0]) 254 255 # Add bunch of secrets 256 self.addKeyQmp("testdev", new_secret = self.secrets[1], slot=4) 257 self.addKeyQmp("testdev", new_secret = self.secrets[4], slot=2) 258 259 # overwrite an active secret 260 self.addKeyQmp("testdev", new_secret = self.secrets[5], slot=2) 261 self.addKeyQmp("testdev", new_secret = self.secrets[5], slot=2, force=True) 262 263 self.addKeyQmp("testdev", new_secret = self.secrets[0]) 264 265 # Now erase all the secrets 266 self.eraseKeyQmp("testdev", old_secret = self.secrets[5]) 267 self.eraseKeyQmp("testdev", slot=4) 268 269 # erase last keyslot 270 self.eraseKeyQmp("testdev", old_secret = self.secrets[0]) 271 self.eraseKeyQmp("testdev", old_secret = self.secrets[0], force=True) 272 273 self.closeImageQmp("testdev") 274 os.remove(test_img) 275 276 277if __name__ == '__main__': 278 iotests.verify_working_luks() 279 # Encrypted formats support 280 iotests.activate_logging() 281 iotests.main(supported_fmts = ['qcow2', 'luks']) 282