corenet: 1st release with working LTE attach / tau / default PDN setup / serv req

This commit is contained in:
mitshell 2018-01-14 14:20:58 +01:00
parent 35d5e6fd62
commit e565ef0d01
12 changed files with 1993 additions and 552 deletions

View File

@ -53,7 +53,7 @@ class UEd(SigStack):
# to log UE NAS GMM / SM for all UE
TRACE_NAS_PS = False
# to log UE LTE NAS (potentially) encrypted EMM / ESM for all UE
TRACE_NAS_EPS_ENC = False
TRACE_NAS_EPS_SEC = False
# to log UE LTE NAS clear-text EMM / ESM for all UE
TRACE_NAS_EPS = False
# to log UE LTE NAS containing SMS for all UE
@ -132,24 +132,28 @@ class UEd(SigStack):
self.MSISDN = config['MSISDN']
self.USIM = config['USIM']
#
self.IuPS.SM.PDP = {}
self.IuPS.SM.PDPConfig = {}
# cpdict(self.IuPS.SM.__class__.PDPConfig)
# TODO: handle config for PDP networks
#
self.S1.ESM.PDN = {}
for apn, pdntype, ipaddr in config['PDN']:
#Server.ConfigPDN provides the DNS servers for each APN
#UE.S1.ESM.PDNConfig provides the default QoS for each APN
self.S1.ESM.PDNConfig = {}
for pdnconfig in config['PDN']:
apn, pdnaddr, apncfg = pdnconfig[0], pdnconfig[1:], {}
# Server.ConfigPDN provides the DNS servers for each APN (and some
# more common parameters)
# UE.S1.ESM.RABConfig provides the default RAB QoS for each APN
if apn not in self.Server.ConfigPDN:
self._log('WNG', 'unable to configure PDN connectivity for APN %s, no DNS servers'\
% apn)
self._log('WNG', 'unable to configure PDN connectivity for APN %s, '\
'no DNS servers' % apn)
elif apn not in self.S1.ESM.RABConfig:
self._log('WNG', 'unable to configure PDN connectivity for APN %s, '\
'no S1 QoS parameters' % apn)
else:
self.S1.ESM.PDN = {apn: {'IP' : (pdntype, ipaddr),
'DNS': self.Server.ConfigPDN[apn]}}
if apn not in self.S1.ESM.PDNConfig:
self._log('WNG', 'unable to configure PDN connectivity for APN %s, '\
'no S1 QoS parameters' % apn)
else:
self.S1.ESM.PDN[apn].update( self.S1.ESM.PDNConfig[apn] )
apncfg = cpdict(self.Server.ConfigPDN[apn])
apncfg['PDNAddr'] = pdnaddr
apncfg['RAB'] = cpdict(self.S1.ESM.RABConfig[apn])
apncfg['RAB']['QCI'] = apncfg['QCI']
self.S1.ESM.PDNConfig[apn] = apncfg
def set_ran(self, ran, ctx_id, sid=None):
# UE going connected

View File

@ -98,6 +98,7 @@ class UEIuSigStack(SigStack):
def unset_ran(self):
del self.RNC
self.SEC['CKSN'] = None
self.clear()
self.connected.clear()
def set_ran_unconnected(self, rncd):
@ -257,7 +258,7 @@ class UEIuSigStack(SigStack):
def clear(self):
# clears all running RANAP CS/PS procedures
for Proc in self.Proc.values():
for Proc in list(self.Proc.values()):
Proc.abort()
#--------------------------------------------------------------------------#

View File

@ -459,8 +459,15 @@ class UESMd(SigStack):
'*' : {},
'corenet': {}
}
# PDP UE config, per APN, built at runtime
PDP = {}
# when the UE 1st attach it gets a specific PDPConfig dict with a copy of this content
# plus specific content from the CorenetServer.ConfigPDP and CorenetServer.ConfigUE
# Protocol config option with authentication
# if bypass enabled, the PAP / CHAP authentication will not be checked against
# the CorenetServer.PDPConfig and always return authentication success
PDN_PAP_BYPASS = True
PDN_CHAP_BYPASS = True
def _log(self, logtype, msg):
self.Iu._log(logtype, '[SM] %s' % msg)
@ -471,6 +478,8 @@ class UESMd(SigStack):
#
# dict of ongoing SM procedures (indexed by NSAPI)
self.Proc = {}
# dict of activated PDP config per PS bearer ID
self.PDP = {}
# dict of ongoing ESM transactions IEs
self.Trans = {}
# list of tracked procedures (requires TRACK_PROC = True)

File diff suppressed because it is too large Load Diff

View File

