corenet: some advancements in the 5G NG and MM handling

This commit is contained in:
p1-bmu 2021-04-28 18:09:04 +02:00
parent c4333e4b59
commit 503a9ac563
5 changed files with 336 additions and 117 deletions

View File

@ -382,8 +382,18 @@ class GNBd(object):
return gnb_ue_id
#--------------------------------------------------------------------------#
# CN-initiated non-UE-associated S1AP signalling procedures
# CN-initiated non-UE-associated NGAP signalling procedures
#--------------------------------------------------------------------------#
# TODO: page(), send_error_ind()
def page(self, **IEs):
"""send a NGAP Paging message to the gNB
"""
# TODO
pass
def send_err(self, **IEs):
"""send a NGAP Error Indication to the gNB
"""
# TODO
pass

View File

@ -45,7 +45,7 @@ class UEd(SigStack):
#--------------------------------------------------------------------------#
#
# verbosity level
DEBUG = ('ERR', 'WNG', 'INF', 'DBG')
DEBUG = ('VLN', 'ERR', 'WNG', 'INF', 'DBG')
# to log UE-related RANAP, S1AP and NGAP for all UE
TRACE_ASN_RANAP_CS = False
TRACE_ASN_RANAP_PS = False

View File

@ -33,6 +33,22 @@ from .ProcCNFGMM import *
from .ProcCNFGSM import *
from .HdlrUESMS import *
from pycrate_mobile.TS24501_IE import (
FGSIDTYPE_NO, # 0
FGSIDTYPE_SUPI,
FGSIDTYPE_GUTI,
FGSIDTYPE_IMEI,
FGSIDTYPE_STMSI,
FGSIDTYPE_IMEISV,
FGSIDTYPE_MAC,
FGSIDTYPE_EUI64, # 7
FGSIDFMT_IMSI, # 0
FGSIDFMT_NSI,
FGSIDFMT_GCI,
FGSIDFMT_GLI, # 3
UESecCap as FGSUESecCap
)
class UEFGMMd(SigStack):
"""UE 5GMM handler within a UENGd instance
@ -132,7 +148,7 @@ class UEFGMMd(SigStack):
return False
# need to wait for potential 5GMM serving / common procedures to happen and end
sleep(self._WAIT_ADD)
if not self.ready.wait(10):
if not self.ready.wait(5):
# something is blocking in the serving / common procedures
return False
elif not self.NG.connected.is_set():
@ -288,6 +304,49 @@ class UENGd(SigStack):
#--------------------------------------------------------------------------#
# global security policy
#--------------------------------------------------------------------------#
#
# 1) NAS Rx path
#
# IEs allowed in clear-text initial NAS messages
SECNAS_RX_CT_IES = {
# Registration Req
65 : {
'NAS_KSI',
'5GSRegType',
'5GSID', # needs to be a temp id or SUCI
'UESecCap',
'UEStatus',
'AddGUTI',
'EPSNASContainer',
'NASContainer',
},
# Service Req
76 : {
'ServiceType',
'NAS_KSI',
'5GSID', # needs to be a temp id
'NASContainer',
},
# Ctrl Plane Service Req
79 : {
'NAS_KSI',
'CtrlPlaneServiceType',
'NASContainer',
}
}
#
# Identity type allowed in clear-text IdentityResponse
SECNAS_RX_CT_IDTYPE = {
FGSIDTYPE_NO,
FGSIDTYPE_SUPI,
#FGSIDTYPE_GUTI,
#FGSIDTYPE_STMSI
}
#
# dropping invalid Rx message is the default behaviour
SECNAS_RX_DROP_INVAL = True
'''
# this will systematically bypass all auth and smc procedures,
# NAS MAC and UL count verification in the uplink
# and setting of the 5GMM security header (and encryption) in the downlink
@ -304,6 +363,7 @@ class UENGd(SigStack):
# this will disable the setting of the 5GMM security header (and encryption)
# in the downlink for given NAS message (by name)
SECNAS_PDU_NOSEC = set()
'''
#
# format of the security context dict self.SEC:
# self.SEC is a dict of available 5G security contexts indexed by KSI,
@ -326,7 +386,11 @@ class UENGd(SigStack):
#--------------------------------------------------------------------------#
# NGAPPaging policy
#--------------------------------------------------------------------------#
#
# page_block() parameters:
# number of retries when not successful
PAG_RETR = 2
# timer in sec between retries
PAG_WAIT = 2
#--------------------------------------------------------------------------#
# NGAPInitialContextSetup policy
@ -357,8 +421,10 @@ class UENGd(SigStack):
self.SEC = {}
self.reset_sec_ctx()
#
# state for NG / radio connection: set with InitialUEMessage, unset with UEContextRelease
self.connected = Event()
self.nasinit = Event() # state for initial NAS message
# state for processing the initial NAS message: unset after InitialUEMessage processed
self.nasinit = Event()
if gnbd is not None:
self.set_ran(gnbd)
else:
@ -381,7 +447,6 @@ class UENGd(SigStack):
self.SEC['KSI'] = None
self.clear()
self.connected.clear()
self.nasinit.clear()
def set_ran_unconnected(self, gnbd):
# required for paging
@ -572,124 +637,91 @@ class UENGd(SigStack):
NasRxSec, err = NAS.parse_NAS5G(buf, inner=False)
if err:
self._log('WNG', 'invalid 5GS NAS message: %s' % hexlify(buf).decode('ascii'))
return self.ret_ngap_dnt(NAS.FGMMStatus(val={'5GMMCause':err}, sec=False))
return self.ret_ngap_dnt(NAS.FGMMStatus(val={'5GMMCause': err}, sec=False))
#
# 5GS NAS security handling
sh, pd = NasRxSec[0]['SecHdr'].get_val(), NasRxSec[0]['EPD'].get_val()
if sh == 0:
# clear-text NAS message
NasRxSec._sec = False
NasRxSec._ulcnt = 0
if self.UE.TRACE_NAS_5GS:
self._log('TRACE_NAS_5GS_UL', '\n' + NasRxSec.show())
if pd == 126:
NgapTxProc = self.FGMM.process(NasRxSec)
else:
assert( pd == 46 ) # this won't happen due to parse_NAS5G()
NgapTxProc = self.FGSM.process(NasRxSec)
elif sh in (2, 4) and pd == 126:
if self.UE.TRACE_NAS_5GS_SEC:
self._log('TRACE_NAS_5GS_UL_SEC', '\n' + NasRxSec.show())
NasRx, err = self.process_nas_sec_enc(NasRxSec, sh)
if err & 0xff:
# non-security related error
NgapTxProc = self.ret_ngap_dnt(NAS.FGMMStatus(val={'5GMMCause': err}, sec=True))
elif not NasRx:
# deciphering failed
return self._ngap_nas_sec_err()
else:
if self.UE.TRACE_NAS_5GS:
self._log('TRACE_NAS_5GS_UL', '\n' + NasRx.show())
if NasRx[0]['EPD'].get_val() == 126:
NgapTxProc = self.FGMM.process(NasRx)
else:
NgapTxProc = self.FGSM.process(NasRx)
else:
self._log('WNG', 'invalid 5GS NAS message: %r' % NasRxSec)
NgapTxProc = self.ret_ngap_dnt(NAS.EMMStatus(val={'5GMMCause': 96}, sec=False))
return self.process_nas_nosec(NasRxSec)
#
return NgapTxProc
elif pd == 126 and sh in (1, 2, 4):
# protected NAS message
# 1: integrity prot only, 2: current sec ctx, 4: new sec ctx (SMC after fresh auth)
return self.process_nas_sec(NasRxSec)
#
# invalid NAS message
self._log('WNG', 'invalid 5GS NAS message security status')
if self.UE.TRACE_NAS_5GS:
self._log('TRACE_NAS_5GS_UL', '\n' + NasRxSec.show())
# err cause 98: Message type not compatible with the protocol state
return self.ret_ngap_dnt(NAS.FGMMStatus(val={'5GMMCause': 98}, sec=False))
def process_nas_sec_mac(self, NasRxSec, secctx):
def process_nas_nosec(self, NasRx):
# Check if the message type is valid or not, log it and eventually drop it
typ = NasRx[0]['Type'].get_val()
#
sqnmsb, sqnlsb = secctx['UL'] & 0xffffff00, secctx['UL'] & 0xff
verif_mac = NasRxSec.mac_verify(secctx['Knasint'], 0, secctx['EIA'], sqnmsb)
ue_sqn = NasRxSec['Seqn'].get_val()
verif_sqn = True if ue_sqn == sqnlsb else False
if typ in (65, 76, 79):
# initial NAS message
ct_ies, vln_ies = self.SECNAS_RX_CT_IES[typ], []
for ie in list(NasRx)[1:]:
if ie._name not in ct_ies:
vln_ies.append(ie._name)
if vln_ies:
self._log('VLN', 'unprotected IEs in initial NAS message: %s' % ', '.join(vln_ies))
#
if not verif_mac:
if self.SECNAS_UL_MAC:
self._log('ERR', 'NAS SEC UL: MAC verif failed, dropping %s' % NasRxSec._name)
return False, 0x200, False, 0
else:
self._log('WNG', 'NAS SEC UL: MAC verif failed in %s' % NasRxSec._name)
return True, 0x200, False, sqnmsb+ue_sqn
elif not verif_sqn:
if self.SECNAS_UL_CNT:
self._log('ERR', 'NAS SEC UL: UL count verif failed, dropping %s' % NasRxSec._name)
return False, 0x300, False, 0
else:
self._log('WNG', 'NAS SEC UL: UL count verif failed in %s' % NasRxSec._name)
# resynch uplink count
secctx['UL'] = sqnmsb+ue_sqn+1
return True, 0x300, False, sqnmsb+ue_sqn
else:
self._log('DBG', 'NAS SEC UL: MAC verified, UL count %i' % secctx['UL'])
ulcnt = secctx['UL']
secctx['UL'] += 1
return True, 0, True, ulcnt
elif typ == 92:
# ident resp
ie = NasRx['5GSID']['V'].get_val_d()
if isinstance(ie, dict) and ie['Type'] not in self.SECNAS_RX_CT_IDTYPE:
self._log('VLN', 'unprotected UE Identity: %r' % ie)
#
elif typ not in {69, 70, 87, 89, 95, 100}:
# not dereg req, dereg acc, auth resp, auth fail, sec mode rej, status
self._log('VLN', 'invalid unprotected NAS message: %s' % NasRx._name)
if self.SECNAS_RX_DROP_INVAL:
return []
#
return self.dispatch_nas(NasRx)
def process_nas_sec_enc(self, NasRxSec, sh):
"""Check the security on all UL 5GMM messages which are encrypted.
Returns the message or None (if security checks are enforced), and the
security error code.
def process_nas_sec(self, NasRx):
# get the KSI and sec ctx
# verify MAC
# decrypt
#
# in case of MAC only (no encr), we need to go through process_nas_nosec()
# whatever result of the MAC check
Security error codes:
0: no error
0x100: no active NAS KSI
0x200: MAC verification failed
0x300: NAS UL count not matching
if self.SEC['KSI'] in self.SEC:
sec_ctx = self.SEC[self.SEC['KSI']]
if self.SEC['KSI'] is not None:
self.SEC['KSI'] = None
else:
# TODO: no readily-available security context
pass
The returned message gets 2 attributes (_sec [bool], _ulcnt [uint])
"""
if self.SECNAS_DISABLED:
# TODO: try to decode the inner NAS message, in case 5G-EA0 is in use ?
self._log('WNG', 'unable to decode the inner NAS message')
return None, 0
#
if self.SEC['KSI'] not in self.SEC:
# no active KSI: happens when restarting corenet, and UE using a forgotten sec ctx
self._log('WNG', 'NAS SEC UL: no active NAS KSI')
return None, 0x100
else:
secctx = self.SEC[self.SEC['KSI']]
#
chk, err, sec, ulcnt = self.process_nas_sec_mac(NasRxSec, secctx)
if not chk:
return None, err
#
if secctx['NASEA'] == 0:
buf = NasRxSec['NASMessage'].get_val()
else:
NasRxSec.decrypt(secctx['Knasenc'], 0, secctx['NASEA'], ulcnt & 0xffffff00, 1)
buf = NasRxSec._dec_msg
NasRx, err2 = NAS.parse_NAS5G(buf, inner=False)
if err2:
# decrypted decoded part is malformed
self._log('WNG', 'invalid 5GS NAS message: %s' % hexlify(buf).decode('ascii'))
NasRx._sec = sec
NasRx._ulcnt = ulcnt
return NasRx, err + err2
return self.dispatch_nas(NasRxNosec)
def output_nas_sec(self, NasTx):
"""Apply the security on all DL 5GSM / 5GMM messages.
Returns the encoded bytes buffer or None if error.
"""
# TODO
pass
def dispatch_nas(self, NasRx):
epd = NasRx[0]['EPD'].get_val()
if epd == 126:
return self.FGMM.process(NasRx)
elif epd == 46:
return self.FGSM.process(NasRx)
else:
self._log('WNG', 'invalid 5G NAS message, header: %r' % NasRx[0])
return []
def ret_ngap_dnt(self, NasTx, **IEs):
@ -723,18 +755,135 @@ class UENGd(SigStack):
# network-initiated method (fg task, to be used from the interpreter)
#--------------------------------------------------------------------------#
def _get_paging_ies(self):
guami = self.Server.AMF_GUAMI[self.UE.PLMN]
# only supporting mandatory IEs
IEs = {
'TAIListForPaging': [{
'tAI': {
'pLMNIdentity': plmn_str_to_buf(self.UE.PLMN),
'tAC': uint_to_bytes(self.UE.TAC, 24)
}
}],
'UEPagingIdentity': {
'fiveG-S-TMSI': {
'aMFSetID': (guami[1], 10),
'aMFPointer': (guami[2], 6),
'fiveG-TMSI': uint_to_bytes(self.UE.FGTMSI, 32),
}
}
}
#
return IEs
def page(self):
"""send NGAP Paging command to gNB responsible for the UE TAI
"""
# send a NGAPPaging for the 5GS domain
if self.connected.is_set():
self._log('DBG', 'paging: UE already connected')
return
# get the set of gNBs serving the UE TAI
# gNB id is 3-tuple whereas eNB id is 2-tuple
tai = (self.UE.PLMN, self.UE.TAC)
try:
gnbs = [self.Server.RAN[gnbid] for gnbid in self.Server.TAI[tai] if len(gnbid) == 3]
except Exception:
self._log('ERR', 'paging: no gNB serving the UE TAI %s.%.6x' % tai)
return
#
# only mandatory IEs supported yet
IEs = self._get_paging_ies()
#
# start a NGAPPaging procedure on all gNBs
for gnb in enbs:
gnb.page(**IEs)
self._log('INF', 'paging: ongoing')
def page_block(self):
"""page the UE and wait for it to connect, or the paging procedure to timeout.
Returns True if UE gets connected, False otherwise.
"""
# send a NGAPPaging for the 5GS domain
if self.connected.is_set():
self._log('DBG', 'paging: UE already connected')
return True
# get the set of gNBs serving the UE TAI
# gNB id is 3-tuple whereas eNB id is 2-tuple
tai = (self.UE.PLMN, self.UE.TAC)
try:
gnbs = [self.Server.RAN[gnbid] for gnbid in self.Server.TAI[tai] if len(gnbid) == 3]
except Exception:
self._log('ERR', 'paging: no gNB serving the UE TAI %s.%.6x' % tai)
return False
#
IEs = self._get_paging_ies()
#
# retries paging as defined in case UE does not connect
i = 0
while i <= self.PAG_RETR:
# start an S1APPaging procedure on all RNCs
for enb in enbs:
enb.page(**IEs)
# check until UE gets connected or timer expires
if self.connected.wait(self.PAG_WAIT):
self._log('INF', 'paging: UE connected')
return True
else:
# timeout
i += 1
self._log('WNG', 'paging: timeout, UE not connected')
return False
def send_ng_rel(self, cause=('nas', 'normal-release')):
"""send an UEContextRelease over the NG link with the given NGAP cause
"""
if not self.connected.is_set():
# nothing to release
self._log('DBG', 'release: UE not connected')
return True
# prepare the NGAPUEContextRelease procedure
NgapProc = self.init_ngap_proc(NGAPUEContextRelease, Cause=cause)
if not NgapProc:
return False
if not self.transmit_ngap_proc([NgapProc]):
return False
else:
return True
def send_ng_err(self, cause, **IEs):
"""send an ErrorIndication over the NG link with the given AP cause
IEs can contain any of the optional or extended IEs: CriticalityDiagnostics
"""
if not self.connected.is_set():
# NGAP link disconnected
if self.NGAP_FORCE_PAGE:
# force to connect
if not self._net_init_con():
# unable to connect with the UE
return False
else:
return False
# prepare the S1AP procedure
IEs['Cause'] = cause
NgapProc = self.init_ngap_proc(NGAPErrorIndCN, **IEs)
if not NgapProc:
return False
if not self.transmit_s1ap_proc([NgapProc]):
return False
else:
return True
#--------------------------------------------------------------------------#
# to send arbitrary NAS buffers to the UE
#--------------------------------------------------------------------------#
# TODO
#--------------------------------------------------------------------------#
# 5G bearer activation
#--------------------------------------------------------------------------#
# TODO

View File

@ -1867,9 +1867,10 @@ class UES1d(SigStack):
self._log('DBG', 'paging: UE already connected')
return
# get the set of eNBs serving the UE TAI
# eNB id is 2-tuple whereas gNB id is 3-tuple
tai = (self.UE.PLMN, self.UE.TAC)
try:
enbs = [self.Server.RAN[enbid] for enbid in self.Server.TAI[tai]]
enbs = [self.Server.RAN[enbid] for enbid in self.Server.TAI[tai] if len(enbid) == 2]
except Exception:
self._log('ERR', 'paging: no eNB serving the UE TAI %s.%.4x' % tai)
return
@ -1879,7 +1880,7 @@ class UES1d(SigStack):
self._log('ERR', 'paging: missing basic information')
return
#
# start an S1APPaging procedure on all RNCs
# start an S1APPaging procedure on all eNBs
for enb in enbs:
enb.page(**IEs)
self._log('INF', 'paging: ongoing')
@ -1893,9 +1894,10 @@ class UES1d(SigStack):
self._log('DBG', 'paging: UE already connected')
return True
# get the set of eNBs serving the UE TAI
# eNB id is 2-tuple whereas gNB id is 3-tuple
tai = (self.UE.PLMN, self.UE.TAC)
try:
enbs = [self.Server.RAN[enbid] for enbid in self.Server.TAI[tai]]
enbs = [self.Server.RAN[enbid] for enbid in self.Server.TAI[tai] if len(enbid) == 2]
except Exception:
self._log('ERR', 'paging: no eNB serving the UE TAI %s.%.4x' % tai)
return False

View File

@ -560,10 +560,24 @@ class NGAPUEContextReleaseRequest(NGAPSigProc):
'suc': None,
'uns': None
}
recv = NGAPSigProc._recv
def trigger(self):
# copy the cause signaled by the gNB
Proc = self.NG.init_ngap_proc(NGAPUEContextRelease,
Cause=self.UEInfo['Cause'],
UE_NGAP_IDs=('uE-NGAP-ID-pair',
{'aMF-UE-NGAP-ID': self.NG.CtxId,
'rAN-UE-NGAP-ID': self.NG.CtxId}))
if Proc:
return [Proc]
else:
return []
class NGAPUEContextRelease(NGAPSigProc):
"""Initial Context Setup: TS 38.413, section 8.3.3
"""UE Context Release: TS 38.413, section 8.3.3
CN-initiated
request-reponse
@ -588,10 +602,10 @@ class NGAPUEContextRelease(NGAPSigProc):
# Custom decoders
Decod = {
'ini': ({
'ini': ({}, {}),
'suc': ({
'UserLocationInformation': lambda x: ngap_userloc_to_hum(x)
}, {}),
'suc': ({}, {}),
'uns': None
}
@ -601,6 +615,39 @@ class NGAPUEContextRelease(NGAPSigProc):
'suc': ({}, {}),
'uns': None
}
send = NGAPSigProc._send
def _release_ng(self):
# TODO: suspend all PDU sessions
#self.NG.FGSM.pdu_suspend()
# update mobility state
if self.NG.FGMM.state != 'INACTIVE':
self.NG.FGMM.state = 'IDLE'
self._log('INF', 'UE disconnected, cause %r' % (self._NetInfo['Cause'], ))
#
# disconnect the NG interface to the gNB for the UE
self.NG.unset_ran()
self.NG.unset_ctx()
def recv(self, pdu):
# recv the NGAPUEContextRelease response
self._recv(pdu)
# remove from the NGAP procedure stack
try:
del self.NG.Proc[self.Code]
except Exception:
pass
self._release_ng()
def abort(self):
# remove from the NGAP procedure stack
try:
del self.NG.Proc[self.Code]
except Exception:
pass
self._log('INF', 'aborting')
self._release_ng()
class NGAPUEContextModification(NGAPSigProc):
@ -1202,7 +1249,18 @@ class NGAPUplinkNASTransport(NGAPSigProc):
if not self.errcause:
# verification of UserLocInfo against GNBd parameters:
err = False
# TODO
userloc = self.UEInfo['UserLocationInformation']
tai = userloc['TAI']
if 'NR-CGI' in userloc:
cgi = userloc['NR-CGI']
else:
cgi = userloc['EUTRA-CGI']
if cgi[0] != self.GNB.ID[0] or cgi[1][0] != self.GNB.ID[2][0]:
self._log('WNG', 'invalid Cell Global-ID, %s.%.9x' % (cgi[0], cgi[1][0]))
err = True
elif tai not in self.GNB.Config['TAIs']:
self._log('WNG', 'invalid TAI, %s.%.6x' % tai)
err = True
if err:
self.errcause = ('protocol', 'message-not-compatible-with-receiver-state')
#