#!/usr/bin/env python3 """ Script to generate certificates for a CA, server, and client allowing for client authentication using mTLS certificates. This can then be used to test mTLS client authentication for Redfish and WebUI. """ import argparse import datetime import errno import ipaddress import os import socket import time from typing import Optional import asn1 import httpx from cryptography import x509 from cryptography.hazmat.primitives import hashes, serialization from cryptography.hazmat.primitives.asymmetric import ec from cryptography.hazmat.primitives.serialization import ( load_pem_private_key, pkcs12, ) from cryptography.x509.oid import NameOID replaceCertPath = "/redfish/v1/CertificateService/Actions/CertificateService.ReplaceCertificate" # https://oidref.com/1.3.6.1.4.1.311.20.2.3 upnObjectIdentifier = "1.3.6.1.4.1.311.20.2.3" class RedfishSessionContext: def __init__(self, client, username="root", password="0penBmc"): self.client = client self.session_uri = None self.x_auth_token = None self.username = username self.password = password def __enter__(self): r = self.client.post( "/redfish/v1/SessionService/Sessions", json={ "UserName": self.username, "Password": self.password, "Context": f"pythonscript::{os.path.basename(__file__)}", }, headers={"content-type": "application/json"}, ) r.raise_for_status() self.x_auth_token = r.headers["x-auth-token"] self.session_uri = r.headers["location"] return self def __exit__(self, type, value, traceback): if not self.session_uri: return r = self.client.delete(self.session_uri) r.raise_for_status() def get_hostname(redfish_session, username, password, url): service_root = redfish_session.get("/redfish/v1/") service_root.raise_for_status() manager_uri = service_root.json()["Links"]["ManagerProvidingService"][ "@odata.id" ] manager_response = redfish_session.get(manager_uri) manager_response.raise_for_status() network_protocol_uri = manager_response.json()["NetworkProtocol"][ "@odata.id" ] network_protocol_response = redfish_session.get(network_protocol_uri) network_protocol_response.raise_for_status() hostname = network_protocol_response.json()["HostName"] return hostname def generateCA(): private_key = ec.generate_private_key(ec.SECP256R1()) public_key = private_key.public_key() builder = x509.CertificateBuilder() name = x509.Name( [ x509.NameAttribute(NameOID.ORGANIZATION_NAME, "OpenBMC"), x509.NameAttribute(NameOID.ORGANIZATIONAL_UNIT_NAME, "bmcweb"), x509.NameAttribute(NameOID.COMMON_NAME, "Test CA"), ] ) builder = builder.subject_name(name) builder = builder.issuer_name(name) builder = builder.not_valid_before( datetime.datetime(1970, 1, 1, 0, 0, tzinfo=datetime.timezone.utc) ) builder = builder.not_valid_after( datetime.datetime(2070, 1, 1, 0, 0, tzinfo=datetime.timezone.utc) ) builder = builder.serial_number(x509.random_serial_number()) builder = builder.public_key(public_key) basic_constraints = x509.BasicConstraints(ca=True, path_length=None) builder = builder.add_extension(basic_constraints, critical=True) usage = x509.KeyUsage( content_commitment=False, crl_sign=True, data_encipherment=False, decipher_only=False, digital_signature=False, encipher_only=False, key_agreement=False, key_cert_sign=True, key_encipherment=False, ) builder = builder.add_extension(usage, critical=False) auth_key = x509.AuthorityKeyIdentifier.from_issuer_public_key(public_key) builder = builder.add_extension(auth_key, critical=False) root_cert = builder.sign( private_key=private_key, algorithm=hashes.SHA256() ) return private_key, root_cert def signCsr(csr, ca_key): csr.sign(ca_key, algorithm=hashes.SHA256()) return def generate_client_key_and_cert( ca_cert, ca_key, common_name: Optional[str] = None, upn: Optional[str] = None, ): private_key = ec.generate_private_key(ec.SECP256R1()) public_key = private_key.public_key() builder = x509.CertificateBuilder() cert_names = [ x509.NameAttribute(NameOID.COUNTRY_NAME, "US"), x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, "California"), x509.NameAttribute(NameOID.LOCALITY_NAME, "San Francisco"), x509.NameAttribute(NameOID.ORGANIZATION_NAME, "OpenBMC"), x509.NameAttribute(NameOID.ORGANIZATIONAL_UNIT_NAME, "bmcweb"), ] if common_name is not None: cert_names.append(x509.NameAttribute(NameOID.COMMON_NAME, common_name)) builder = builder.subject_name(x509.Name(cert_names)) builder = builder.issuer_name(ca_cert.subject) builder = builder.public_key(public_key) builder = builder.serial_number(x509.random_serial_number()) builder = builder.not_valid_before( datetime.datetime(1970, 1, 1, 0, 0, tzinfo=datetime.timezone.utc) ) builder = builder.not_valid_after( datetime.datetime(2070, 1, 1, 0, 0, tzinfo=datetime.timezone.utc) ) usage = x509.KeyUsage( content_commitment=False, crl_sign=False, data_encipherment=False, decipher_only=False, digital_signature=True, encipher_only=False, key_agreement=True, key_cert_sign=False, key_encipherment=False, ) builder = builder.add_extension(usage, critical=False) exusage = x509.ExtendedKeyUsage([x509.oid.ExtendedKeyUsageOID.CLIENT_AUTH]) builder = builder.add_extension(exusage, critical=True) auth_key = x509.AuthorityKeyIdentifier.from_issuer_public_key(public_key) builder = builder.add_extension(auth_key, critical=False) if upn is not None: encoder = asn1.Encoder() encoder.start() encoder.write(upn, asn1.Numbers.UTF8String) builder = builder.add_extension( x509.SubjectAlternativeName( [ x509.OtherName( x509.ObjectIdentifier(upnObjectIdentifier), encoder.output(), ) ] ), critical=False, ) signed = builder.sign(private_key=ca_key, algorithm=hashes.SHA256()) return private_key, signed def generateServerCert(url, ca_key, ca_cert, csr): builder = x509.CertificateBuilder() builder = builder.subject_name(csr.subject) builder = builder.issuer_name(ca_cert.subject) builder = builder.public_key(csr.public_key()) builder = builder.serial_number(x509.random_serial_number()) builder = builder.not_valid_before( datetime.datetime(1970, 1, 1, 0, 0, tzinfo=datetime.timezone.utc) ) builder = builder.not_valid_after( datetime.datetime(2070, 1, 1, 0, 0, tzinfo=datetime.timezone.utc) ) usage = x509.KeyUsage( content_commitment=False, crl_sign=False, data_encipherment=False, decipher_only=False, digital_signature=True, encipher_only=False, key_agreement=False, key_cert_sign=True, key_encipherment=True, ) builder = builder.add_extension(usage, critical=True) exusage = x509.ExtendedKeyUsage([x509.oid.ExtendedKeyUsageOID.SERVER_AUTH]) builder = builder.add_extension(exusage, critical=True) san_list = [x509.DNSName("localhost")] try: value = ipaddress.ip_address(url) san_list.append(x509.IPAddress(value)) except ValueError: san_list.append(x509.DNSName(url)) altname = x509.SubjectAlternativeName(san_list) builder = builder.add_extension(altname, critical=True) basic_constraints = x509.BasicConstraints(ca=False, path_length=None) builder = builder.add_extension(basic_constraints, critical=True) builder = builder.add_extension( x509.SubjectKeyIdentifier.from_public_key(ca_key.public_key()), critical=False, ) authkeyident = x509.AuthorityKeyIdentifier.from_issuer_public_key( ca_key.public_key() ) builder = builder.add_extension(authkeyident, critical=False) signed = builder.sign(private_key=ca_key, algorithm=hashes.SHA256()) return signed def generateCsr( redfish_session, commonName, manager_uri, ): try: socket.inet_aton(commonName) commonName = "IP: " + commonName except socket.error: commonName = "DNS: " + commonName CSRRequest = { "CommonName": commonName, "City": "San Fransisco", "Country": "US", "Organization": "", "OrganizationalUnit": "", "State": "CA", "CertificateCollection": { "@odata.id": f"{manager_uri}/NetworkProtocol/HTTPS/Certificates", }, "AlternativeNames": [ commonName, "DNS: localhost", "IP: 127.0.0.1", ], } response = redfish_session.post( "/redfish/v1/CertificateService/Actions/CertificateService.GenerateCSR", json=CSRRequest, ) response.raise_for_status() csrString = response.json()["CSRString"] csr = x509.load_pem_x509_csr(csrString.encode()) if not csr.is_signature_valid: raise Exception("CSR was not valid") return csr def install_ca_cert(redfish_session, ca_cert_dump, manager_uri): ca_certJSON = { "CertificateString": ca_cert_dump.decode(), "CertificateType": "PEM", } ca_certPath = f"{manager_uri}/Truststore/Certificates" print("Attempting to install CA certificate to BMC.") response = redfish_session.post(ca_certPath, json=ca_certJSON) if response.status_code == 500: print( "An existing CA certificate is likely already installed." " Replacing..." ) ca_certJSON["CertificateUri"] = { "@odata.id": ca_certPath + "/1", } response = redfish_session.post(replaceCertPath, json=ca_certJSON) if response.status_code == 200: print("Successfully replaced existing CA certificate.") else: raise Exception( "Could not install or replace CA certificate." "Please check if a certificate is already installed. If a" "certificate is already installed, try performing a factory" "restore to clear such settings." ) response.raise_for_status() print("Successfully installed CA certificate.") def install_server_cert(redfish_session, manager_uri, server_cert_dump): server_cert_json = { "CertificateString": server_cert_dump.decode(), "CertificateUri": { "@odata.id": f"{manager_uri}/NetworkProtocol/HTTPS/Certificates/1", }, "CertificateType": "PEM", } print("Replacing server certificate...") response = redfish_session.post(replaceCertPath, json=server_cert_json) if response.status_code == 200: print("Successfully replaced server certificate.") else: raise Exception(f"Could not replace certificate: {response.json()}") tls_patch_json = {"Oem": {"OpenBMC": {"AuthMethods": {"TLS": True}}}} print("Ensuring TLS authentication is enabled.") response = redfish_session.patch( "/redfish/v1/AccountService", json=tls_patch_json ) if response.status_code == 200: print("Successfully enabled TLS authentication.") else: raise Exception("Could not enable TLS auth: " + response.read) def generate_pk12(certs_dir, key, client_cert, username): print("Generating p12 cert file for browser authentication.") p12 = pkcs12.serialize_key_and_certificates( username.encode(), key, client_cert, None, serialization.NoEncryption(), ) with open(os.path.join(certs_dir, "client.p12"), "wb") as f: f.write(p12) def test_mtls_auth(url, certs_dir): with httpx.Client( base_url=f"https://{url}", verify=os.path.join(certs_dir, "CA-cert.cer"), cert=( os.path.join(certs_dir, "client-cert.pem"), os.path.join(certs_dir, "client-key.pem"), ), ) as client: print("Testing mTLS auth with CommonName") response = client.get( "/redfish/v1/SessionService/Sessions", ) response.raise_for_status() print("Changing CertificateMappingAttribute to use UPN") patch_json = { "MultiFactorAuth": { "ClientCertificate": { "CertificateMappingAttribute": "UserPrincipalName" } } } response = client.patch( "/redfish/v1/AccountService", json=patch_json, ) response.raise_for_status() with httpx.Client( base_url=f"https://{url}", verify=os.path.join(certs_dir, "CA-cert.cer"), cert=( os.path.join(certs_dir, "upn-client-cert.pem"), os.path.join(certs_dir, "upn-client-key.pem"), ), ) as client: print("Retesting mTLS auth with UPN") response = client.get( "/redfish/v1/SessionService/Sessions", ) response.raise_for_status() print("Changing CertificateMappingAttribute to use CommonName") patch_json = { "MultiFactorAuth": { "ClientCertificate": { "CertificateMappingAttribute": "CommonName" } } } response = client.patch( "/redfish/v1/AccountService", json=patch_json, ) response.raise_for_status() def setup_server_cert( redfish_session, ca_cert_dump, certs_dir, client_key, client_cert, username, url, ca_key, ca_cert, ): service_root = redfish_session.get("/redfish/v1/") service_root.raise_for_status() manager_uri = service_root.json()["Links"]["ManagerProvidingService"][ "@odata.id" ] install_ca_cert(redfish_session, ca_cert_dump, manager_uri) generate_pk12(certs_dir, client_key, client_cert, username) csr = generateCsr( redfish_session, url, manager_uri, ) serverCert = generateServerCert( url, ca_key, ca_cert, csr, ) server_cert_dump = serverCert.public_bytes( encoding=serialization.Encoding.PEM ) with open(os.path.join(certs_dir, "server-cert.pem"), "wb") as f: f.write(server_cert_dump) print("Server cert generated.") install_server_cert(redfish_session, manager_uri, server_cert_dump) print("Make sure setting CertificateMappingAttribute to CommonName") patch_json = { "MultiFactorAuth": { "ClientCertificate": {"CertificateMappingAttribute": "CommonName"} } } response = redfish_session.patch( "/redfish/v1/AccountService", json=patch_json ) response.raise_for_status() def generate_and_load_certs(url, username, password): certs_dir = os.path.expanduser("~/certs") print(f"Writing certs to {certs_dir}") try: print("Making certs directory.") os.mkdir(certs_dir) except OSError as error: if error.errno != errno.EEXIST: raise ca_cert_filename = os.path.join(certs_dir, "CA-cert.cer") ca_key_filename = os.path.join(certs_dir, "CA-key.pem") if not os.path.exists(ca_cert_filename): ca_key, ca_cert = generateCA() ca_key_dump = ca_key.private_bytes( encoding=serialization.Encoding.PEM, format=serialization.PrivateFormat.TraditionalOpenSSL, encryption_algorithm=serialization.NoEncryption(), ) ca_cert_dump = ca_cert.public_bytes( encoding=serialization.Encoding.PEM ) with open(ca_cert_filename, "wb") as f: f.write(ca_cert_dump) print("CA cert generated.") with open(ca_key_filename, "wb") as f: f.write(ca_key_dump) print("CA key generated.") with open(ca_cert_filename, "rb") as ca_cert_file: ca_cert_dump = ca_cert_file.read() ca_cert = x509.load_pem_x509_certificate(ca_cert_dump) with open(ca_key_filename, "rb") as ca_key_file: ca_key_dump = ca_key_file.read() ca_key = load_pem_private_key(ca_key_dump, None) client_key, client_cert = generate_client_key_and_cert( ca_cert, ca_key, common_name=username ) client_key_dump = client_key.private_bytes( encoding=serialization.Encoding.PEM, format=serialization.PrivateFormat.TraditionalOpenSSL, encryption_algorithm=serialization.NoEncryption(), ) with open(os.path.join(certs_dir, "client-key.pem"), "wb") as f: f.write(client_key_dump) print("Client key generated.") client_cert_dump = client_cert.public_bytes( encoding=serialization.Encoding.PEM ) with open(os.path.join(certs_dir, "client-cert.pem"), "wb") as f: f.write(client_cert_dump) print("Client cert generated.") print(f"Connecting to {url}") with httpx.Client( base_url=f"https://{url}", verify=False, follow_redirects=False ) as redfish_session: with RedfishSessionContext( redfish_session, username, password ) as rf_session: redfish_session.headers["X-Auth-Token"] = rf_session.x_auth_token hostname = get_hostname(redfish_session, username, password, url) print(f"Hostname: {hostname}") upn_client_key, upn_client_cert = generate_client_key_and_cert( ca_cert, ca_key, upn=f"{username}@{hostname}", ) upn_client_key_dump = upn_client_key.private_bytes( encoding=serialization.Encoding.PEM, format=serialization.PrivateFormat.TraditionalOpenSSL, encryption_algorithm=serialization.NoEncryption(), ) with open( os.path.join(certs_dir, "upn-client-key.pem"), "wb" ) as f: f.write(upn_client_key_dump) print("UPN client key generated.") upn_client_cert_dump = upn_client_cert.public_bytes( encoding=serialization.Encoding.PEM ) with open( os.path.join(certs_dir, "upn-client-cert.pem"), "wb" ) as f: f.write(upn_client_cert_dump) print("UPN client cert generated.") setup_server_cert( redfish_session, ca_cert_dump, certs_dir, client_key, client_cert, username, url, ca_key, ca_cert, ) print("Testing redfish TLS authentication with generated certs.") time.sleep(2) test_mtls_auth(url, certs_dir) print("Redfish TLS authentication success!") def main(): parser = argparse.ArgumentParser() parser.add_argument( "--username", help="Username to connect with", default="root", ) parser.add_argument( "--password", help="Password for user in order to install certs over Redfish.", default="0penBmc", ) parser.add_argument("host", help="Host to connect to") args = parser.parse_args() generate_and_load_certs(args.host, args.username, args.password) if __name__ == "__main__": main()