@ -152,7 +152,8 @@ class EMMSigProc(NASSigProc):
if Cap == 'UENetCap':
setseccap = True
if setseccap:
self.EMM.set_sec_ue_cap()
self._log('DBG', 'setting UE security capabilities')
self.EMM.set_sec_cap()
def _chk_imsi(self):
# arriving here means the UE's IMSI was unknown at first
@ -204,6 +205,78 @@ class EMMSigProc(NASSigProc):
ksi = (ksi[0]<<3) + ksi[1]
NasProc._set_ksi(ksi, emerg)
return NasProc.output()
def _act_bear(self):
# reactivate all PDN connections
erablist, ebilist, brdl, brul = [], [], 0, 0
for ebi, pdncfg in self.S1.ESM.PDN.items():
if 'RAB' in pdncfg:
rabcfg = pdncfg['RAB']
ebilist.append(ebi)
# erab ext IE can be Correlation-ID and/or BearerType
erablist.append({
'id': 52,
'criticality': 'reject',
'value': ('E-RABToBeSetupItemCtxtSUReq', {
'e-RAB-ID': ebi,
'e-RABlevelQoSParameters': rabcfg['E-RABlevelQoSParameters'],
'transportLayerAddress': (bytes_to_uint(inet_aton(rabcfg['SGW-TLA']), 32), 32),
'gTP-TEID': uint_to_bytes(rabcfg['SGW-GTP-TEID'], 32),
#'iE-Extensions': [],
})
})
brdl += rabcfg['BitrateDL']
brul += rabcfg['BitrateUL']
if not erablist:
# no PDN connection to reactivate
return None
#
# get the current sec context to setup the eNB security layer
secctx = self.S1.get_sec_ctx()
if secctx and 'UESecCap' in self.UE.Cap:
# create the KeNB
self._log('DBG', 'NAS UL count for Kenb derivation, %i' % secctx['UL_enb'])
Kenb, UESecCap = conv_A3(secctx['Kasme'], secctx['UL_enb']), self.UE.Cap['UESecCap'][1]
secctx['Kenb'] = Kenb
secctx['NCC'] = 0
secctx['NH'] = conv_A4(secctx['Kasme'], Kenb)
else:
self._log('WNG', 'no active NAS security context, using the null AS security context')
Kenb, UESecCap = self.S1.SECAS_NULL_CTX
#
IEs = {
'E_RABToBeSetupListCtxtSUReq': erablist,
'UEAggregateMaximumBitrate': {
'uEaggregateMaximumBitRateDL': brdl,
'uEaggregateMaximumBitRateUL': brul
},
'SecurityKey': (bytes_to_uint(Kenb, 256), 256),
'UESecurityCapabilities': {
'encryptionAlgorithms': ((UESecCap[1].get_val()<<15) + \
(UESecCap[2].get_val()<<14) + \
(UESecCap[3].get_val()<<13), 16),
'integrityProtectionAlgorithms': ((UESecCap[9].get_val()<<15) + \
(UESecCap[10].get_val()<<14) + \
(UESecCap[11].get_val()<<13), 16)
}
}
#
if self.S1.ICS_RADCAP_INCL and 'UERadioCap' in self.UE.Cap:
IEs['UERadioCapability'] = self.UE.Cap['UERadioCap'][0]
if self.S1.ICS_GUMMEI_INCL:
IEs['GUMMEI'] = gummei_to_asn(self.UE.Server.PLMN,
self.UE.Server.MME_GID,
self.UE.Server.MME_CODE)
if self.S1.ICS_TRACE_ACT:
IEs['TraceActivation'] = self.S1.ICS_TRACE_ACT
#
S1apProc = self.S1.init_s1ap_proc(S1APInitialContextSetup, **IEs)
if S1apProc:
# pass the info required for setting the GTPU tunnel
S1apProc._gtp_add_mobile_ebi = ebilist
return S1apProc
else:
return None
#------------------------------------------------------------------------------#
@ -310,7 +383,7 @@ class EMMAuthentication(EMMSigProc):
}
def output(self):
# get a new KSI (0..15)
# get a new KSI (0..6)
ksi = self.EMM.get_new_ksi()
# in case a RAND is configured as a class encoder, we use it for
# generating the auth vector
@ -513,7 +586,7 @@ class EMMSecurityModeControl(EMMSigProc):
# prepare IEs for the SMC
IEs = {'NASSecAlgo': {'CiphAlgo': self.secctx['EEA'], 'IntegAlgo': self.secctx['EIA']},
'NAS_KSI' : (self.ksi>>3, self.ksi&0x7),
'UESecCap' : self.EMM.get_sec_ue_cap(),
'UESecCap' : self.EMM.get_sec_cap(),
}
if self.UE.IMEISV is None and self.EMM.SMC_IMEISV_REQ:
IEs['IMEISVReq'] = 1
@ -521,6 +594,8 @@ class EMMSecurityModeControl(EMMSigProc):
#
self.set_msg(7, 93, **IEs)
self.encode_msg(7, 93)
if not self._sec:
self._nas_tx._sec = False
if self.TRACK_PDU:
self._pdu.append( (time(), 'DL', self._nas_tx) )
self.init_timer()
@ -531,6 +606,7 @@ class EMMSecurityModeControl(EMMSigProc):
self._pdu.append( (time(), 'UL', pdu) )
self.UEInfo = {}
self.decode_msg(pdu, self.UEInfo)
self.rm_from_emm_stack()
if pdu._name == 'EMMSecurityModeComplete':
self.success = True
if 'IMEISV' in self.UEInfo:
@ -575,7 +651,8 @@ class EMMIdentification(EMMSigProc):
# build the Id Request msg, Id type has to be set by the caller
self.set_msg(7, 85)
self.encode_msg(7, 85)
# log the NAS msg
if not self._sec:
self._nas_tx._sec = False
if self.TRACK_PDU:
self._pdu.append( (time(), 'DL', self._nas_tx) )
#
@ -730,10 +807,7 @@ class EMMAttach(EMMSigProc):
# AttachComplete
self.errcause, self.CompInfo = None, {}
self.decode_msg(pdu, self.CompInfo)
if self.mtmsi_realloc >= 0:
self.UE.set_mtmsi(self.mtmsi_realloc)
self._log('INF', 'new M-TMSI set, 0x%.8x' % self.mtmsi_realloc)
return self._end()
return self._process_comp()
def _process_req(self):
#
@ -755,11 +829,11 @@ class EMMAttach(EMMSigProc):
# jump directly to the smc with EEA0 / EIA0
self.EMM.set_sec_ctx_emerg()
# emergency ctx has always ksi 0
return self._ret_smc(0, emerg=True)
return self._ret_smc((0, 0), emerg=True)
#
# check local ID
elif self.UE.IMSI is None:
# UEd was created based on a MTMSI provided at the RRC layer
# UEd was created based on a S-TMSI provided at the RRC layer
# -> we need to get its IMSI before continuing
try:
del self.UE.Server._UEpre[self.UE.MTMSI]
@ -778,30 +852,48 @@ class EMMAttach(EMMSigProc):
#
if self.EMM.require_auth(self, ksi=self.UEInfo['NAS_KSI']):
return self._ret_auth()
elif self.EMM.require_smc(self):
# no auth procedure, ksi submitted by the UE is valid
return self._ret_smc(self.UEInfo['NAS_KSI'])
else:
# otherwise, go directly to postprocess
return self.postprocess()
# no auth procedure, ksi submitted by the UE is valid
# set UL NAS count for further KeNB derivation
secctx = self.S1.SEC[self.S1.SEC['KSI']]
secctx['UL_enb'] = self._nas_rx._ulcnt
if self.EMM.require_smc(self):
return self._ret_smc(self.UEInfo['NAS_KSI'])
else:
# otherwise, go directly to postprocess
return self.postprocess()
def _process_comp(self):
#
if self.mtmsi_realloc >= 0:
self.UE.set_mtmsi(self.mtmsi_realloc)
self._log('INF', 'new M-TMSI set, 0x%.8x' % self.mtmsi_realloc)
#
# transfer to the ESM stack, which will terminate the ongoing ESM procedure
# and shoud return an empty list
ret = self.S1.ESM.process_buf(self.CompInfo['ESMContainer'].get_val(),
sec=self._nas_rx._sec)
ret.extend(self._end())
return ret
def postprocess(self, Proc=None):
if isinstance(Proc, EMMIdentification):
if Proc.IDType == NAS.IDTYPE_IMSI:
# got the UE's IMSI, check if it's allowed
if self.UE.IMSI is None:
# UE did actually not responded with its IMSI, this is bad !
# error 96: invalid mandatory info
self.errcause = 96
return self.output()
elif not self._chk_imsi():
return self.output()
else:
assert()
assert( Proc.IDType == NAS.IDTYPE_IMSI)
# got the UE's IMSI, check if it's allowed
if self.UE.IMSI is None:
# UE did actually not responded with its IMSI, this is bad !
# error 96: invalid mandatory info
self.errcause = 96
return self.output()
elif not self._chk_imsi():
return self.output()
if self.EMM.require_auth(self, ksi=self.UEInfo['NAS_KSI']):
return self._ret_auth()
elif self.EMM.require_smc(self):
return self._ret_smc(self.UEInfo['NAS_KSI'])
else:
secctx = self.S1.SEC[self.S1.SEC['KSI']]
secctx['UL_enb'] = self._nas_rx._ulcnt
if self.EMM.require_smc(self):
return self._ret_smc(self.UEInfo['NAS_KSI'])
#
elif isinstance(Proc, EMMAuthentication):
if not Proc.success:
@ -834,7 +926,7 @@ class EMMAttach(EMMSigProc):
else:
self.set_msg(7, 68, EMMCause=self.errcause)
self.encode_msg(7, 68)
if not self._sec:
if not self._nas_rx._sec:
self._nas_tx._sec = False
self.mtmsi_realloc = -1
self._log('INF', 'reject, %r' % self._nas_tx['EMMCause'])
@ -876,17 +968,32 @@ class EMMAttach(EMMSigProc):
#
if self.EMM.ATT_EPS_NETFEAT_SUPP:
IEs['EPSNetFeat'] = self.EMM.ATT_EPS_NETFEAT_SUPP
if self.EMM.ATT_T3412_EXT:
IEs['T3412Ext'] = self.EMM.ATT_T3412_EXT
if self.EMM.ATT_EXTDRX:
IEs['ExtDRXParam'] = self.EMM.ATT_EXTDRX
#
# power saving mode
if 'MSNetFeatSupp' in self.UEInfo and self.UEInfo['MSNetFeatSupp'][1].get_val():
if self.EMM.ATT_T3412_EXT:
IEs['T3412Ext'] = self.EMM.ATT_T3412_EXT
elif 'T3412Ext' in self.UEInfo:
IEs['T3412Ext'] = self.UEInfo['T3412Ext'].get_val()
if 'T3324' in self.UEInfo:
if self.EMM.ATT_T3324:
IEs['T3324'] = self.EMM.ATT_T3324
else:
IEs['T3324'] = self.UEInfo['T3324'].get_val()
#
if 'ExtDRXParam' in self.UEInfo:
if self.EMM.ATT_EXTDRX:
IEs['ExtDRXParam'] = self.EMM.ATT_EXTDRX
else:
IEs['ExtDRXParam'] = self['ExtDRXParam'].get_val()
#
if self.EMM.ATT_SMS_SERV_STAT:
IEs['SMSServStat'] = self.EMM.ATT_SMS_SERV_STAT
#
self.set_msg(7, 66, **IEs)
self.encode_msg(7, 66)
#
# transfer to the ESM stack, which will populare the ESMContainer in
# transfer to the ESM stack, which will populate the ESMContainer in
# the AttachAccept and setup the proper S1AP procedure
ret = self.S1.ESM.process_buf(self.UEInfo['ESMContainer'].get_val(),
sec=self._sec,
@ -896,11 +1003,10 @@ class EMMAttach(EMMSigProc):
self.init_timer()
return ret
def _end(self, nas_tx):
def _end(self):
ret = []
if self.EMM.ATT_S1REL:
# trigger an S1 UE ctxt release with Cause NAS normal-release (83)
S1apProcRel = self.S1.init_s1ap_proc(S1APUEContextRelease, Cause=('nAS', 83))
S1apProcRel = self.S1.init_s1ap_proc(S1APUEContextRelease, Cause=('nas', 'normal-release'))
if S1apProcRel:
ret.append(S1apProcRel)
self.rm_from_emm_stack()
@ -929,13 +1035,27 @@ class EMMDetachUE(EMMSigProc):
)
def _detach(self):
# set EMM state
self.EMM.state = 'INACTIVE'
self._log('INF', 'detaching')
#
self.rm_from_emm_stack()
# abort all ongoing PS procedures
self.S1.clear_nas_proc()
if self.S1.SEC['KSI'] is not None and not self._nas_rx._sec:
# security is activated, by the detach request is not protected
# this is not acceptable
self._log('WNG', 'invalid detach request with no security layer')
else:
# set EMM state
self.EMM.state = 'INACTIVE'
self._log('INF', 'detaching')
#
self.rm_from_emm_stack()
# abort all ongoing EPS procedures and PDN ctxt
self.S1.clear_nas_proc()
self.S1.ESM.pdn_clear()
#
if self.UE.IMSI is None:
# UEd was created based on a S-TMSI provided at the RRC layer
# just delete it
try:
del self.UE.Server._UEpre[self.UE.MTMSI]
except:
pass
def process(self, pdu):
# preempt the EMM stack
@ -944,10 +1064,13 @@ class EMMDetachUE(EMMSigProc):
self._pdu.append( (time(), 'UL', pdu) )
self.UEInfo = {}
self.decode_msg(pdu, self.UEInfo)
#
if self.UEInfo['EPSDetachTypeMO']['SwitchOff'].get_val():
# if UE is to power-off, procedure ends here
ret = self.output(poff=True)
else:
# TODO: implement require_auth() / require_smc()
#
ret = self.output(poff=False)
self._detach()
return ret
@ -959,7 +1082,7 @@ class EMMDetachUE(EMMSigProc):
# set a S1ap direct transfer to transport the DetachAccept
#self.set_msg(7, 70)
self.encode_msg(7, 70)
if not self._sec:
if not self._nas_rx._sec:
self._nas_tx._sec = False
if self.TRACK_PDU:
self._pdu.append( (time(), 'DL', self._nas_tx) )
@ -1075,12 +1198,230 @@ class EMMTrackingAreaUpdate(EMMSigProc):
(TS24301_EMM.EMMTrackingAreaUpdateRequest, TS24301_EMM.EMMTrackingAreaUpdateComplete)
)
Decod = {
(7, 72): {
'NAS_KSI' : lambda x: (x[0].get_val(), x[1].get_val()),
'OldGUTI' : lambda x: x[1].decode(),
'OldTAI' : lambda x: x[1].decode(),
'OldLAI' : lambda x: x[1].decode(),
},
}
Cap = ('UENetCap', 'DRXParam', 'MSNetCap', 'MSCm2', 'MSCm3', 'SuppCodecs',
'VoiceDomPref', 'DeviceProp', 'MSNetFeatSupp', 'ExtDRXParam')
Timer = 'T3450'
def process(self, pdu):
pass
# preempt the EMM stack
self.emm_preempt()
if self.TRACK_PDU:
self._pdu.append( (time(), 'UL', pdu) )
#
if pdu._name == 'EMMTrackingAreaUpdateRequest':
self.errcause, self.UEInfo = None, {}
self.decode_msg(pdu, self.UEInfo)
return self._process_req()
else:
# EMMTrackingAreaUpdateComplete
self.errcause, self.CompInfo = None, {}
self.decode_msg(pdu, self.CompInfo)
return self._process_comp()
def _process_req(self):
#
upd_type = self.UEInfo['EPSUpdateType']
self.upd_type = upd_type.get_val()
self._log('INF', upd_type.repr())
# collect capabilities
self._collect_cap()
#
# check local ID
if self.UE.IMSI is None:
# UEd was created based on a S-TMSI provided at the RRC layer
# -> we need to get its IMSI before continuing
try:
del self.UE.Server._UEpre[self.UE.MTMSI]
except:
pass
# need to request the IMSI, prepare an id request procedure
return self._ret_req_imsi()
#
elif self.EMM.require_auth(self, ksi=self.UEInfo['NAS_KSI']):
return self._ret_auth()
else:
# no auth procedure, ksi submitted by the UE is valid
# set UL NAS count for further KeNB derivation
secctx = self.S1.SEC[self.S1.SEC['KSI']]
secctx['UL_enb'] = self._nas_rx._ulcnt
if self.EMM.require_smc(self):
return self._ret_smc(self.UEInfo['NAS_KSI'])
else:
# otherwise, go directly to postprocess
return self.postprocess()
def _process_comp(self):
if self.mtmsi_realloc >= 0:
self.UE.set_mtmsi(self.mtmsi_realloc)
self._log('INF', 'new M-TMSI set, 0x%.8x' % self.mtmsi_realloc)
return self._end()
def postprocess(self, Proc=None):
if isinstance(Proc, EMMIdentification):
assert( Proc.IDType == NAS.IDTYPE_IMSI)
# got the UE's IMSI, check if it's allowed
if self.UE.IMSI is None:
# UE did actually not responded with its IMSI, this is bad !
# error 96: invalid mandatory info
self.errcause = 96
return self.output()
elif not self._chk_imsi():
return self.output()
if self.EMM.require_auth(self, ksi=self.UEInfo['NAS_KSI']):
return self._ret_auth()
else:
secctx = self.S1.SEC[self.S1.SEC['KSI']]
secctx['UL_enb'] = self._nas_rx._ulcnt
if self.EMM.require_smc(self):
return self._ret_smc(self.UEInfo['NAS_KSI'])
#
elif isinstance(Proc, EMMAuthentication):
if not Proc.success:
self.abort()
return []
elif self.EMM.require_smc(self):
# ksi established during the auth procedure
return self._ret_smc(Proc.ksi)
#
elif isinstance(Proc, EMMSecurityModeControl):
if not Proc.success:
self.abort()
return []
#
elif Proc != self and Proc is not None:
self._err = Proc
assert()
#
return self.output()
def output(self):
pass
if self.errcause:
# prepare TAUReject IE
self.set_msg(7, 75, EMMCause=self.errcause)
self.encode_msg(7, 75)
if not self._nas_rx._sec:
self._nas_tx._sec = False
self.mtmsi_realloc = -1
self._log('INF', 'reject, %r' % self._nas_tx['EMMCause'])
ret = self.S1.ret_s1ap_dnt(self._nas_tx)
ret.extend(self._end())
else:
# prepare TAU Accept IEs
IEs = {'EPSUpdateResult': (0, 0)} # spare, value
#
# check EPSBearerCtxtStat, and deactivate PDN not enabled at the UE
if 'EPSBearerCtxtStat' in self.UEInfo:
EBCtxtStat = self.UEInfo['EPSBearerCtxtStat']
EBCtxtStatResp = []
for Stat in EBCtxtStat:
uestat = Stat.get_val()
ebi = int(Stat._name[4:])
if uestat == 1 and ebi not in self.S1.ESM.PDN:
self._log('WNG', 'EPS bearer %i activated in the UE but not the network' % ebi)
EBCtxtStatResp.append(0)
elif uestat == 0 and ebi in self.S1.ESM.PDN:
self._log('INF', 'EPS bearer %i activated in the network but not the UE' % ebi)
pdncfg = self.S1.ESM.PDN[ebi]
if pdncfg['state'] == 1:
self.UE.Server.GTPUd.rem_mobile(pdncfg['RAB']['SGW-GTP-TEID'])
del self.S1.ESM.PDN[ebi]
EBCtxtStatResp.append(0)
else:
EBCtxtStatResp.append(uestat)
IEs['EPSBearerCtxtStat'] = EBCtxtStatResp
#
# check UERACapUpdateNeed, and delete UERadCap if needed
if 'UERACapUpdateNeed' in self.UEInfo \
and self.UEInfo['UERACapUpdateNeed'].get_val() \
and 'UERadioCap' in self.UE.Cap:
del self.UE.Cap['UERadioCap']
#
if self.EMM.TAU_T3402 is not None:
IEs['T3402'] = self.EMM.TAU_T3402
if self.EMM.TAU_T3412 is not None:
IEs['T3412'] = self.EMM.TAU_T3412
#
# in case we want to realloc a GUTI, we start a GUTIRealloc,
# but don't forward its output
if self.EMM.TAU_GUTI_REALLOC:
NasProc = self.EMM.init_proc(EMMGUTIReallocation)
NasProc.output(embedded=True)
IEs['GUTI'] = {'type': NAS.IDTYPE_GUTI, 'ident': NasProc.guti}
self.mtmsi_realloc = NasProc.mtmsi
else:
self.mtmsi_realloc = -1
#
# power saving mode
if 'MSNetFeatSupp' in self.UEInfo and self.UEInfo['MSNetFeatSupp'][1].get_val():
if self.EMM.ATT_T3412_EXT:
IEs['T3412Ext'] = self.EMM.ATT_T3412_EXT
elif 'T3412Ext' in self.UEInfo:
IEs['T3412Ext'] = self.UEInfo['T3412Ext'].get_val()
if 'T3324' in self.UEInfo:
if self.EMM.ATT_T3324:
IEs['T3324'] = self.EMM.ATT_T3324
else:
IEs['T3324'] = self.UEInfo['T3324'].get_val()
#
if 'ExtDRXParam' in self.UEInfo:
if self.EMM.ATT_EXTDRX:
IEs['ExtDRXParam'] = self.EMM.ATT_EXTDRX
else:
IEs['ExtDRXParam'] = self['ExtDRXParam'].get_val()
#
if self.S1.Config['EquivPLMNList'] is not None:
IEs['EquivPLMNList'] = self.S1.Config['EquivPLMNList']
if isinstance(self.S1.Config['EmergNumList'], bytes_types):
IEs['EmergNumList'] = self.S1.Config['EmergNumList']
elif self.S1.Config['EmergNumList'] is not None:
IEs['EmergNumList'] = [{'ServiceCat': uint_to_bitlist(cat), 'Num': num} for \
(cat, num) in self.S1.Config['EmergNumList']]
if self.EMM.TAU_EPS_NETFEAT_SUPP:
IEs['EPSNetFeat'] = self.EMM.TAU_EPS_NETFEAT_SUPP
if self.EMM.TAU_SMS_SERV_STAT:
IEs['SMSServStat'] = self.EMM.TAU_SMS_SERV_STAT
#
self._IEs = IEs
self.set_msg(7, 73, **IEs)
self.encode_msg(7, 73)
#
if self.upd_type[0]:
# reactivate EPS bearers
self.BearActProc = self._act_bear()
else:
self.BearActProc = None
#
ret = self.S1.ret_s1ap_dnt(self._nas_tx)
if self.BearActProc:
ret.append(self.BearActProc)
if self.mtmsi_realloc >= 0:
self.init_timer()
else:
ret.extend(self._end())
#
if self.TRACK_PDU:
self._pdu.append( (time(), 'DL', self._nas_tx) )
return ret
def _end(self):
ret = []
if self.EMM.TAU_S1REL and not self.BearActProc:
S1apProcRel = self.S1.init_s1ap_proc(S1APUEContextRelease, Cause=('nas', 'normal-release'))
if S1apProcRel:
ret.append(S1apProcRel)
self.rm_from_emm_stack()
return ret
#------------------------------------------------------------------------------#
# EMM connection management procedures (S1 mode only): TS 24.301, section 5.6
@ -1105,11 +1446,92 @@ class EMMServiceRequest(EMMSigProc):
)
def process(self, pdu):
pass
# preempt the EMM stack
self.emm_preempt()
if self.TRACK_PDU:
self._pdu.append( (time(), 'UL', pdu) )
self.errcause, self.UEInfo = None, {}
self.decode_msg(pdu, self.UEInfo)
#
# check local ID
if self.UE.IMSI is None:
# UEd was created based on a S-TMSI provided at the RRC layer
# -> we need to get its IMSI before continuing
try:
del self.UE.Server._UEpre[self.UE.MTMSI]
except:
pass
# need to request the IMSI, prepare an id request procedure
return self._ret_req_imsi()
#
elif self.EMM.require_auth(self, ksi=(0, self.UEInfo['KSI'].get_val())):
return self._ret_auth()
else:
# no auth procedure, ksi submitted by the UE is valid
# set UL NAS count for further KeNB derivation
secctx = self.S1.SEC[self.S1.SEC['KSI']]
secctx['UL_enb'] = self._nas_rx._ulcnt
if self.EMM.require_smc(self) and self.EMM.SER_SMC_ALW:
return self._ret_smc((0, self.UEInfo['KSI'].get_val()))
else:
# otherwise, go directly to postprocess
return self.postprocess()
def postprocess(self, Proc=None):
if isinstance(Proc, EMMIdentification):
assert( Proc.IDType == NAS.IDTYPE_IMSI)
# got the UE's IMSI, check if it's allowed
if self.UE.IMSI is None:
# UE did actually not responded with its IMSI, this is bad !
# error 96: invalid mandatory info
self.errcause = 96
return self.output()
elif not self._chk_imsi():
return self.output()
if self.EMM.require_auth(self, ksi=(0, self.UEInfo['KSI'].get_val())):
return self._ret_auth()
else:
secctx = self.S1.SEC[self.S1.SEC['KSI']]
secctx['UL_enb'] = self._nas_rx._ulcnt
if self.EMM.require_smc(self) and self.EMM.SER_SMC_ALW:
return self._ret_smc((0, self.UEInfo['KSI'].get_val()))
#
if isinstance(Proc, EMMAuthentication):
if not Proc.success:
self.abort()
return []
elif self.EMM.require_smc(self):
# ksi established during the auth procedure
return self._ret_smc(Proc.ksi)
#
elif isinstance(Proc, EMMSecurityModeControl):
if not Proc.success:
self.abort()
return []
#
elif Proc != self and Proc is not None:
self._err = Proc
assert()
#
return self.output()
def output(self):
if self.errcause:
self._nas_tx = NAS.EMMStatus(EMMCause=self.errcause)
return self.S1.ret_s1ap_dnt(self._nas_tx)
#
else:
# reactivate all PDN connections
S1apProc = self._act_bear()
self.rm_from_emm_stack()
if S1apProc:
return [S1apProc]
else:
return []
class EMMExtServiceRequest(EMMSigProc):
"""Service request procedure: TS 24.301, section 5.6.1
"""Extended service request procedure: TS 24.301, section 5.6.1
UE-initiated
@ -1132,38 +1554,17 @@ class EMMExtServiceRequest(EMMSigProc):
)
def process(self, pdu):
pass
class EMMExtServiceRequest(EMMSigProc):
"""Service request procedure: TS 24.301, section 5.6.1
UE-initiated
CN message:
None
UE message:
EMMExtServiceRequest (PD 7, Type 76), IEs:
- Type1V : NAS_KSI
- Type1V : ServiceType
- Type4LV : MTMSI
- Type1TV : CSFBResponse
- Type4TLV : EPSBearerCtxtStat
- Type1TV : DeviceProp
"""
Cont = (
None,
(TS24301_EMM.EMMExtServiceRequest, )
)
def process(self, pdu):
pass
# preempt the EMM stack
self.emm_preempt()
if self.TRACK_PDU:
self._pdu.append( (time(), 'UL', pdu) )
self.UEInfo = {}
self.decode_msg(pdu, self.UEInfo)
# TODO
class EMMCPServiceRequest(EMMSigProc):
"""Service request procedure: TS 24.301, section 5.6.1
"""Control-Plane service request procedure: TS 24.301, section 5.6.1
UE-initiated
@ -1186,7 +1587,13 @@ class EMMCPServiceRequest(EMMSigProc):
)
def process(self, pdu):
pass
# preempt the EMM stack
self.emm_preempt()
if self.TRACK_PDU:
self._pdu.append( (time(), 'UL', pdu) )
self.UEInfo = {}
self.decode_msg(pdu, self.UEInfo)
# TODO
class EMMDLNASTransport(EMMSigProc):

