1#!/usr/bin/env python3 2# 3# Test case QMP's encrypted key management 4# 5# Copyright (C) 2019 Red Hat, Inc. 6# 7# This program is free software; you can redistribute it and/or modify 8# it under the terms of the GNU General Public License as published by 9# the Free Software Foundation; either version 2 of the License, or 10# (at your option) any later version. 11# 12# This program is distributed in the hope that it will be useful, 13# but WITHOUT ANY WARRANTY; without even the implied warranty of 14# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 15# GNU General Public License for more details. 16# 17# You should have received a copy of the GNU General Public License 18# along with this program. If not, see <http://www.gnu.org/licenses/>. 19# 20 21import iotests 22import os 23import time 24import json 25 26test_img = os.path.join(iotests.test_dir, 'test.img') 27 28class Secret: 29 def __init__(self, index): 30 self._id = "keysec" + str(index) 31 # you are not supposed to see the password... 32 self._secret = "hunter" + str(index) 33 34 def id(self): 35 return self._id 36 37 def secret(self): 38 return self._secret 39 40 def to_cmdline_object(self): 41 return [ "secret,id=" + self._id + ",data=" + self._secret] 42 43 def to_qmp_object(self): 44 return { "qom_type" : "secret", "id": self.id(), 45 "props": { "data": self.secret() } } 46 47################################################################################ 48class EncryptionSetupTestCase(iotests.QMPTestCase): 49 50 # test case startup 51 def setUp(self): 52 # start the VM 53 self.vm = iotests.VM() 54 self.vm.launch() 55 56 # create the secrets and load 'em into the VM 57 self.secrets = [ Secret(i) for i in range(0, 6) ] 58 for secret in self.secrets: 59 result = self.vm.qmp("object-add", **secret.to_qmp_object()) 60 self.assert_qmp(result, 'return', {}) 61 62 if iotests.imgfmt == "qcow2": 63 self.pfx = "encrypt." 64 self.img_opts = [ '-o', "encrypt.format=luks" ] 65 else: 66 self.pfx = "" 67 self.img_opts = [] 68 69 # test case shutdown 70 def tearDown(self): 71 # stop the VM 72 self.vm.shutdown() 73 74 ########################################################################### 75 # create the encrypted block device 76 def createImg(self, file, secret): 77 78 iotests.qemu_img( 79 'create', 80 '--object', *secret.to_cmdline_object(), 81 '-f', iotests.imgfmt, 82 '-o', self.pfx + 'key-secret=' + secret.id(), 83 '-o', self.pfx + 'iter-time=10', 84 *self.img_opts, 85 file, 86 '1M') 87 88 ########################################################################### 89 # open an encrypted block device 90 def openImageQmp(self, id, file, secret, read_only = False): 91 92 encrypt_options = { 93 'key-secret' : secret.id() 94 } 95 96 if iotests.imgfmt == "qcow2": 97 encrypt_options = { 98 'encrypt': { 99 'format':'luks', 100 **encrypt_options 101 } 102 } 103 104 result = self.vm.qmp('blockdev-add', ** 105 { 106 'driver': iotests.imgfmt, 107 'node-name': id, 108 'read-only': read_only, 109 110 **encrypt_options, 111 112 'file': { 113 'driver': 'file', 114 'filename': test_img, 115 } 116 } 117 ) 118 self.assert_qmp(result, 'return', {}) 119 120 # close the encrypted block device 121 def closeImageQmp(self, id): 122 result = self.vm.qmp('blockdev-del', **{ 'node-name': id }) 123 self.assert_qmp(result, 'return', {}) 124 125 ########################################################################### 126 # add a key to an encrypted block device 127 def addKeyQmp(self, id, new_secret, secret = None, 128 slot = None, force = False): 129 130 crypt_options = { 131 'state' : 'active', 132 'new-secret' : new_secret.id(), 133 'iter-time' : 10 134 } 135 136 if slot != None: 137 crypt_options['keyslot'] = slot 138 139 140 if secret != None: 141 crypt_options['secret'] = secret.id() 142 143 if iotests.imgfmt == "qcow2": 144 crypt_options['format'] = 'luks' 145 crypt_options = { 146 'encrypt': crypt_options 147 } 148 149 args = { 150 'node-name': id, 151 'job-id' : 'job_add_key', 152 'options' : { 153 'driver' : iotests.imgfmt, 154 **crypt_options 155 }, 156 } 157 158 if force == True: 159 args['force'] = True 160 161 #TODO: check what jobs return 162 result = self.vm.qmp('x-blockdev-amend', **args) 163 assert result['return'] == {} 164 self.vm.run_job('job_add_key') 165 166 # erase a key from an encrypted block device 167 def eraseKeyQmp(self, id, old_secret = None, slot = None, force = False): 168 169 crypt_options = { 170 'state' : 'inactive', 171 } 172 173 if slot != None: 174 crypt_options['keyslot'] = slot 175 if old_secret != None: 176 crypt_options['old-secret'] = old_secret.id() 177 178 if iotests.imgfmt == "qcow2": 179 crypt_options['format'] = 'luks' 180 crypt_options = { 181 'encrypt': crypt_options 182 } 183 184 args = { 185 'node-name': id, 186 'job-id' : 'job_erase_key', 187 'options' : { 188 'driver' : iotests.imgfmt, 189 **crypt_options 190 }, 191 } 192 193 if force == True: 194 args['force'] = True 195 196 result = self.vm.qmp('x-blockdev-amend', **args) 197 assert result['return'] == {} 198 self.vm.run_job('job_erase_key') 199 200 ########################################################################### 201 # create image, and change its key 202 def testChangeKey(self): 203 204 # create the image with secret0 and open it 205 self.createImg(test_img, self.secrets[0]); 206 self.openImageQmp("testdev", test_img, self.secrets[0]) 207 208 # add key to slot 1 209 self.addKeyQmp("testdev", new_secret = self.secrets[1]) 210 211 # add key to slot 5 212 self.addKeyQmp("testdev", new_secret = self.secrets[2], slot=5) 213 214 # erase key from slot 0 215 self.eraseKeyQmp("testdev", old_secret = self.secrets[0]) 216 217 #reopen the image with secret1 218 self.closeImageQmp("testdev") 219 self.openImageQmp("testdev", test_img, self.secrets[1]) 220 221 # close and erase the image for good 222 self.closeImageQmp("testdev") 223 os.remove(test_img) 224 225 # test that if we erase the old password, 226 # we can still change the encryption keys using 'old-secret' 227 def testOldPassword(self): 228 229 # create the image with secret0 and open it 230 self.createImg(test_img, self.secrets[0]); 231 self.openImageQmp("testdev", test_img, self.secrets[0]) 232 233 # add key to slot 1 234 self.addKeyQmp("testdev", new_secret = self.secrets[1]) 235 236 # erase key from slot 0 237 self.eraseKeyQmp("testdev", old_secret = self.secrets[0]) 238 239 # this will fail as the old password is no longer valid 240 self.addKeyQmp("testdev", new_secret = self.secrets[2]) 241 242 # this will work 243 self.addKeyQmp("testdev", new_secret = self.secrets[2], secret = self.secrets[1]) 244 245 # close and erase the image for good 246 self.closeImageQmp("testdev") 247 os.remove(test_img) 248 249 def testUseForceLuke(self): 250 251 self.createImg(test_img, self.secrets[0]); 252 self.openImageQmp("testdev", test_img, self.secrets[0]) 253 254 # Add bunch of secrets 255 self.addKeyQmp("testdev", new_secret = self.secrets[1], slot=4) 256 self.addKeyQmp("testdev", new_secret = self.secrets[4], slot=2) 257 258 # overwrite an active secret 259 self.addKeyQmp("testdev", new_secret = self.secrets[5], slot=2) 260 self.addKeyQmp("testdev", new_secret = self.secrets[5], slot=2, force=True) 261 262 self.addKeyQmp("testdev", new_secret = self.secrets[0]) 263 264 # Now erase all the secrets 265 self.eraseKeyQmp("testdev", old_secret = self.secrets[5]) 266 self.eraseKeyQmp("testdev", slot=4) 267 268 # erase last keyslot 269 self.eraseKeyQmp("testdev", old_secret = self.secrets[0]) 270 self.eraseKeyQmp("testdev", old_secret = self.secrets[0], force=True) 271 272 self.closeImageQmp("testdev") 273 os.remove(test_img) 274 275 276if __name__ == '__main__': 277 iotests.verify_working_luks() 278 # Encrypted formats support 279 iotests.activate_logging() 280 iotests.main(supported_fmts = ['qcow2', 'luks']) 281