1#!/usr/bin/env python3 2# group: rw 3# 4# Test case QMP's encrypted key management 5# 6# Copyright (C) 2019 Red Hat, Inc. 7# 8# This program is free software; you can redistribute it and/or modify 9# it under the terms of the GNU General Public License as published by 10# the Free Software Foundation; either version 2 of the License, or 11# (at your option) any later version. 12# 13# This program is distributed in the hope that it will be useful, 14# but WITHOUT ANY WARRANTY; without even the implied warranty of 15# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 16# GNU General Public License for more details. 17# 18# You should have received a copy of the GNU General Public License 19# along with this program. If not, see <http://www.gnu.org/licenses/>. 20# 21 22import iotests 23import os 24import time 25import json 26 27test_img = os.path.join(iotests.test_dir, 'test.img') 28 29class Secret: 30 def __init__(self, index): 31 self._id = "keysec" + str(index) 32 # you are not supposed to see the password... 33 self._secret = "hunter" + str(index) 34 35 def id(self): 36 return self._id 37 38 def secret(self): 39 return self._secret 40 41 def to_cmdline_object(self): 42 return [ "secret,id=" + self._id + ",data=" + self._secret] 43 44 def to_qmp_object(self): 45 return { "qom_type" : "secret", "id": self.id(), 46 "data": self.secret() } 47 48################################################################################ 49class EncryptionSetupTestCase(iotests.QMPTestCase): 50 51 # test case startup 52 def setUp(self): 53 # start the VM 54 self.vm = iotests.VM() 55 self.vm.launch() 56 57 # create the secrets and load 'em into the VM 58 self.secrets = [ Secret(i) for i in range(0, 6) ] 59 for secret in self.secrets: 60 result = self.vm.qmp("object-add", **secret.to_qmp_object()) 61 self.assert_qmp(result, 'return', {}) 62 63 if iotests.imgfmt == "qcow2": 64 self.pfx = "encrypt." 65 self.img_opts = [ '-o', "encrypt.format=luks" ] 66 else: 67 self.pfx = "" 68 self.img_opts = [] 69 70 # test case shutdown 71 def tearDown(self): 72 # stop the VM 73 self.vm.shutdown() 74 75 ########################################################################### 76 # create the encrypted block device 77 def createImg(self, file, secret): 78 79 iotests.qemu_img( 80 'create', 81 '--object', *secret.to_cmdline_object(), 82 '-f', iotests.imgfmt, 83 '-o', self.pfx + 'key-secret=' + secret.id(), 84 '-o', self.pfx + 'iter-time=10', 85 *self.img_opts, 86 file, 87 '1M') 88 89 ########################################################################### 90 # open an encrypted block device 91 def openImageQmp(self, id, file, secret, read_only = False): 92 93 encrypt_options = { 94 'key-secret' : secret.id() 95 } 96 97 if iotests.imgfmt == "qcow2": 98 encrypt_options = { 99 'encrypt': { 100 'format':'luks', 101 **encrypt_options 102 } 103 } 104 105 result = self.vm.qmp('blockdev-add', { 106 'driver': iotests.imgfmt, 107 'node-name': id, 108 'read-only': read_only, 109 110 **encrypt_options, 111 112 'file': { 113 'driver': 'file', 114 'filename': test_img, 115 } 116 } 117 ) 118 self.assert_qmp(result, 'return', {}) 119 120 # close the encrypted block device 121 def closeImageQmp(self, id): 122 result = self.vm.qmp('blockdev-del', {'node-name': id}) 123 self.assert_qmp(result, 'return', {}) 124 125 ########################################################################### 126 # add a key to an encrypted block device 127 def addKeyQmp(self, id, new_secret, secret = None, 128 slot = None, force = False): 129 130 crypt_options = { 131 'state' : 'active', 132 'new-secret' : new_secret.id(), 133 'iter-time' : 10 134 } 135 136 if slot != None: 137 crypt_options['keyslot'] = slot 138 139 140 if secret != None: 141 crypt_options['secret'] = secret.id() 142 143 if iotests.imgfmt == "qcow2": 144 crypt_options['format'] = 'luks' 145 crypt_options = { 146 'encrypt': crypt_options 147 } 148 149 args = { 150 'node-name': id, 151 'job-id' : 'job_add_key', 152 'options' : { 153 'driver' : iotests.imgfmt, 154 **crypt_options 155 }, 156 } 157 158 if force == True: 159 args['force'] = True 160 161 #TODO: check what jobs return 162 result = self.vm.qmp('x-blockdev-amend', **args) 163 assert result['return'] == {} 164 self.vm.run_job('job_add_key') 165 166 # erase a key from an encrypted block device 167 def eraseKeyQmp(self, id, old_secret = None, slot = None, force = False): 168 169 crypt_options = { 170 'state' : 'inactive', 171 } 172 173 if slot != None: 174 crypt_options['keyslot'] = slot 175 if old_secret != None: 176 crypt_options['old-secret'] = old_secret.id() 177 178 if iotests.imgfmt == "qcow2": 179 crypt_options['format'] = 'luks' 180 crypt_options = { 181 'encrypt': crypt_options 182 } 183 184 args = { 185 'node-name': id, 186 'job-id' : 'job_erase_key', 187 'options' : { 188 'driver' : iotests.imgfmt, 189 **crypt_options 190 }, 191 } 192 193 if force == True: 194 args['force'] = True 195 196 result = self.vm.qmp('x-blockdev-amend', **args) 197 assert result['return'] == {} 198 self.vm.run_job('job_erase_key') 199 200 ########################################################################### 201 # create image, and change its key 202 def testChangeKey(self): 203 204 # create the image with secret0 and open it 205 self.createImg(test_img, self.secrets[0]); 206 self.openImageQmp("testdev", test_img, self.secrets[0]) 207 208 # add key to slot 1 209 self.addKeyQmp("testdev", new_secret = self.secrets[1]) 210 211 # add key to slot 5 212 self.addKeyQmp("testdev", new_secret = self.secrets[2], slot=5) 213 214 # erase key from slot 0 215 self.eraseKeyQmp("testdev", old_secret = self.secrets[0]) 216 217 #reopen the image with secret1 218 self.closeImageQmp("testdev") 219 self.openImageQmp("testdev", test_img, self.secrets[1]) 220 221 # close and erase the image for good 222 self.closeImageQmp("testdev") 223 os.remove(test_img) 224 225 # test that if we erase the old password, 226 # we can still change the encryption keys using 'old-secret' 227 def testOldPassword(self): 228 229 # create the image with secret0 and open it 230 self.createImg(test_img, self.secrets[0]); 231 self.openImageQmp("testdev", test_img, self.secrets[0]) 232 233 # add key to slot 1 234 self.addKeyQmp("testdev", new_secret = self.secrets[1]) 235 236 # erase key from slot 0 237 self.eraseKeyQmp("testdev", old_secret = self.secrets[0]) 238 239 # this will fail as the old password is no longer valid 240 self.addKeyQmp("testdev", new_secret = self.secrets[2]) 241 242 # this will work 243 self.addKeyQmp("testdev", new_secret = self.secrets[2], secret = self.secrets[1]) 244 245 # close and erase the image for good 246 self.closeImageQmp("testdev") 247 os.remove(test_img) 248 249 def testUseForceLuke(self): 250 251 self.createImg(test_img, self.secrets[0]); 252 self.openImageQmp("testdev", test_img, self.secrets[0]) 253 254 # Add bunch of secrets 255 self.addKeyQmp("testdev", new_secret = self.secrets[1], slot=4) 256 self.addKeyQmp("testdev", new_secret = self.secrets[4], slot=2) 257 258 # overwrite an active secret 259 self.addKeyQmp("testdev", new_secret = self.secrets[5], slot=2) 260 self.addKeyQmp("testdev", new_secret = self.secrets[5], slot=2, force=True) 261 262 self.addKeyQmp("testdev", new_secret = self.secrets[0]) 263 264 # Now erase all the secrets 265 self.eraseKeyQmp("testdev", old_secret = self.secrets[5]) 266 self.eraseKeyQmp("testdev", slot=4) 267 268 # erase last keyslot 269 self.eraseKeyQmp("testdev", old_secret = self.secrets[0]) 270 self.eraseKeyQmp("testdev", old_secret = self.secrets[0], force=True) 271 272 self.closeImageQmp("testdev") 273 os.remove(test_img) 274 275 276if __name__ == '__main__': 277 iotests.verify_working_luks() 278 # Encrypted formats support 279 iotests.activate_logging() 280 iotests.main(supported_fmts = ['qcow2', 'luks']) 281