View File

@ -216,6 +216,106 @@ class ESMDefaultEPSBearerCtxtAct(ESMSigProc):
)
Timer = 'T3485'
def output(self):
self.encode_msg(2, 193)
if not self._sec:
self._nas_tx._sec = False
if self.TRACK_PDU:
self._pdu.append( (time(), 'DL', self._nas_tx) )
NasTx = self.ESM.output_nas_esm(self._nas_tx, self._EMMProc)
if not NasTx:
return []
#
pdncfg = self.ESM.PDN[self._ebi]
if 'RAB' in pdncfg:
# only a single E-RAB can will be established.
# -> check if we would have to support the establishment of
# several E-RABs with this single procedure
#
NasTxSec = self.S1.output_nas_sec(NasTx)
if not NasTxSec:
return self.S1._s1ap_nas_sec_err()
#
rabcfg = pdncfg['RAB']
# set the E-RAB list parameters for S1 with the single (default) RAB
# embedding the NAS ESM msg into the 1st E-RAB item
# erab ext IE can be Correlation-ID and/or BearerType
erablist = [{
'id': 52,
'criticality': 'reject',
'value': ('E-RABToBeSetupItemCtxtSUReq', {
'e-RAB-ID': self._ebi,
'e-RABlevelQoSParameters': rabcfg['E-RABlevelQoSParameters'],
'transportLayerAddress': (bytes_to_uint(inet_aton(rabcfg['SGW-TLA']), 32), 32),
'gTP-TEID': uint_to_bytes(rabcfg['SGW-GTP-TEID'], 32),
'nAS-PDU': NasTxSec,
#'iE-Extensions': [],
})
}]
#
# get the current sec context to setup the eNB security layer
secctx = self.S1.get_sec_ctx()
if secctx and 'UESecCap' in self.UE.Cap:
# create the KeNB
self._log('DBG', 'NAS UL count for Kenb derivation, %i' % secctx['UL_enb'])
Kenb, UESecCap = conv_A3(secctx['Kasme'], secctx['UL_enb']), self.UE.Cap['UESecCap'][1]
secctx['Kenb'] = Kenb
secctx['NCC'] = 0
secctx['NH'] = conv_A4(secctx['Kasme'], Kenb)
else:
self._log('WNG', 'no active NAS security context, using the null AS security context')
Kenb, UESecCap = self.S1.SECAS_NULL_CTX
#
IEs = {
'E_RABToBeSetupListCtxtSUReq': erablist,
'UEAggregateMaximumBitrate': {
'uEaggregateMaximumBitRateDL': rabcfg['BitrateDL'],
'uEaggregateMaximumBitRateUL': rabcfg['BitrateUL']
},
'SecurityKey': (bytes_to_uint(Kenb, 256), 256),
'UESecurityCapabilities': {
'encryptionAlgorithms': ((UESecCap[1].get_val()<<15) + \
(UESecCap[2].get_val()<<14) + \
(UESecCap[3].get_val()<<13), 16),
'integrityProtectionAlgorithms': ((UESecCap[9].get_val()<<15) + \
(UESecCap[10].get_val()<<14) + \
(UESecCap[11].get_val()<<13), 16)
}
}
#
if self.S1.ICS_RADCAP_INCL and 'UERadioCap' in self.UE.Cap:
IEs['UERadioCapability'] = self.UE.Cap['UERadioCap'][0]
if self.S1.ICS_GUMMEI_INCL:
IEs['GUMMEI'] = gummei_to_asn(self.UE.Server.PLMN,
self.UE.Server.MME_GID,
self.UE.Server.MME_CODE)
if self.S1.ICS_TRACE_ACT:
IEs['TraceActivation'] = self.S1.ICS_TRACE_ACT
#
S1apProc = self.S1.init_s1ap_proc(S1APInitialContextSetup, **IEs)
if S1apProc:
# pass the info required for setting the GTPU tunnel
S1apProc._gtp_add_mobile_ebi = [self._ebi]
self.init_timer()
return [S1apProc]
else:
return []
else:
# send the NAS ESM into a NAS direct transfer
ret = self.S1.ret_s1ap_dnt(NasTx)
if ret:
self.init_timer()
return ret
def process(self, pdu):
if self.TRACK_PDU:
self._pdu.append( (time(), 'UL', pdu) )
self.UEInfo = {}
self.decode_msg(pdu, self.UEInfo)
#
self.rm_from_esm_stack()
return []
class ESMDedicatedEPSBearerCtxtAct(ESMSigProc):
@ -372,6 +472,8 @@ class ESMPDNConnectivityRequest(ESMSigProc):
},
}
Cap = ('DeviceProp', )
def process(self, pdu):
if self.TRACK_PDU:
self._pdu.append( (time(), 'UL', pdu) )
@ -389,7 +491,6 @@ class ESMPDNConnectivityRequest(ESMSigProc):
return NasProc.output()
#
else:
self.postprocess()
def postprocess(self, Proc=None):
@ -406,14 +507,38 @@ class ESMPDNConnectivityRequest(ESMSigProc):
def output(self):
# process the whole transaction request
ret, self.errcause = self.ESM.process_trans(self.UEInfo['PTI'])
NasProc, self.errcause = self.ESM.process_trans(self.UEInfo['PTI'])
# deny request or start an ESNDefaultEPSBearerCtxtAct
if self.errcause:
self.set_msg(2, 209, ESMCause=self.errcause)
self.set_msg(2, 209, EPSBearerId=self.UEInfo['EPSBearerId'],
PTI=self.UEInfo['PTI'],
ESMCause=self.errcause)
self.encode_msg(2, 209)
if self.TRACK_PDU:
self._pdu.append( (time(), 'DL', self._nas_tx) )
self.rm_from_esm_stack()
#
NasTx = self.ESM.output_nas_esm(self._nas_tx, self._EMMProc)
if not NasTx:
return []
else:
return self.S1.ret_s1ap_dnt(NasTx)
# TODO: infinite loop here !!!
#
else:
# TODO
pass
# associate the new ESM procedure to the EMM procedure associated to self
NasProc._EMMProc = self._EMMProc
NasProc.set_msg(2, 193, EPSBearerId=NasProc._ebi,
PTI=self.UEInfo['PTI'])
self.rm_from_esm_stack()
return NasProc.output()
class ESMPDNDisconnectRequest(ESMSigProc):
@ -543,9 +668,9 @@ class ESMInfoRequest(ESMSigProc):
self._nas_tx._sec = False
if self.TRACK_PDU:
self._pdu.append( (time(), 'DL', self._nas_tx) )
nas_tx = self.ESM.output_nas_esm(self._nas_tx, self._EMMProc)
NasTx = self.ESM.output_nas_esm(self._nas_tx, self._EMMProc)
self.init_timer()
return self.S1.ret_s1ap_dnt(nas_tx)
return self.S1.ret_s1ap_dnt(NasTx)
def process(self, pdu):
if self.TRACK_PDU:

