1#!/usr/bin/env python3 2""" 3Script to generate certificates for a CA, server, and client allowing for 4client authentication using mTLS certificates. This can then be used to test 5mTLS client authentication for Redfish and WebUI. 6""" 7 8import argparse 9import datetime 10import errno 11import ipaddress 12import os 13import socket 14import time 15 16import httpx 17from cryptography import x509 18from cryptography.hazmat.primitives import hashes, serialization 19from cryptography.hazmat.primitives.asymmetric import ec 20from cryptography.hazmat.primitives.serialization import ( 21 load_pem_private_key, 22 pkcs12, 23) 24from cryptography.x509.oid import NameOID 25 26replaceCertPath = "/redfish/v1/CertificateService/Actions/CertificateService.ReplaceCertificate" 27 28 29class RedfishSessionContext: 30 def __init__(self, client, username="root", password="0penBmc"): 31 self.client = client 32 self.session_uri = None 33 self.x_auth_token = None 34 self.username = username 35 self.password = password 36 37 def __enter__(self): 38 r = self.client.post( 39 "/redfish/v1/SessionService/Sessions", 40 json={ 41 "UserName": self.username, 42 "Password": self.password, 43 "Context": f"pythonscript::{os.path.basename(__file__)}", 44 }, 45 headers={"content-type": "application/json"}, 46 ) 47 r.raise_for_status() 48 self.x_auth_token = r.headers["x-auth-token"] 49 self.session_uri = r.headers["location"] 50 return self 51 52 def __exit__(self, type, value, traceback): 53 if not self.session_uri: 54 return 55 r = self.client.delete(self.session_uri) 56 r.raise_for_status() 57 58 59def generateCA(): 60 private_key = ec.generate_private_key(ec.SECP256R1()) 61 public_key = private_key.public_key() 62 builder = x509.CertificateBuilder() 63 64 name = x509.Name( 65 [ 66 x509.NameAttribute(NameOID.ORGANIZATION_NAME, "OpenBMC"), 67 x509.NameAttribute(NameOID.ORGANIZATIONAL_UNIT_NAME, "bmcweb"), 68 x509.NameAttribute(NameOID.COMMON_NAME, "Test CA"), 69 ] 70 ) 71 builder = builder.subject_name(name) 72 builder = builder.issuer_name(name) 73 74 builder = builder.not_valid_before( 75 datetime.datetime(1970, 1, 1, 0, 0, tzinfo=datetime.timezone.utc) 76 ) 77 builder = builder.not_valid_after( 78 datetime.datetime(2070, 1, 1, 0, 0, tzinfo=datetime.timezone.utc) 79 ) 80 builder = builder.serial_number(x509.random_serial_number()) 81 builder = builder.public_key(public_key) 82 83 basic_constraints = x509.BasicConstraints(ca=True, path_length=None) 84 builder = builder.add_extension(basic_constraints, critical=True) 85 86 usage = x509.KeyUsage( 87 content_commitment=False, 88 crl_sign=True, 89 data_encipherment=False, 90 decipher_only=False, 91 digital_signature=False, 92 encipher_only=False, 93 key_agreement=False, 94 key_cert_sign=True, 95 key_encipherment=False, 96 ) 97 builder = builder.add_extension(usage, critical=False) 98 99 auth_key = x509.AuthorityKeyIdentifier.from_issuer_public_key(public_key) 100 101 builder = builder.add_extension(auth_key, critical=False) 102 103 root_cert = builder.sign( 104 private_key=private_key, algorithm=hashes.SHA256() 105 ) 106 107 return private_key, root_cert 108 109 110def signCsr(csr, ca_key): 111 csr.sign(ca_key, algorithm=hashes.SHA256()) 112 return 113 114 115def generate_client_key_and_cert(commonName, ca_cert, ca_key): 116 private_key = ec.generate_private_key(ec.SECP256R1()) 117 public_key = private_key.public_key() 118 builder = x509.CertificateBuilder() 119 120 builder = builder.subject_name( 121 x509.Name( 122 [ 123 x509.NameAttribute(NameOID.COUNTRY_NAME, "US"), 124 x509.NameAttribute( 125 NameOID.STATE_OR_PROVINCE_NAME, "California" 126 ), 127 x509.NameAttribute(NameOID.LOCALITY_NAME, "San Francisco"), 128 x509.NameAttribute(NameOID.ORGANIZATION_NAME, "OpenBMC"), 129 x509.NameAttribute(NameOID.ORGANIZATIONAL_UNIT_NAME, "bmcweb"), 130 x509.NameAttribute(NameOID.COMMON_NAME, commonName), 131 ] 132 ) 133 ) 134 135 builder = builder.issuer_name(ca_cert.subject) 136 builder = builder.public_key(public_key) 137 builder = builder.serial_number(x509.random_serial_number()) 138 builder = builder.not_valid_before( 139 datetime.datetime(1970, 1, 1, 0, 0, tzinfo=datetime.timezone.utc) 140 ) 141 builder = builder.not_valid_after( 142 datetime.datetime(2070, 1, 1, 0, 0, tzinfo=datetime.timezone.utc) 143 ) 144 145 usage = x509.KeyUsage( 146 content_commitment=False, 147 crl_sign=False, 148 data_encipherment=False, 149 decipher_only=False, 150 digital_signature=True, 151 encipher_only=False, 152 key_agreement=True, 153 key_cert_sign=False, 154 key_encipherment=False, 155 ) 156 builder = builder.add_extension(usage, critical=False) 157 158 exusage = x509.ExtendedKeyUsage([x509.oid.ExtendedKeyUsageOID.CLIENT_AUTH]) 159 builder = builder.add_extension(exusage, critical=True) 160 161 auth_key = x509.AuthorityKeyIdentifier.from_issuer_public_key(public_key) 162 builder = builder.add_extension(auth_key, critical=False) 163 164 signed = builder.sign(private_key=ca_key, algorithm=hashes.SHA256()) 165 166 return private_key, signed 167 168 169def generateServerCert(url, ca_key, ca_cert, csr): 170 builder = x509.CertificateBuilder() 171 172 builder = builder.subject_name(csr.subject) 173 builder = builder.issuer_name(ca_cert.subject) 174 builder = builder.public_key(csr.public_key()) 175 builder = builder.serial_number(x509.random_serial_number()) 176 builder = builder.not_valid_before( 177 datetime.datetime(1970, 1, 1, 0, 0, tzinfo=datetime.timezone.utc) 178 ) 179 builder = builder.not_valid_after( 180 datetime.datetime(2070, 1, 1, 0, 0, tzinfo=datetime.timezone.utc) 181 ) 182 183 usage = x509.KeyUsage( 184 content_commitment=False, 185 crl_sign=False, 186 data_encipherment=False, 187 decipher_only=False, 188 digital_signature=True, 189 encipher_only=False, 190 key_agreement=False, 191 key_cert_sign=True, 192 key_encipherment=True, 193 ) 194 builder = builder.add_extension(usage, critical=True) 195 196 exusage = x509.ExtendedKeyUsage([x509.oid.ExtendedKeyUsageOID.SERVER_AUTH]) 197 builder = builder.add_extension(exusage, critical=True) 198 199 san_list = [x509.DNSName("localhost")] 200 try: 201 value = ipaddress.ip_address(url) 202 san_list.append(x509.IPAddress(value)) 203 except ValueError: 204 san_list.append(x509.DNSName(url)) 205 206 altname = x509.SubjectAlternativeName(san_list) 207 builder = builder.add_extension(altname, critical=True) 208 basic_constraints = x509.BasicConstraints(ca=False, path_length=None) 209 builder = builder.add_extension(basic_constraints, critical=True) 210 211 builder = builder.add_extension( 212 x509.SubjectKeyIdentifier.from_public_key(ca_key.public_key()), 213 critical=False, 214 ) 215 authkeyident = x509.AuthorityKeyIdentifier.from_issuer_public_key( 216 ca_key.public_key() 217 ) 218 builder = builder.add_extension(authkeyident, critical=False) 219 220 signed = builder.sign(private_key=ca_key, algorithm=hashes.SHA256()) 221 222 return signed 223 224 225def generateCsr( 226 redfish_session, 227 commonName, 228 manager_uri, 229): 230 try: 231 socket.inet_aton(commonName) 232 commonName = "IP: " + commonName 233 except socket.error: 234 commonName = "DNS: " + commonName 235 236 CSRRequest = { 237 "CommonName": commonName, 238 "City": "San Fransisco", 239 "Country": "US", 240 "Organization": "", 241 "OrganizationalUnit": "", 242 "State": "CA", 243 "CertificateCollection": { 244 "@odata.id": f"{manager_uri}/NetworkProtocol/HTTPS/Certificates", 245 }, 246 "AlternativeNames": [ 247 commonName, 248 "DNS: localhost", 249 "IP: 127.0.0.1", 250 ], 251 } 252 253 response = redfish_session.post( 254 "/redfish/v1/CertificateService/Actions/CertificateService.GenerateCSR", 255 json=CSRRequest, 256 ) 257 response.raise_for_status() 258 259 csrString = response.json()["CSRString"] 260 csr = x509.load_pem_x509_csr(csrString.encode()) 261 if not csr.is_signature_valid: 262 raise Exception("CSR was not valid") 263 return csr 264 265 266def install_ca_cert(redfish_session, ca_cert_dump, manager_uri): 267 ca_certJSON = { 268 "CertificateString": ca_cert_dump.decode(), 269 "CertificateType": "PEM", 270 } 271 ca_certPath = f"{manager_uri}/Truststore/Certificates" 272 print("Attempting to install CA certificate to BMC.") 273 274 response = redfish_session.post(ca_certPath, json=ca_certJSON) 275 if response.status_code == 500: 276 print( 277 "An existing CA certificate is likely already installed." 278 " Replacing..." 279 ) 280 ca_certJSON["CertificateUri"] = { 281 "@odata.id": ca_certPath + "/1", 282 } 283 284 response = redfish_session.post(replaceCertPath, json=ca_certJSON) 285 if response.status_code == 200: 286 print("Successfully replaced existing CA certificate.") 287 else: 288 raise Exception( 289 "Could not install or replace CA certificate." 290 "Please check if a certificate is already installed. If a" 291 "certificate is already installed, try performing a factory" 292 "restore to clear such settings." 293 ) 294 response.raise_for_status() 295 print("Successfully installed CA certificate.") 296 297 298def install_server_cert(redfish_session, manager_uri, server_cert_dump): 299 300 server_cert_json = { 301 "CertificateString": server_cert_dump.decode(), 302 "CertificateUri": { 303 "@odata.id": f"{manager_uri}/NetworkProtocol/HTTPS/Certificates/1", 304 }, 305 "CertificateType": "PEM", 306 } 307 308 print("Replacing server certificate...") 309 response = redfish_session.post(replaceCertPath, json=server_cert_json) 310 if response.status_code == 200: 311 print("Successfully replaced server certificate.") 312 else: 313 raise Exception(f"Could not replace certificate: {response.json()}") 314 315 tls_patch_json = {"Oem": {"OpenBMC": {"AuthMethods": {"TLS": True}}}} 316 print("Ensuring TLS authentication is enabled.") 317 response = redfish_session.patch( 318 "/redfish/v1/AccountService", json=tls_patch_json 319 ) 320 if response.status_code == 200: 321 print("Successfully enabled TLS authentication.") 322 else: 323 raise Exception("Could not enable TLS auth: " + response.read) 324 325 326def generate_pk12(certs_dir, key, client_cert, username): 327 print("Generating p12 cert file for browser authentication.") 328 p12 = pkcs12.serialize_key_and_certificates( 329 username.encode(), 330 key, 331 client_cert, 332 None, 333 serialization.NoEncryption(), 334 ) 335 with open(os.path.join(certs_dir, "client.p12"), "wb") as f: 336 f.write(p12) 337 338 339def test_mtls_auth(url, certs_dir): 340 response = httpx.get( 341 f"https://{url}/redfish/v1/SessionService/Sessions", 342 verify=os.path.join(certs_dir, "CA-cert.cer"), 343 cert=( 344 os.path.join(certs_dir, "client-cert.pem"), 345 os.path.join(certs_dir, "client-key.pem"), 346 ), 347 ) 348 response.raise_for_status() 349 350 351def setup_server_cert( 352 redfish_session, 353 ca_cert_dump, 354 certs_dir, 355 client_key, 356 client_cert, 357 username, 358 url, 359 ca_key, 360 ca_cert, 361): 362 service_root = redfish_session.get("/redfish/v1/") 363 service_root.raise_for_status() 364 365 manager_uri = service_root.json()["Links"]["ManagerProvidingService"][ 366 "@odata.id" 367 ] 368 369 install_ca_cert(redfish_session, ca_cert_dump, manager_uri) 370 generate_pk12(certs_dir, client_key, client_cert, username) 371 372 csr = generateCsr( 373 redfish_session, 374 url, 375 manager_uri, 376 ) 377 serverCert = generateServerCert( 378 url, 379 ca_key, 380 ca_cert, 381 csr, 382 ) 383 server_cert_dump = serverCert.public_bytes( 384 encoding=serialization.Encoding.PEM 385 ) 386 with open(os.path.join(certs_dir, "server-cert.pem"), "wb") as f: 387 f.write(server_cert_dump) 388 print("Server cert generated.") 389 390 install_server_cert(redfish_session, manager_uri, server_cert_dump) 391 392 393def generate_and_load_certs(url, username, password): 394 certs_dir = os.path.expanduser("~/certs") 395 print(f"Writing certs to {certs_dir}") 396 try: 397 print("Making certs directory.") 398 os.mkdir(certs_dir) 399 except OSError as error: 400 if error.errno != errno.EEXIST: 401 raise 402 403 ca_cert_filename = os.path.join(certs_dir, "CA-cert.cer") 404 ca_key_filename = os.path.join(certs_dir, "CA-key.pem") 405 if not os.path.exists(ca_cert_filename): 406 ca_key, ca_cert = generateCA() 407 408 ca_key_dump = ca_key.private_bytes( 409 encoding=serialization.Encoding.PEM, 410 format=serialization.PrivateFormat.TraditionalOpenSSL, 411 encryption_algorithm=serialization.NoEncryption(), 412 ) 413 ca_cert_dump = ca_cert.public_bytes( 414 encoding=serialization.Encoding.PEM 415 ) 416 417 with open(ca_cert_filename, "wb") as f: 418 f.write(ca_cert_dump) 419 print("CA cert generated.") 420 with open(ca_key_filename, "wb") as f: 421 f.write(ca_key_dump) 422 print("CA key generated.") 423 424 with open(ca_cert_filename, "rb") as ca_cert_file: 425 ca_cert_dump = ca_cert_file.read() 426 ca_cert = x509.load_pem_x509_certificate(ca_cert_dump) 427 428 with open(ca_key_filename, "rb") as ca_key_file: 429 ca_key_dump = ca_key_file.read() 430 ca_key = load_pem_private_key(ca_key_dump, None) 431 432 client_key, client_cert = generate_client_key_and_cert( 433 username, ca_cert, ca_key 434 ) 435 client_key_dump = client_key.private_bytes( 436 encoding=serialization.Encoding.PEM, 437 format=serialization.PrivateFormat.TraditionalOpenSSL, 438 encryption_algorithm=serialization.NoEncryption(), 439 ) 440 441 with open(os.path.join(certs_dir, "client-key.pem"), "wb") as f: 442 f.write(client_key_dump) 443 print("Client key generated.") 444 client_cert_dump = client_cert.public_bytes( 445 encoding=serialization.Encoding.PEM 446 ) 447 448 with open(os.path.join(certs_dir, "client-cert.pem"), "wb") as f: 449 f.write(client_cert_dump) 450 print("Client cert generated.") 451 452 print(f"Connecting to {url}") 453 with httpx.Client( 454 base_url=f"https://{url}", verify=False, follow_redirects=False 455 ) as redfish_session: 456 with RedfishSessionContext( 457 redfish_session, username, password 458 ) as rf_session: 459 redfish_session.headers["X-Auth-Token"] = rf_session.x_auth_token 460 setup_server_cert( 461 redfish_session, 462 ca_cert_dump, 463 certs_dir, 464 client_key, 465 client_cert, 466 username, 467 url, 468 ca_key, 469 ca_cert, 470 ) 471 472 print("Testing redfish TLS authentication with generated certs.") 473 474 time.sleep(2) 475 test_mtls_auth(url, certs_dir) 476 print("Redfish TLS authentication success!") 477 478 479def main(): 480 parser = argparse.ArgumentParser() 481 parser.add_argument( 482 "--username", 483 help="Username to connect with", 484 default="root", 485 ) 486 parser.add_argument( 487 "--password", 488 help="Password for user in order to install certs over Redfish.", 489 default="0penBmc", 490 ) 491 parser.add_argument("host", help="Host to connect to") 492 493 args = parser.parse_args() 494 generate_and_load_certs(args.host, args.username, args.password) 495 496 497if __name__ == "__main__": 498 main() 499