1#!/usr/bin/env python3 2""" 3Script to generate certificates for a CA, server, and client allowing for 4client authentication using mTLS certificates. This can then be used to test 5mTLS client authentication for Redfish and WebUI. 6""" 7 8import argparse 9import datetime 10import errno 11import ipaddress 12import os 13import socket 14import time 15from typing import Optional 16 17import asn1 18import httpx 19from cryptography import x509 20from cryptography.hazmat.primitives import hashes, serialization 21from cryptography.hazmat.primitives.asymmetric import ec 22from cryptography.hazmat.primitives.serialization import ( 23 load_pem_private_key, 24 pkcs12, 25) 26from cryptography.x509.oid import NameOID 27 28replaceCertPath = "/redfish/v1/CertificateService/Actions/CertificateService.ReplaceCertificate" 29# https://oidref.com/1.3.6.1.4.1.311.20.2.3 30upnObjectIdentifier = "1.3.6.1.4.1.311.20.2.3" 31 32 33class RedfishSessionContext: 34 def __init__(self, client, username="root", password="0penBmc"): 35 self.client = client 36 self.session_uri = None 37 self.x_auth_token = None 38 self.username = username 39 self.password = password 40 41 def __enter__(self): 42 r = self.client.post( 43 "/redfish/v1/SessionService/Sessions", 44 json={ 45 "UserName": self.username, 46 "Password": self.password, 47 "Context": f"pythonscript::{os.path.basename(__file__)}", 48 }, 49 headers={"content-type": "application/json"}, 50 ) 51 r.raise_for_status() 52 self.x_auth_token = r.headers["x-auth-token"] 53 self.session_uri = r.headers["location"] 54 return self 55 56 def __exit__(self, type, value, traceback): 57 if not self.session_uri: 58 return 59 r = self.client.delete(self.session_uri) 60 r.raise_for_status() 61 62 63def get_hostname(redfish_session, username, password, url): 64 service_root = redfish_session.get("/redfish/v1/") 65 service_root.raise_for_status() 66 67 manager_uri = service_root.json()["Links"]["ManagerProvidingService"][ 68 "@odata.id" 69 ] 70 71 manager_response = redfish_session.get(manager_uri) 72 manager_response.raise_for_status() 73 74 network_protocol_uri = manager_response.json()["NetworkProtocol"][ 75 "@odata.id" 76 ] 77 78 network_protocol_response = redfish_session.get(network_protocol_uri) 79 network_protocol_response.raise_for_status() 80 81 hostname = network_protocol_response.json()["HostName"] 82 83 return hostname 84 85 86def generateCA(): 87 private_key = ec.generate_private_key(ec.SECP256R1()) 88 public_key = private_key.public_key() 89 builder = x509.CertificateBuilder() 90 91 name = x509.Name( 92 [ 93 x509.NameAttribute(NameOID.ORGANIZATION_NAME, "OpenBMC"), 94 x509.NameAttribute(NameOID.ORGANIZATIONAL_UNIT_NAME, "bmcweb"), 95 x509.NameAttribute(NameOID.COMMON_NAME, "Test CA"), 96 ] 97 ) 98 builder = builder.subject_name(name) 99 builder = builder.issuer_name(name) 100 101 builder = builder.not_valid_before( 102 datetime.datetime(1970, 1, 1, 0, 0, tzinfo=datetime.timezone.utc) 103 ) 104 builder = builder.not_valid_after( 105 datetime.datetime(2070, 1, 1, 0, 0, tzinfo=datetime.timezone.utc) 106 ) 107 builder = builder.serial_number(x509.random_serial_number()) 108 builder = builder.public_key(public_key) 109 110 basic_constraints = x509.BasicConstraints(ca=True, path_length=None) 111 builder = builder.add_extension(basic_constraints, critical=True) 112 113 usage = x509.KeyUsage( 114 content_commitment=False, 115 crl_sign=True, 116 data_encipherment=False, 117 decipher_only=False, 118 digital_signature=False, 119 encipher_only=False, 120 key_agreement=False, 121 key_cert_sign=True, 122 key_encipherment=False, 123 ) 124 builder = builder.add_extension(usage, critical=False) 125 126 auth_key = x509.AuthorityKeyIdentifier.from_issuer_public_key(public_key) 127 128 builder = builder.add_extension(auth_key, critical=False) 129 130 root_cert = builder.sign( 131 private_key=private_key, algorithm=hashes.SHA256() 132 ) 133 134 return private_key, root_cert 135 136 137def signCsr(csr, ca_key): 138 csr.sign(ca_key, algorithm=hashes.SHA256()) 139 return 140 141 142def generate_client_key_and_cert( 143 ca_cert, 144 ca_key, 145 common_name: Optional[str] = None, 146 upn: Optional[str] = None, 147): 148 private_key = ec.generate_private_key(ec.SECP256R1()) 149 public_key = private_key.public_key() 150 builder = x509.CertificateBuilder() 151 152 cert_names = [ 153 x509.NameAttribute(NameOID.COUNTRY_NAME, "US"), 154 x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, "California"), 155 x509.NameAttribute(NameOID.LOCALITY_NAME, "San Francisco"), 156 x509.NameAttribute(NameOID.ORGANIZATION_NAME, "OpenBMC"), 157 x509.NameAttribute(NameOID.ORGANIZATIONAL_UNIT_NAME, "bmcweb"), 158 ] 159 if common_name is not None: 160 cert_names.append(x509.NameAttribute(NameOID.COMMON_NAME, common_name)) 161 162 builder = builder.subject_name(x509.Name(cert_names)) 163 164 builder = builder.issuer_name(ca_cert.subject) 165 builder = builder.public_key(public_key) 166 builder = builder.serial_number(x509.random_serial_number()) 167 builder = builder.not_valid_before( 168 datetime.datetime(1970, 1, 1, 0, 0, tzinfo=datetime.timezone.utc) 169 ) 170 builder = builder.not_valid_after( 171 datetime.datetime(2070, 1, 1, 0, 0, tzinfo=datetime.timezone.utc) 172 ) 173 174 usage = x509.KeyUsage( 175 content_commitment=False, 176 crl_sign=False, 177 data_encipherment=False, 178 decipher_only=False, 179 digital_signature=True, 180 encipher_only=False, 181 key_agreement=True, 182 key_cert_sign=False, 183 key_encipherment=False, 184 ) 185 builder = builder.add_extension(usage, critical=False) 186 187 exusage = x509.ExtendedKeyUsage([x509.oid.ExtendedKeyUsageOID.CLIENT_AUTH]) 188 builder = builder.add_extension(exusage, critical=True) 189 190 auth_key = x509.AuthorityKeyIdentifier.from_issuer_public_key(public_key) 191 builder = builder.add_extension(auth_key, critical=False) 192 193 if upn is not None: 194 encoder = asn1.Encoder() 195 encoder.start() 196 encoder.write(upn, asn1.Numbers.UTF8String) 197 198 builder = builder.add_extension( 199 x509.SubjectAlternativeName( 200 [ 201 x509.OtherName( 202 x509.ObjectIdentifier(upnObjectIdentifier), 203 encoder.output(), 204 ) 205 ] 206 ), 207 critical=False, 208 ) 209 210 signed = builder.sign(private_key=ca_key, algorithm=hashes.SHA256()) 211 212 return private_key, signed 213 214 215def generateServerCert(url, ca_key, ca_cert, csr): 216 builder = x509.CertificateBuilder() 217 218 builder = builder.subject_name(csr.subject) 219 builder = builder.issuer_name(ca_cert.subject) 220 builder = builder.public_key(csr.public_key()) 221 builder = builder.serial_number(x509.random_serial_number()) 222 builder = builder.not_valid_before( 223 datetime.datetime(1970, 1, 1, 0, 0, tzinfo=datetime.timezone.utc) 224 ) 225 builder = builder.not_valid_after( 226 datetime.datetime(2070, 1, 1, 0, 0, tzinfo=datetime.timezone.utc) 227 ) 228 229 usage = x509.KeyUsage( 230 content_commitment=False, 231 crl_sign=False, 232 data_encipherment=False, 233 decipher_only=False, 234 digital_signature=True, 235 encipher_only=False, 236 key_agreement=False, 237 key_cert_sign=True, 238 key_encipherment=True, 239 ) 240 builder = builder.add_extension(usage, critical=True) 241 242 exusage = x509.ExtendedKeyUsage([x509.oid.ExtendedKeyUsageOID.SERVER_AUTH]) 243 builder = builder.add_extension(exusage, critical=True) 244 245 san_list = [x509.DNSName("localhost")] 246 try: 247 value = ipaddress.ip_address(url) 248 san_list.append(x509.IPAddress(value)) 249 except ValueError: 250 san_list.append(x509.DNSName(url)) 251 252 altname = x509.SubjectAlternativeName(san_list) 253 builder = builder.add_extension(altname, critical=True) 254 basic_constraints = x509.BasicConstraints(ca=False, path_length=None) 255 builder = builder.add_extension(basic_constraints, critical=True) 256 257 builder = builder.add_extension( 258 x509.SubjectKeyIdentifier.from_public_key(ca_key.public_key()), 259 critical=False, 260 ) 261 authkeyident = x509.AuthorityKeyIdentifier.from_issuer_public_key( 262 ca_key.public_key() 263 ) 264 builder = builder.add_extension(authkeyident, critical=False) 265 266 signed = builder.sign(private_key=ca_key, algorithm=hashes.SHA256()) 267 268 return signed 269 270 271def generateCsr( 272 redfish_session, 273 commonName, 274 manager_uri, 275): 276 try: 277 socket.inet_aton(commonName) 278 commonName = "IP: " + commonName 279 except socket.error: 280 commonName = "DNS: " + commonName 281 282 CSRRequest = { 283 "CommonName": commonName, 284 "City": "San Fransisco", 285 "Country": "US", 286 "Organization": "", 287 "OrganizationalUnit": "", 288 "State": "CA", 289 "CertificateCollection": { 290 "@odata.id": f"{manager_uri}/NetworkProtocol/HTTPS/Certificates", 291 }, 292 "AlternativeNames": [ 293 commonName, 294 "DNS: localhost", 295 "IP: 127.0.0.1", 296 ], 297 } 298 299 response = redfish_session.post( 300 "/redfish/v1/CertificateService/Actions/CertificateService.GenerateCSR", 301 json=CSRRequest, 302 ) 303 response.raise_for_status() 304 305 csrString = response.json()["CSRString"] 306 csr = x509.load_pem_x509_csr(csrString.encode()) 307 if not csr.is_signature_valid: 308 raise Exception("CSR was not valid") 309 return csr 310 311 312def install_ca_cert(redfish_session, ca_cert_dump, manager_uri): 313 ca_certJSON = { 314 "CertificateString": ca_cert_dump.decode(), 315 "CertificateType": "PEM", 316 } 317 ca_certPath = f"{manager_uri}/Truststore/Certificates" 318 print("Attempting to install CA certificate to BMC.") 319 320 response = redfish_session.post(ca_certPath, json=ca_certJSON) 321 if response.status_code == 500: 322 print( 323 "An existing CA certificate is likely already installed." 324 " Replacing..." 325 ) 326 ca_certJSON["CertificateUri"] = { 327 "@odata.id": ca_certPath + "/1", 328 } 329 330 response = redfish_session.post(replaceCertPath, json=ca_certJSON) 331 if response.status_code == 200: 332 print("Successfully replaced existing CA certificate.") 333 else: 334 raise Exception( 335 "Could not install or replace CA certificate." 336 "Please check if a certificate is already installed. If a" 337 "certificate is already installed, try performing a factory" 338 "restore to clear such settings." 339 ) 340 response.raise_for_status() 341 print("Successfully installed CA certificate.") 342 343 344def install_server_cert(redfish_session, manager_uri, server_cert_dump): 345 346 server_cert_json = { 347 "CertificateString": server_cert_dump.decode(), 348 "CertificateUri": { 349 "@odata.id": f"{manager_uri}/NetworkProtocol/HTTPS/Certificates/1", 350 }, 351 "CertificateType": "PEM", 352 } 353 354 print("Replacing server certificate...") 355 response = redfish_session.post(replaceCertPath, json=server_cert_json) 356 if response.status_code == 200: 357 print("Successfully replaced server certificate.") 358 else: 359 raise Exception(f"Could not replace certificate: {response.json()}") 360 361 tls_patch_json = {"Oem": {"OpenBMC": {"AuthMethods": {"TLS": True}}}} 362 print("Ensuring TLS authentication is enabled.") 363 response = redfish_session.patch( 364 "/redfish/v1/AccountService", json=tls_patch_json 365 ) 366 if response.status_code == 200: 367 print("Successfully enabled TLS authentication.") 368 else: 369 raise Exception("Could not enable TLS auth: " + response.read) 370 371 372def generate_pk12(certs_dir, key, client_cert, username): 373 print("Generating p12 cert file for browser authentication.") 374 p12 = pkcs12.serialize_key_and_certificates( 375 username.encode(), 376 key, 377 client_cert, 378 None, 379 serialization.NoEncryption(), 380 ) 381 with open(os.path.join(certs_dir, "client.p12"), "wb") as f: 382 f.write(p12) 383 384 385def test_mtls_auth(url, certs_dir): 386 with httpx.Client( 387 base_url=f"https://{url}", 388 verify=os.path.join(certs_dir, "CA-cert.cer"), 389 cert=( 390 os.path.join(certs_dir, "client-cert.pem"), 391 os.path.join(certs_dir, "client-key.pem"), 392 ), 393 ) as client: 394 print("Testing mTLS auth with CommonName") 395 response = client.get( 396 "/redfish/v1/SessionService/Sessions", 397 ) 398 response.raise_for_status() 399 400 print("Changing CertificateMappingAttribute to use UPN") 401 patch_json = { 402 "MultiFactorAuth": { 403 "ClientCertificate": { 404 "CertificateMappingAttribute": "UserPrincipalName" 405 } 406 } 407 } 408 response = client.patch( 409 "/redfish/v1/AccountService", 410 json=patch_json, 411 ) 412 response.raise_for_status() 413 414 with httpx.Client( 415 base_url=f"https://{url}", 416 verify=os.path.join(certs_dir, "CA-cert.cer"), 417 cert=( 418 os.path.join(certs_dir, "upn-client-cert.pem"), 419 os.path.join(certs_dir, "upn-client-key.pem"), 420 ), 421 ) as client: 422 print("Retesting mTLS auth with UPN") 423 response = client.get( 424 "/redfish/v1/SessionService/Sessions", 425 ) 426 response.raise_for_status() 427 428 print("Changing CertificateMappingAttribute to use CommonName") 429 patch_json = { 430 "MultiFactorAuth": { 431 "ClientCertificate": { 432 "CertificateMappingAttribute": "CommonName" 433 } 434 } 435 } 436 response = client.patch( 437 "/redfish/v1/AccountService", 438 json=patch_json, 439 ) 440 response.raise_for_status() 441 442 443def setup_server_cert( 444 redfish_session, 445 ca_cert_dump, 446 certs_dir, 447 client_key, 448 client_cert, 449 username, 450 url, 451 ca_key, 452 ca_cert, 453): 454 service_root = redfish_session.get("/redfish/v1/") 455 service_root.raise_for_status() 456 457 manager_uri = service_root.json()["Links"]["ManagerProvidingService"][ 458 "@odata.id" 459 ] 460 461 install_ca_cert(redfish_session, ca_cert_dump, manager_uri) 462 generate_pk12(certs_dir, client_key, client_cert, username) 463 464 csr = generateCsr( 465 redfish_session, 466 url, 467 manager_uri, 468 ) 469 serverCert = generateServerCert( 470 url, 471 ca_key, 472 ca_cert, 473 csr, 474 ) 475 server_cert_dump = serverCert.public_bytes( 476 encoding=serialization.Encoding.PEM 477 ) 478 with open(os.path.join(certs_dir, "server-cert.pem"), "wb") as f: 479 f.write(server_cert_dump) 480 print("Server cert generated.") 481 482 install_server_cert(redfish_session, manager_uri, server_cert_dump) 483 484 print("Make sure setting CertificateMappingAttribute to CommonName") 485 patch_json = { 486 "MultiFactorAuth": { 487 "ClientCertificate": {"CertificateMappingAttribute": "CommonName"} 488 } 489 } 490 response = redfish_session.patch( 491 "/redfish/v1/AccountService", json=patch_json 492 ) 493 response.raise_for_status() 494 495 496def generate_and_load_certs(url, username, password): 497 certs_dir = os.path.expanduser("~/certs") 498 print(f"Writing certs to {certs_dir}") 499 try: 500 print("Making certs directory.") 501 os.mkdir(certs_dir) 502 except OSError as error: 503 if error.errno != errno.EEXIST: 504 raise 505 506 ca_cert_filename = os.path.join(certs_dir, "CA-cert.cer") 507 ca_key_filename = os.path.join(certs_dir, "CA-key.pem") 508 if not os.path.exists(ca_cert_filename): 509 ca_key, ca_cert = generateCA() 510 511 ca_key_dump = ca_key.private_bytes( 512 encoding=serialization.Encoding.PEM, 513 format=serialization.PrivateFormat.TraditionalOpenSSL, 514 encryption_algorithm=serialization.NoEncryption(), 515 ) 516 ca_cert_dump = ca_cert.public_bytes( 517 encoding=serialization.Encoding.PEM 518 ) 519 520 with open(ca_cert_filename, "wb") as f: 521 f.write(ca_cert_dump) 522 print("CA cert generated.") 523 with open(ca_key_filename, "wb") as f: 524 f.write(ca_key_dump) 525 print("CA key generated.") 526 527 with open(ca_cert_filename, "rb") as ca_cert_file: 528 ca_cert_dump = ca_cert_file.read() 529 ca_cert = x509.load_pem_x509_certificate(ca_cert_dump) 530 531 with open(ca_key_filename, "rb") as ca_key_file: 532 ca_key_dump = ca_key_file.read() 533 ca_key = load_pem_private_key(ca_key_dump, None) 534 535 client_key, client_cert = generate_client_key_and_cert( 536 ca_cert, ca_key, common_name=username 537 ) 538 client_key_dump = client_key.private_bytes( 539 encoding=serialization.Encoding.PEM, 540 format=serialization.PrivateFormat.TraditionalOpenSSL, 541 encryption_algorithm=serialization.NoEncryption(), 542 ) 543 544 with open(os.path.join(certs_dir, "client-key.pem"), "wb") as f: 545 f.write(client_key_dump) 546 print("Client key generated.") 547 client_cert_dump = client_cert.public_bytes( 548 encoding=serialization.Encoding.PEM 549 ) 550 551 with open(os.path.join(certs_dir, "client-cert.pem"), "wb") as f: 552 f.write(client_cert_dump) 553 print("Client cert generated.") 554 555 print(f"Connecting to {url}") 556 with httpx.Client( 557 base_url=f"https://{url}", verify=False, follow_redirects=False 558 ) as redfish_session: 559 with RedfishSessionContext( 560 redfish_session, username, password 561 ) as rf_session: 562 redfish_session.headers["X-Auth-Token"] = rf_session.x_auth_token 563 564 hostname = get_hostname(redfish_session, username, password, url) 565 print(f"Hostname: {hostname}") 566 567 upn_client_key, upn_client_cert = generate_client_key_and_cert( 568 ca_cert, 569 ca_key, 570 upn=f"{username}@{hostname}", 571 ) 572 upn_client_key_dump = upn_client_key.private_bytes( 573 encoding=serialization.Encoding.PEM, 574 format=serialization.PrivateFormat.TraditionalOpenSSL, 575 encryption_algorithm=serialization.NoEncryption(), 576 ) 577 with open( 578 os.path.join(certs_dir, "upn-client-key.pem"), "wb" 579 ) as f: 580 f.write(upn_client_key_dump) 581 print("UPN client key generated.") 582 583 upn_client_cert_dump = upn_client_cert.public_bytes( 584 encoding=serialization.Encoding.PEM 585 ) 586 with open( 587 os.path.join(certs_dir, "upn-client-cert.pem"), "wb" 588 ) as f: 589 f.write(upn_client_cert_dump) 590 print("UPN client cert generated.") 591 592 setup_server_cert( 593 redfish_session, 594 ca_cert_dump, 595 certs_dir, 596 client_key, 597 client_cert, 598 username, 599 url, 600 ca_key, 601 ca_cert, 602 ) 603 604 print("Testing redfish TLS authentication with generated certs.") 605 time.sleep(2) 606 test_mtls_auth(url, certs_dir) 607 print("Redfish TLS authentication success!") 608 609 610def main(): 611 parser = argparse.ArgumentParser() 612 parser.add_argument( 613 "--username", 614 help="Username to connect with", 615 default="root", 616 ) 617 parser.add_argument( 618 "--password", 619 help="Password for user in order to install certs over Redfish.", 620 default="0penBmc", 621 ) 622 parser.add_argument("host", help="Host to connect to") 623 624 args = parser.parse_args() 625 generate_and_load_certs(args.host, args.username, args.password) 626 627 628if __name__ == "__main__": 629 main() 630