diff --git a/pycrate_corenet/HdlrGNB.py b/pycrate_corenet/HdlrGNB.py index 2bd6f51..708af4d 100644 --- a/pycrate_corenet/HdlrGNB.py +++ b/pycrate_corenet/HdlrGNB.py @@ -366,7 +366,7 @@ class GNBd(object): return None, ran_ue_id elif FgsIdType == FGSIDTYPE_GUTI: # TODO: should ensure PLMN and AMF identifiers correspond - return self.Server.get_ued(mtmsi=FgsId['5GTMSI'].get_val()), ran_ue_id + return self.Server.get_ued(fgtmsi=FgsId['5GTMSI'].get_val()), ran_ue_id else: self._log('WNG', 'Unable to get UE id from NAS message, 5GSID of '\ 'unexpected type %i' % FgsIdType) diff --git a/pycrate_corenet/HdlrUE.py b/pycrate_corenet/HdlrUE.py index f8ac049..45cfd71 100644 --- a/pycrate_corenet/HdlrUE.py +++ b/pycrate_corenet/HdlrUE.py @@ -28,11 +28,11 @@ # *-------------------------------------------------------- #*/ -from .utils import * from .HdlrUEIuCS import * from .HdlrUEIuPS import * from .HdlrUES1 import * from .HdlrUENG import * +from .utils import * class UEd(SigStack): @@ -75,10 +75,11 @@ class UEd(SigStack): IMSI = None IMEI = None IMEISV = None - # temporary identities (TMSI / PTMSI / MTMSI are uint32) + # temporary identities (TMSI / PTMSI / MTMSI / FGTMSI are uint32) TMSI = None # CS domain PTMSI = None # PS domain - MTMSI = None # EPS / 5GS domains + MTMSI = None # EPS domain + FGTMSI = None # 5GS domain #--------------------------------------------------------------------------# # CorenetServer reference @@ -118,11 +119,17 @@ class UEd(SigStack): if imsi: self.IMSI = imsi elif 'tmsi' in kw: + # CS domain, 3G self.TMSI = kw['tmsi'] elif 'ptmsi' in kw: + # PS domain, 3G self.PTMSI = kw['ptmsi'] elif 'mtmsi' in kw: + # EPS domain, 4G self.MTMSI = kw['mtmsi'] + elif 'fgtmsi' in kw: + # 5GS domain, 5G + self.FGTMSI = kw['fgtmsi'] # # init capabilities self.Cap = {} @@ -405,6 +412,8 @@ class UEd(SigStack): def get_new_tmsi(self): # use the Python random generator + # WARNING: not good for randomness, but good enough for corenet + # and at least with some good uniqueness return random.getrandbits(32) def set_tmsi(self, tmsi): @@ -438,12 +447,23 @@ class UEd(SigStack): del self.Server.MTMSI[self.MTMSI] except Exception: pass - # set the new PTMSI + # set the new MTMSI self.MTMSI = mtmsi # update the Server LUT self.Server.MTMSI[mtmsi] = self.IMSI - # TODO: handle 5G NAS identities + def set_fgtmsi(self, fgtmsi): + # delete current 5GTMSI from the Server LUT + if self.FGTMSI is not None: + try: + del self.Server.FGTMSI[self.FGTMSI] + except Exception: + pass + # set the new 5G TMSI + self.FGTMSI = fgtmsi + # update the Server LUT + self.Server.FGTMSI[fgtmsi] = self.FGTMSI + #--------------------------------------------------------------------------# # UE location @@ -771,3 +791,4 @@ class UEd(SigStack): return '\n\n'.join(txt) else: return '' + diff --git a/pycrate_corenet/HdlrUENG.py b/pycrate_corenet/HdlrUENG.py index 5eeffeb..82ca2b6 100644 --- a/pycrate_corenet/HdlrUENG.py +++ b/pycrate_corenet/HdlrUENG.py @@ -3,7 +3,7 @@ # * Software Name : pycrate # * Version : 0.4 # * -# * Copyright 2020. Benoit Michau. ANSSI. +# * Copyright 2020. Benoit Michau. P1Sec. # * # * This library is free software; you can redistribute it and/or # * modify it under the terms of the GNU Lesser General Public @@ -29,9 +29,9 @@ from .utils import * from .ProcCNNgap import * -# load all required 5GS NAS protocol handlers and SMS handler -#from .ProcCNFGMM import * -#from .HdlrUESMS import * +from .ProcCNFGMM import * +from .ProcCNFGSM import * +from .HdlrUESMS import * class UEFGMMd(SigStack): @@ -55,17 +55,27 @@ class UEFGMMd(SigStack): # additional time for letting background task happen in priority _WAIT_ADD = 0.005 - # list of 5GMM message types that do not require NAS security to be - # activated to be processed - SEC_NOTNEED = {} - # to disable completely the check for secured NAS message - SEC_DISABLED = False + + #--------------------------------------------------------------------------# + # FGMM timers + #--------------------------------------------------------------------------# + + # MT Deregistration + T3522 = 1 + # Registration + T3550 = 1 + # UE Config Update + T3555 = 2 + # AKA, SMC + T3560 = 2 + # Identification + T3570 = 1 + # NSSAI Auth + T3575 = 2 - - def _log(self, logtype, msg): self.NG._log(logtype, '[5GMM] %s' % msg) @@ -89,13 +99,11 @@ class UEFGMMd(SigStack): """process a NAS 5GMM message (NasRx) sent by the UE, and return a list (possibly empty) of NGAP procedure(s) to be sent back to the gNB - - NasRx has 2 additional attributes (_sec [bool], _ulcnt [uint]) """ # TODO return [] - def init_proc(self, ProcClass, encod=None, fgmm_preempt=False, sec=True): + def init_proc(self, ProcClass, encod=None, fgmm_preempt=False): """initialize a CN-initiated 5GMM procedure of class `ProcClass' and given encoder(s), and return the procedure """ @@ -133,7 +141,7 @@ class UEFGMMd(SigStack): else: return True - def run_proc(self, ProcClass, sec=True, **IEs): + def run_proc(self, ProcClass, **IEs): """run a network-initiated procedure ProcClass in the context of the 5GMM stack, after setting the given IEs in the NAS message to be sent to the UE @@ -147,7 +155,7 @@ class UEFGMMd(SigStack): if not self._net_init_con(): return False, None # - Proc = self.init_proc(ProcClass, encod={ProcClass.Init: IEs}, fgmm_preempt=True, sec=sec) + Proc = self.init_proc(ProcClass, encod={ProcClass.Init: IEs}, fgmm_preempt=True) try: NgapTxProc = Proc.output() except Exception: @@ -190,15 +198,6 @@ class UEFGSMd(SigStack): # to bypass the process() server loop with a custom NAS PDU handler RX_HOOK = None - # list of ESM message types that do not require NAS security to be - # activated to be processed - SEC_NOTNEED = { - } - # to disable completely the check for secured NAS message - SEC_DISABLED = False - - - def _log(self, logtype, msg): self.NG._log(logtype, '[5GSM] %s' % msg) @@ -224,8 +223,6 @@ class UEFGSMd(SigStack): and return a list (possibly empty) of NGAP procedure(s) to be sent back to the gNB - NasRx has 2 additional attributes (_sec [bool], _ulcnt [uint]) - FGMMProc [FMMSigProc or None], indicates if the NAS FGSM message is handled in the context of an FGMM procedure """ @@ -361,6 +358,7 @@ class UENGd(SigStack): self.reset_sec_ctx() # self.connected = Event() + self.nasinit = Event() # state for initial NAS message if gnbd is not None: self.set_ran(gnbd) else: @@ -369,12 +367,13 @@ class UENGd(SigStack): # init 5GMM and 5GSM sig stacks self.FGMM = UEFGMMd(ued, self) self.FGSM = UEFGSMd(ued, self) - #self.SMS = UESMSd(ued, self) + self.SMS = UESMSd(ued, self) def set_ran(self, gnbd): self.SEC['KSI'] = None self.GNB = gnbd self.connected.set() + self.nasinit.set() def unset_ran(self): self.GNB.unset_ue_ng(self.CtxId) @@ -382,6 +381,7 @@ class UENGd(SigStack): self.SEC['KSI'] = None self.clear() self.connected.clear() + self.nasinit.clear() def set_ran_unconnected(self, gnbd): # required for paging @@ -575,13 +575,15 @@ class UENGd(SigStack): return self.ret_ngap_dnt(NAS.FGMMStatus(val={'5GMMCause':err}, sec=False)) # # 5GS NAS security handling + + sh, pd = NasRxSec[0]['SecHdr'].get_val(), NasRxSec[0]['EPD'].get_val() if sh == 0: # clear-text NAS message NasRxSec._sec = False NasRxSec._ulcnt = 0 - if self.UE.TRACE_NAS_EPS: - self._log('TRACE_NAS_5G_UL', '\n' + NasRxSec.show()) + if self.UE.TRACE_NAS_5GS: + self._log('TRACE_NAS_5GS_UL', '\n' + NasRxSec.show()) if pd == 126: NgapTxProc = self.FGMM.process(NasRxSec) else: @@ -707,7 +709,6 @@ class UENGd(SigStack): else: return [] - def _ngap_nas_sec_err(self): # TODO: maybe release the NG-UE link ? return [] @@ -716,6 +717,7 @@ class UENGd(SigStack): # clears all NAS EPS procedures self.FGMM.clear() self.FGSM.clear() + self.SMS.clear() #--------------------------------------------------------------------------# # network-initiated method (fg task, to be used from the interpreter) diff --git a/pycrate_corenet/HdlrUES1.py b/pycrate_corenet/HdlrUES1.py index fc017a4..188c756 100644 --- a/pycrate_corenet/HdlrUES1.py +++ b/pycrate_corenet/HdlrUES1.py @@ -68,17 +68,18 @@ class UEEMMd(SigStack): # list of EMM message types that do not require NAS security to be # activated to be processed - SEC_NOTNEED = {'EMMAttachRequest', - 'EMMIdentityResponse', # only for IMSI - 'EMMAuthenticationResponse', - 'EMMAuthenticationFailure', - 'EMMSecurityModeReject', - 'EMMDetachRequestMO', # if sent before security activation - 'EMMDetachAccept', - 'EMMTrackingAreaUpdateRequest', - 'EMMServiceRequest', - 'EMMExtServiceRequest' - } + SEC_NOTNEED = { + 'EMMAttachRequest', + 'EMMIdentityResponse', # only for IMSI + 'EMMAuthenticationResponse', + 'EMMAuthenticationFailure', + 'EMMSecurityModeReject', + 'EMMDetachRequestMO', # if sent before security activation + 'EMMDetachAccept', + 'EMMTrackingAreaUpdateRequest', + 'EMMServiceRequest', + 'EMMExtServiceRequest' + } # to disable completely the check for secured NAS message SEC_DISABLED = False diff --git a/pycrate_corenet/ProcCNFGMM.py b/pycrate_corenet/ProcCNFGMM.py new file mode 100644 index 0000000..99d2015 --- /dev/null +++ b/pycrate_corenet/ProcCNFGMM.py @@ -0,0 +1,794 @@ +# -*- coding: UTF-8 -*- +#/** +# * Software Name : pycrate +# * Version : 0.4 +# * +# * Copyright 2021. Benoit Michau. P1Sec. +# * +# * This library is free software; you can redistribute it and/or +# * modify it under the terms of the GNU Lesser General Public +# * License as published by the Free Software Foundation; either +# * version 2.1 of the License, or (at your option) any later version. +# * +# * This library is distributed in the hope that it will be useful, +# * but WITHOUT ANY WARRANTY; without even the implied warranty of +# * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# * Lesser General Public License for more details. +# * +# * You should have received a copy of the GNU Lesser General Public +# * License along with this library; if not, write to the Free Software +# * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, +# * MA 02110-1301 USA +# * +# *-------------------------------------------------------- +# * File Name : pycrate_corenet/ProcCNFGMM.py +# * Created : 2021-04-27 +# * Authors : Benoit Michau +# *-------------------------------------------------------- +#*/ + +__all__ = [ + 'FGMMSigProc', + # + 'FGMMPrimAKA', + 'FGMMSecurityModeControl', + 'FGMMIdentification', + 'FGMMGenericUEConfigUpdate', + 'FGMMMONASTransport', + 'FGMMMTNASTransport', + 'FGMMNSSAA', + # + 'FGMMRegistration', + 'FGMMMODeregistration', + 'FGMMMTDeregistration', + 'FGMMServiceRequest', + 'FGMMCtrlPlaneServiceRequest', + # + 'FGMMProcUeDispatcher', + 'FGMMProcUeDispatcherStr', + 'FGMMProcCnDispatcher', + 'FGMMProcCnDispatcherStr' + ] + + +from .utils import * +from .ProcProto import * +from .ProcCNNgap import * + + +TESTING = False + +#------------------------------------------------------------------------------# +# NAS 5GS Mobility Management signalling procedure +# TS 24.501, version h21 +# Core Network side +#------------------------------------------------------------------------------# + +class FGMMSigProc(NASSigProc): + """5GS Mobility Management signalling procedure handler + + instance attributes: + - Name : procedure name + - FGMM : reference to the UEFGMMd instance running this procedure + - NG : reference to the UENGd instance connecting the UE + - Cont : 2-tuple of CN-initiated NAS message(s) and UE-initiated NAS + message(s) + - Timer: timer in sec. for this procedure + - Encod: custom NAS message encoders with fixed values + - Decod: custom NAS message decoders with transform functions + """ + + # tacking all exchanged NAS message within the procedure + TRACK_PDU = True + + # potential timer + Timer = None + TimerDefault = 4 + + # network initiator message id + Init = None + + if TESTING: + def __init__(self, encod=None): + self._prepare(encod) + self._log('DBG', 'instantiating procedure') + + def _log(self, logtype, msg): + log('[TESTING] [%s] [FGMMSigProc] [%s] %s' % (logtype, self.Name, msg)) + + else: + def __init__(self, fgmmd, encod=None, fgmm_preempt=False, sec=True): + self._prepare(encod) + self.FGMM = fgmmd + self.NG = fgmmd.NG + self.UE = fgmmd.UE + self._fgmm_preempt = fgmm_preempt + if fgmm_preempt: + self.FGMM.ready.clear() + self._log('DBG', 'instantiating procedure') + + def _log(self, logtype, msg): + self.FGMM._log(logtype, '[%s] %s' % (self.Name, msg)) + + def output(self): + self._log('ERR', 'output() not implemented') + return [] + + def process(self, pdu): + if self.TRACK_PDU: + self._pdu.append( (time(), 'UL', pdu) ) + self.UEInfo = {} + self.decode_msg(pdu, self.UEInfo) + # + self._log('ERR', 'process() not implemented') + return [] + + def postprocess(self, Proc=None): + self._log('ERR', 'postprocess() not implemented') + self.rm_from_fgmm_stack() + return [] + + def abort(self): + # abort this procedure, and all procedures started within this one + ind = self.FGMM.Proc.index(self) + if ind >= 0: + for p in self.FGMM.Proc[ind+1:]: + p.abort() + del self.FGMM.Proc[ind:] + if self._fgmm_preempt: + # release the FGMM stack + self.FGMM.ready.set() + self._log('INF', 'aborting') + + def rm_from_fgmm_stack(self): + # remove the procedure from the FGMM stack of procedures + try: + if self.FGMM.Proc[-1] == self: + del self.FGMM.Proc[-1] + except Exception: + self._log('WNG', 'FGMM stack corrupted') + else: + if self._fgmm_preempt: + # release the FGMM stack + self.FGMM.ready.set() + + def init_timer(self): + if self.Timer is not None: + self.TimerValue = getattr(self.FGMM, self.Timer, self.TimerDefault) + self.TimerStart = time() + self.TimerStop = self.TimerStart + self.TimerValue + + def get_timer(self): + if self.Timer is None: + return None + else: + return getattr(self.FGMM, self.Timer) + + def fgmm_preempt(self): + self._fgmm_preempt = True + self.FGMM.ready.clear() + + #--------------------------------------------------------------------------# + # common helpers + #--------------------------------------------------------------------------# + # None yet + + +#------------------------------------------------------------------------------# +# 5GMM common procedures: TS 24.501, section 5.4 +#------------------------------------------------------------------------------# + +class FGMMPrimAKA(FGMMSigProc): + """Primary authentication and key agreement procedure: TS 24.501, section 5.4.1 + + CN-initiated + + CN message: + 5GMMAuthenticationRequest (PD 126, Type 86), IEs: + - Type1V : spare + - Type1V : NAS_KSI + - Type4LV : ABBA + - Type3TV : RAND (T: 33) + - Type4TLV : AUTN (T: 32) + - Type6TLVE : EAPMsg (T: 120) + + 5GMMAuthenticationReject (PD 126, Type 88), IEs: + - Type6TLVE : EAPMsg (T: 120) + + 5GMMAuthenticationResult (PD 126, Type 90), IEs: + - Type1V : spare + - Type1V : NAS_KSI + - Type6LVE : EAPMsg + - Type4TLV : ABBA (T: 56) + + UE message: + 5GMMAuthenticationResponse (PD 126, Type 87), IEs: + - Type4TLV : RES (T: 45) + - Type6TLVE : EAPMsg (T: 120) + + 5GMMAuthenticationFailure (PD 126, Type 89), IEs: + - Type3V : 5GMMCause + - Type4TLV : AUTS (T: 48) + """ + + Cont = ( + (TS24501_FGMM.FGMMAuthenticationRequest, TS24501_FGMM.FGMMAuthenticationReject), + (TS24501_FGMM.FGMMAuthenticationResponse, TS24501_FGMM.FGMMAuthenticationFailure) + ) + + Init = (126, 86) + Timer = 'T3560' + + '''TODO + def output(self): + return [] + + def process(self, pdu): + return [] + ''' + + +class FGMMSecurityModeControl(FGMMSigProc): + """Security mode control procedure: TS 24.501, section 5.4.2 + + CN-initiated + + CM message: + 5GMMSecurityModeCommand (PD 126, Type 93), IEs: + - Type3V : NASSecAlgo + - Type1V : spare + - Type1V : NAS_KSI + - Type4LV : UESecCap + - Type1TV : IMEISVReq (T: 14) + - Type3TV : EPSNASSecAlgo (T: 87) + - Type4TLV : Add5GSecInfo (T: 54) + - Type6TLVE : EAPMsg (T: 120) + - Type4TLV : ABBA (T: 56) + - Type4TLV : S1UESecCap (T: 25) + + UE message: + 5GMMSecurityModeComplete (PD 126, Type 94), IEs: + - Type6TLVE : IMEISV (T: 119) + - Type6TLVE : NASContainer (T: 113) + - Type6TLVE : PEI (T: 120) + + 5GMMSecurityModeReject (PD 126, Type 95), IEs: + - Type3V : 5GMMCause + """ + + Cont = ( + (TS24501_FGMM.FGMMSecurityModeCommand, ), + (TS24501_FGMM.FGMMSecurityModeComplete, TS24501_FGMM.FGMMSecurityModeReject) + ) + + Init = (126, 93) + Timer = 'T3560' + + '''TODO + def output(self): + return [] + + def process(self, pdu): + return [] + ''' + + +class FGMMIdentification(FGMMSigProc): + """Identification procedure: TS 24.501, section 5.4.3 + + CN-initiated + + CN message: + 5GMMIdentityRequest (PD 126, Type 91), IEs: + - Type1V : spare + - Type1V : 5GSIDType + + UE message: + 5GMMIdentityResponse (PD 126, Type 92), IEs: + - Type6LVE : 5GSID + """ + + Cont = ( + (NAS.FGMMIdentityRequest, ), + (NAS.FGMMIdentityResponse, ) + ) + + Init = (126, 91) + Timer = 'T3570' + + '''TODO + def output(self): + return [] + + def process(self, pdu): + return [] + ''' + + +class FGMMGenericUEConfigUpdate(FGMMSigProc): + """Generic UE configuration update procedure: TS 24.501, section 5.4.4 + + CN-initiated + + CN message: + 5GMMConfigurationUpdateCommand (PD 126, Type 84), IEs: + - Type1TV : ConfigUpdateInd (T: 13) + - Type6TLVE : GUTI (T: 119) + - Type4TLV : 5GSTAIList (T: 84) + - Type4TLV : AllowedNSSAI (T: 21) + - Type4TLV : SAList (T: 39) + - Type4TLV : NetFullName (T: 67) + - Type4TLV : NetShortName (T: 69) + - Type3TV : LocalTimeZone (T: 70) + - Type3TV : UnivTimeAndTimeZone (T: 71) + - Type4TLV : DLSavingTime (T: 73) + - Type6TLVE : LADNInfo (T: 121) + - Type1TV : MICOInd (T: 11) + - Type1TV : NetSlicingInd (T: 9) + - Type4TLV : ConfiguredNSSAI (T: 49) + - Type4TLV : RejectedNSSAI (T: 17) + - Type6TLVE : OperatorAccessCatDefs (T: 118) + - Type1TV : SMSInd (T: 15) + - Type4TLV : T3447 (T: 108) + - Type6TLVE : CAGInfoList (T: 117) + - Type4TLV : UERadioCapID (T: 103) + - Type1TV : UERadioCapIDDelInd (T: 10) + - Type4TLV : 5GSRegResult (T: 68) + - Type4TLV : Trunc5GSTMSIConfig (T: 27) + - Type1TV : AddConfigInd (T: 12) + + UE message: + 5GMMConfigurationUpdateComplete (PD 126, Type 85), IEs: + None + """ + + Cont = ( + (NAS.FGMMConfigurationUpdateCommand, ), + (NAS.FGMMConfigurationUpdateComplete, ) + ) + + Init = (126, 84) + Timer = 'T3555' + + '''TODO + def output(self): + return [] + + def process(self, pdu): + return [] + ''' + + +class FGMMMONASTransport(FGMMSigProc): + """UE-initiated NAS transport procedure: TS 24.501, section 5.4.5.2 + + UE-initiated + + CN message: + None + + UE message: + 5GMMULNASTransport (PD 126, Type 103), IEs: + - Type1V : spare + - Type1V : PayloadContainerType + - Type6LVE : PayloadContainer + - Type3TV : PDUSessID (T: 18) + - Type3TV : OldPDUSessID (T: 89) + - Type1TV : RequestType (T: 8) + - Type4TLV : SNSSAI (T: 34) + - Type4TLV : DNN (T: 37) + - Type4TLV : AddInfo (T: 36) + - Type1TV : MAPDUSessInfo (T: 10) + - Type1TV : ReleaseAssistInd (T: 15) + """ + + Cont = ( + None, + (NAS.FGMMULNASTransport,) + ) + + Init = (126, 103) + + '''TODO + def process(self, pdu): + return [] + ''' + + +class FGMMMTNASTransport(FGMMSigProc): + """Network-initiated NAS transport procedure: TS 24.501, section 5.4.5.3 + + CN-initiated + + CN message: + 5GMMDLNASTransport (PD 126, Type 104), IEs: + - Type1V : spare + - Type1V : PayloadContainerType + - Type6LVE : PayloadContainer + - Type3TV : PDUSessID (T: 18) + - Type4TLV : AddInfo (T: 36) + - Type3TV : 5GMMCause (T: 88) + - Type4TLV : BackOffTimer (T: 55) + + UE message: + None + """ + + Cont = ( + (NAS.FGMMDLNASTransport, ), + None + ) + + Init = (126, 104) + + '''TODO + def output(self): + return [] + ''' + + +class FGMMNSSAA(FGMMSigProc): + """Network slice-specific authentication and authorization procedure: TS 24.501, section 5.4.7 + + CN-initiated + + CN message: + 5GMMNetworkSliceSpecAuthCommand (PD 126, Type 80), IEs: + - Type4LV : SNSSAI + - Type6LVE : EAPMsg + + 5GMMNetworkSliceSpecAuthResult (PD 126, Type 82), IEs: + - Type4LV : SNSSAI + - Type6LVE : EAPMsg + + UE message: + 5GMMNetworkSliceSpecAuthComplete (PD 126, Type 81), IEs: + - Type4LV : SNSSAI + - Type6LVE : EAPMsg + """ + + Cont = ( + (NAS.FGMMNetworkSliceSpecAuthCommand, NAS.FGMMNetworkSliceSpecAuthResult), + (NAS.FGMMNetworkSliceSpecAuthComplete, ) + ) + + Init = (126, 80) + Timer = 'T3575' + + '''TODO + def output(self): + return [] + + def process(self, pdu): + return [] + ''' + + +#------------------------------------------------------------------------------# +# 5GMM specific procedures: TS 24.501, section 5.5 +#------------------------------------------------------------------------------# + +class FGMMRegistration(FGMMSigProc): + """Registration procedure: TS 24.501, section 5.5.1 + + UE-initiated + + CN message: + 5GMMRegistrationAccept (PD 126, Type 66), IEs: + - Type4LV : 5GSRegResult + - Type6TLVE : GUTI (T: 119) + - Type4TLV : EquivPLMNList (T: 74) + - Type4TLV : 5GSTAIList (T: 84) + - Type4TLV : AllowedNSSAI (T: 21) + - Type4TLV : RejectedNSSAI (T: 17) + - Type4TLV : ConfiguredNSSAI (T: 49) + - Type4TLV : 5GSNetFeat (T: 33) + - Type4TLV : PDUSessStat (T: 80) + - Type4TLV : PDUSessReactResult (T: 38) + - Type6TLVE : PDUSessReactResultErr (T: 114) + - Type6TLVE : LADNInfo (T: 121) + - Type1TV : MICOInd (T: 11) + - Type1TV : NetSlicingInd (T: 9) + - Type4TLV : SAList (T: 39) + - Type4TLV : T3512 (T: 94) + - Type4TLV : Non3GPPDeregTimer (T: 93) + - Type4TLV : T3502 (T: 22) + - Type4TLV : EmergNumList (T: 52) + - Type6TLVE : ExtEmergNumList (T: 122) + - Type6TLVE : SORTransContainer (T: 115) + - Type6TLVE : EAPMsg (T: 120) + - Type1TV : NSSAIInclMode (T: 10) + - Type6TLVE : OperatorAccessCatDefs (T: 118) + - Type4TLV : 5GSDRXParam (T: 81) + - Type1TV : Non3GPPNWProvPol (T: 13) + - Type4TLV : EPSBearerCtxtStat (T: 96) + - Type4TLV : ExtDRXParam (T: 110) + - Type4TLV : T3447 (T: 108) + - Type4TLV : T3448 (T: 107) + - Type4TLV : T3324 (T: 106) + - Type4TLV : UERadioCapID (T: 103) + - Type4TLV : PendingNSSAI (T: 57) + - Type6TLVE : CipheringKeyData (T: 116) + - Type6TLVE : CAGInfoList (T: 117) + - Type4TLV : Trunc5GSTMSIConfig (T: 27) + - Type4TLV : WUSAssistInfo (T: 26) + - Type4TLV : NBN1ModeDRXParam (T: 41) + + 5GMMRegistrationReject (PD 126, Type 68), IEs: + - Type3V : 5GMMCause + - Type4TLV : T3346 (T: 95) + - Type4TLV : T3502 (T: 22) + - Type6TLVE : EAPMsg (T: 120) + - Type4TLV : RejectedNSSAI (T: 105) + - Type6TLVE : CAGInfoList (T: 117) + + UE message: + 5GMMRegistrationRequest (PD 126, Type 65), IEs: + - Type1V : NAS_KSI + - Type1V : 5GSRegType + - Type6LVE : 5GSID + - Type1TV : NonCurrentNativeNAS_KSI (T: 12) + - Type4TLV : 5GMMCap (T: 16) + - Type4TLV : UESecCap (T: 46) + - Type4TLV : NSSAI (T: 47) + - Type3TV : TAI (T: 82) + - Type4TLV : EPSUENetCap (T: 23) + - Type4TLV : ULDataStat (T: 64) + - Type4TLV : PDUSessStat (T: 80) + - Type1TV : MICOInd (T: 11) + - Type4TLV : UEStatus (T: 43) + - Type6TLVE : AddGUTI (T: 119) + - Type4TLV : AllowedPDUSessStat (T: 37) + - Type4TLV : UEUsage (T: 24) + - Type4TLV : 5GSDRXParam (T: 81) + - Type6TLVE : EPSNASContainer (T: 112) + - Type6TLVE : LADNInd (T: 116) + - Type1TV : PayloadContainerType (T: 8) + - Type6TLVE : PayloadContainer (T: 123) + - Type1TV : NetSlicingInd (T: 9) + - Type4TLV : 5GSUpdateType (T: 83) + - Type4TLV : MSCm2 (T: 65) + - Type4TLV : SuppCodecs (T: 66) + - Type6TLVE : NASContainer (T: 113) + - Type4TLV : EPSBearerCtxtStat (T: 96) + - Type4TLV : ExtDRXParam (T: 110) + - Type4TLV : T3324 (T: 106) + - Type4TLV : UERadioCapID (T: 103) + - Type4TLV : MappedNSSAI (T: 53) + - Type4TLV : AddInfoReq (T: 72) + - Type4TLV : WUSAssistInfo (T: 26) + - Type2 : N5GCInd (T: 10) + - Type4TLV : NBN1ModeDRXParam (T: 48) + + 5GMMRegistrationComplete (PD 126, Type 67), IEs: + - Type6TLVE : SORTransContainer (T: 115) + """ + + Cont = ( + (NAS.FGMMRegistrationAccept, NAS.FGMMRegistrationReject), + (NAS.FGMMRegistrationRequest, NAS.FGMMRegistrationComplete) + ) + + Init = (126, 65) + Timer = 'T3550' + + '''TODO + def process(self, pdu): + return [] + + def output(self): + return [] + ''' + + +class FGMMMODeregistration(FGMMSigProc): + """UE-initiated de-registration procedure: TS 24.501, section 5.5.2.2 + + UE-initiated + + CN message: + 5GMMMODeregistrationAccept (PD 126, Type 70), IEs: + None + + UE message: + 5GMMMODeregistrationRequest (PD 126, Type 69), IEs: + - Type1V : NAS_KSI + - Type1V : DeregistrationType + - Type6LVE : 5GSID + """ + + Cont = ( + (NAS.FGMMMODeregistrationAccept, ), + (NAS.FGMMMODeregistrationRequest, ) + ) + + Init = (126, 69) + + '''TODO + def process(self, pdu): + return [] + + def output(self): + return [] + ''' + + +class FGMMMTDeregistration(FGMMSigProc): + """Network-initiated de-registration procedure: TS 24.501, section 5.5.2.3 + + CN-initiated + + CN message: + 5GMMMTDeregistrationRequest (PD 126, Type 71), IEs: + - Type1V : spare + - Type1V : DeregistrationType + - Type3TV : 5GMMCause (T: 88) + - Type4TLV : T3346 (T: 95) + - Type4TLV : RejectedNSSAI (T: 109) + + UE message: + 5GMMMTDeregistrationAccept (PD 126, Type 71), IEs: + None + """ + + Cont = ( + (NAS.FGMMMTDeregistrationRequest, ), + (NAS.FGMMMTDeregistrationAccept, ) + ) + + Init = (126, 71) + Timer = 'T3522' + + '''TODO + def output(self): + return [] + + def process(self, pdu): + return [] + ''' + + +#------------------------------------------------------------------------------# +# 5GMM connection management procedures: TS 24.501, section 5.6 +#------------------------------------------------------------------------------# + +class FGMMServiceRequest(FGMMSigProc): + """Service request procedure: TS 24.501, section 5.6.1 + + UE-initiated + + CN message: + 5GMMServiceAccept (PD 126, Type 78), IEs: + - Type4TLV : PDUSessStat (T: 80) + - Type4TLV : PDUSessReactResult (T: 38) + - Type6TLVE : PDUSessReactResultErr (T: 114) + - Type6TLVE : EAPMsg (T: 120) + - Type4TLV : T3448 (T: 107) + + 5GMMServiceAccept (PD 126, Type 77), IEs: + - Type3V : 5GMMCause + - Type4TLV : PDUSessStat (T: 80) + - Type4TLV : T3346 (T: 95) + - Type6TLVE : EAPMsg (T: 120) + - Type4TLV : T3448 (T: 107) + - Type6TLVE : CAGInfoList (T: 117) + + UE message: + 5GMMServiceRequest (PD 126, Type 76), IEs: + - Type1V : ServiceType + - Type1V : NAS_KSI + - Type6LVE : 5GSID + - Type4TLV : ULDataStat (T: 64) + - Type4TLV : PDUSessStat (T: 80) + - Type4TLV : AllowedPDUSessStat (T: 37) + - Type6TLVE : NASContainer (T: 113) + """ + + Cont = ( + (NAS.FGMMServiceAccept, NAS.FGMMServiceReject), + (NAS.FGMMServiceRequest, ) + ) + + Init = (126, 76) + + '''TODO + def process(self, pdu): + return [] + + def output(self): + return [] + ''' + + +class FGMMCtrlPlaneServiceRequest(FGMMSigProc): + """Service request procedure: TS 24.501, section 5.6.1 + + UE-initiated + + CN message: + 5GMMServiceAccept (PD 126, Type 78), IEs: + - Type4TLV : PDUSessStat (T: 80) + - Type4TLV : PDUSessReactResult (T: 38) + - Type6TLVE : PDUSessReactResultErr (T: 114) + - Type6TLVE : EAPMsg (T: 120) + - Type4TLV : T3448 (T: 107) + + 5GMMServiceAccept (PD 126, Type 77), IEs: + - Type3V : 5GMMCause + - Type4TLV : PDUSessStat (T: 80) + - Type4TLV : T3346 (T: 95) + - Type6TLVE : EAPMsg (T: 120) + - Type4TLV : T3448 (T: 107) + - Type6TLVE : CAGInfoList (T: 117) + + UE message: + 5GMMControlPlaneServiceRequest (PD 126, Type 79), IEs: + - Type1V : NAS_KSI + - Type1V : CtrlPlaneServiceType + - Type4TLV : CIoTSmallDataContainer (T: 111) + - Type1TV : PayloadContainerType (T: 8) + - Type6TLVE : PayloadContainer (T: 123) + - Type3TV : PDUSessID (T: 18) + - Type4TLV : PDUSessStat (T: 80) + - Type1TV : ReleaseAssistInd (T: 15) + - Type4TLV : ULDataStat (T: 64) + - Type6TLVE : NASContainer (T: 113) + - Type4TLV : AddInfo (T: 36) + """ + + Cont = ( + (NAS.FGMMServiceAccept, NAS.FGMMServiceReject), + (NAS.FGMMControlPlaneServiceRequest, ) + ) + + Init = (126, 79) + + '''TODO + def process(self, pdu): + return [] + + def output(self): + return [] + ''' + + + +FGMMPrimAKA.init(filter_init=1) +FGMMSecurityModeControl.init(filter_init=1) +FGMMIdentification.init(filter_init=1) +FGMMGenericUEConfigUpdate.init(filter_init=1) +FGMMMONASTransport.init(filter_init=1) +FGMMMTNASTransport.init(filter_init=1) +FGMMNSSAA.init(filter_init=1) +FGMMRegistration.init(filter_init=1) +FGMMMODeregistration.init(filter_init=1) +FGMMMTDeregistration.init(filter_init=1) +FGMMServiceRequest.init(filter_init=1) +FGMMCtrlPlaneServiceRequest.init(filter_init=1) + +# 5G MM UE-initiated procedures dispatcher +FGMMProcUeDispatcher = { + 103 : FGMMMONASTransport, + 65 : FGMMRegistration, + 69 : FGMMMODeregistration, + 76 : FGMMServiceRequest, + 79 : FGMMCtrlPlaneServiceRequest, + } + +FGMMProcUeDispatcherStr = {ProcClass.Cont[1][0]()._name: ProcClass \ + for ProcClass in FGMMProcUeDispatcher.values()} + +# 5G MM CN-initiated procedures dispatcher +FGMMProcCnDispatcher = { + 86 : FGMMPrimAKA, + 93 : FGMMSecurityModeControl, + 91 : FGMMIdentification, + 84 : FGMMGenericUEConfigUpdate, + 104 : FGMMMTNASTransport, + 80 : FGMMNSSAA, + 71 : FGMMMTDeregistration, + } + +FGMMProcCnDispatcherStr = {ProcClass.Cont[0][0]()._name: ProcClass \ + for ProcClass in FGMMProcCnDispatcher.values()} + diff --git a/pycrate_corenet/ProcCNFGSM.py b/pycrate_corenet/ProcCNFGSM.py new file mode 100644 index 0000000..c2982b7 --- /dev/null +++ b/pycrate_corenet/ProcCNFGSM.py @@ -0,0 +1,179 @@ +# -*- coding: UTF-8 -*- +#/** +# * Software Name : pycrate +# * Version : 0.4 +# * +# * Copyright 2021. Benoit Michau. P1Sec. +# * +# * This library is free software; you can redistribute it and/or +# * modify it under the terms of the GNU Lesser General Public +# * License as published by the Free Software Foundation; either +# * version 2.1 of the License, or (at your option) any later version. +# * +# * This library is distributed in the hope that it will be useful, +# * but WITHOUT ANY WARRANTY; without even the implied warranty of +# * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# * Lesser General Public License for more details. +# * +# * You should have received a copy of the GNU Lesser General Public +# * License along with this library; if not, write to the Free Software +# * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, +# * MA 02110-1301 USA +# * +# *-------------------------------------------------------- +# * File Name : pycrate_corenet/ProcCNFGSM.py +# * Created : 2021-04-27 +# * Authors : Benoit Michau +# *-------------------------------------------------------- +#*/ + +__all__ = [ + 'FGSMSigProc', + ] + + +from .utils import * +from .ProcProto import * +from .ProcCNNgap import * + + +TESTING = False + +#------------------------------------------------------------------------------# +# NAS 5GS Session Management signalling procedure +# TS 24.501, version h21 +# Core Network side +#------------------------------------------------------------------------------# + +class FGSMSigProc(NASSigProc): + """5GS Session Management signalling procedure handler + + instance attributes: + - Name : procedure name + - FGSM : reference to the UEFGSMd instance running this procedure + - NG : reference to the UENGd instance connecting the UE + - Cont : 2-tuple of CN-initiated NAS message(s) and UE-initiated NAS + message(s) + - Timer: timer in sec. for this procedure + - Encod: custom NAS message encoders with fixed values + - Decod: custom NAS message decoders with transform functions + """ + + # tacking all exchanged NAS message within the procedure + TRACK_PDU = True + + # potential timer + Timer = None + TimerDefault = 2 + + if TESTING: + def __init__(self, encod=None): + self._prepare(encod) + self._log('DBG', 'instantiating procedure') + + def _log(self, logtype, msg): + log('[TESTING] [%s] [FGSMSigProc] [%s] %s' % (logtype, self.Name, msg)) + + else: + def __init__(self, fgsmd, encod=None, sec=True, ebi=0, FGMMProc=None): + self._prepare(encod) + self.FGSM = fgsmd + self.NG = fgsmd.NG + self.UE = fgsmd.UE + self._ebi = ebi + self._FGMMProc = FGMMProc + self._log('DBG', 'instantiating procedure') + + def _log(self, logtype, msg): + self.FGSM._log(logtype, '[%s [%i]] %s' % (self.Name, self._ebi, msg)) + + def decode_msg(self, msg, ret): + NASSigProc.decode_msg(self, msg, ret) + # add PDUSessionID and PTI into ret + ret['PDUSessID'] = msg[0][1].get_val() + ret['PTI'] = msg[0][2].get_val() + + def set_msg(self, pd, typ, **kw): + """prepare a specific encoder dict for a given NAS message + """ + # select the encoder and duplicate it + try: + Encod = self.Encod[(pd, typ)] + except Exception: + return + FGSMHeader = {} + if 'PDUSessID' in kw: + FGSMHeader['PDUSessID'] = kw['PDUSessID'] + del kw['PDUSessID'] + if 'PTI' in kw: + FGSMHeader['PTI'] = kw['PTI'] + del kw['PTI'] + if FGSMHeader: + kw['FGSMHeader'] = FGSMHeader + Encod.update(kw) + + def output(self): + self._log('ERR', 'output() not implemented') + return None + + def process(self, pdu): + if self.TRACK_PDU: + self._pdu.append( (time(), 'UL', pdu) ) + self.UEInfo = {} + self.decode_msg(pdu, self.UEInfo) + # + self._log('ERR', 'process() not implemented') + return None + + def postprocess(self, Proc=None): + self._log('ERR', 'postprocess() not implemented') + self.rm_from_fgsm_stack() + return None + + def abort(self): + # abort this procedure, and all procedures started within this one + ProcStack = self.FGSM.Proc[self._ebi] + ind = ProcStack.index(self) + if ind >= 0: + for p in ProcStack[ind+1:]: + p.abort() + del ProcStack[ind:] + self._log('INF', 'aborting') + + def rm_from_fgsm_stack(self): + # remove the procedure from the FGSM stack of procedures + try: + ProcStack = self.FGSM.Proc[self._ebi] + if ProcStack[-1] == self: + del ProcStack[-1] + except Exception: + self._log('WNG', 'FGSM stack corrupted') + + def init_timer(self): + if self.Timer is not None: + self.TimerValue = getattr(self.FGSM, self.Timer, self.TimerDefault) + self.TimerStart = time() + self.TimerStop = self.TimerStart + self.TimerValue + + def get_timer(self): + if self.Timer is None: + return None + else: + return getattr(self.FGSM, self.Timer) + + #--------------------------------------------------------------------------# + # common helpers + #--------------------------------------------------------------------------# + # None yet + + +#------------------------------------------------------------------------------# +# Network-requested 5G SM procedures: TS 24.501, section 6.3 +#------------------------------------------------------------------------------# + + +#------------------------------------------------------------------------------# +# UE-requested 5G SM procedures: TS 24.501, section 6.4 +#------------------------------------------------------------------------------# + + diff --git a/pycrate_corenet/ProcProto.py b/pycrate_corenet/ProcProto.py index 64f84cc..4022d43 100644 --- a/pycrate_corenet/ProcProto.py +++ b/pycrate_corenet/ProcProto.py @@ -489,8 +489,10 @@ class NASSigProc(SigProc): mhdr = msg[0] if mhdr[0]._name == 'TIPD': mid = (mhdr[0]['ProtDisc'].get_val(), mhdr['Type'].get_val()) - else: + elif 'ProtDisc' in mhdr._by_name: mid = (mhdr['ProtDisc'].get_val(), mhdr['Type'].get_val()) + else: + mid = (mhdr['EPD'].get_val(), mhdr['Type'].get_val()) mies = msg[1:] ContLUT[mid] = (0, i) if mid not in cls.Encod: @@ -520,8 +522,10 @@ class NASSigProc(SigProc): mhdr = msg[0] if mhdr[0]._name == 'TIPD': mid = (mhdr[0]['ProtDisc'].get_val(), mhdr['Type'].get_val()) - else: + elif 'ProtDisc' in mhdr._by_name: mid = (mhdr['ProtDisc'].get_val(), mhdr['Type'].get_val()) + else: + mid = (mhdr['EPD'].get_val(), mhdr['Type'].get_val()) mies = msg[1:] ContLUT[mid] = (1, i) if mid not in cls.Encod: @@ -553,8 +557,10 @@ class NASSigProc(SigProc): mhdr = msg[0] if mhdr[0]._name == 'TIPD': mid = (mhdr[0]['ProtDisc'].get_val(), mhdr['Type'].get_val()) - else: + elif 'ProtDisc' in mhdr._by_name: mid = (mhdr['ProtDisc'].get_val(), mhdr['Type'].get_val()) + else: + mid = (mhdr['EPD'].get_val(), mhdr['Type'].get_val()) Filter.add(mid) FilterStr.add(msg._name) if Filter: diff --git a/pycrate_corenet/Server.py b/pycrate_corenet/Server.py index a586fb8..b6a153a 100644 --- a/pycrate_corenet/Server.py +++ b/pycrate_corenet/Server.py @@ -342,10 +342,11 @@ class CorenetServer(object): # UE, indexed by TMSI when the IMSI is unknown (at attachment), # and their UEd handler instance are set in ._UEpre, created at init # - # TMSI / P-TMSI / M-TMSI to IMSI conversion - TMSI = {} - PTMSI = {} - MTMSI = {} + # TMSI / P-TMSI / M-TMSI / 5G-TMSI to IMSI conversion + TMSI = {} + PTMSI = {} + MTMSI = {} + FGTMSI = {} # # This is a filter which enables the potential attachment of non-preconfigured # UE to the CorenetServer @@ -1315,7 +1316,7 @@ class CorenetServer(object): def get_ued(self, **kw): """return a UEd instance or None, according to the UE identity provided - kw: imsi (digit-str), tmsi (uint32), ptmsi (uint32) or mtmsi (uint32) + kw: imsi (digit-str), tmsi (uint32), ptmsi (uint32), mtmsi (uint32) or fgtmsi (uint32) If an imsi is provided, returns the UEd instance in case the IMSI is allowed If a tmsi or ptmsi is provided, returns @@ -1333,7 +1334,7 @@ class CorenetServer(object): return self.UE[imsi] elif self.UE_ATTACH_FILTER and re.match(self.UE_ATTACH_FILTER, imsi) and \ '*' in self.ConfigUE: - self._log('WNG', 'attaching an UE without dedicated configuration, IMSI %s' % imsi) + self._log('WNG', 'attaching a UE without dedicated configuration, IMSI %s' % imsi) self.UE[imsi] = UEd(self, imsi, config=self.ConfigUE['*']) return self.UE[imsi] else: @@ -1359,6 +1360,13 @@ class CorenetServer(object): else: # creating a UEd instance which will request IMSI return self.create_dummy_ue(mtmsi=mtmsi) + elif 'fgtmsi' in kw: + fgtmsi = kw['fgtmsi'] + if fgtmsi in self.FGTMSI: + return self.UE[self.FGTMSI[fgtmsi]] + else: + # creating a UEd instance which will request SUPI + return self.create_dummy_ue(fgtmsi=fgtmsi) return None def create_dummy_ue(self, **kw): @@ -1390,18 +1398,20 @@ class CorenetServer(object): # go over all UE and abort() NAS signalling procedures in timeout T = time() for ue in self.UE.values(): + if ue.IuCS is not None: + if ue.IuCS.MM.Proc: for P in ue.IuCS.MM.Proc: if hasattr(P, 'TimerStop') and T > P.TimerStop: P._log('WNG', 'timeout: aborting') P.abort() - #if ue.IuCS.CC.Proc: - # for P in ue.IuCS.CC.Proc.values(): - # if hasattr(P, 'TimerStop') and T > P.TimerStop: - # P._log('WNG', 'timeout: aborting') - # P.abort() + if ue.IuCS.CC.Proc: + for P in ue.IuCS.CC.Proc.values(): + if hasattr(P, 'TimerStop') and T > P.TimerStop: + P._log('WNG', 'timeout: aborting') + P.abort() if ue.IuCS.SMS.Proc: for P in tuple(ue.IuCS.SMS.Proc.values()): @@ -1409,11 +1419,11 @@ class CorenetServer(object): P._log('WNG', 'timeout: aborting') P.abort() - #if ue.IuCS.SS.Proc: - # for P in ue.IuCS.SS.Proc.values(): - # if hasattr(P, 'TimerStop') and T > P.TimerStop: - # P._log('WNG', 'timeout: aborting') - # P.abort() + if ue.IuCS.SS.Proc: + for P in ue.IuCS.SS.Proc.values(): + if hasattr(P, 'TimerStop') and T > P.TimerStop: + P._log('WNG', 'timeout: aborting') + P.abort() if ue.IuPS is not None: @@ -1424,11 +1434,11 @@ class CorenetServer(object): P._log('WNG', 'timeout: aborting') P.abort() - #if ue.IuPS.SM.Proc: - # for P in tuple(ue.IuPS.SM.Proc.values()): - # if hasattr(P, 'TimerStop') and T > P.TimerStop: - # P._log('WNG', 'timeout: aborting') - # P.abort() + if ue.IuPS.SM.Proc: + for P in tuple(ue.IuPS.SM.Proc.values()): + if hasattr(P, 'TimerStop') and T > P.TimerStop: + P._log('WNG', 'timeout: aborting') + P.abort() if ue.S1 is not None: @@ -1449,6 +1459,26 @@ class CorenetServer(object): if hasattr(P, 'TimerStop') and T > P.TimerStop: P._log('WNG', 'timeout: aborting') P.abort() + + if ue.NG is not None: + + if ue.NG.FGMM.Proc: + for P in ue.NG.FGMM.Proc: + if hasattr(P, 'TimerStop') and T > P.TimerStop: + P._log('WNG', 'timeout: aborting') + P.abort() + + if ue.NG.FGSM.Proc: + for P in tuple(ue.NG.FGSM.Proc.values()): + if hasattr(P, 'TimerStop') and T > P.TimerStop: + P._log('WNG', 'timeout: aborting') + P.abort() + + if ue.NG.SMS.Proc: + for P in tuple(ue.NG.SMS.Proc.values()): + if hasattr(P, 'TimerStop') and T > P.TimerStop: + P._log('WNG', 'timeout: aborting') + P.abort() def get_gtp_teid(self): if self._GTP_TEID_UL > 4294967294: diff --git a/pycrate_corenet/utils.py b/pycrate_corenet/utils.py index 8a828ee..73bbe92 100644 --- a/pycrate_corenet/utils.py +++ b/pycrate_corenet/utils.py @@ -475,32 +475,52 @@ def print_pduies(desc): print(' None') -def print_nasies(nasmsg): +def print_nasies(nasmsg, indent=''): # go after the header (last field: Type), and print IE type, tag if defined, # and name # WNG: Type1V (Uint4), Type2 (Uint8), Type3V(Buf) are not wrapped hdr = nasmsg[0] if 'ProtDisc' in hdr._by_name: pd = hdr['ProtDisc'].get_val() + elif 'EPD' in hdr._by_name: + pd = hdr['EPD'].get_val() else: pd = hdr[0]['ProtDisc'].get_val() typ = hdr['Type'].get_val() - print('%s (PD %i, Type %i), IEs:' % (nasmsg._name, pd, typ)) + print('%s%s (PD %i, Type %i), IEs:' % (indent, nasmsg._name, pd, typ)) # if len(nasmsg._content) == 1: - print(' None') + print('%s None' % indent) else: for ie in nasmsg._content[1:]: if ie.get_trans(): # optional IE - print('- %-9s : %s (T: %i)'\ - % (ie.__class__.__name__, ie._name, ie[0].get_val())) + print('%s- %-9s : %s (T: %i)'\ + % (indent, ie.__class__.__name__, ie._name, ie[0].get_val())) elif isinstance(ie, TS24007.IE): # mandatory IE - print('- %-9s : %s' % (ie.__class__.__name__, ie._name)) + print('%s- %-9s : %s' % (indent, ie.__class__.__name__, ie._name)) elif ie.get_bl() == 4: # uint / spare bits - print('- %-9s : %s' % ('Type1V', 'spare')) + print('%s- %-9s : %s' % (indent, 'Type1V', 'spare')) else: assert() + +def print_nasproc_docs(nasproc): + msgcn, msgue = nasproc.Cont + print('CN message:') + if msgcn is None: + print(' None') + else: + for m in msgcn: + print_nasies(m(), indent=' ') + print(' ') + print('UE message:') + if msgue is None: + print(' None') + else: + for m in msgue: + print_nasies(m(), indent=' ') + print(' ') +