View File

@ -357,7 +357,7 @@ class RANAPIuRelease(RANAPSigProc):
else:
if self.Iu.GMM.state != 'INACTIVE':
self.Iu.GMM.state = 'IDLE'
self._log('INF', 'UE disconnected')
self._log('INF', 'UE disconnected, cause %r' % (self._NetInfo['Cause'], ))
#
# disconnect the Iu interface to the RNC for the UE
self.Iu.unset_ran()

View File

@ -421,6 +421,88 @@ class S1APInitialContextSetup(S1APSigProc):
'suc': ({}, {}),
'uns': ({}, {})
}
def send(self):
self._enable_gtpu()
return self._send()
def _enable_gtpu(self):
if hasattr(self, '_gtp_add_mobile_ebi'):
for erab in self._gtp_add_mobile_ebi:
pdncfg = self.S1.ESM.PDN[erab]
pdncfg['state'] = 1
self.UE.Server.GTPUd.add_mobile(
pdncfg['RAB']['SGW-GTP-TEID'], # teid_ul
pdncfg['PDNAddr'], # mobile_addr
pdncfg['RAB']['ENB-TLA'], # ran_ip (maybe None)
pdncfg['RAB']['ENB-GTP-TEID']) # teid_dl (maybe None)
else:
self._log('WNG', 'enable_gtpu: no GTP mobile info provided')
def _disable_gtpu(self):
if hasattr(self, '_gtp_rem_mobile_ebi'):
for erab in self._gtp_rem_mobile_ebi:
pdncfg = self.S1.ESM.PDN[erab]
self.Server.GTPUd.rem_mobile(pdncfg['RAB']['SGW-GTP-TEID'])
pdncfg['state'] = 0
else:
self._log('WNG', 'disable_gtpu: no GTP mobile info provided')
def recv(self, pdu):
self._recv(pdu)
try:
del self.S1.Proc[self.Code]
except:
pass
#
if self.errcause:
self._log('WNG', 'error in the response decoding')
self.success = False
if hasattr(self, '_gtp_add_mobile_ebi'):
self._gtp_rem_mobile_ebi = self._gtp_add_mobile_ebi
self._disable_gtpu()
#
elif pdu[0] == 'unsuccessfulOutcome':
try:
self._log('WNG', 'failure, rejected with cause %r' % (self.UEInfo['Cause'], ))
except:
self._log('WNG', 'failure, rejected without cause')
self.success = False
if hasattr(self, '_gtp_add_mobile_ebi'):
self._gtp_rem_mobile_ebi = self._gtp_add_mobile_ebi
self._disable_gtpu()
#
else:
self.success = True
# E-RAB successfully established, to be completed with eNB IP and TEID
for erabsetupitem in self.UEInfo['E_RABSetupListCtxtSURes']:
erabsetupitem = erabsetupitem['value'][1]
erab = erabsetupitem['e-RAB-ID']
if erab in self._gtp_add_mobile_ebi:
pdncfg = self.S1.ESM.PDN[erab]
pdncfg['ENB-TLA'] = inet_ntoa(uint_to_bytes(*erabsetupitem['transportLayerAddress']))
pdncfg['ENB-GTP-TEID'] = bytes_to_uint(erabsetupitem['gTP-TEID'], 32)
self.Server.GTPUd.set_mobile_dl(
pdncfg['RAB']['SGW-GTP-TEID'], # teid_ul
ran_ip=pdncfg['ENB-TLA'],
teid_dl=pdncfg['ENB-GTP-TEID'])
# E-RAB failed to established
if 'E_RABList' in self.UEInfo:
self._gtp_rem_mobile_ebi = []
for erabitem in self.UEInfo['E_RABList']:
erabitem = erabitem['value'][1]
erab = erabitem['e-RAB-ID']
if erab in self._gtp_add_mobile_ebi:
self._gtp_rem_mobile_ebi.append(erab)
self._log('INF', 'unable to establish E-RAB %i, cause %r'\
% (erab, erabitem['cause']))
self._disable_gtpu()
def abort(self):
S1APSigProc.abort(self)
if hasattr(self, '_gtp_add_mobile_ebi'):
self._gtp_rem_mobile_ebi = self._gtp_add_mobile_ebi
self._disable_gtpu()
class S1APUEContextReleaseRequest(S1APSigProc):
@ -455,8 +537,7 @@ class S1APUEContextReleaseRequest(S1APSigProc):
'uns': None
}
def recv(self, pdu):
self._recv(pdu)
recv = S1APSigProc._recv
def trigger(self):
# copy the cause signaled by the eNB
@ -512,10 +593,12 @@ class S1APUEContextRelease(S1APSigProc):
send = S1APSigProc._send
def _release_s1(self):
# suspend all RAB
self.S1.ESM.pdn_suspend()
# update mobility state
if self.S1.EMM.state != 'INACTIVE':
self.S1.EMM.state = 'IDLE'
self._log('INF', 'UE disconnected')
self._log('INF', 'UE disconnected, cause %r' % (self._NetInfo['Cause'], ))
#
# disconnect the S1 interface to the eNB for the UE
self.S1.unset_ran()
@ -1723,7 +1806,7 @@ class S1APS1Setup(S1APNonUESigProc):
self.ENBInfo['Global_ENB_ID']['eNB-ID'][1])
# prepare the S1SetupResponse
IEs = cpdict(self.Server.ConfigS1)
IEs['ServedGUMMEIs'] = [gummei_to_asn(gummei) for gummei in IEs['GUMMEIs']]
IEs['ServedGUMMEIs'] = [served_gummei_to_asn(gummei) for gummei in IEs['GUMMEIs']]
del IEs['GUMMEIs']
self.encode_pdu('suc', **IEs)
self._log('INF', 'eNB S1 setup successfully')
@ -1989,7 +2072,15 @@ class S1APUECapabilityInfoInd(S1APSigProc):
'suc': None,
'uns': None
}
def recv(self, pdu):
self._recv(pdu)
if not self.errcause:
# set the UERadioCapability in UE.Cap
ueradcap, uecapinfo = decode_ue_rad_cap(self.UEInfo['UERadioCapability'])
self.UE.Cap['UERadioCap'] = (self.UEInfo['UERadioCapability'], ueradcap, uecapinfo)
if 'UERadioCapabilityForPaging' in self.UEInfo:
self.UE.Cap['UERadioCapPaging'] = self.UEInfo['UERadioCapabilityForPaging']
#------------------------------------------------------------------------------#
# Trace Procedures

View File

