WIP: osmo-smdpp ES9+ support for ASN.1 endpoint

This commit is contained in:
Harald Welte 2024-01-10 19:59:04 +01:00
parent d1cc8d0c1d
commit bef85dbc28
1 changed files with 149 additions and 29 deletions

View File

@ -242,6 +242,36 @@ class SmDppHttpServer:
except InvalidSignature:
return False
@staticmethod
def b64decode_members(indict: Dict, keys: List[str]):
"""base64-decoder all members of 'indict' whose key is in 'keys'."""
for key in keys:
if key in indict:
indict[key] = b64decode(indict[key])
@staticmethod
def b64encode_members(indict: Dict, keys: List[str]):
"""base64-encoder (to string!) all members of 'indict' whose key is in 'keys'."""
for key in keys:
if key in indict:
indict[key] = b64encode2str(indict[key])
@staticmethod
def asn1decode_member(indict: Dict, key: str, typename: Optional[str]):
"""decode indict[key] using RSP ASN.1 decoder for type 'typename'."""
if not typename:
typename = capitalize_first_char(key)
if key in indict:
indict[key] = rsp.asn1.decode(typename, indict[key])
@staticmethod
def asn1encode_member(indict: Dict, key: str, typename: Optional[str]):
"""encode indict[key] using RSP ASN.1 decoder for type 'typename'."""
if not typename:
typename = capitalize_first_char(key)
if key in indict:
indict[key] = rsp.asn1.encode(typename, indict[key])
@staticmethod
def rsp_api_wrapper(func):
"""Wrapper that can be used as decorator in order to perform common REST API endpoint entry/exit
@ -262,8 +292,6 @@ class SmDppHttpServer:
return json.dumps(output)
return _api_wrapper
@app.route('/gsma/rsp2/es9plus/initiateAuthentication', methods=['POST'])
@rsp_api_wrapper
def initiateAutentication(self, request: IRequest, content: dict) -> dict:
"""See ES9+ InitiateAuthentication SGP.22 Section 5.6.1"""
# Verify that the received address matches its own SM-DP+ address, where the comparison SHALL be
@ -271,12 +299,11 @@ class SmDppHttpServer:
if content['smdpAddress'] != self.server_hostname:
raise ApiError('8.8.1', '3.8', 'Invalid SM-DP+ Address')
euiccChallenge = b64decode(content['euiccChallenge'])
euiccChallenge = content['euiccChallenge']
if len(euiccChallenge) != 16:
raise ValueError
euiccInfo1_bin = b64decode(content['euiccInfo1'])
euiccInfo1 = rsp.asn1.decode('EUICCInfo1', euiccInfo1_bin)
euiccInfo1 = content['euiccInfo1']
print("Rx euiccInfo1: %s" % euiccInfo1)
#euiccInfo1['svn']
@ -315,32 +342,46 @@ class SmDppHttpServer:
serverSigned1_bin = rsp.asn1.encode('ServerSigned1', serverSigned1)
print("Tx serverSigned1: %s" % rsp.asn1.decode('ServerSigned1', serverSigned1_bin))
output = {}
output['serverSigned1'] = b64encode2str(serverSigned1_bin)
output['serverSigned1'] = serverSigned1
# Generate a signature (serverSignature1) as described in section 5.7.13 "ES10b.AuthenticateServer" using the SK related to the selected CERT.DPauth.SIG.
# serverSignature1 SHALL be created using the private key associated to the RSP Server Certificate for authentication, and verified by the eUICC using the contained public key as described in section 2.6.9. serverSignature1 SHALL apply on serverSigned1 data object.
output['serverSignature1'] = b64encode2str(b'\x5f\x37\x40' + self.dp_auth.ecdsa_sign(serverSigned1_bin))
output['serverSignature1'] = b'\x5f\x37\x40' + self.dp_auth.ecdsa_sign(serverSigned1_bin)
output['transactionId'] = transactionId
server_cert_aki = self.dp_auth.get_authority_key_identifier()
output['euiccCiPKIdToBeUsed'] = b64encode2str(b'\x04\x14' + server_cert_aki.key_identifier)
output['serverCertificate'] = b64encode2str(self.dp_auth.get_cert_as_der()) # CERT.DPauth.SIG
output['euiccCiPKIdToBeUsed'] = b'\x04\x14' + server_cert_aki.key_identifier
output['serverCertificate'] = self.dp_auth.get_cert_as_der() # CERT.DPauth.SIG
# FIXME: add those certificate
#output['otherCertsInChain'] = b64encode2str()
#output['otherCertsInChain'] = ...
# create SessionState and store it in rss
self.rss[transactionId] = rsp.RspSessionState(transactionId, serverChallenge)
return output
@app.route('/gsma/rsp2/es9plus/authenticateClient', methods=['POST'])
@app.route('/gsma/rsp2/es9plus/initiateAuthentication', methods=['POST'])
@rsp_api_wrapper
def json_initiateAuthentication(self, request: IRequest, content: dict):
"""Transform from JSON binding to generic function and back."""
# convert from JSON/BASE64 to decoded ASN.1
b64decode_members(content, ['euiccChallenge', 'euiccInfo1', 'lpaRspCapability'])
asn1decode_member(content, 'eUICCInfo1')
# do the actual processing
output = self.initiateAuthentication(request, content)
# convert from decoded ASN.1 to base64 to JSON
asn1encode_member(output, 'serverSigned1')
b64encode_members(output, ['serverSigned1', 'serverSignature1', 'euiccCiPKIdToBeUsed',
'serverCertificate', 'otherCertsInChain', 'crlList'])
return output
def authenticateClient(self, request: IRequest, content: dict) -> dict:
"""See ES9+ AuthenticateClient in SGP.22 Section 5.6.3"""
transactionId = content['transactionId']
authenticateServerResp_bin = b64decode(content['authenticateServerResponse'])
authenticateServerResp = rsp.asn1.decode('AuthenticateServerResponse', authenticateServerResp_bin)
authenticateServerResp = content['authenticateServerResponse']
print("Rx %s: %s" % authenticateServerResp)
if authenticateServerResp[0] == 'authenticateResponseError':
r_err = authenticateServerResp[1]
@ -416,14 +457,26 @@ class SmDppHttpServer:
self.rss[transactionId] = ss
return {
'transactionId': transactionId,
'profileMetadata': b64encode2str(profileMetadata_bin),
'smdpSigned2': b64encode2str(smdpSigned2_bin),
'smdpSignature2': b64encode2str(ss.smdpSignature2_do),
'smdpCertificate': b64encode2str(self.dp_pb.get_cert_as_der()), # CERT.DPpb.SIG
'profileMetadata': profileMetadata,
'smdpSigned2': smdpSigned2,
'smdpSignature2': ss.smdpSignature2_do,
'smdpCertificate': self.dp_pb.get_cert_as_der(), # CERT.DPpb.SIG
}
@app.route('/gsma/rsp2/es9plus/getBoundProfilePackage', methods=['POST'])
@app.route('/gsma/rsp2/es9plus/authenticateClient', methods=['POST'])
@rsp_api_wrapper
def json_authenticateClient(self, request: IRequest, content: dict):
"""Transform from JSON binding to generic function and back."""
b64decode_members(content, ['authenticateServerResponse', 'deleteNotifciationForDc'])
asn1decode_member(content, 'authenticateServerResponse')
output = self.authenticateClient(requeset, content)
asn1encode_member(output, 'smdpSigned2')
b64encode_members(output, ['profileMetadata', 'smdpSigned2', 'smdpSignature2', 'smdpCertificate'])
return output
def getBoundProfilePackage(self, request: IRequest, content: dict) -> dict:
"""See ES9+ GetBoundProfilePackage SGP.22 Section 5.6.2"""
transactionId = content['transactionId']
@ -433,8 +486,7 @@ class SmDppHttpServer:
if not ss:
raise ApiError('8.10.1', '3.9', 'The RSP session identified by the TransactionID is unknown')
prepDownloadResp_bin = b64decode(content['prepareDownloadResponse'])
prepDownloadResp = rsp.asn1.decode('PrepareDownloadResponse', prepDownloadResp_bin)
prepDownloadResp = content['prepareDownloadResponse']
print("Rx %s: %s" % prepDownloadResp)
if prepDownloadResp[0] == 'downloadResponseError':
@ -496,15 +548,26 @@ class SmDppHttpServer:
self.rss[transactionId] = ss
return {
'transactionId': transactionId,
'boundProfilePackage': b64encode2str(bpp.encode(ss, self.dp_pb)),
'boundProfilePackage': bpp.encode(ss, self.dp_pb),
}
@app.route('/gsma/rsp2/es9plus/handleNotification', methods=['POST'])
@app.route('/gsma/rsp2/es9plus/getBoundProfilePackage', methods=['POST'])
@rsp_api_wrapper
def json_getBoundProfilePackage(self, request: IRequest, content: dict):
"""Transform from JSON binding to generic function and back."""
b64decode_members(content, ['prepareDownloadResponse'])
asn1decode_member(content, 'prepareDownlaodResponse')
output = self.getBoundProfilePackage(request, content)
asn1encode_member(output, 'boundProfilePackage')
b64encode_members(output, ['boundProfilePackage'])
return output
def handleNotification(self, request: IRequest, content: dict) -> dict:
"""See ES9+ HandleNotification in SGP.22 Section 5.6.4"""
pendingNotification_bin = b64decode(content['pendingNotification'])
pendingNotification = rsp.asn1.decode('PendingNotification', pendingNotification_bin)
pendingNotification = content['pendingNotification']
print("Rx %s: %s" % pendingNotification)
if pendingNotification[0] == 'profileInstallationResult':
profileInstallRes = pendingNotification[1]
@ -528,17 +591,27 @@ class SmDppHttpServer:
pass
else:
raise ValueError(pendingNotification)
return None
@app.route('/gsma/rsp2/es9plus/handleNotification', methods=['POST'])
@rsp_api_wrapper
def json_handleNotification(self, request: IRequest, content: dict):
"""Transform from JSON binding to generic function and back."""
b64decode_members(content, ['pendingNotification'])
asn1decode_member(content, 'pendingNotification')
output = self.handleNotification(request, content)
return output
#@app.route('/gsma/rsp3/es9plus/handleDeviceChangeRequest, methods=['POST']')
#@rsp_api_wrapper
#"""See ES9+ ConfirmDeviceChange in SGP.22 Section 5.6.6"""
# TODO: implement this
@app.route('/gsma/rsp2/es9plus/cancelSession', methods=['POST'])
@rsp_api_wrapper
def cancelSession(self, request: IRequest, content: dict) -> dict:
"""See ES9+ CancelSession in SGP.22 Section 5.6.5"""
print("Rx JSON: %s" % content)
transactionId = content['transactionId']
# Verify that the received transactionId is known and relates to an ongoing RSP session
@ -546,8 +619,7 @@ class SmDppHttpServer:
if ss is None:
raise ApiError('8.10.1', '3.9', 'The RSP session identified by the transactionId is unknown')
cancelSessionResponse_bin = b64decode(content['cancelSessionResponse'])
cancelSessionResponse = rsp.asn1.decode('CancelSessionResponse', cancelSessionResponse_bin)
cancelSessionResponse = content['cancelSessionResponse']
print("Rx %s: %s" % cancelSessionResponse)
if cancelSessionResponse[0] == 'cancelSessionResponseError':
@ -577,6 +649,54 @@ class SmDppHttpServer:
del self.rss[transactionId]
return { 'transactionId': transactionId }
@app.route('/gsma/rsp2/es9plus/cancelSession', methods=['POST'])
@rsp_api_wrapper
def json_cancelSessionn(self, request: IRequest, content: dict) -> dict:
"""Transform from JSON binding to generic function and back."""
b64decode_members(content, ['cancelSessionResponse'])
asn1decode_member(content, 'cancelSessionResponse')
output = self.cancelSession(request, content)
return output
@app.route('/gsma/rsp2/asn1', methods=['POST'])
def asn1(self, request: IRequest) -> dict:
# TODO: evaluate User-Agent + X-Admin-Protocol header
# TODO: reject any non-ASN.1 Content-type
content = rsp.asn1.decode('RemoteProfileProvisioningRequest', request.content.read())
print("Rx ASN.1: %s" % content)
request.setHeader('Content-Type', 'application/x-gsma-rsp-asn1')
request.setHeader('X-Admin-Protocol', 'gsma/rsp/v2.1.0')
operation = content[0]
if operation == 'initiateAuthenticationRequest':
method = self.initiateAuthentication
ochoice = 'initiateAuthenticationResponse'
elif operation == 'authenticateClientRequest':
method = self.authenticateClient
ochoice = 'authenticateClientResponseEs9'
elif operation == 'getBoundProfilePackageRequest':
method = self.getBoundProfilePackage
ochoice = 'getBoundProfilePackageResponse'
elif operation == 'cancelSessionRequestEs9':
method = self.cancelSession
ochoice = 'cancelSessionResponseEs9'
elif operation == 'handleNotification':
method = self.handleNotification
ochoice = None
output = method(self, request, content[1])
if ochoice:
output = (ochoice, output)
print("Tx ASN.1: %s" % output)
return rsp.asn1.encode('RemoteProfileProvisioningResponse', output)
else:
return None
def main(argv):
parser = argparse.ArgumentParser()