Move X.509 related code from osmo-smdpp to pySim.esim.x509_cert

Change-Id: I230ba0537b702b0bf6e9da9a430908ed2a21ca61
This commit is contained in:
Harald Welte 2024-01-19 19:28:15 +01:00
parent c83a963877
commit 45b7dc9466
2 changed files with 77 additions and 74 deletions

View File

@ -36,6 +36,7 @@ from pySim.utils import h2b, b2h, swap_nibbles
import pySim.esim.rsp as rsp import pySim.esim.rsp as rsp
from pySim.esim.es8p import * from pySim.esim.es8p import *
from pySim.esim.x509_cert import oid, cert_policy_has_oid, CertAndPrivkey
# HACK: make this configurable # HACK: make this configurable
DATA_DIR = './smdpp-data' DATA_DIR = './smdpp-data'
@ -70,18 +71,12 @@ def build_resp_header(js: dict, status: str = 'Executed-Success', status_code_da
js['header']['functionExecutionStatus']['statusCodeData'] = status_code_data js['header']['functionExecutionStatus']['statusCodeData'] = status_code_data
from cryptography.hazmat.primitives.asymmetric.utils import decode_dss_signature, encode_dss_signature from cryptography.hazmat.primitives.asymmetric.utils import decode_dss_signature, encode_dss_signature
from cryptography.hazmat.primitives.serialization import load_pem_private_key, Encoding, PublicFormat, PrivateFormat, NoEncryption from cryptography.hazmat.primitives.serialization import Encoding, PublicFormat, PrivateFormat, NoEncryption
from cryptography.hazmat.primitives.asymmetric import ec from cryptography.hazmat.primitives.asymmetric import ec
from cryptography.hazmat.primitives import hashes from cryptography.hazmat.primitives import hashes
from cryptography.exceptions import InvalidSignature from cryptography.exceptions import InvalidSignature
from cryptography import x509 from cryptography import x509
def ecdsa_dss_to_tr03111(sig: bytes) -> bytes:
"""convert from DER format to BSI TR-03111; first get long integers; then convert those to bytes."""
r, s = decode_dss_signature(sig)
return r.to_bytes(32, 'big') + s.to_bytes(32, 'big')
def ecdsa_tr03111_to_dss(sig: bytes) -> bytes: def ecdsa_tr03111_to_dss(sig: bytes) -> bytes:
"""convert an ECDSA signature from BSI TR-03111 format to DER: first get long integers; then encode those.""" """convert an ECDSA signature from BSI TR-03111 format to DER: first get long integers; then encode those."""
assert len(sig) == 64 assert len(sig) == 64
@ -90,52 +85,6 @@ def ecdsa_tr03111_to_dss(sig: bytes) -> bytes:
return encode_dss_signature(r, s) return encode_dss_signature(r, s)
class CertAndPrivkey:
"""A pair of certificate and private key, as used for ECDSA signing."""
def __init__(self, required_policy_oid: Optional[x509.ObjectIdentifier] = None,
cert: Optional[x509.Certificate] = None, priv_key = None):
self.required_policy_oid = required_policy_oid
self.cert = cert
self.priv_key = priv_key
def cert_from_der_file(self, path: str):
with open(path, 'rb') as f:
cert = x509.load_der_x509_certificate(f.read())
if self.required_policy_oid:
# verify it is the right type of certificate (id-rspRole-dp-auth, id-rspRole-dp-auth-v2, etc.)
assert cert_policy_has_oid(cert, self.required_policy_oid)
self.cert = cert
def privkey_from_pem_file(self, path: str, password: Optional[str] = None):
with open(path, 'rb') as f:
self.priv_key = load_pem_private_key(f.read(), password)
def ecdsa_sign(self, plaintext: bytes) -> bytes:
"""Sign some input-data using an ECDSA signature compliant with SGP.22,
which internally refers to Global Platform 2.2 Annex E, which in turn points
to BSI TS-03111 which states "concatengated raw R + S values". """
sig = self.priv_key.sign(plaintext, ec.ECDSA(hashes.SHA256()))
# convert from DER format to BSI TR-03111; first get long integers; then convert those to bytes
return ecdsa_dss_to_tr03111(sig)
def get_authority_key_identifier(self) -> x509.AuthorityKeyIdentifier:
"""Return the AuthorityKeyIdentifier X.509 extension of the certificate."""
return list(filter(lambda x: isinstance(x.value, x509.AuthorityKeyIdentifier), self.cert.extensions))[0].value
def get_subject_alt_name(self) -> x509.SubjectAlternativeName:
"""Return the SubjectAlternativeName X.509 extension of the certificate."""
return list(filter(lambda x: isinstance(x.value, x509.SubjectAlternativeName), self.cert.extensions))[0].value
def get_cert_as_der(self) -> bytes:
"""Return certificate encoded as DER."""
return self.cert.public_bytes(Encoding.DER)
def get_curve(self) -> ec.EllipticCurve:
return self.cert.public_key().public_numbers().curve
class ApiError(Exception): class ApiError(Exception):
def __init__(self, subject_code: str, reason_code: str, message: Optional[str] = None, def __init__(self, subject_code: str, reason_code: str, message: Optional[str] = None,
subject_id: Optional[str] = None): subject_id: Optional[str] = None):
@ -147,27 +96,6 @@ class ApiError(Exception):
build_resp_header(js, 'Failed', self.status_code) build_resp_header(js, 'Failed', self.status_code)
return json.dumps(js) return json.dumps(js)
def cert_policy_has_oid(cert: x509.Certificate, match_oid: x509.ObjectIdentifier) -> bool:
"""Determine if given certificate has a certificatePolicy extension of matching OID."""
for policy_ext in filter(lambda x: isinstance(x.value, x509.CertificatePolicies), cert.extensions):
if any(policy.policy_identifier == match_oid for policy in policy_ext.value._policies):
return True
return False
ID_RSP = "2.23.146.1"
ID_RSP_CERT_OBJECTS = '.'.join([ID_RSP, '2'])
ID_RSP_ROLE = '.'.join([ID_RSP_CERT_OBJECTS, '1'])
class oid:
id_rspRole_ci = x509.ObjectIdentifier(ID_RSP_ROLE + '.0')
id_rspRole_euicc_v2 = x509.ObjectIdentifier(ID_RSP_ROLE + '.1')
id_rspRole_eum_v2 = x509.ObjectIdentifier(ID_RSP_ROLE + '.2')
id_rspRole_dp_tls_v2 = x509.ObjectIdentifier(ID_RSP_ROLE + '.3')
id_rspRole_dp_auth_v2 = x509.ObjectIdentifier(ID_RSP_ROLE + '.4')
id_rspRole_dp_pb_v2 = x509.ObjectIdentifier(ID_RSP_ROLE + '.5')
id_rspRole_ds_tls_v2 = x509.ObjectIdentifier(ID_RSP_ROLE + '.6')
id_rspRole_ds_auth_v2 = x509.ObjectIdentifier(ID_RSP_ROLE + '.7')
class SmDppHttpServer: class SmDppHttpServer:
app = Klein() app = Klein()

View File

@ -23,6 +23,7 @@ from cryptography.hazmat.primitives.asymmetric import ec, padding
from cryptography.hazmat.primitives import hashes from cryptography.hazmat.primitives import hashes
from cryptography.exceptions import InvalidSignature from cryptography.exceptions import InvalidSignature
from cryptography import x509 from cryptography import x509
from cryptography.hazmat.primitives.serialization import load_pem_private_key, Encoding
from pySim.utils import b2h from pySim.utils import b2h
@ -43,6 +44,27 @@ def cert_get_auth_key_id(cert: x509.Certificate) -> bytes:
aki_ext = cert.extensions.get_extension_for_class(x509.AuthorityKeyIdentifier).value aki_ext = cert.extensions.get_extension_for_class(x509.AuthorityKeyIdentifier).value
return aki_ext.key_identifier return aki_ext.key_identifier
def cert_policy_has_oid(cert: x509.Certificate, match_oid: x509.ObjectIdentifier) -> bool:
"""Determine if given certificate has a certificatePolicy extension of matching OID."""
for policy_ext in filter(lambda x: isinstance(x.value, x509.CertificatePolicies), cert.extensions):
if any(policy.policy_identifier == match_oid for policy in policy_ext.value._policies):
return True
return False
ID_RSP = "2.23.146.1"
ID_RSP_CERT_OBJECTS = '.'.join([ID_RSP, '2'])
ID_RSP_ROLE = '.'.join([ID_RSP_CERT_OBJECTS, '1'])
class oid:
id_rspRole_ci = x509.ObjectIdentifier(ID_RSP_ROLE + '.0')
id_rspRole_euicc_v2 = x509.ObjectIdentifier(ID_RSP_ROLE + '.1')
id_rspRole_eum_v2 = x509.ObjectIdentifier(ID_RSP_ROLE + '.2')
id_rspRole_dp_tls_v2 = x509.ObjectIdentifier(ID_RSP_ROLE + '.3')
id_rspRole_dp_auth_v2 = x509.ObjectIdentifier(ID_RSP_ROLE + '.4')
id_rspRole_dp_pb_v2 = x509.ObjectIdentifier(ID_RSP_ROLE + '.5')
id_rspRole_ds_tls_v2 = x509.ObjectIdentifier(ID_RSP_ROLE + '.6')
id_rspRole_ds_auth_v2 = x509.ObjectIdentifier(ID_RSP_ROLE + '.7')
class VerifyError(Exception): class VerifyError(Exception):
"""An error during certificate verification,""" """An error during certificate verification,"""
pass pass
@ -54,6 +76,8 @@ class CertificateSet:
def __init__(self, root_cert: x509.Certificate): def __init__(self, root_cert: x509.Certificate):
check_signed(root_cert, root_cert) check_signed(root_cert, root_cert)
# TODO: check other mandatory attributes for CA Cert # TODO: check other mandatory attributes for CA Cert
if not cert_policy_has_oid(root_cert, oid.id_rspRole_ci):
raise ValueError("Given root certificate doesn't have rspRole_ci OID")
usage_ext = root_cert.extensions.get_extension_for_class(x509.KeyUsage).value usage_ext = root_cert.extensions.get_extension_for_class(x509.KeyUsage).value
if not usage_ext.key_cert_sign: if not usage_ext.key_cert_sign:
raise ValueError('Given root certificate key usage does not permit signing of certificates') raise ValueError('Given root certificate key usage does not permit signing of certificates')
@ -135,3 +159,54 @@ class CertificateSet:
depth += 1 depth += 1
if depth > max_depth: if depth > max_depth:
raise VerifyError('Maximum depth %u exceeded while verifying certificate chain' % max_depth) raise VerifyError('Maximum depth %u exceeded while verifying certificate chain' % max_depth)
from cryptography.hazmat.primitives.asymmetric.utils import decode_dss_signature
def ecdsa_dss_to_tr03111(sig: bytes) -> bytes:
"""convert from DER format to BSI TR-03111; first get long integers; then convert those to bytes."""
r, s = decode_dss_signature(sig)
return r.to_bytes(32, 'big') + s.to_bytes(32, 'big')
class CertAndPrivkey:
"""A pair of certificate and private key, as used for ECDSA signing."""
def __init__(self, required_policy_oid: Optional[x509.ObjectIdentifier] = None,
cert: Optional[x509.Certificate] = None, priv_key = None):
self.required_policy_oid = required_policy_oid
self.cert = cert
self.priv_key = priv_key
def cert_from_der_file(self, path: str):
with open(path, 'rb') as f:
cert = x509.load_der_x509_certificate(f.read())
if self.required_policy_oid:
# verify it is the right type of certificate (id-rspRole-dp-auth, id-rspRole-dp-auth-v2, etc.)
assert cert_policy_has_oid(cert, self.required_policy_oid)
self.cert = cert
def privkey_from_pem_file(self, path: str, password: Optional[str] = None):
with open(path, 'rb') as f:
self.priv_key = load_pem_private_key(f.read(), password)
def ecdsa_sign(self, plaintext: bytes) -> bytes:
"""Sign some input-data using an ECDSA signature compliant with SGP.22,
which internally refers to Global Platform 2.2 Annex E, which in turn points
to BSI TS-03111 which states "concatengated raw R + S values". """
sig = self.priv_key.sign(plaintext, ec.ECDSA(hashes.SHA256()))
# convert from DER format to BSI TR-03111; first get long integers; then convert those to bytes
return ecdsa_dss_to_tr03111(sig)
def get_authority_key_identifier(self) -> x509.AuthorityKeyIdentifier:
"""Return the AuthorityKeyIdentifier X.509 extension of the certificate."""
return list(filter(lambda x: isinstance(x.value, x509.AuthorityKeyIdentifier), self.cert.extensions))[0].value
def get_subject_alt_name(self) -> x509.SubjectAlternativeName:
"""Return the SubjectAlternativeName X.509 extension of the certificate."""
return list(filter(lambda x: isinstance(x.value, x509.SubjectAlternativeName), self.cert.extensions))[0].value
def get_cert_as_der(self) -> bytes:
"""Return certificate encoded as DER."""
return self.cert.public_bytes(Encoding.DER)
def get_curve(self) -> ec.EllipticCurve:
return self.cert.public_key().public_numbers().curve