@ -273,8 +273,7 @@ class LinkSigProc(SigProc):
"""
# 1) select the correct PDU and content
Cont, IEs, Extensions, mand = self.Cont[ptype]
Encod = self.Encod[ptype]
pdu_ies, pdu_exts = [], []
self._NetInfo, Encod, pdu_ies, pdu_exts = kw.copy(), self.Encod[ptype], [], []
#
# 2) encode the list of IEs' values
if IEs is not None:
@ -305,6 +304,8 @@ class LinkSigProc(SigProc):
'value': val})
elif ident in mand:
self._log('WNG', 'encode_pdu: missing mandatory IE, ident %i' % ident)
# sort pdu_ies in order according to 'id'
pdu_ies.sort(key=lambda x:x['id'])
#
# 3) encode the list of Extensions' values
if Extensions is not None:
@ -335,6 +336,8 @@ class LinkSigProc(SigProc):
'extensionValue': val})
elif ident in mand:
self._log('WNG', 'encode_pdu: missing mandatory Ext, ident %i' % ident)
# sort pdu_exts in order according to 'id'
pdu_exts.sort(key=lambda x:x['id'])
#
# 4) enable also undefined buffer values passed at runtime to be encoded
for name in kw:

View File

@ -43,12 +43,13 @@ from .HdlrHNB import HNBd
from .HdlrENB import ENBd
from .HdlrUE import UEd
from .ServerAuC import AuC
from .ServerGTPU import GTPUd
from .ServerGTPU import ARPd, GTPUd, BLACKHOLE_LAN, BLACKHOLE_WAN
# to log all the SCTP socket send() / recv() calls
DEBUG_SK = False
# global HNB debug level
HNBd.DEBUG = ('ERR', 'WNG', 'INF') #, 'DBG')
HNBd.TRACE_ASN_HNBAP = False
@ -64,7 +65,7 @@ UEd.TRACE_RANAP_PS = False
UEd.TRACE_NAS_CS = False
UEd.TRACE_NAS_PS = False
UEd.TRACE_S1AP = True
UEd.TRACE_NAS_EPS_ENC = True
UEd.TRACE_NAS_EPS_SEC = False
UEd.TRACE_NAS_EPS = True
UEd.TRACE_NAS_EPS_SMS = True
@ -94,7 +95,7 @@ class CorenetServer(object):
'port' : 29169,
'MAXCLI': SERVER_MAXCLI,
'errclo': True}
#SERVER_HNB = {} # disabling HNB server
SERVER_HNB = {} # disabling HNB server
# S1AP server
SERVER_ENB = {'INET' : socket.AF_INET,
'IP' : '127.0.1.100',
@ -139,7 +140,7 @@ class CorenetServer(object):
EQUIV_PLMN = None
# emergency number lists
# None or list of 2-tuple [(number_category, number), ...]
# number_category is a 5 bit uint set of flags (Police, Ambulance, Fire, Marine, Mountain)
# number_category is an uint5 set of flags (Police, Ambulance, Fire, Marine, Mountain)
# number is a digits string
EMERG_NUMS = None
#
@ -196,17 +197,18 @@ class CorenetServer(object):
#
# UE configuration parameters
ConfigUE = {
# $IMSI: {'PDN' : [($APN -str-, $PDNType -1..3-, $IPAddr -str-), ...],
# $IMSI: {'PDN' : [($APN -str-, $PDNType -1..3-, $IPAddr -str-, ...), ...],
# 'MSISDN': $phone_num -str-,
# 'USIM' : $milenage_supported -bool-}
# PDN type: 1:IPv4, 2:IPv6, 3:IPv4v6
# PDN type: 1:IPv4, 2:IPv6 /64 prefix, 3:IPv4v6 (-> 1 IPv4 + 1 IPv6 /64 prefix)
'*': {'PDP' : [],
'PDN' : [('*', 1, '192.168.132.199')],
'MSISDN': '0123456789',
'USIM' : True
},
'208691664001001': {'PDP' : [],
'PDN' : [('*', 1, '192.168.132.201')],
'PDN' : [('*', 3, '192.168.132.201', '2001:7a8:1161:132'),
('corenet', 1, '192.168.132.201')],
'MSISDN': '16641001',
'USIM' : True
}
@ -217,9 +219,27 @@ class CorenetServer(object):
}
#
# Packet Data Network config for EPC, per APN
# config elements available:
# DNS : tuple of DNS server addr (in PDNAddr format style:
# 1->IPv4 2-tuple addr, 2->IPv6 2-tuple addr)
# PAP : dict of {peerid : passwd}
# CHAP: dict of ???
# MTU : 2-tuple (IPv4 link MTU, non-IP link MTU)
ConfigPDN = {
'*' : {'DNS': ('192.168.253.1', '192.168.253.2')},
'corenet': {'DNS': ('192.168.253.1', '192.168.253.2')},
'*': {
'QCI': 9,
'DNS': ((1, '192.168.253.1'),
(1, '192.168.253.2'),
(2, '2001:4860:4860::8888'), # Google DNS server
(2, '2001:4860:4860::8844')),
'MTU': (None, None),
},
'corenet': {
'QCI': 9,
'DNS': ((1, '192.168.253.1'),
(1, '192.168.253.2')),
'MTU': (None, None),
},
}
#
# UE, indexed by IMSI, and their UEd handler instance
@ -303,10 +323,14 @@ class CorenetServer(object):
# (with a dummy thread, which will be overridden at runtime)
self._clean_ue_proc = threadit( lambda: 1 )
#
# clear LAI, RAI, TAI dict
self.LAI.clear()
self.RAI.clear()
self.TAI.clear()
#
# initialize GTP TEID UL counter
self.GTP_TEID_UL = randint(1, 200000)
#
# start sub-servers
if self.AUCd:
self.AUCd = self.__class__.AUCd()
@ -1046,7 +1070,7 @@ class CorenetServer(object):
if hasattr(P, 'TimerStop') and T > P.TimerStop:
P._log('WNG', 'timeout: aborting')
P.abort()
#if ue.IuCS.CC.Proc:
# for P in ue.IuCS.CC.Proc.values():
# if hasattr(P, 'TimerStop') and T > P.TimerStop:
@ -1058,7 +1082,7 @@ class CorenetServer(object):
# if hasattr(P, 'TimerStop') and T > P.TimerStop:
# P._log('WNG', 'timeout: aborting')
# P.abort()
#if ue.IuCS.SS.Proc:
# for P in ue.IuCS.SS.Proc.values():
# if hasattr(P, 'TimerStop') and T > P.TimerStop:
@ -1080,11 +1104,23 @@ class CorenetServer(object):
# P._log('WNG', 'timeout: aborting')
# P.abort()
#if ue.S1 is not None:
#
#if ue.S1.EMM.Proc:
# pass
if ue.S1 is not None:
if ue.S1.EMM.Proc:
for P in ue.S1.EMM.Proc:
if hasattr(P, 'TimerStop') and T > P.TimerStop:
P._log('WNG', 'timeout: aborting')
P.abort()
#if ue.S1.ESM.Proc:
# pass
def get_sgw_addr(self):
return self.GTPUd.GTP_IP
def get_gtp_teid(self):
if self.GTP_TEID_UL > 4294967294:
self.GTP_TEID_UL = randint(1, 200000)
self.GTP_TEID_UL += 1
return self.GTP_TEID_UL

View File

@ -44,13 +44,12 @@ APRd.ROUTER_MAC_ADDR = 'f4:00:00:01:02:03', the LAN router (1st IP hop) MAC addr
APRd.ROUTER_IP_ADDR = '192.168.1.1', the LAN router (1st IP hop) IP address
-> some internal network parameters (toward RNC / eNodeB)
GTPUd.INT_IP = '10.1.1.1', IP address exposed on the RAN side
GTPUd.INT_PORT = 2152, GTPU UDP port to be used by RAN equipments
GTPUd.GTP_IP = '10.1.1.1', IP address exposed on the RAN side
GTPUd.GTP_PORT = 2152, GTPU UDP port to be used by RAN equipments
-> some mobiles parameters
APRd.IP_POOL = {'192.168.1.201', '192.168.1.202'}, the pool of IP addresses to be used by our set of mobiles
GTPUd.BLACKHOLING = 0, BLACKHOLE_LAN, BLACKHOLE_WAN or BLACKHOLE_LAN|BLACKHOLE_WAN,
to filter out all the mobile trafic, no trafic at all, or IP packets to external network only
GTPUd.WL_ACTIVE = True or False, to allow specific IP packets to be forwarded to the external network,
bypassing the BLACKHOLING directive
GTPUd.WL_PORTS = [('UDP', 53), ('UDP', 123)], to specify to list of IP protocol / port to allow in case WL_ACTIVE is True
@ -62,13 +61,14 @@ GTPUd.DPI = True or False, to store packet statistics (protocol / port
>>> gsn = GTPUd()
-> to start forwarding IP packets between the external interface and the GTP tunnel
if you want to let the GTPUd manage the attribution of TEID_to_rnc (GTPUd.GTP_TEID_EXT = False)
>>> teid_to_ran = gsn.add_mobile(mobile_ip='192.168.1.201', ran_ip='10.1.1.2', teid_from_ran=0x1)
if you want to manage teid_to_rnc by yourself and just provide its value to GTPUd (GTPUd.GTP_TEID_EXT = True)
>>> gsn.add_mobile(self, mobile_ip='192.168.1.201', ran_ip='10.1.1.2', teid_from_rnc=0x1, teid_to_rnc=0x2)
You need to set at least the TEID for uplink and mobile address (2-tuple, with address type: 1 for IPv4, 2 for IPv6):
>>> gsn.add_mobile(self, teid_ul=0x1, mobile_addr=(1, '192.168.1.201'), ran_ip=None, teid_dl=None)
You will need to set the downlink parameters in order to relay IP packets from the external interface over GTP:
>>> gsn.set_mobile_dl(teid_ul=0x1, ran_ip='10.1.1.2', teid_dl=0x3)
You can also set all parameters when calling add_mobile() method
-> to stop forwading IP packets
>>> gsn.rem_mobile(mobile_ip='192.168.1.201')
>>> gsn.rem_mobile(teid_ul=0x1)
-> modules that act on GTPU packets can be added to the GTPUd instance, they must be put in the MOD attribute
Two example modules DNSRESP and TCPSYNACK are provided.
@ -175,23 +175,23 @@ class ARPd(object):
#
# verbosity level: list of log types to display when calling
# self._log(logtype, msg)
DEBUG = ('ERR', 'WNG', 'INF', 'DBG')
DEBUG = ('ERR', 'WNG', 'INF', 'DBG')
#
# recv() buffer length
BUFLEN = 2048
BUFLEN = 2048
# select() timeout and wait period
SELECT_TO = 0.1
SELECT_SLEEP = 0.05
SELECT_TO = 0.1
SELECT_SLEEP = 0.05
#
# all Gi interface parameters
# Our GGSN ethernet parameters (IF, MAC and IP addresses)
# (and also the MAC address to be used for any mobiles through our GGSN)
GGSN_ETH_IF = 'eth0'
GGSN_MAC_ADDR = '08:00:00:01:02:03'
GGSN_IP_ADDR = '192.168.1.100'
GGSN_ETH_IF = 'eth0'
GGSN_MAC_ADDR = '08:00:00:01:02:03'
GGSN_IP_ADDR = '192.168.1.100'
#
# the set of IP address to be used by our mobiles
IP_POOL = {'192.168.1.201', '192.168.1.202', '192.168.1.203'}
IP_POOL = {'192.168.1.201', '192.168.1.202', '192.168.1.203'}
#
# network parameters:
# subnet prefix
@ -301,7 +301,7 @@ class ARPd(object):
sleep(self.SELECT_SLEEP)
self._log('INF', 'ARP resolver stopped')
def _process_arpbuf(self, buf=bytes()):
def _process_arpbuf(self, buf):
# this is an ARP request or response:
arpop = ord(buf[21:22])
# 1) check if it requests for one of our IP
@ -311,15 +311,14 @@ class ARPd(object):
# reply to it with our MAC ADDR
try:
self.sk_arp.sendto(
b''.join((buf[6:12], self.GGSN_MAC_BUF, # Ethernet hdr
b'\x08\x06\0\x01\x08\0\x06\x04\0\x02',
self.GGSN_MAC_BUF, buf[38:42], # ARP sender
buf[6:12], buf[28:32], # ARP target
b'\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0'))
(self.GGSN_ETH_IF, 0x0806))
b''.join((buf[6:12], self.GGSN_MAC_BUF, # Ethernet hdr
b'\x08\x06\0\x01\x08\0\x06\x04\0\x02',
self.GGSN_MAC_BUF, buf[38:42], # ARP sender
buf[6:12], buf[28:32], # ARP target
b'\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0')),
(self.GGSN_ETH_IF, 0x0806))
except Exception as err:
self._log('ERR', 'external network error (sendto) on ARP '\
'response: %s' % err)
self._log('ERR', 'external network error (sendto) on ARP response: %s' % err)
else:
self._log('DBG', 'ARP response sent for IP: %s' % ipreq)
# 2) check if it responses something useful for us
@ -332,7 +331,7 @@ class ARPd(object):
self.ARP_RESOLV_TABLE[ipres] = buf[22:28]
self._log('DBG', 'got ARP response for new local IP: %s' % ipres)
def _process_ipbuf(self, buf=b''):
def _process_ipbuf(self, buf):
# this is an random IPv4 packet incoming into our interface:
# check if src IP is in our subnet and not already resolved,
# then store the Ethernet MAC address
@ -345,7 +344,7 @@ class ARPd(object):
self.ARP_RESOLV_TABLE[ipsrc] = buf[6:12]
self._log('DBG', 'got MAC address from IPv4 packet for new local IP: %s' % ipsrc)
def resolve(self, ip='192.168.1.2'):
def resolve(self, ip):
# check if already resolved
if ip in self.ARP_RESOLV_TABLE:
return self.ARP_RESOLV_TABLE[ip]
@ -358,12 +357,12 @@ class ARPd(object):
else:
try:
self.sk_arp.sendto(
b''.join((self.ROUTER_MAC_BUF, self.GGSN_MAC_BUF, # Ethernet hdr
b'\x08\x06\0\x01\x08\0\x06\x04\0\x01',
self.GGSN_MAC_BUF, self.GGSN_IP_BUF, # ARP sender
b'\0\0\0\0\0\0', inet_aton(ip), # ARP target
b'\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0'))
(self.GGSN_ETH_IF, 0x0806))
b''.join((self.ROUTER_MAC_BUF, self.GGSN_MAC_BUF, # Ethernet hdr
b'\x08\x06\0\x01\x08\0\x06\x04\0\x01',
self.GGSN_MAC_BUF, self.GGSN_IP_BUF, # ARP sender
b'\0\0\0\0\0\0', inet_aton(ip), # ARP target
b'\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0')),
(self.GGSN_ETH_IF, 0x0806))
except Exception as err:
self._log('ERR', 'external network error (sendto) on ARP request: %s' % err)
else:
@ -393,12 +392,12 @@ class GTPUd(object):
GTP-U forwarder
bridges Ethernet to GTP-U to handle IPv4 data traffic of connected UE.
This is to be instanciated as a unique handler for all GTP-U tunnels
This is to be instantiated as a unique handler for all GTP-U tunnels
in the corenet mobile core network.
Then, it is possible to add or remove GTP tunnel endpoints at will,
for each mobile with methods:
.add_mobile(mobile_ip, rnc_ip, teid_from_rnc)
-> returns teid_to_rnc for the given mobile
To add or remove GTP tunnel endpoints at will for each mobile,
use the methods:
.add_mobile(mobile_ip, rnc_ip, teid_ul[, teid_dl])
-> returns teid_dl for the given mobile, if not set through the method call
.rem_mobile(mobile_ip)
-> returns None
@ -427,28 +426,20 @@ class GTPUd(object):
#
# verbosity level: list of log types to display when calling
# self._log(logtype, msg)
DEBUG = ('ERR', 'WNG', 'INF', 'DBG')
DEBUG = ('ERR', 'WNG', 'INF', 'DBG')
#
# packet buffer space (over MTU...)
BUFLEN = 1536
BUFLEN = 2048
# select loop settings
SELECT_TO = 0.2
SELECT_TO = 0.1
#
# Gi interface, with GGSN ethernet IF and mobile IP address
EXT_IF = ARPd.GGSN_ETH_IF
GGSN_MAC_ADDR = ARPd.GGSN_MAC_ADDR
# IPv4 protocol only, to be forwarded
EXT_PROT = 0x0800
#
# internal IP interface, for handling GTP-U packets from RNC / eNB
INT_IP = '127.0.1.100'
INT_PORT = 2152
#
# GTP TEID toward RNC / eNodeBs (for DL traffic)
GTP_TEID = 0
GTP_TEID_MAX = 2**32 - 1
# in case the GTP TEID is assigned by an external entity
GTP_TEID_EXT = True
GTP_IP = '127.0.1.100'
GTP_PORT = 2152
#
# BLACKHOLING feature
# to enable the whole traffic: 0
@ -463,10 +454,11 @@ class GTPUd(object):
# in case we want to generate traffic statistics (then available in .stats)
DPI = True
#
# in case we want to check and drop spoofed IPv4 source address in incoming
# GTP-U packet
# in case we want to check and drop spoofed IPv4/v6 source address
# in incoming GTP-U packet
DROP_SPOOF = True
#
# in case we want to stop the listener when typing CTRL+C
CATCH_SIGINT = False
def __init__(self):
@ -474,15 +466,13 @@ class GTPUd(object):
self.GGSN_MAC_BUF = mac_aton(self.GGSN_MAC_ADDR)
#
# 2 dict for handling mobile GTP-U packets transfers:
# key: mobile IPv4 addr (buf)
# value: (ran_ip (asc), teid_from_ran (int), teid_to_ran (int))
self._mobiles_ip = {}
# key: teid_from_ran (int)
# value: mobile IPv4 addr (buf)
# key: mobile IPv4 or v6 addr (4 or 16 bytes)
# value: teid_ul (uint)
self._mobiles_addr = {}
# key: teid_ul (uint)
# value: [ran_ip (asc), teid_dl (uint), ipaddr, ctx_num (uint)]
# ipaddr: ipv4 (4 bytes), ipv6pref (8 bytes) or ipv6 (16 bytes)
self._mobiles_teid = {}
# global TEID to RAN value, to be incremented from here
if not self.GTP_TEID_EXT:
self.GTP_TEID = randint(0, 200000)
#
# initialize the traffic statistics
self.stats = {}
@ -490,25 +480,25 @@ class GTPUd(object):
# initialize the list of modules that can act on GTP-U payloads
self.MOD = []
#
# create a RAW PF_PACKET socket on the `Internet` side
# python is not convinient to configure dest mac addr
# when using SOCK_DGRAM (or I missed something...),
# so we use SOCK_RAW and build our own ethernet header:
self.sk_ext = socket.socket(socket.AF_PACKET, socket.SOCK_RAW, ntohs(self.EXT_PROT))
# configure timeouting and interface binding
self.sk_ext.settimeout(0.001)
#self.sk_ext.setblocking(0)
self.sk_ext.bind((self.EXT_IF, self.EXT_PROT))
# put the interface in promiscuous mode
set_promisc(self.sk_ext, self.EXT_IF, 1)
# create two RAW PF_PACKET sockets on the `Internet` side (1 for IPv4, 1 for IPv6)
self.sk_ext_v4 = socket.socket(socket.AF_PACKET, socket.SOCK_RAW)
self.sk_ext_v4.settimeout(0.001)
#self.sk_ext_v4.setblocking(0)
self.sk_ext_v4.bind((self.EXT_IF, 0x0800))
set_promisc(self.sk_ext_v4, self.EXT_IF, 1)
#
# create an UDP socket on the RNC / eNB side, on port 2152
self.sk_ext_v6 = socket.socket(socket.AF_PACKET, socket.SOCK_RAW, ntohs(0x86dd))
self.sk_ext_v6.settimeout(0.001)
#self.sk_ext_v6.setblocking(0)
self.sk_ext_v6.bind((self.EXT_IF, 0x86dd))
set_promisc(self.sk_ext_v6, self.EXT_IF, 1)
#
# create an UDP socket on the RNC / eNB side
self.sk_int = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
# configure timeout, binding and rebinding on same address
self.sk_int.settimeout(0.001)
#self.sk_int.setblocking(0)
self.sk_int.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.sk_int.bind((self.INT_IP, self.INT_PORT))
self.sk_int.bind((self.GTP_IP, self.GTP_PORT))
#
# interrupt handler
if self.CATCH_SIGINT:
@ -519,7 +509,7 @@ class GTPUd(object):
signal.signal(signal.SIGINT, sigint_handler)
#
# and start listening and transferring packets in background
self.sk_list = (self.sk_ext, self.sk_int)
self.sk_list = (self.sk_ext_v4, self.sk_ext_v6, self.sk_int)
self._listening = True
self._listener_t = threadit(self.listen)
self._log('INF', 'GTP-U tunnels handler started')
@ -533,15 +523,17 @@ class GTPUd(object):
log('[%s] [GTPUd] %s' % (logtype, msg))
def init_stats(self, ip):
self.stats[ip] = {
'DNS': [], # IP of DNS servers requested
'NTP': [], # IP of NTP servers requested
'resolved': [], # domain name resolved
'ICMP': [], # ICMP endpoint (IP) contacted
'TCP': [], # TCP endpoint (IP, port) contacted
'UDP': [], # UDP endpoint (IP, port) contacted
'alien': [], # other protocol endpoint contacted
stats = {
'DNS' : set(), # IP of DNS servers requested
'NTP' : set(), # IP of NTP servers requested
'resolved': set(), # domain name resolved
'ICMP' : set(), # ICMP endpoint (IP) contacted
'TCP' : set(), # TCP endpoint (IP, port) contacted
'UDP' : set(), # UDP endpoint (IP, port) contacted
'alien' : set(), # other protocol packets
}
self.stats[ip] = stats
return stats
def stop(self):
# stop ARP resolver
@ -551,11 +543,12 @@ class GTPUd(object):
self._listening = False
sleep(self.SELECT_TO * 2)
try:
# unset promiscuous mode
set_promisc(self.sk_ext, self.EXT_IF, 0)
set_promisc(self.sk_ext_v4, self.EXT_IF, 0)
set_promisc(self.sk_ext_v6, self.EXT_IF, 0)
# closing sockets
self.sk_int.close()
self.sk_ext.close()
self.sk_ext_v4.close()
self.sk_ext_v6.close()
except Exception as err:
self._log('ERR', 'socket error: %s' % err)
@ -565,148 +558,248 @@ class GTPUd(object):
r = select(self.sk_list, [], [], self.SELECT_TO)[0]
# read ext and int sockets until they are empty
for sk in r:
if sk != self.sk_int:
# sk == self.sk_ext
#
if sk == self.sk_ext_v4:
# DL IPv4
try:
buf = sk.recvfrom(self.BUFLEN)[0]
#except timeout:
# # nothing to read anymore
# buf = b''
# pass
except Exception as err:
self._log('ERR', 'external network IF error (recvfrom): %s' % err)
#buf = b''
self._log('ERR', 'sk_ext_v4 IF error (recvfrom): %s' % err)
else:
if len(buf) >= 34 and \
buf[:6] == self.GGSN_MAC_BUF and \
buf[12:14] == b'\x08\0' and \
buf[16:20] in self._mobiles_ip:
# transferring over GTP-U after removing the Ethernet header
self.transfer_to_int(buf[14:])
#threadit(self.transfer_to_int, buf[14:])
#self._log('DBG', 'sk_ext_v4, recvfrom()')
if len(buf) >= 34 and buf[:6] == self.GGSN_MAC_BUF \
and buf[30:34] in self._mobiles_addr:
# IPv4 of a mobile, transfer over GTP-U
# after removing the Ethernet header
self.transfer_v4_to_int(buf[14:])
#threadit(self.transfer_v4_to_int, buf[14:])
#
elif sk == self.sk_ext_v6:
# DL IPv6
try:
buf = sk.recvfrom(self.BUFLEN)[0]
#except timeout:
# pass
except Exception as err:
self._log('ERR', 'sk_ext_v6 IF error (recvfrom): %s' % err)
else:
#self._log('DBG', 'sk_ext_v6, recvfrom()')
if len(buf) >= 54 and buf[:6] == self.GGSN_MAC_BUF \
and buf[38:54] in self._mobiles_addr:
# IPv6 of a mobile, transfer over GTP-U
# after removing the Ethernet header
self.transfer_v6_to_int(buf[14:])
#threadit(self.transfer_v6_to_int, buf[14:])
#
else:
# sk == self.int_sk
#sk == self.sk_int
# UL, both IPv4 and IPv6 packets
try:
buf = sk.recv(self.BUFLEN)
#except timeout:
# # nothing to read anymore
# buf = b''
# pass
except Exception as err:
self._log('ERR', 'internal network IF error (recv): %s' % err)
#buf = b''
self._log('ERR', 'sk_int IF error (recv): %s' % err)
else:
self.transfer_to_ext(buf)
#threadit(self.transfer_to_ext, buf)
#
self._log('INF', 'GTPU handler stopped')
def transfer_to_ext(self, buf):
# if GTP-U TEID in self._mobiles_teid, just forward...
# in this direction, there is no reason to filter
# except to avoid IP spoofing from malicious mobile
# (damned ! Would it be possible ?!?)
#
# extract the GTP header
try:
flags, msgtype, msglen, teid_from_ran = unpack('>BBHI', buf[:8])
except:
self._log('WNG', 'invalid GTP packet from RAN')
return
#
# in case GTP TEID is not correct, drop it
try:
mobile_ip = self._mobiles_teid[teid_from_ran]
except:
self._log('WNG', 'unknown GTP TEID from RAN: 0x%.8x' % teid_from_ran)
return
#
# in case GTP does not contain UP data, drop it
if msgtype != 0xff:
self._log('WNG', 'unsupported GTP type from RAN: 0x%.2x' % msgtype)
return
#
# get the IP packet: use the length in GTPv1 header to cut the buffer
if flags & 0x04:
# GTP header extended
msglen -= 4
ipbuf = buf[-msglen:]
#
# drop dummy IP packets
if len(ipbuf) < 24:
self._log('WNG', 'dummy packet from mobile dropped: %s' % hexlify(ipbuf).decode('ascii'))
return
#
# drop packet other than IPv4
ipver = ord(ipbuf[0:1]) >> 4
if ipver != 4:
self._log('WNG', 'unsupported IPv%i packet from UE' % ipver)
return
#
# drop spoofed IP packet
if self.DROP_SPOOF and ipbuf[12:16] != mobile_ip:
self._log('WNG', 'spoofed IPv4 source address from UE: %s' % inet_ntoa(ipbuf[12:16]))
return
#
ipsrc = inet_ntoa(ipbuf[12:16])
ipdst = inet_ntoa(ipbuf[16:20])
#
# analyze the packet content for statistics
if self.DPI:
self._analyze(ipsrc, ipbuf)
#
# possibly process the UL GTP-U payload within modules
if self.MOD:
try:
for mod in self.MOD:
if mod.TYPE == 0:
ipbuf = mod.handle_ul(ipbuf)
else:
mod.handle_ul(ipbuf)
except Exception as err:
self._log('ERR', 'MOD error: %s' % err)
#
# resolve the dest MAC addr
macdst = self.arpd.resolve(ipdst)
#
# possibly bypass blackholing rule for allowed ports
# check if PROT / PORT is allowed in the whilelist
if self.BLACKHOLING:
if self.WL_ACTIVE:
dst, prot, pay = DPI.get_ip_dst_pay(ipbuf)
# TCP:6, UDP:17
if prot in (6, 17) and pay:
port = DPI.get_port(pay)
if (self._prot_dict[prot], port) in self.WL_PORTS:
self._transfer_to_ext(macdst, ipbuf)
else:
return
else:
return
elif macdst != self.arpd.ROUTER_MAC_BUF:
if self.BLACKHOLING & BLACKHOLE_LAN:
return
else:
self._transfer_to_ext(macdst, ipbuf)
else:
if self.BLACKHOLING & BLACKHOLE_WAN:
return
else:
self._transfer_to_ext(macdst, ipbuf)
def resolve_mac(self, ipdst):
if len(ipdst) == 4:
return self.arpd.resolve(inet_ntoa(ipdst))
else:
self._transfer_to_ext(macdst, ipbuf)
# TODO: implement a minimal IPv6 NDP service
return self.ROUTER_MAC_BUF
def _transfer_to_ext(self, macdst, ipbuf):
#--------------------------------------------------------------------------#
# UL transfer
#--------------------------------------------------------------------------#
def transfer_to_ext(self, buf):
try:
# extract the GTP header
flags, msgtype, msglen, teid_ul = unpack('>BBHI', buf[:8])
# in case GTP TEID is not correct, drop it
ran_ip, teid_dl, ipaddr, ctx_num = self._mobiles_teid[teid_ul]
# TODO: handle GTP ECHO
if msgtype != 0xff:
self._log('WNG', 'unsupported GTP type from RAN: 0x%.2x' % msgtype)
return
# get the IP packet: use the length in the GTP header to cut the buffer
if flags & 0x04:
# GTP header extended
msglen -= 4
ipbuf = buf[-msglen:]
# get the IP version
ipvers = ord(ipbuf[0:1])>>4
if ipvers == 4:
ipsrc = ipbuf[12:16]
ipdst = ipbuf[16:20]
elif ipvers == 6:
ipsrc = ipbuf[8:24]
ipdst = ipbuf[24:40]
else:
self._log('WNG', 'invalid IP packet from UE, dropping it')
return
except:
self._log('WNG', 'invalid GTP / IP packet from RAN / UE, dropping it')
return
#
if ipvers == 4:
if self.DROP_SPOOF and ipsrc != ipaddr:
self._log('WNG', 'spoofed IPv4 src addr, teid_ul 0x%.8x' % teid_ul)
return
if self.DPI:
self._analyze(ipvers, inet_ntoa(ipsrc), ipbuf)
if self.MOD:
try:
for mod in self.MOD:
if mod.TYPE == 0:
ipbuf = mod.handle_ul(ipbuf)
else:
mod.handle_ul(ipbuf)
except Exception as err:
self._log('ERR', 'MOD error: %s' % err)
# resolve the dest MAC addr
macdst = self.resolve_mac(ipdst)
# apply blackholing
if self.BLACKHOLING:
if macdst != self.arpd.ROUTER_MAC_BUF:
if self.BLACKHOLING & BLACKHOLE_LAN:
drop = True
else:
drop = False
else:
if self.BLACKHOLING & BLACKHOLE_WAN:
drop = True
else:
drop = False
if drop and self.WL_ACTIVE:
ipdst, prot, pay = DPIv4.get_ip_info(ipbuf)
if prot in (6, 17) and pay:
# UDP / TCP
port = DPIv4.get_port(pay)
if (self._prot_dict[prot], port) in self.WL_PORTS:
self._transfer_to_ext_v4(macdst, ipbuf)
else:
return
else:
self._transfer_to_ext_v4(macdst, ipbuf)
#
else:
#ipvers == 6
if len(ipaddr) == 8:
# we only have the IPv6 prefix, need to store the full ipv6 addr
if self.DROP_SPOOF and ipsrc[:8] != ipaddr:
self._log('WNG', 'spoofed IPv6 src prefix, teid_ul 0x%.8x' % teid_ul)
return
# update local db with the full IPv6
self._mobiles_teid[teid_ul][2] = ipsrc
self._mobiles_addr[ipsrc] = teid_ul
elif self.DROP_SPOOF and ipsrc != ipaddr:
self._log('WNG', 'spoofed IPv6 src addr, teid_ul 0x%.8x' % teid_ul)
return
if self.DPI:
self._analyze(ipvers, inet_pton(AF_INET6, ipsrc), ipbuf)
if self.MOD:
try:
for mod in self.MOD:
if mod.TYPE == 0:
ipbuf = mod.handle_ul(ipbuf)
else:
mod.handle_ul(ipbuf)
except Exception as err:
self._log('ERR', 'MOD error: %s' % err)
# resolve the dest MAC addr
macdst = self.resolve_mac(ipdst)
# apply blackholing
if self.BLACKHOLING:
if macdst != self.arpd.ROUTER_MAC_BUF:
if self.BLACKHOLING & BLACKHOLE_LAN:
drop = True
else:
drop = False
else:
if self.BLACKHOLING & BLACKHOLE_WAN:
drop = True
else:
drop = False
if drop and self.WL_ACTIVE:
ipdst, prot, pay = DPIv6.get_ip_info(ipbuf)
if prot in (6, 17) and pay:
# UDP / TCP
port = DPIv6.get_port(pay)
if (self._prot_dict[prot], port) in self.WL_PORTS:
self._transfer_to_ext_v6(macdst, ipbuf)
else:
return
else:
self._transfer_to_ext_v6(macdst, ipbuf)
def _transfer_to_ext_v4(self, macdst, ipbuf):
# forward to the external PF_PACKET socket, over the Gi interface
try:
self.ext_sk.sendto(b''.join((macdst, self.GGSN_MAC_BUF, b'\x08\0', ipbuf))
(self.EXT_IF, self.EXT_PROT))
self.sk_ext_v4.sendto(b''.join((macdst, self.GGSN_MAC_BUF, b'\x08\0', ipbuf)),
(self.EXT_IF, 0x0800))
except Exception as err:
self._log('ERR', 'external network IF error (sendto): %s' % err)
#else:
# self._log('DBG', 'buffer transferred from GTPU to RAW')
self._log('ERR', 'sk_ext_v4 IF error (sendto): %s' % err)
def transfer_to_int(self, buf):
# from .listen():
# buf length is guaranteed >= 20 and ipdst in self._mobiles_ip
def _transfer_to_ext_v6(self, macdst, ipbuf):
# forward to the external PF_PACKET socket, over the Gi interface
try:
self.sk_ext_v6.sendto(b''.join((macdst, self.GGSN_MAC_BUF, b'\x86\xdd', ipbuf)),
(self.EXT_IF, 0x86dd))
except Exception as err:
self._log('ERR', 'sk_ext_v6 IF error (sendto): %s' % err)
def _analyze(self, ipvers, ipsrc, ipbuf):
#
try:
stats = self.stats[ipsrc]
except:
stats = self.init_stats(ipsrc)
#
if ipvers == 4:
dst, prot, pay = DPIv4.get_ip_info(ipbuf)
DPI = DPIv4
else:
dst, prot, pay = DPIv6.get_ip_info(ipbuf)
DPI = DPIv6
#
# UDP
if prot == 17 and pay:
port = DPI.get_port(pay)
stats['UDP'].add((dst, port))
# DNS
if port == 53:
stats['DNS'].add(dst)
name = DPI.get_dn_req(pay[8:])
stats['resolved'].add(name)
elif port == 123:
stats['NTP'].add(dst)
# TCP
elif prot == 6 and pay:
port = DPI.get_port(pay)
stats['TCP'].add((dst, port))
# ICMP
elif prot == 1 and pay:
stats['ICMP'].add(dst)
# alien
else:
stats['alien'].add(hexlify(ipbuf))
#--------------------------------------------------------------------------#
# DL transfer
#--------------------------------------------------------------------------#
def transfer_v4_to_int(self, buf):
#self._log('DBG', 'transfer_v4_to_int()')
# buf length is guaranteed >= 20 and ipdst in self._mobiles_addr
#
if self.MOD:
# possibly process the DL GTP-U payload within modules
@ -719,112 +812,133 @@ class GTPUd(object):
except Exception as err:
self._log('ERR', 'MOD error: %s' % err)
#
ran_ip, teid_from_ran, teid_to_ran = self._mobiles_ip[buf[16:20]]
teid_ul = self._mobiles_addr[buf[16:20]]
ran_ip, teid_dl, ipaddr, ctx_num = self._mobiles_teid[teid_ul]
#
# prepend GTP header and forward to the RAN IP
gtphdr = pack('>BBHI', 0x30, 0xff, len(buf), teid_to_ran)
#
try:
ret = self.int_sk.sendto(gtphdr + buf, (ran_ip, self.INT_PORT))
except Exception as err:
self._log('ERR', 'internal network IF error (sendto): %s' % err)
#else:
# self._log('DBG', '%i bytes transferred from RAW to GTPU' % ret)
###
# Now we can add and remove (mobile_IP, TEID_from/to_RAN),
# to configure filters and really start forwading packets over GTP
def add_mobile(self, mobile_ip='192.168.1.201', ran_ip='10.1.1.1',
teid_from_ran=0x1, teid_to_ran=0x1):
try:
ip = inet_aton(mobile_ip)
except Exception as err:
self._log('ERR', 'mobile_ip (%r) has not the correct format: '\
'cannot configure the GTPU handler' % mobile_ip)
return
if not self.GTP_TEID_EXT:
teid_to_ran = self.get_teid_to_ran()
self._mobiles_ip[ip] = (ran_ip, teid_from_ran, teid_to_ran)
self._mobiles_teid[teid_from_ran] = ip
self._log('INF', 'setting GTP tunnel for mobile with IP %s' % mobile_ip)
return teid_to_ran
def rem_mobile(self, mobile_ip='192.168.1.201'):
try:
ip = inet_aton(mobile_ip)
except Exception as err:
self._log('ERR', 'mobile_ip (%r) has not the correct format: '\
'cannot configure the GTPU handler' % mobile_ip)
return
if ip in self._mobiles_ip:
self._log('INF', 'unsetting GTP tunnel for mobile with IP %s' % mobile_ip)
ran_ip, teid_from_ran, teid_to_ran = self._mobiles_ip[ip]
del self._mobiles_ip[ip]
if teid_from_ran in self._mobiles_teid:
del self._mobiles_teid[teid_from_ran]
def get_teid_to_ran(self):
if self.GTP_TEID >= self.GTP_TEID_MAX:
self.GTP_TEID = randint(0, 200000)
self.GTP_TEID += 1
return self.GTP_TEID
def _analyze(self, ipsrc, ipbuf):
#
try:
stats = self.stats[ipsrc]
except:
self.init_stats(ipsrc)
stats = self.stats[ipsrc]
#
dst, prot, pay = DPI.get_ip_dst_pay(ipbuf)
# UDP
if prot == 17 and pay:
port = DPI.get_port(pay)
if (dst, port) not in stats['UDP']:
stats['UDP'].append((dst, port))
# DNS
if port == 53:
if dst not in stats['DNS']:
stats['DNS'].append(dst)
name = DPI.get_dn_req(pay[8:])
if name not in stats['resolved']:
stats['resolved'].append(name)
elif port == 123 and dst not in stats['NTP']:
stats['NTP'].append(dst)
# TCP
elif prot == 6 and pay:
port = DPI.get_port(pay)
if (dst, port) not in stats['TCP']:
stats['TCP'].append((dst, port))
# ICMP
elif prot == 1 and pay and dst not in stats['ICMP']:
stats['ICMP'].append(dst)
# alien
if ran_ip and teid_dl is not None:
gtphdr = pack('>BBHI', 0x30, 0xff, len(buf), teid_dl)
try:
ret = self.sk_int.sendto(gtphdr + buf, (ran_ip, self.GTP_PORT))
except Exception as err:
self._log('ERR', 'sk_int IF error (sendto): %s' % err)
else:
stats['alien'].append(hexlify(ipbuf))
class DPI:
self._log('WNG', 'teid_ul 0x%.8x, downlink GTP parameters not set' % teid_ul)
@staticmethod
def get_ip_dst_pay(ipbuf):
# returns a 3-tuple: dst IP, protocol, payload buffer
# get IP header length
l = (ord(ipbuf[0:1]) & 0x0F) * 4
# get dst IP
dst = inet_ntoa(ipbuf[16:20])
# get protocol
prot = ord(ipbuf[9:10])
def transfer_v6_to_int(self, buf):
#self._log('DBG', 'transfer_v6_to_int()')
# buf length is guaranteed >= 40 and ipdst in self._mobiles_addr
#
return (dst, prot, ipbuf[l:])
if self.MOD:
# possibly process the DL GTP-U payload within modules
try:
for mod in self.MOD:
if mod.TYPE == 0:
buf = mod.handle_dl(buf)
else:
mod.handle_dl(buf)
except Exception as err:
self._log('ERR', 'MOD error: %s' % err)
#
teid_ul = self._mobiles_addr[buf[24:40]]
ran_ip, teid_dl, ipaddr, ctx_num = self._mobiles_teid[teid_ul]
#
# prepend GTP header and forward to the RAN IP
if ran_ip and teid_dl is not None:
gtphdr = pack('>BBHI', 0x30, 0xff, len(buf), teid_dl)
try:
ret = self.sk_int.sendto(gtphdr + buf, (ran_ip, self.GTP_PORT))
except Exception as err:
self._log('ERR', 'sk_int IF error (sendto): %s' % err)
else:
self._log('WNG', 'teid_ul 0x%.8x, downlink GTP parameters not set' % teid_ul)
#--------------------------------------------------------------------------#
# UE management
#--------------------------------------------------------------------------#
def add_mobile(self, teid_ul, mobile_addr, ran_ip, teid_dl):
if mobile_addr[0] == 1:
# IPv4
ipbuf = inet_aton_cn(*mobile_addr)
if len(ipbuf) != 4:
self._log('ERR', 'invalid mobile addr %r' % (mobile_addr, ))
return
elif mobile_addr[0] == 2:
# IPv6 prefix (8 bytes) or IPv6
ipbuf = inet_aton_cn(*mobile_addr)
if len(ipbuf) not in (8, 16):
self._log('ERR', 'invalid mobile addr %r' % (mobile_addr, ))
return
elif mobile_addr[0] == 3:
# IPv4v6
self.add_mobile(teid_ul, (1, mobile_addr[1]), ran_ip, teid_dl)
self.add_mobile(teid_ul, (2, mobile_addr[2]), ran_ip, teid_dl)
return
else:
self._log('ERR', 'invalid mobile addr %r' % (mobile_addr, ))
#
if teid_ul in self._mobiles_teid:
# just increment the ctx_num
self._mobiles_teid[teid_ul][3] += 1
else:
# insert a new context
self._mobiles_teid[teid_ul] = [ran_ip, teid_dl, ipbuf, 1]
if len(ipbuf) in (4, 16):
self._mobiles_addr[ipbuf] = teid_ul
self._log('INF', 'setting GTP-U context for UE with IP %s, teid_ul 0x%.8x'\
% (mobile_addr[1], teid_ul))
def set_mobile_dl(self, teid_ul, ran_ip=None, teid_dl=None):
# enables to reconfigure the DL parameters (RAN IP, DL TEID)
try:
ran_ip_ori, teid_dl_ori, ipbuf, ctx_num = self._mobiles_teid[teid_ul]
except Exception as err:
self._log('ERR', 'invalid teid_ul 0x%.8x' % teid_ul)
return
else:
if ran_ip is None:
ran_ip = ran_ip_ori
if teid_dl is None:
teid_dl = teid_dl_ori
self._mobiles_teid[teid_ul] = [ran_ip, teid_dl, ipbuf, ctx_num]
def rem_mobile(self, teid_ul):
if teid_ul in self._mobiles_teid:
mobile_ctx = self._mobiles_teid[teid_ul]
if mobile_ctx[-1] > 1:
# decrement the number of GTP contexts
mobile_ctx[-1] -= 1
else:
# delete the mobile context
del self._mobiles_teid[teid_ul]
ipbuf, ipaddr = mobile_ctx[2], None
if len(ipbuf) in (4, 16):
try:
del self._mobiles_addr[ipbuf]
except:
pass
else:
if len(ipbuf) == 4:
ipaddr = inet_ntoa(ipbuf)
else:
ipaddr = inet_pton(AF_INET6, ipbuf)
self._log('INF', 'deleting GTP-U context for UE with IP %s, teid_ul 0x%.8x'\
% (ipaddr, teid_ul))
class _DPI(object):
@staticmethod
def get_port(pay):
"""return the port TCP / UDP number
"""
return unpack('!H', pay[2:4])[0]
@staticmethod
def get_dn_req(req):
def __get_dn_req_py2(req):
"""return the DNS name requested
"""
# remove fixed DNS header and Type / Class
s = req[12:-4]
n = []
@ -833,6 +947,98 @@ class DPI:
n.append( s[1:1+l] )
s = s[1+l:]
return b'.'.join(n)
@staticmethod
def __get_dn_req_py3(req):
"""return the DNS name requested
"""
# remove fixed DNS header and Type / Class
s = req[12:-4]
n = []
while len(s) > 1:
l = s[0]
n.append( s[1:1+l] )
s = s[1+l:]
return b'.'.join(n)
if python_version < 3:
get_dn_req = __get_dn_req_py2
else:
get_dn_req = __get_dn_req_py3
class DPIv4(_DPI):
@staticmethod
def __get_ip_info_py2(ipbuf):
"""return a 3-tuple: ipdst (asc), protocol (uint), payload (bytes)
"""
# returns a 3-tuple: dst IP, protocol, payload buffer
# get IP header length
l = (ord(ipbuf[0]) & 0x0F) * 4
# get dst IP
dst = inet_ntoa(ipbuf[16:20])
# get protocol
prot = ord(ipbuf[9])
#
return (dst, prot, ipbuf[l:])
@staticmethod
def __get_ip_info_py3(ipbuf):
"""return a 3-tuple: ipdst (asc), protocol (uint), payload (bytes)
"""
# returns a 3-tuple: dst IP, protocol, payload buffer
# get IP header length
l = (ipbuf[0] & 0x0F) * 4
# get dst IP
dst = inet_ntoa(ipbuf[16:20])
# get protocol
prot = ipbuf[9]
#
return (dst, prot, ipbuf[l:])
if python_version < 3:
get_ip_info = __get_ip_info_py2
else:
get_ip_info = __get_ip_info_py3
class DPIv6(_DPI):
@staticmethod
def __get_ip_info_py2(ipbuf):
"""return a 3-tuple: ipdst (asc), protocol (uint), payload (bytes)
"""
# returns a 3-tuple: dst IP, protocol, payload buffer
# get payload length
pl = unpack('>H', ipbuf[4:6])[0]
# get dst IP
dst = inet_ntop(AF_INET6, ipbuf[24:40])
# get protocol
# TODO: unstack IPv6 opts
prot = ord(ipbuf[6])
#
return (dst, prot, ipbuf[-pl:])
@staticmethod
def __get_ip_info_py3(ipbuf):
"""return a 3-tuple: ipdst (asc), protocol (uint), payload (bytes)
"""
# returns a 3-tuple: dst IP, protocol, payload buffer
# get payload length
pl = unpack('>H', ipbuf[4:6])[0]
# get dst IP
dst = inet_ntop(AF_INET6, ipbuf[24:40])
# get protocol
# TODO: unstack IPv6 opts
prot = ipbuf[6]
#
return (dst, prot, ipbuf[-pl:])
if python_version < 3:
get_ip_info = __get_ip_info_py2
else:
get_ip_info = __get_ip_info_py3
class MOD(object):

View File

@ -40,7 +40,8 @@ from time import time, sleep
from datetime import datetime
from binascii import hexlify, unhexlify
from struct import pack, unpack
from socket import ntohl, htonl, ntohs, htons, inet_aton, inet_ntoa
from socket import AF_INET, AF_INET6, AF_PACKET, ntohl, htonl, ntohs, htons, \
inet_aton, inet_ntoa, inet_pton, inet_ntop
# SCTP support for S1AP / RUA interfaces
try:
@ -75,6 +76,8 @@ from pycrate_asn1dir import RANAP
# to decode UE 3G and LTE radio capability
from pycrate_asn1dir import RRC3G
from pycrate_asn1dir import RRCLTE
#
from pycrate_asn1rt.utils import get_val_at
# to drive 3G UE
from pycrate_mobile import TS24007
@ -188,6 +191,69 @@ def asn_ranap_acquire():
def asn_ranap_release():
ASN_READY_RANAP.set()
def decode_ue_rad_cap(buf):
UERadCap = RRCLTE.EUTRA_InterNodeDefinitions.UERadioAccessCapabilityInformation
try:
UERadCap.from_uper(buf)
except:
return None
uecapinfo = {}
try:
# ue-RadioAccessCapabilityInfo (OCTET STRING) contains UECapabilityInformation (SEQUENCE)
radcapinfo = get_val_at(UERadCap, ('criticalExtensions',
'c1',
'ueRadioAccessCapabilityInformation-r8',
'ue-RadioAccessCapabilityInfo',
'UECapabilityInformation',
'criticalExtensions',
'c1',
'ueCapabilityInformation-r8'))
except:
UERadCap._val, uecapinfo
# decode each ueCapabilityRAT-Container
for caprat in radcapinfo['ue-CapabilityRAT-ContainerList']:
rattype = caprat['rat-Type'] # eutra, utra, geran-cs, geran-ps, cdma2000-1XRTT
if rattype == 'eutra':
UEEUTRACap = RRCLTE.EUTRA_RRC_Definitions.UE_EUTRA_Capability
try:
UEEUTRACap.from_uper(caprat['ueCapabilityRAT-Container'])
except:
uecapinfo[rattype] = caprat['ueCapabilityRAT-Container']
else:
uecapinfo[rattype] = UEEUTRACap._val
elif rattype == 'utra':
UEUTRACap = RRC3G.PDU_definitions.InterRATHandoverInfo
try:
UEUTRACap.from_uper(caprat['ueCapabilityRAT-Container'])
except:
uecapinfo[rattype] = caprat['ueCapabilityRAT-Container']
else:
uecapinfo[rattype] = UEUTRACap._val
elif rattype == 'geran-cs':
m2, m3 = NAS.MSCm2(), NAS.Classmark_3_Value_part.clone()
# MSCm2 || MSCm3
try:
m2.from_bytes(caprat['ueCapabilityRAT-Container'])
m3.from_bytes(caprat['ueCapabilityRAT-Container'][m2.get_len():])
except:
uecapinfo[rattype] = caprat['ueCapabilityRAT-Container']
else:
uecapinfo[rattype] = (m2, m3)
elif rattype == 'geran-ps':
mrc = NAS.MS_RA_capability_value_part.clone()
try:
mrc.from_bytes(caprat['ueCapabilityRAT-Container'])
except:
uecapinfo[rattype] = caprat['ueCapabilityRAT-Container']
else:
uecapinfo[rattype] = mrc
else:
uecapinfo[rattype] = caprat['ueCapabilityRAT-Container']
return UERadCap._val, uecapinfo
#------------------------------------------------------------------------------#
# logging facilities
#------------------------------------------------------------------------------#
@ -307,6 +373,15 @@ def imsi_str_to_buf(s):
return __IMSI.to_bytes()
def get_ueseccap_null_alg():
seccap = NAS.UESecCap(val={'EEA0': 1, 'EIA0': 1, 'UEA0': 1})
return seccap
def get_ueseccap_null_alg_lte():
seccap = NAS.UESecCap(val={'EEA0': 1, 'EIA0': 1})
seccap.disable_from('UEA0')
return seccap
def cellid_bstr_to_str(bstr):
# 20 or 28 bits
return hexlify(int_to_bytes(*bstr)).decode('ascii')[:-1]
@ -322,15 +397,62 @@ def supptas_to_hum(seqof):
'tAC': bytes_to_uint(sta['tAC'], 16)} for sta in seqof]
def gummei_to_asn(val):
def gummei_to_asn(plmnid, mmegid, mmec):
return {'pLMN-Identity': plmn_str_to_buf(plmnid),
'mME-Group-ID' : uint_to_bytes(mmegid, 16),
'mME-Code' : uint_to_bytes(mmec, 8)}
def served_gummei_to_asn(val):
return {'servedGroupIDs': [uint_to_bytes(gid, 16) for gid in val['GroupIDs']],
'servedMMECs': [uint_to_bytes(mmec, 8) for mmec in val['MMECs']],
'servedPLMNs': [plmn_str_to_buf(plmn) for plmn in val['PLMNs']]}
'servedMMECs' : [uint_to_bytes(mmec, 8) for mmec in val['MMECs']],
'servedPLMNs' : [plmn_str_to_buf(plmn) for plmn in val['PLMNs']]}
def mac_aton(mac='00:00:00:00:00:00'):
return unhexlify(mac.replace(':', ''))
def inet_aton_cn(*pdnaddr):
if pdnaddr[0] == 1:
# IPv4 address
try:
return inet_aton(pdnaddr[1])
except:
return pdnaddr[1]
elif pdnaddr[0] == 2:
# accept 64-bit IPv6 prefix / subnet or full 128-bit IPv6 address
ipaddr = pdnaddr[1]
if ipaddr.count(':') == 3:
# IPv6 prefix / subnet only
return pack('>HHHH', *map(lambda x:int(x, 16), ipaddr.split(':')))
else:
try:
return inet_pton(AF_INET6, ipaddr)
except:
return ipaddr
elif pdnaddr[0] == 3:
# IPv4v6 addresses
try:
return inet_aton(pdnaddr[1]) + inet_aton_cn(2, pdnaddr[2])
except:
return pdnaddr[1]
else:
# unknown address type
return pdnaddr[1]
def inet_ntoa_cn(pdntype, buf):
if pdntype == 1:
# IPv4 address
return (1, inet_ntoa(buf))
elif pdntype == 2:
# accept 64-bit IPv6 prefix / subnet or full 128-bit IPv6 address
if len(buf) == 8:
return (2, '%x:%x:%x:%x' % unpack('>HHHH', buf))
else:
return (2, inet_ntop(AF_INET6, buf))
elif pdntype == 3:
return (3, inet_ntoa(buf[:4]), inet_ntoa_cn(2, buf[4:])[1])
else:
return (pdntype, buf)
#------------------------------------------------------------------------------#
# ASN.1 object handling facilities