pycrate/pycrate_corenet/HdlrUES1.py

2207 lines
88 KiB
Python

# -*- coding: UTF-8 -*-
#/**
# * Software Name : pycrate
# * Version : 0.4
# *
# * Copyright 2017. Benoit Michau. ANSSI.
# *
# * This library is free software; you can redistribute it and/or
# * modify it under the terms of the GNU Lesser General Public
# * License as published by the Free Software Foundation; either
# * version 2.1 of the License, or (at your option) any later version.
# *
# * This library is distributed in the hope that it will be useful,
# * but WITHOUT ANY WARRANTY; without even the implied warranty of
# * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# * Lesser General Public License for more details.
# *
# * You should have received a copy of the GNU Lesser General Public
# * License along with this library; if not, write to the Free Software
# * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
# * MA 02110-1301 USA
# *
# *--------------------------------------------------------
# * File Name : pycrate_corenet/HdlrUES1.py
# * Created : 2017-07-11
# * Authors : Benoit Michau
# *--------------------------------------------------------
#*/
from .utils import *
from .ProcCNS1ap import *
from .ProcCNEMM import *
from .ProcCNESM import *
from .HdlrUESMS import *
# WNG: all procedures that call .require_smc() method need to be set in this LUT
ProcAbbrLUT = {
'EMMAttach' : 'ATT',
'EMMTrackingAreaUpdate' : 'TAU',
'EMMDetachUE' : 'DET',
'EMMServiceRequest' : 'SER',
'EMMExtServiceRequest' : 'SER',
'EMMCPServiceRequest' : 'SER',
}
class UEEMMd(SigStack):
"""UE EMM handler within a UES1d instance
responsible for EPS Mobility Management signalling procedures
"""
TRACK_PROC = True
# reference to the UEd
UE = None
# reference to the UES1d
S1 = None
# state: INACTIVE (cannot be paged) <-> ACTIVE <-> IDLE
state = 'INACTIVE'
# to bypass the process() server loop with a custom NAS PDU handler
RX_HOOK = None
# additional time for letting background task happen in priority
_WAIT_ADD = 0.005
# list of EMM message types that do not require NAS security to be
# activated to be processed
SEC_NOTNEED = {
'EMMAttachRequest',
'EMMIdentityResponse', # only for IMSI
'EMMAuthenticationResponse',
'EMMAuthenticationFailure',
'EMMSecurityModeReject',
'EMMDetachRequestMO', # if sent before security activation
'EMMDetachAccept',
'EMMTrackingAreaUpdateRequest',
'EMMServiceRequest',
'EMMExtServiceRequest'
}
# to disable completely the check for secured NAS message
SEC_DISABLED = False
#--------------------------------------------------------------------------#
# EMM common parameters
#--------------------------------------------------------------------------#
# T3412, periodic TAU timer: dict {'Unit': uint3, 'Value': uint5}
# Unit: 0: 2s, 1: 1mn, 2: 6mn, 7: deactivated
_T3412 = {'Unit': 2, 'Value': 2} # 12mn
#_T3412 = {'Unit': 7, 'Value': 0} # deactivated
#
# Reattach attempt after a failure timer: dict {'Unit': uint3, 'Value': uint5}
# Unit: 0: 2s, 1: 1mn, 2: 6mn, 7: deactivated
_T3402 = {'Unit': 1, 'Value': 2} # 2mn
#
# T3412Ext, power saving mode, TAU extended timer: None or dict {'Unit': uint3, 'Value': uint5}
# Unit: 0: 10mn, 1: 1h, 2: 10h, 3: 2s, 4: 30s, 5: 1mn, 6: 320h, 7: deactivated
_T3412_EXT = None
#
# T3324, power saving mode, time the UE stays active after idle mode following
# Attach or TAU: None or dict {'Unit': uint3, 'Value': uint5}
# # Unit: 0: 2s, 1: 1mxn, 2: 6mn, 7: deactivated
_T3324 = None
#
# EPS Network features support: if None, not sent, otherwise dict
# {'CP_CIoT': uint1, 'ERwoPDN': uint1, 'ESR_PS': uint1, 'CS_LCS': uint2,
# 'EPC_LCS': uint1, 'EMC_BS': uint1, 'IMS_VoPS': uint1, 'EPCO': uint1,
# 'HC_CP_CIoT': uint1, 'S1U_Data': uint1, 'UP_CIoT': uint1}
_EPS_NETFEAT_SUPP = None
#
# Extended DRX support: if None and sent by the UE, value returned it the one
# from the UE ; otherwise dict {'PTX': uint4, 'eDRX': uint4}
_EXTDRX = None
#
# SMS service status: if defined (status cause 0 to 3), denies SMS service
# for EPS-only attach
_SMS_SERV_STAT = None
#--------------------------------------------------------------------------#
# EMMStatus policy
#--------------------------------------------------------------------------#
# behaviour when receiving EMM STATUS
# 0: do nothing,
# 1: abort the top-level EMM procedure,
# 2: abort the whole stack of EMM procedures
STAT_CLEAR = 2
#--------------------------------------------------------------------------#
# EMMGUTIReallocation policy
#--------------------------------------------------------------------------#
# EMM procedure timer
T3450 = 4
#--------------------------------------------------------------------------#
# EMMAuthentication policy
#--------------------------------------------------------------------------#
# this will systematically bypass all authentication procedures
AUTH_DISABLED = False
# EMM procedure timer for auth and smc
T3460 = 4
# Authentication Management Field
AUTH_AMF = b'\x80\0'
# if AUTH_PLMN is not None, it will be used for building the 4G auth vector
# otherwise the main Corenet PLMN will be used
AUTH_PLMN = None
# this is to force a 2G or 3G authentication instead of a 4G one
AUTH_2G = False
AUTH_3G = False
# this is to extend AUTN with arbitrary data
AUTH_AUTN_EXT = None
#
# re-authentication policy:
# this forces an auth procedure every X EMM TAU / (Ext/CP) SER procedures
# even if a valid KSI is provided by the UE
AUTH_TAU = 1
AUTH_SER = 5
AUTH_DET = 1 # only applied to Detach without UE power off
#--------------------------------------------------------------------------#
# EMMSecurityModeControl policy
#--------------------------------------------------------------------------#
# this will systematically bypass all NAS SMC procedures during UE signalling
SMC_DISABLED = False
# this will bypass the NAS SMC procedure into specific UE signalling procedure
# set proc abbreviation in the list: 'ATT', 'TAU', 'SER'
SMC_DISABLED_PROC = []
# list of algorithm priorities
#SMC_EEA_PRIO = [2, 1, 0]
SMC_EEA_PRIO = [0]
SMC_EIA_PRIO = [2, 1]
#
# UE security capabilities: add dummy 3G sec cap if GPRS sec cap available
SMC_SECCAP_W2G = False
# UE default algorithm identifier, when everything else is failing...
SMC_EEA_DEF = 0
SMC_EIA_DEF = 1
# request IMEISV during a NAS SMC when IMEISV is unknown
SMC_IMEISV_REQ = True
#
# dummy security cap / context when security is disabled
# the SMC_EMERG_USE will change the output of self.require_smc() to still
# trigger an SMC even if there is no security context
# then the SMC procedure will use the SMC_DUMMY_CTX
SMC_EMERG_USE = False
SMC_DUMMY_SECCAP = NAS.UESecCap(val={'EEA0':1, 'EEA1_128':1, 'EEA2_128':1,
'EIA0':1, 'EIA1_128':1, 'EIA2_128':1}).to_bytes()[:2]
#--------------------------------------------------------------------------#
# EMMIdentification policy
#--------------------------------------------------------------------------#
# EMM procedure timer
T3470 = 2
#
# potential reject causes:
# 2: 'IMSI unknown in HLR', -> kill the cellular connectivity until SIM card is removed
# 3: 'Illegal MS', -> maybe same as 2
# 4: 'IMSI unknown in VLR',
# 5: 'IMEI not accepted', -> maybe same as 2
# 6: 'Illegal ME',
# 11: 'PLMN not allowed',
# 12: 'Location Area not allowed',
# 13: 'Roaming not allowed in this location area',
# 15: 'No Suitable Cells In Location Area',
# 17: 'Network failure',
# 22: 'Congestion'
# ...
IDENT_IMSI_NOT_ALLOWED = 11
IDENT_IMEI_NOT_ALLOWED = 5
#--------------------------------------------------------------------------#
# EMMAttach policy
#--------------------------------------------------------------------------#
ATT_T3412 = _T3412
ATT_T3402 = _T3402
ATT_T3412_EXT = _T3412_EXT
ATT_T3324 = _T3324
ATT_EPS_NETFEAT_SUPP = _EPS_NETFEAT_SUPP
ATT_EXTDRX = _EXTDRX
ATT_SMS_SERV_STAT = _SMS_SERV_STAT
# if 0, enable IMSI attach from EPS; if > 0, use it as error code
# e.g. 18: CS domain not available
ATT_IMSI = 0
# if 0, enable emergency attach; if > 0, use it as error code
# e.g. 8: EPS services and non-EPS services not allowed)
ATT_EMERG = 0
# if we want to run a GUTI Reallocation within the EMM Attach Accept
ATT_GUTI_REALLOC = True
# if we want to release the S1 ue context after the procedure ends
ATT_S1REL = False
# if we want to release the S1 ue context after the procedure fails
ATT_S1REL_ONERR = True
#
# when a UEd with MTMSI was created, that in fact corresponds to a UE
# already set in Server.UE, we need to reject it after updating Server.MTMSI
ATT_IMSI_PROV_REJECT = 17
# timer within Attach Reject, Unit: 0: 2s, 1: 1mn, 2: 6mn, 7: deactivated
ATT_T3346 = {'Unit': 0, 'Value': 2}
#--------------------------------------------------------------------------#
# EMMTrackingAreaUpdate policy
#--------------------------------------------------------------------------#
TAU_T3412 = _T3412
TAU_T3402 = _T3402
TAU_T3412_EXT = _T3412_EXT
TAU_T3324 = _T3324
TAU_EPS_NETFEAT_SUPP = _EPS_NETFEAT_SUPP
TAU_EXTDRX = _EXTDRX
TAU_SMS_SERV_STAT = _SMS_SERV_STAT
# if we want to run a GUTI Reallocation within the EMM TAU Accept
TAU_GUTI_REALLOC = True
# if we want to release the S1 ue context after the procedure ends
TAU_S1REL = True
#--------------------------------------------------------------------------#
# EMMServiceRequest policy
#--------------------------------------------------------------------------#
# to always start an SMC after a service request, even if no auth happened
SER_SMC_ALW = False
# to create a default PDN config in case it was deleted
SER_PDN_ALW = False
# to never setup a radio bearer
SER_RAB_NEVER = False
def _log(self, logtype, msg):
self.S1._log(logtype, '[EMM] %s' % msg)
def __init__(self, ued, ues1d):
self.UE = ued
self.set_s1(ues1d)
#
# ready event, used by foreground tasks (network / interpreter initiated)
self.ready = Event()
self.ready.set()
# stack of ongoing EMM procedures (i.e. common procedures can be run
# within specific procedure)
self.Proc = []
# list of tracked procedures (requires TRACK_PROC = True)
self._proc = []
def set_s1(self, ues1d):
self.S1 = ues1d
def process(self, NasRx):
"""process a NAS EMM message (NasRx) sent by the UE,
and return a list (possibly empty) of S1AP procedure(s) to be sent back
to the eNB
NasRx has 2 additional attributes (_sec [bool], _ulcnt [uint])
"""
if self.RX_HOOK is not None:
return self.RX_HOOK(NasRx)
#
name = NasRx._name
# 1) in case sec check failed, see if request is still to be accepted
if not NasRx._sec and not self.SEC_DISABLED and name not in self.SEC_NOTNEED:
# discard the msg
self._log('INF', 'discarding %s message, failed security check' % name)
return []
#
# 2) check if it is a Detach Request
if name == 'EMMDetachRequestMO':
Proc = EMMDetachUE(self)
self.Proc.append( Proc )
if self.TRACK_PROC:
self._proc.append(Proc)
# GMMDetachUE.process() will abort every other ongoing NAS procedures
# for the PS domain
return Proc.process(NasRx)
#
# 3) check if there is any ongoing EMM procedure
elif self.Proc:
# 2.1) in case of STATUS, disable ongoing procedure(s)
if name == 'EMMStatus':
self._log('WNG', 'STATUS received with %r' % NasRx['EMMCause'])
if self.STAT_CLEAR == 1:
#self._log('WNG', 'STATUS, disabling %r' % self.Proc[-1])
self.Proc[-1].abort()
elif self.STAT_CLEAR == 2:
#self._log('WNG', 'STATUS, disabling %r' % self.Proc)
self.clear()
return []
#
# 2.2) in case of expected response
elif name in self.Proc[-1].FilterStr:
Proc = self.Proc[-1]
S1apTxProc = Proc.process(NasRx)
while self.Proc and not S1apTxProc:
# while the top-level NAS procedure has nothing to respond and terminates,
# we postprocess() lower-level NAS procedure(s) until we have something
# to send, or the stack is empty
ProcLower = self.Proc[-1]
S1apTxProc = ProcLower.postprocess(Proc)
Proc = ProcLower
return S1apTxProc
#
# 2.3) in case of unexpected NasRx
else:
self._log('WNG', 'unexpected %s message, sending STATUS 98' % name)
# cause 98: Message type not compatible with the protocol state
return self.S1.ret_s1ap_dnt(NAS.EMMStatus(val={'EMMCause':98}, sec=NasRx._sec))
#
# 3) start a new UE-initiated procedure
elif name in EMMProcUeDispatcherStr:
Proc = EMMProcUeDispatcherStr[name](self)
self.Proc.append( Proc )
if self.TRACK_PROC:
self._proc.append(Proc)
return Proc.process(NasRx)
#
# 4) unexpected NasRx
else:
self._log('WNG', 'unexpected %s message, sending STATUS 96' % name)
# cause 96: Invalid mandatory information
return self.S1.ret_s1ap_dnt(NAS.EMMStatus(val={'EMMCause':96}, sec=NasRx._sec))
def init_proc(self, ProcClass, encod=None, emm_preempt=False, sec=True):
"""initialize a CN-initiated EMM procedure of class `ProcClass' and
given encoder(s), and return the procedure
"""
Proc = ProcClass(self, encod=encod, emm_preempt=emm_preempt, sec=sec)
self.Proc.append( Proc )
if self.TRACK_PROC:
self._proc.append( Proc )
return Proc
def clear(self):
"""abort all running procedures
"""
for Proc in self.Proc[::-1]:
Proc.abort()
#--------------------------------------------------------------------------#
# SMC and security-related methods
#--------------------------------------------------------------------------#
def require_auth(self, Proc, ksi=None):
# ksi is a 2-tuple (TSC 0..1, Value 0..7)
# check if an EMMAuthentication procedure is required
if self.S1.SECNAS_DISABLED or self.AUTH_DISABLED:
return False
elif ksi is None or ksi[1] == 7:
self.S1.SEC['KSI'] = None
return True
#
ksi = (ksi[0]<<3) + ksi[1]
if ksi not in self.S1.SEC:
self.S1.SEC['KSI'] = None
return True
#
else:
# auth policy per EMM procedure
if isinstance(Proc, EMMAttach):
# always authenticate within an Attach
return True
elif isinstance(Proc, EMMTrackingAreaUpdate):
self.S1.SEC['POL']['TAU'] += 1
if self.AUTH_TAU and self.S1.SEC['POL']['TAU'] % self.AUTH_TAU == 0:
self.S1.SEC['KSI'] = None
return True
else:
self.S1.SEC['KSI'] = ksi
return False
elif isinstance(Proc, EMMDetachUE):
self.S1.SEC['POL']['DET'] += 1
if self.AUTH_DET and self.S1.SEC['POL']['TAU'] % self.AUTH_DET == 0:
self.S1.SEC['KSI'] = None
return True
else:
self.S1.SEC['KSI'] = ksi
return False
elif isinstance(Proc, (EMMServiceRequest, EMMExtServiceRequest, EMMCPServiceRequest)):
self.S1.SEC['POL']['SER'] += 1
if self.AUTH_SER and self.S1.SEC['POL']['SER'] % self.AUTH_SER == 0:
self.S1.SEC['KSI'] = None
return True
else:
self.S1.SEC['KSI'] = ksi
return False
else:
# auth not required, use the UE-provided cksn in use
self.S1.SEC['KSI'] = ksi
return False
def require_smc(self, Proc):
# check if an EMMSecurityModeControl procedure is required
if self.S1.SECNAS_DISABLED or self.SMC_DISABLED:
return False
#
elif ProcAbbrLUT[Proc.Name] in self.SMC_DISABLED_PROC:
return False
#
elif not self.SMC_EMERG_USE and \
(self.S1.SEC['KSI'] is None or self.S1.SEC['KSI'] not in self.S1.SEC):
# no security context established, cannot run an smc
self._log('WNG', 'require_smc: no KSI set, unable to run an SMC')
return False
#
else:
return True
def get_any_ksi(self):
cur = self.S1.SEC['KSI']
if cur is not None:
if cur in self.S1.SEC:
return cur
else:
self.S1.SEC['KSI'] = None
#
for i in range(0, 7):
if i in self.S1.SEC:
self.S1.SEC['KSI'] = i
return i
for i in range(8, 15):
if i in self.S1.SEC:
self._log('INF', 'selecting a mapped KSI %i' % i)
self.S1.SEC['KSI'] = i
return i
return None
def get_new_ksi(self):
for i in range(0, 7):
if i not in self.S1.SEC:
return i
# all native KSI have been used, clear all of them except the current one
# if defined
cur = self.S1.SEC['KSI']
for i in range(0, 7):
if i != cur:
del self.S1.SEC[i]
if cur == 0:
return 1
else:
return 0
def set_sec_ctx(self, ksi, ctx, vect):
ksi = (ksi[0]<<3) + ksi[1]
if ctx == 3:
if self.AUTH_PLMN:
snid = plmn_str_to_buf(self.AUTH_PLMN)
else:
snid = plmn_str_to_buf(self.UE.PLMN)
Kasme = conv_401_A2(vect[3], vect[4], snid, vect[2][:6])
secctx = {'VEC' : vect,
'CTX' : ctx,
'CK' : vect[3],
'IK' : vect[4],
'Kasme': Kasme}
elif ctx == 2:
# WNG: this is undefined / illegal and won't work (hopefully)
CK, IK = conv_102_C4(vect[2]), conv_102_C5(vect[2])
if self.AUTH_PLMN:
snid = plmn_str_to_buf(self.AUTH_PLMN)
else:
snid = plmn_str_to_buf(self.UE.PLMN)
Kasme = conv_401_A2(CK, IK, snid, b'\0\0\0\0\0\0')
secctx = {'VEC' : vect,
'CTX' : ctx,
'Kc' : vect[2],
'CK' : CK,
'IK' : IK,
'Kasme': Kasme}
else:
# ctx == 4
secctx = {'VEC' : vect,
'CTX' : ctx,
'Kasme': vect[3]}
#
secctx['UL'], secctx['DL'], secctx['UL_enb'] = 0, 0, 0
self.S1.SEC[ksi] = secctx
self.S1.SEC['KSI'] = ksi
def set_sec_ctx_emerg(self, ksi=0):
secctx = {'CTX' : 0,
'Kasme' : 32*b'\0',
'Knasenc': 16*b'\0',
'Knasint': 16*b'\0',
'EEA' : 0,
'EIA' : 0,
'UL' : 0,
'DL' : 0,
'UL_enb' : 0}
self.S1.SEC[ksi] = secctx
def set_sec_ctx_smc(self, ksi):
try:
secctx = self.S1.SEC[ksi]
except Exception:
pass
else:
secctx['EEA'], secctx['EIA'] = self._get_sec_eea(), self._get_sec_eia()
secctx['Knasenc'] = conv_401_A7(secctx['Kasme'], 1, secctx['EEA'])[16:32]
secctx['Knasint'] = conv_401_A7(secctx['Kasme'], 2, secctx['EIA'])[16:32]
def set_sec_cap(self):
# build UESecCap from UENetCap
if 'UENetCap' in self.UE.Cap:
ueseccap = self.UE.Cap['UENetCap'][0]
if len(ueseccap) > 4:
# we have more than 3G and 4G sec cap
ueseccap = ueseccap[:4]
if len(ueseccap) == 4:
# void UCS2 support
lastoct = ord(ueseccap[3:4])
if lastoct & 0x80:
ueseccap = ueseccap[:3] + bchr(lastoct^0x80)
if 'MSNetCap' in self.UE.Cap:
ueseccap += self._get_sec_gea_cap()
else:
assert( len(ueseccap) == 2 )
if self.SMC_SECCAP_W2G and 'MSNetCap' in self.UE.Cap:
ueseccap += b'\0\0'
ueseccap += self._get_sec_gea_cap()
UESecCap = NAS.UESecCap()
UESecCap.from_bytes(ueseccap)
self.UE.Cap['UESecCap'] = (ueseccap, UESecCap)
def _get_sec_gea_cap(self):
msnetcap = self.UE.Cap['MSNetCap'][1]()
v = msnetcap[0]<< 6 # GEA1
if isinstance(msnetcap[8], list):
# Extended_GEA_bits
for i, b in enumerate(msnetcap[8]):
v += b << (5-i)
# TODO: add GIA sec cap
return bchr(v)
def get_sec_cap(self):
if 'UESecCap' not in self.UE.Cap:
# build UESecCap from UENetCap
if 'UENetCap' in self.UE.Cap:
self.set_sec_cap()
return self.UE.Cap['UESecCap'][0]
else:
# build UESecCap from SMC_DUMMY_SECCAP
self._log('WNG', 'no security capabilities available, using dummy ones')
return self.SMC_DUMMY_SECCAP
else:
return self.UE.Cap['UESecCap'][0]
def _get_sec_eea(self):
if 'UESecCap' not in self.UE.Cap:
self._log('WNG', 'no security capabilities available, using EEA%i' % self.SMC_EEA_DEF)
return self.SMC_EEA_DEF
else:
UESecCap = self.UE.Cap['UESecCap'][1]
for eea in self.SMC_EEA_PRIO:
if UESecCap._content[eea].get_val():
return eea
self._log('INF', 'no matching EEA identifier, using EEA%i' % self.SMC_EEA_DEF)
return self.SMC_EEA_DEF
def _get_sec_eia(self):
if 'UESecCap' not in self.UE.Cap:
self._log('WNG', 'no security capabilities available, using EIA%i' % self.SMC_EIA_DEF)
return self.SMC_EIA_DEF
else:
UESecCap = self.UE.Cap['UESecCap'][1]
for eia in self.SMC_EIA_PRIO:
if UESecCap._content[8+eia].get_val():
return eia
self._log('INF', 'no matching EIA identifier, using EIA%i' % self.SMC_EIA_DEF)
return self.SMC_EEA_DEF
#--------------------------------------------------------------------------#
# network-initiated method (fg task, to be used from the interpreter)
#--------------------------------------------------------------------------#
def _net_init_con(self):
if not self.S1.page_block():
return False
# need to wait for potential EMM serving / common procedures to happen and end
sleep(self._WAIT_ADD)
if not self.ready.wait(10):
# something is blocking in the serving / common procedures
return False
elif not self.S1.connected.is_set():
# something went wrong during the serving / common procedures
return False
else:
return True
def run_proc(self, ProcClass, sec=True, **IEs):
"""run a network-initiated procedure ProcClass in the context of the EMM
stack, after setting the given IEs in the NAS message to be sent to the
UE
returns a 2-tuple (success, proc)
success is a bool
proc is the instance of ProcClass or None
"""
if ProcClass.Init is None:
self._log('ERR', 'invalid network-initiated procedure %s' % ProcClass.Name)
return False, None
if not self._net_init_con():
return False, None
#
Proc = self.init_proc(ProcClass, encod={ProcClass.Init: IEs}, emm_preempt=True, sec=sec)
try:
S1apTxProc = Proc.output()
except Exception:
self._log('ERR', 'invalid IEs for network-initiated procedure %s' % Proc.Name)
Proc.abort()
return False, Proc
if not self.S1.transmit_s1ap_proc(S1apTxProc):
return False, Proc
#
# check if a response is expected
if not hasattr(Proc, 'TimerValue'):
return True, Proc
elif not self.ready.wait(Proc.TimerValue + self._WAIT_ADD):
# procedure is stuck, will be aborted in the server loop
# WNG: this means the routine for cleaning NAS procedures in timeout
# should be enabled in CorenetServer
return False, Proc
#
# check is a response was received
if hasattr(Proc, 'UEInfo'):
return True, Proc
else:
return False, Proc
def req_ident(self, idtype=NAS.IDTYPE_IMSI):
"""start an EMM Identification procedure toward the UE and wait for the
response or timeout
"""
return self.run_proc(EMMIdentification, IDType=idtype)
def detach(self, type=1, cause=None):
"""send an EMM Detach with type and cause (optional) and wait for the
response or timeout
"""
if cause is not None:
return self.run_proc(EMMDetachCN, EPSDetachTypeMT={'Type': type}, EMMCause=cause)
else:
return self.run_proc(EMMDetachCN, EPSDetachTypeMT={'Type': type})
def inform(self, **info):
"""send an EMM information with given info
"""
return self.run_proc(EMMInformation, **info)
class UEESMd(SigStack):
"""UE ESM handler within a UES1d instance
responsible for EPS Session Management signalling procedures
"""
TRACK_PROC = True
# reference to the UEd
UE = None
# reference to the UES1d
S1 = None
# to bypass the process() server loop with a custom NAS PDU handler
RX_HOOK = None
# list of ESM message types that do not require NAS security to be
# activated to be processed
SEC_NOTNEED = {'ESMPDNConnectivityRequest'}
# to disable completely the check for secured NAS message
SEC_DISABLED = False
# default Radio Access Bearer settings for PDN config, per APN
# QCI (being LTE + EPS) is copied from the CorenetServer.ConfigPDN at UE init
RABConfig = {
'*' : {'PriorityLevel': 15, # 0..15, 1: highest, 14: lowest, 15: no priority
'PreemptCap' : 'shall-not-trigger-pre-emption', # or 'may-trigger-pre-emption'
'PreemptVuln' : 'not-pre-emptable', # or 'pre-emptable'
'BitrateDL' : 100000000, # aggregate max bitrate downlink (b/s)
'BitrateUL' : 50000000, # aggregate max bitrate uplink (b/s)
},
'corenet': {'PriorityLevel': 14, # 0..15, 1: highest, 14: lowest, 15: no priority
'PreemptCap' : 'shall-not-trigger-pre-emption', # or 'may-trigger-pre-emption'
'PreemptVuln' : 'not-pre-emptable', # 'pre-emptable'
'BitrateDL' : 100000000, # aggregate max bitrate downlink (b/s)
'BitrateUL' : 50000000, # aggregate max bitrate uplink (b/s)
}
}
# when the UE 1st attach it gets a specific PDNConfig dict with a copy of this content
# under the key 'RAB'
# Default APN in case the UE does not indicate any APN for the default connectivity
# If None, an APN must be explicitely requested by the UE
# Otherwise, it must be a standard APN from the configuration
APN_DEFAULT = 'corenet'
# Protocol config option with authentication
# if bypass enabled, the PAP / CHAP authentication will not be checked against
# the CorenetServer.PDNConfig and always return authentication success
AUTH_PAP_BYPASS = True
AUTH_CHAP_BYPASS = True
#--------------------------------------------------------------------------#
# ESMStatus policy
#--------------------------------------------------------------------------#
# behaviour when receiving ESM STATUS
# 0: do nothing,
# 1: abort the top ESM procedure for the indicated EPS bearer ID
# 2: abort the whole ESM procedure stack for the indicated EPS bearer ID
# 3: abort all the ESM procedures stacks
STAT_CLEAR = 3
#--------------------------------------------------------------------------#
# ESMDefaultEPSBearerCtxtAct / ESMDedicatedEPSBearerCtxtAct policy
#--------------------------------------------------------------------------#
T3485 = 2
#--------------------------------------------------------------------------#
# ESMEPSBearerCtxtModif policy
#--------------------------------------------------------------------------#
T3486 = 2
#--------------------------------------------------------------------------#
# ESMEPSBearerCtxtDeact policy
#--------------------------------------------------------------------------#
T3495 = 2
#--------------------------------------------------------------------------#
# ESMInfoRequest policy
#--------------------------------------------------------------------------#
T3489 = 2
def _log(self, logtype, msg):
self.S1._log(logtype, '[ESM] %s' % msg)
def __init__(self, ued, ues1d):
self.UE = ued
self.set_s1(ues1d)
#
# dict of ongoing ESM procedures, indexed by EPS bearer ID
self.Proc = {i: [] for i in range(16)}
# dict of configured PDN, indexed by EPS bearer ID
self.PDN = {}
# dict of ongoing ESM transactions IEs
self.Trans = {}
# list of tracked procedures (requires TRACK_PROC = True)
self._proc = []
def set_s1(self, ues1d):
self.S1 = ues1d
def process_buf(self, buf, sec, EMMProc=None):
"""process a NAS ESM message buffer (buf) sent by the UE,
if the decoding is correct, return the result of process()
"""
ESMRx, err = NAS.parse_NASLTE_MO(buf, inner=False)
if err:
# invalid ESM message
self._log('WNG', 'invalid EPS NAS ESM message: %s' % hexlify(buf).decode('ascii'))
ESMTx = NAS.ESMStatus(val={'ESMCause':err}, sec=sec)
return self.S1.ret_s1ap_dnt(self.output_nas_esm(ESMTx, EMMProc))
#
elif ESMRx[0]['ProtDisc'].get_val() != 2:
# cause 96: Invalid mandatory information
self._log('WNG', 'invalid EPS NAS ESM message: %r' % ESMRx)
ESMTx = NAS.ESMStatus(val={'ESMCause':96}, sec=sec)
return self.S1.ret_s1ap_dnt(self.output_nas_esm(ESMTx, EMMProc))
#
elif self.UE.TRACE_NAS_EPS:
self._log('TRACE_NAS_EPS_UL', '\n' + ESMRx.show())
ESMRx._sec = sec
return self.process(ESMRx, EMMProc=EMMProc)
def process(self, NasRx, EMMProc=None):
"""process a NAS ESM message (NasRx) sent by the UE,
and return a list (possibly empty) of S1AP procedure(s) to be sent back
to the eNB
NasRx has 2 additional attributes (_sec [bool], _ulcnt [uint])
EMMProc [EMMSigProc or None], indicates if the NAS ESM message is handled in
the context of an EMM procedure
"""
if self.RX_HOOK is not None:
return self.RX_HOOK(NasRx)
#
name = NasRx._name
# 1) in case sec check failed, see if request is still to be accepted
if not NasRx._sec and not self.SEC_DISABLED and name not in self.SEC_NOTNEED:
# discard the msg
self._log('INF', 'discarding %s message, failed security check' % name)
return self.S1.ret_s1ap_dnt(self.output_nas_esm(None, EMMProc))
#
# 2) check if there is any ongoing ESM procedure for the given EPS bearer id
ebi = NasRx[0][0].get_val()
if self.Proc[ebi]:
ProcStack = self.Proc[ebi]
# 2.1) in case of STATUS, disable ongoing procedure(s)
if name == 'ESMStatus':
self._log('WNG', 'STATUS received with %r' % NasRx['ESMCause'])
if self.STAT_CLEAR == 1:
#self._log('WNG', 'STATUS, disabling %r' % ProcStack[-1])
ProcStack[-1].abort()
elif self.STAT_CLEAR == 2:
#self._log('WNG', 'STATUS, disabling %r' % ProcStack)
self.clear(ebi)
elif self.STAT_CLEAR == 3:
#self._log('WNG', 'STATUS, disabling %r' % self.Proc)
self.clear()
return self.S1.ret_s1ap_dnt(self.output_nas_esm(None, EMMProc))
#
# 2.2) in case of expected response
elif name in ProcStack[-1].FilterStr:
Proc = ProcStack[-1]
S1apTxProc = Proc.process(NasRx)
while ProcStack and not S1apTxProc:
# while the top-level NAS procedure has nothing to respond and terminates,
# we postprocess() lower-level NAS procedure(s) until we have something
# to send, or the stack is empty
ProcLower = ProcStack[-1]
S1apTxProc = ProcLower.postprocess(Proc)
Proc = ProcLower
return S1apTxProc
#
# 2.3) in case of unexpected NasRx
else:
self._log('WNG', 'unexpected %s message, sending STATUS 98' % name)
# cause 98: Message type not compatible with the protocol state
ESMTx = NAS.ESMStatus(val={'ESMCause':96}, sec=NasRx._sec)
return self.S1.ret_s1ap_dnt(self.output_nas_esm(ESMTx, EMMProc))
#
# 3) start a new UE-initiated procedure
elif name in ESMProcUeDispatcherStr:
Proc = ESMProcUeDispatcherStr[name](self, ebi=ebi, EMMProc=EMMProc)
self.Proc[ebi].append(Proc)
if self.TRACK_PROC:
self._proc.append(Proc)
return Proc.process(NasRx)
#
# 4) unexpected NasRx
else:
self._log('WNG', 'unexpected %s message, sending STATUS 96' % name)
# cause 96: Invalid mandatory information
ESMTx = NAS.ESMStatus(val={'ESMCause':96}, sec=NasRx._sec)
return self.S1.ret_s1ap_dnt(self.output_nas_esm(ESMTx, EMMProc))
def output_nas_esm(self, ESMTx, EMMProc):
if not ESMTx:
if EMMProc:
self._log('WNG', 'output_nas_esm: no ESMTx but an EMMTx to be sent')
return EMMProc._nas_tx
else:
return None
elif EMMProc:
ESMTx._sec = False
EMMTx = EMMProc._nas_tx
ESMCont = EMMTx['ESMContainer']
ESMCont['V'].set_val(self.S1.output_nas_sec(ESMTx))
if ESMCont.get_trans():
ESMCont.set_trans(False)
return EMMTx
else:
return ESMTx
def init_proc(self, ProcClass, **kw):
"""initialize a CN-initiated ESM procedure of class `ProcClass' and
given encoder(s), and return the procedure
"""
if 'ebi' in kw:
ebi = kw['ebi']
assert( 0 <= ebi <= 15 )
del kw['ebi']
else:
ebi = 0
Proc = ProcClass(self, ebi=ebi, **kw)
self.Proc[ebi].append( Proc )
if self.TRACK_PROC:
self._proc.append( Proc )
return Proc
def clear(self, ebi=None):
"""abort all running procedures, eventually for a single EPS Bearer ID
"""
self.Trans.clear()
if ebi is None:
for ebi in range(16):
for Proc in self.Proc[ebi][::-1]:
Proc.abort()
else:
for Proc in self.Proc[ebi][::-1]:
Proc.abort()
def pdn_clear(self, ebi=None):
if ebi is None:
for ebi, pdncfg in list(self.PDN.items()):
self.UE.Server.GTPUd.rem_mobile(pdncfg['RAB']['SGW-GTP-TEID'])
del self.PDN[ebi]
elif ebi in self.PDN:
self.UE.Server.GTPUd.rem_mobile(self.PDN[ebi]['RAB']['SGW-GTP-TEID'])
del self.PDN[ebi]
def pdn_suspend(self, ebi=None):
if ebi is None:
for ebi, pdncfg in self.PDN.items():
if pdncfg['state'] == 1:
self.UE.Server.GTPUd.rem_mobile(pdncfg['RAB']['SGW-GTP-TEID'])
pdncfg['state'] = 0
elif ebi in self.PDN and self.PDN[ebi]['state'] == 1:
self.UE.Server.GTPUd.rem_mobile(self.PDN[ebi]['RAB']['SGW-GTP-TEID'])
self.PDN[ebi]['state'] = 0
#--------------------------------------------------------------------------#
# transaction processing
#--------------------------------------------------------------------------#
def process_trans(self, trans_id):
"""process an ESM transaction initiated by the UE, and return a network-initiated
procedure with IEs configured and None, or None and the ESM error code
"""
try:
trans = self.Trans[trans_id]
except Exception:
# err cause 47: PTI mismatch
return None, 47
#
if trans['Type'] == 'Default':
IEs = {}
#
# 1) need APN
if trans['APN'] is None:
if self.APN_DEFAULT is not None:
apn = self.APN_DEFAULT
b_apn = apn.encode('ascii')
IEs['APN'] = bchr(len(b_apn)) + b_apn
else:
# err cause 27: missing or unknown APN
return None, 27
else:
apn = trans['APN'][0][1].get_val()
IEs['APN'] = trans['APN'].get_val()
#
if apn in self.PDNConfig:
pdncfg = self.PDNConfig[apn]
elif '*' in self.PDNConfig:
pdncfg = self.PDNConfig['*']
else:
# err cause 27: missing or unknown APN
return None, 27
#
# 2) check the ue request against pdncfg
# 2.1) check the PDN type
pdntue = trans['PDNType'].get_val()
ipaddr, err = self._get_pdn_addr(pdncfg, pdntue)
if err is not None:
return None, err
IEs['PDNAddr'] = {'Type': ipaddr[0], 'Addr': inet_aton_cn(*ipaddr, dom='EPS')}
#
# 2.2) check the protocol config options
if trans['ProtConfig']:
IEs['ProtConfig'], pdnaddrreq = self.process_protconfig(pdncfg, trans['ProtConfig'])
if not pdnaddrreq:
IEs['PDNAddr'] = b''
#
if 'NBIFOMContainer' in trans:
self._log('WNG', 'NBIFOMContainer IE unsupported')
if 'HdrCompConfig' in trans:
self._log('WNG', 'HdrCompConfig IE unsupported')
if 'ExtProtConfig' in trans:
self._log('WNG', 'ExtProtConfig IE unsupported')
#
# 3) get the default QCI for the given APN
IEs['EPSQoS'] = {'QCI': pdncfg.get('QCI', 0x80)}
#
# 4) get the 1st available EPS bearer ID
ebi, err = self._get_ebi()
if err is not None:
return None, err
#
# 5) set the default RAB for the given APN / EPS bearer ID
self.rab_set_default(ebi, apn, ipaddr, pdncfg)
#
# initialize an ESMDefaultEPSBearerCtxtAct with the given EPS Bearer ID and IEs
return self.init_proc(ESMDefaultEPSBearerCtxtAct, ebi=ebi, encod={(2, 193): IEs}), None
#
elif trans['Type'] == 'Dedicated':
# TODO
return None, None
elif trans['Type'] == 'Modif':
# TODO
return None, None
elif trans['Type'] == 'Deact':
# TODO
return None, None
else:
assert()
def _get_pdn_addr(self, pdncfg, pdntype_ue):
pdntype_net = pdncfg['Addr'][0]
if pdntype_ue == 1:
if pdntype_net not in (1, 3):
# err cause 51: PDN type IPv6 only allowed
return None, 51
else:
return (1, pdncfg['Addr'][1]), None
elif pdntype_ue == 2:
if pdntype_net not in (2, 3):
# err cause 50: PDN type IPv4 only allowed
return None, 50
else:
return (2, pdncfg['Addr'][2]), None
elif pdntype_ue == 3:
if not 1 <= pdntype_net <= 3:
# err cause 111: Protocol error, unspecified
return None, 111
else:
return pdncfg['Addr'], None
else:
# err cause 28: Unknown PDN type
return None, 28
def _get_ebi(self):
for i in range(5, 16):
if i not in self.PDN:
return i, None
# err cause 65: Maximum number of EPS bearers reached
return None, 65
def rab_set_default(self, ebi, apn, pdnaddr, pdncfg):
rabcfg = pdncfg['RAB']
del pdncfg['RAB']
pdn = cpdict(pdncfg)
pdncfg['RAB'] = rabcfg
#
pdn['PDNAddr'] = pdnaddr
pdn['APN'] = apn
pdn['RAB'] = {
'E-RABlevelQoSParameters': {
'qCI': pdncfg['QCI'],
'allocationRetentionPriority': {
'priorityLevel': rabcfg['PriorityLevel'],
'pre-emptionCapability': rabcfg['PreemptCap'],
'pre-emptionVulnerability': rabcfg['PreemptVuln']
},
},
'SGW-TLA' : self.UE.Server.SERVER_ENB['GTPU'],
'ENB-TLA' : None, # enb gtpu ip, will be updated after the eNB setup the ERAB
'SGW-GTP-TEID': self.UE.Server.get_gtp_teid(), # teid_ul
'ENB-GTP-TEID': None, # teid_dl, will be updated after the eNB setup the ERAB
'BitrateDL' : rabcfg['BitrateDL'],
'BitrateUL' : rabcfg['BitrateUL']
}
#
pdn['state'] = 0 # 0: suspended (no GTP tunnel exist), 1: active (GTP tunnel exists)
self.PDN[ebi] = pdn
#--------------------------------------------------------------------------#
# protocol configuration processing
#--------------------------------------------------------------------------#
def process_protconfig(self, config, request):
RespElt, pdnaddrreq = self.UE.process_protconfig(self, config, request)
return {'Config': RespElt}, pdnaddrreq
class UES1d(SigStack):
"""UE S1 handler within a CorenetServer instance
responsible for UE-associated S1AP signalling
"""
# to keep track of all S1AP procedures
TRACK_PROC = True
# domain
DOM = 'EPS'
# reference to the UEd
UE = None
# reference to the ENBd, SCTP stream id
ENB = None
SID = None
# to bypass the process_nas() server loop with a custom NAS PDU handler
RX_HOOK = None
# for pure S1AP procedure (no NAS trafic, neither ERAB-oriented stuff)
# should we page the UE to run the procedure successfully when UE is idle
S1AP_FORCE_PAGE = False
#--------------------------------------------------------------------------#
# global security policy
#--------------------------------------------------------------------------#
# this will systematically bypass all auth and smc procedures,
# NAS MAC and UL count verification in the uplink
# and setting of the EMM security header (and encryption) in the downlink
SECNAS_DISABLED = False
#
# finer grained NAS security checks:
# True to drop NAS PDU when NAS MAC verification fails
SECNAS_UL_MAC = False
# True to drop NAS PDU when NAS UL count verification fails
SECNAS_UL_CNT = False
# WNG: EMM and ESM stacks have further control on accepting or not certain
# NAS message even if security control have failed
#
# this will disable the setting of the EMM security header (and encryption)
# in the downlink for given NAS message (by name)
SECNAS_PDU_NOSEC = set()
#
# format of the security context dict self.SEC:
# self.SEC is a dict of available 3G / 4G security contexts indexed by KSI,
# and current KSI in use
#
# when self.SEC['KSI'] is not None, the context is enabled at the NAS level, e.g.
# self.SEC = {'KSI': 0,
# 0: {'Kasme': b'...', 'Knasenc': b'...', 'Knasint': b'...',
# 'UL': 0, 'DL': 0, 'EEA': 1, 'EIA': 1,
# 'Kenb': b'...', 'CTX': 4},
# ...,
# 'POL': {'TAU': 0, 'SER': 0}}
#
# a single security context contains:
# Kasme, Kenb: 32 bytes buffer, key used at the NAS layer and sent to the eNB
# handling the UE
# Knasenc, Knasint: 16 bytes buffer, key used at the NAS layer together with
# EEA and EIA algorithms
# UL, DL: NAS UL and DL count
# EEA, EIA: NAS security algorithms index selected
# CTX: context of the authentication,
# 3 means 3G auth converted to 4G context, in this case, CK and IK are also
# available in the security context
# 4 means 4G auth and native context
# The POL dict indicates the authentication policy for each procedure
#
# in case an E-RAB get activated, but no security context exists
# we use this dummy AS security context for the eNB
SECAS_NULL_CTX = (
32*b'\0', # Kenb
get_ueseccap_null_alg_lte() # UESecCap
)
#--------------------------------------------------------------------------#
# S1APPaging policy
#--------------------------------------------------------------------------#
# if we want to page with the IMSI, instead of the M-TMSI
PAG_IMSI = False
# default paging domain, if not provided as page() arg
# 'ps' or 'cs'
PAG_DOM_DEF = 'ps'
# specific default priority defined for paging, if not provided as page() arg
# None or 1..8 or 'priolevel1'..'priolevel8'
PAG_PRIO_DEF = None
#
# page_block() parameters:
# number of retries when not successful
PAG_RETR = 2
# timer in sec between retries
PAG_WAIT = 2
#--------------------------------------------------------------------------#
# S1APInitialContextSetup policy
#--------------------------------------------------------------------------#
# to include UERadioCap in request when available (bool)
ICS_RADCAP_INCL = True
# to include GUMMEI in request when available (bool)
ICS_GUMMEI_INCL = True
# to activate traces (None or dict of values to be passed to the TraceActivation IEs)
ICS_TRACE_ACT = None
#--------------------------------------------------------------------------#
# S1APTraceStart policy
#--------------------------------------------------------------------------#
# TraceActivation content:
# interfaces to trace, uint8, bitmap (S1-MME | X2 | UU | 5-bit reserved)
TRA_IF = 0b11100000
# depth: minimum, medium, maximum, ...
TRA_DEPTH = 'medium'
# traceCollectionEntityIPAddress
TRA_TLA = (1, '127.0.1.100')
# MDT configuration: None or dict
TRA_MDT_CFG = {
'mdt-Activation': 'immediate-MDT-and-Trace',
'areaScopeOfMDT': ('pLMNWide', 0),
'mDTMode': ('immediateMDT', {
'measurementsToActivate': 0b11111110, # uint8 bitmap, M1 to M7
'm1reportingTrigger': 'periodic'
})
}
TRA_MDT_CFG = None # comment this to send the MDT config in trace activation
def _log(self, logtype, msg):
self.UE._log(logtype, '[UES1d: %3i] %s' % (self.CtxId, msg))
def __init__(self, ued, enbd=None, ctx_id=-1, sid=None):
self.UE = ued
self.Server = ued.Server
self.Config = self.Server.ConfigS1
#
# dict of ongoing S1AP procedures (indexed by their procedure code)
self.Proc = {}
# list of tracked procedures (requires TRACK_PROC = True)
self._proc = []
#
# dict of available LTE security contexts, indexed by KSI
# and current KSI in use
self.SEC = {}
self.reset_sec_ctx()
#
self.connected = Event()
if enbd is not None:
self.set_ran(enbd)
else:
self.CtxId = -1
#
# init EMM and ESM sig stacks
self.EMM = UEEMMd(ued, self)
self.ESM = UEESMd(ued, self)
self.SMS = UESMSd(ued, self)
def set_ran(self, enbd):
self.SEC['KSI'] = None
self.ENB = enbd
self.connected.set()
def unset_ran(self):
self.ENB.unset_ue_s1(self.CtxId)
del self.ENB
self.SEC['KSI'] = None
self.clear()
self.connected.clear()
def set_ran_unconnected(self, enbd):
# required for paging
self.SEC['KSI'] = None
self.ENB = enbd
def unset_ran_unconnected(self):
# required for paging
del self.ENB
self.SEC['KSI'] = None
def is_connected(self):
#return self.RNC is not None
return self.connected.is_set()
def set_ctx(self, ctx_id, sid):
self.CtxId = ctx_id
self.SID = sid
def unset_ctx(self):
self.CtxId = -1
del self.SID
def reset_sec_ctx(self):
self.SEC.clear()
self.SEC['KSI'] = None
self.SEC['POL'] = {'TAU': 0, 'DET': 0, 'SER': 0}
if 'UESecCap' in self.UE.Cap:
del self.UE.Cap['UESecCap']
def get_sec_ctx(self):
return self.SEC.get(self.SEC['KSI'], None)
#--------------------------------------------------------------------------#
# handling of S1AP procedures
#--------------------------------------------------------------------------#
def process_s1ap_pdu(self, pdu_rx):
"""process an S1AP PDU sent by the eNB for UE-associated signalling
and return a list of S1AP PDU(s) to be sent back to it
"""
errcause = None
if pdu_rx[0] == 'initiatingMessage':
# eNB-initiated procedure, instantiate it
try:
Proc = S1APProcEnbDispatcher[pdu_rx[1]['procedureCode']](self)
except Exception:
self._log('ERR', 'invalid S1AP PDU, initiatingMessage, code %i'\
% pdu_rx[1]['procedureCode'])
errcause = ('protocol', 'abstract-syntax-error-reject')
Proc = self.init_s1ap_proc(S1APErrorIndCN, Cause=errcause)
if not Proc:
return []
else:
if self.TRACK_PROC:
self._proc.append( Proc )
# process the PDU within the procedure
Proc.recv( pdu_rx )
if Proc.Class == 2 and Proc.errcause:
Err = self.init_s1ap_proc(S1APErrorIndCN, Cause=Proc.errcause)
if not Err:
return []
self.ProcLast = Err.Code
return Err.send()
elif Proc.Class == 1 or errcause:
self.ProcLast = Proc.Code
return Proc.send()
else:
pdu_tx = []
for ProcRet in Proc.trigger():
pdu_tx.extend( ProcRet.send() )
self.ProcLast = ProcRet.Code
return pdu_tx
#
else:
# CN-initiated procedure, transfer the PDU to it
try:
Proc = self.Proc[pdu_rx[1]['procedureCode']]
except Exception:
self._log('ERR', 'invalid S1AP PDU, %s, code %i'\
% (pdu_rx[0], pdu_rx[1]['procedureCode']))
errcause = ('protocol', 'message-not-compatible-with-receiver-state')
Proc = self.init_s1ap_proc(S1APErrorIndCN, Cause=errcause)
if not Proc:
return []
# process the PDU within the procedure
Proc.recv( pdu_rx )
if Proc.errcause:
Err = self.init_s1ap_proc(S1APErrorIndCN, Cause=Proc.errcause)
if not Err:
return []
self.ProcLast = Err.Code
return Err.send()
elif errcause:
self.ProcLast = Proc.Code
return Proc.send()
else:
pdu_tx = []
for ProcRet in Proc.trigger():
pdu_tx.extend( ProcRet.send() )
self.ProcLast = ProcRet.Code
return pdu_tx
def init_s1ap_proc(self, ProcClass, **IEs):
"""initialize a CN-initiated S1AP procedure of class `ProcClass' for
UE-associated signalling, encode the initiatingMessage PDU with given
**IEs and return the procedure
"""
Proc = self._init_s1ap_proc(ProcClass)
if not Proc:
return None
else:
self._encode_s1ap_proc(Proc, **IEs)
return Proc
def _init_s1ap_proc(self, ProcClass):
if not issubclass(ProcClass, S1APSigProc):
self._log('WNG', 'starting an invalid procedure for UE-associated S1 signalling')
if ProcClass.Code in self.Proc:
self._log('ERR', 'an S1AP procedure %s is already ongoing' % ProcClass.__name__)
return None
try:
Proc = ProcClass(self)
except Exception:
# no active S1 link
self._log('ERR', 'no active S1 link to initialize the S1AP procedure %s'\
% ProcClass.__name__)
return None
if Proc.Code in S1APProcCnDispatcher and Proc.Class == 1:
# store the procedure, which requires a response from the eNB
self.Proc[Proc.Code] = Proc
if self.TRACK_PROC:
self._proc.append( Proc )
return Proc
def _encode_s1ap_proc(self, Proc, **IEs):
if Proc.Name != 'S1APUEContextRelease':
IEs['MME_UE_S1AP_ID'], IEs['ENB_UE_S1AP_ID'] = self.CtxId, self.CtxId
else:
IEs['UE_S1AP_IDs'] = ('uE-S1AP-ID-pair', {'mME-UE-S1AP-ID': self.CtxId,
'eNB-UE-S1AP-ID': self.CtxId})
Proc.encode_pdu('ini', **IEs)
def start_s1ap_proc(self, ProcClass, **IEs):
"""initialize a CN-initiated S1AP procedure of class `ProcClass' for
UE-associated signalling, encode the initiatingMessage PDU with given
**IEs and send the PDU generated by the procedure to the eNB
"""
if not self.is_connected():
self._log('ERR', 'not connected')
return 0
Proc = self.init_s1ap_proc(ProcClass, **IEs)
if Proc is None:
return 0
self.ProcLast, cnt = Proc.Code, 0
for pdu_tx in Proc.send():
if self.UE.Server.send_s1ap_pdu(self.ENB, pdu_tx, self.SID):
cnt += 1
return cnt
def transmit_s1ap_proc(self, S1apTxProc):
"""send the S1AP PDU as returned by the .send() method of the S1AP procedures
in the S1apTxProc list to the eNB
"""
cnt = 0
for Proc in S1apTxProc:
self.ProcLast = Proc.Code
for pdu_tx in Proc.send():
if self.UE.Server.send_s1ap_pdu(self.ENB, pdu_tx, self.SID):
cnt += 1
return cnt
def clear(self):
# clears all running S1AP procedures
for Proc in list(self.Proc.values()):
Proc.abort()
#--------------------------------------------------------------------------#
# handling of NAS messages dispatching
#--------------------------------------------------------------------------#
def process_nas(self, buf):
"""process a NAS message buffer for the EPS domain sent by the mobile
and return a list (possibly empty) of S1AP procedure(s) to be sent back
to the eNB
"""
if self.RX_HOOK:
return self.RX_HOOK(buf)
NasRxSec, err = NAS.parse_NASLTE_MO(buf, inner=False)
if err:
self._log('WNG', 'invalid EPS NAS message: %s' % hexlify(buf).decode('ascii'))
return self.ret_s1ap_dnt(NAS.EMMStatus(val={'EMMCause':err}, sec=False))
#
# LTE NAS security handling
sh, pd = NasRxSec[0]['SecHdr'].get_val(), NasRxSec[0]['ProtDisc'].get_val()
if sh == 0:
# clear-text NAS message
NasRxSec._sec = False
NasRxSec._ulcnt = 0
if self.UE.TRACE_NAS_EPS:
self._log('TRACE_NAS_EPS_UL', '\n' + NasRxSec.show())
if pd == 7:
S1apTxProc = self.EMM.process(NasRxSec)
else:
assert( pd == 2 ) # this won't happen due to parse_NASLTE_MO()
S1apTxProc = self.ESM.process(NasRxSec)
elif sh == 12:
# NAS service request
if self.UE.TRACE_NAS_EPS:
self._log('TRACE_NAS_EPS_UL', '\n' + NasRxSec.show())
try:
NasRx, err = self.process_nas_sec_servreq(NasRxSec)
except Exception as err:
self._log('ERR', 'unable to process the NAS EMMServiceRequest security, %s' % err)
return self._s1ap_nas_sec_err()
if not NasRx:
return self._s1ap_nas_sec_err()
else:
S1apTxProc = self.EMM.process(NasRx)
elif sh in (1, 2, 3, 4) and pd == 7:
if self.UE.TRACE_NAS_EPS_SEC:
self._log('TRACE_NAS_EPS_UL_SEC', '\n' + NasRxSec.show())
if sh in (1, 3):
# integrity-protected NAS message
NasRx, err = self.process_nas_sec_noenc(NasRxSec, sh)
else:
# integrity-protected and ciphered NAS message
NasRx, err = self.process_nas_sec_enc(NasRxSec, sh)
if err & 0xff:
# non-security related error
S1apTxProc = self.ret_s1ap_dnt(NAS.EMMStatus(val={'EMMCause':err}, sec=True))
elif not NasRx:
# deciphering failed
return self._s1ap_nas_sec_err()
else:
if self.UE.TRACE_NAS_EPS:
self._log('TRACE_NAS_EPS_UL', '\n' + NasRx.show())
if NasRx[0]['ProtDisc'].get_val() == 7:
S1apTxProc = self.EMM.process(NasRx)
else:
S1apTxProc = self.ESM.process(NasRx)
else:
# cause: invalid mandatory information
self._log('WNG', 'invalid EPS NAS message: %r' % NasRxSec)
S1apTxProc = self.ret_s1ap_dnt(NAS.EMMStatus(val={'EMMCause':96}, sec=False))
#
return S1apTxProc
def process_nas_sec_servreq(self, ServReq):
"""Check the security on the EMM Service Request.
Returns the request or None (if security checks are enforced), and the
security error code.
Security error codes:
0: no error
0x100: NAS KSI unknown
0x200: MAC verification failed
0x300: NAS UL count not matching
The returned request gets 2 attributes (_sec [bool], _ulcnt [uint])
"""
if self.SECNAS_DISABLED:
ServReq._sec = True
ServReq._ulcnt = 0
return ServReq, 0
#
ue_ksi, ue_sqn = ServReq['KSI'].get_val(), ServReq['SeqnShort'].get_val()
if ue_ksi not in self.SEC:
self._log('WNG', 'NAS SEC: unknown NAS KSI %i in EMMServiceRequest' % ue_ksi)
self.reset_sec_ctx()
ServReq._sec = False
ServReq._ulcnt = ue_sqn # we are missing the MSB...
return ServReq, 0x100
else:
self.SEC['KSI'] = ue_ksi
secctx = self.SEC[ue_ksi]
#
sqnmsb, sqnlsb = secctx['UL'] & 0xffffffe0, secctx['UL'] & 0x1f
verif_mac = ServReq.mac_verify(secctx['Knasint'], 0, secctx['EIA'], sqnmsb)
verif_sqn = True if ue_sqn == sqnlsb else False
#
if not verif_mac:
if self.SECNAS_UL_MAC:
self._log('ERR', 'NAS SEC UL: MAC short verif failed, dropping EMMServiceRequest')
return None, 0x200
else:
self._log('WNG', 'NAS SEC UL: MAC short verif failed in EMMServiceRequest')
ServReq._sec = False
ServReq._ulcnt = sqnmsb + ue_sqn
return ServReq, 0x200
elif not verif_sqn:
if self.SECNAS_UL_CNT:
self._log('ERR', 'NAS SEC UL: UL count verif failed, dropping EMMServiceRequest')
return None, 0x300
else:
self._log('WNG', 'NAS SEC UL: UL count verif failed in EMMServiceRequest')
# resynch uplink count
ServReq._sec = False
ServReq._ulcnt = sqnmsb + ue_sqn
secctx['UL'] = sqnmsb + ue_sqn + 1
return ServReq, 0x300
#
ServReq._sec = True
ServReq._ulcnt = secctx['UL']
secctx['UL'] += 1
return ServReq, 0
def process_nas_sec_mac(self, NasRxSec, secctx, inner_name):
#
sqnmsb, sqnlsb = secctx['UL'] & 0xffffff00, secctx['UL'] & 0xff
verif_mac = NasRxSec.mac_verify(secctx['Knasint'], 0, secctx['EIA'], sqnmsb)
ue_sqn = NasRxSec['Seqn'].get_val()
verif_sqn = True if ue_sqn == sqnlsb else False
if not inner_name:
inner_name = NasRxSec._name
#
if not verif_mac:
if self.SECNAS_UL_MAC:
self._log('ERR', 'NAS SEC UL: MAC verif failed, dropping %s' % inner_name)
return False, 0x200, False, 0
else:
self._log('WNG', 'NAS SEC UL: MAC verif failed in %s' % inner_name)
return True, 0x200, False, sqnmsb+ue_sqn
elif not verif_sqn:
if self.SECNAS_UL_CNT:
self._log('ERR', 'NAS SEC UL: UL count verif failed, dropping %s' % inner_name)
return False, 0x300, False, 0
else:
self._log('WNG', 'NAS SEC UL: UL count verif failed in %s' % inner_name)
# resynch uplink count
secctx['UL'] = sqnmsb+ue_sqn+1
return True, 0x300, False, sqnmsb+ue_sqn
else:
self._log('DBG', 'NAS SEC UL: MAC verified, UL count %i' % secctx['UL'])
ulcnt = secctx['UL']
secctx['UL'] += 1
return True, 0, True, ulcnt
def process_nas_sec_noenc(self, NasRxSec, sh):
"""Check the security on all UL EMM messages which are not encrypted,
except the Service Request.
Returns the message or None (if security checks are enforced), and the
security error code.
Security error codes:
0: no error
0x100: NAS KSI unknown
0x200: MAC verification failed
0x300: NAS UL count not matching
The returned message gets 2 attributes (_sec [bool], _ulcnt [uint])
"""
# decode the inner NAS message
buf = NasRxSec['NASMessage'].get_val()
NasRx, err = NAS.parse_NASLTE_MO(buf, inner=False)
if err:
self._log('WNG', 'invalid EPS NAS message: %s' % hexlify(buf).decode('ascii'))
#
if self.SECNAS_DISABLED:
if err:
return None, err
else:
NasRx._sec = True
NasRx._ulcnt = 0
return NasRx, 0
#
if 'NAS_KSI' in NasRx._by_name:
ue_ksi = NasRx['NAS_KSI'][-1].get_val()
if ue_ksi[0] == 1:
self._log('INF', 'NAS SEC UL: mapped NAS KSI %i in %s' % (ue_ksi[1], NasRxSec._name))
ue_ksi = (ue_ksi[0]<<3) + ue_ksi[1]
# TODO: map the 3G corresponding sec context to a 4G one
else:
ue_ksi = ue_ksi[1]
if ue_ksi not in self.SEC:
# UE KSI unknown
self.reset_sec_ctx()
if not err:
self._log('INF', 'NAS SEC UL: unknown NAS KSI %i in %s' % (ue_ksi, NasRx._name))
NasRx._sec = False
NasRx._ulcnt = NasRxSec['Seqn'].get_val()
return NasRx, 0x100
else:
# there is nothing we can do here
self._log('INF', 'NAS SEC UL: unknown NAS KSI %i' % ue_ksi)
return None, err + 0x100
else:
self.SEC['KSI'] = ue_ksi
secctx = self.SEC[ue_ksi]
else:
if self.SEC['KSI'] not in self.SEC:
# no correct active KSI: happens when restarting corenet, and UE using a forgotten sec ctx
self.reset_sec_ctx()
if not err:
self._log('INF', 'NAS SEC UL: no NAS KSI in %s neither valid active KSI' % NasRx._name)
NasRx._sec = False
NasRx._ulcnt = NasRxSec['Seqn'].get_val()
return NasRx, 0x100
else:
# there is nothing we can do here
self._log('INF', 'NAS SEC UL: no NAS KSI')
return None, err + 0x100
else:
secctx = self.SEC[self.SEC['KSI']]
#
if NasRx:
chk, err, NasRx._sec, NasRx._ulcnt = self.process_nas_sec_mac(NasRxSec, secctx, NasRx._name)
else:
chk, err, sec, ulcnt = self.process_nas_sec_mac(NasRxSec, secctx, '_unknown_')
if not chk:
return None, err
else:
return NasRx, err
def process_nas_sec_enc(self, NasRxSec, sh):
"""Check the security on all UL EMM messages which are encrypted.
Returns the message or None (if security checks are enforced), and the
security error code.
Security error codes:
0: no error
0x100: no active NAS KSI
0x200: MAC verification failed
0x300: NAS UL count not matching
The returned message gets 2 attributes (_sec [bool], _ulcnt [uint])
"""
if self.SECNAS_DISABLED:
# TODO: try to decode the inner NAS message, in case EEA0 is in use ?
self._log('WNG', 'unable to decode the inner NAS message')
return None, 0
#
if self.SEC['KSI'] not in self.SEC:
# no active KSI: happens when restarting corenet, and UE using a forgotten sec ctx
self._log('WNG', 'NAS SEC UL: no active NAS KSI')
return None, 0x100
else:
secctx = self.SEC[self.SEC['KSI']]
#
chk, err, sec, ulcnt = self.process_nas_sec_mac(NasRxSec, secctx, '_unknown_')
if not chk:
return None, err
#
if secctx['EEA'] == 0:
buf = NasRxSec['NASMessage'].get_val()
else:
NasRxSec.decrypt(secctx['Knasenc'], 0, secctx['EEA'], ulcnt & 0xffffff00)
buf = NasRxSec._dec_msg
NasRx, err2 = NAS.parse_NASLTE_MO(buf, inner=False)
if err2:
# decrypted decoded part is malformed
self._log('WNG', 'invalid EPS NAS message: %s' % hexlify(buf).decode('ascii'))
NasRx._sec = sec
NasRx._ulcnt = ulcnt
return NasRx, err + err2
def output_nas_sec(self, NasTx):
"""Apply the security on all DL ESM / EMM messages.
Returns the encoded bytes buffer or None if error.
"""
if self.UE.TRACE_NAS_EPS:
self._log('TRACE_NAS_EPS_DL', '\n' + NasTx.show())
if self.SECNAS_DISABLED or NasTx._name in self.SECNAS_PDU_NOSEC or \
NasTx._sec == False:
sec = False
else:
ksi = self.SEC['KSI']
if ksi is None:
# NAS security not activated
#NasTx[0]['SecHdr'].set_val(0)
sec = False
elif ksi not in self.SEC:
# invalid KSI: this should not happen
self._log('ERR', 'NAS SEC DL: invalid NAS KSI %i, unable to secure the NAS message %s'\
% (ksi, NasTx._name))
self.reset_sec_ctx()
return None
else:
secctx = self.SEC[self.SEC['KSI']]
sqnmsb, sqnlsb = secctx['DL'] & 0xffffff00, secctx['DL'] & 0xff
if NasTx._name == 'EMMSecurityModeCommand':
# integrity protextion only + new security context
sh = 3
else:
# integrity protection + ciphering
sh = 2
try:
NasTxSec = NAS.EMMSecProtNASMessage(val={'EMMHeaderSec': {'SecHdr': sh},
'Seqn': sqnlsb,
'NASMessage': NasTx.to_bytes()})
if sh == 2:
NasTxSec.encrypt(secctx['Knasenc'], 1, secctx['EEA'], sqnmsb)
NasTxSec.mac_compute(secctx['Knasint'], 1, secctx['EIA'], sqnmsb)
except Exception:
self._log('ERR', 'NAS SEC DL: unable to protect the NAS message %s' % NasTx._name)
#self.reset_sec_ctx()
return None
else:
secctx['DL'] += 1
sec = True
if sec:
if self.UE.TRACE_NAS_EPS_SEC:
self._log('TRACE_NAS_EPS_DL_SEC', '\n' + NasTxSec.show())
try:
return NasTxSec.to_bytes()
except Exception as err:
self._log('ERR', 'unable to encode the NAS message %s, %r' % (NasTxSec._name, err))
return None
else:
try:
return NasTx.to_bytes()
except Exception as err:
self._log('ERR', 'unable to encode the NAS message %s, %r' % (NasTx._name, err))
return None
def ret_s1ap_dnt(self, NasTx, **IEs):
"""returns an S1APDownlinkNASTransport procedure initialized with the
NAS PDU and optional IEs to be sent
"""
if not NasTx:
return []
else:
buf = self.output_nas_sec(NasTx)
if buf is None:
return self._s1ap_nas_sec_err()
IEs['NAS_PDU'] = buf
NgapProc = self.init_ngap_proc(NGAPDownlinkNASTransport, **IEs)
if S1apProc:
return [S1apProc]
else:
return []
def _s1ap_nas_sec_err(self):
# TODO: maybe release the S1-UE link ?
return []
def clear_nas_proc(self):
# clears all NAS EPS procedures
self.EMM.clear()
self.ESM.clear()
#--------------------------------------------------------------------------#
# network-initiated method (fg task, to be used from the interpreter)
#--------------------------------------------------------------------------#
def _get_paging_ies(self, dom, prio):
# prepare the S1APPaging IEs
IEs = {}
#
if not self.UE.IMSI or not self.UE.PLMN or self.UE.TAC is None:
return IEs
#
if self.PAG_IMSI or self.UE.MTMSI is None:
# paging with IMSI
self._log('INF', 'paging with IMSI')
IEs['UEPagingID'] = ('iMSI', NAS.encode_bcd(self.UE.IMSI))
else:
IEs['UEPagingID'] = ('s-TMSI', {'mMEC': uint_to_bytes(self.UE.Server.MME_CODE, 8),
'm-TMSI': uint_to_bytes(self.UE.MTMSI, 32)})
#
if 'DRXParam' in self.UE.Cap:
drx = self.UE.Cap['DRXParam'][1]['DRXCycleLen'].get_val()
if drx in (6, 7, 8, 9):
IEs['PagingDRX'] = S1AP.S1AP_IEs.PagingDRX._cont_rev[drx-6]
#
IEs['TAIList'] = [{
'id': 47,
'criticality': 'ignore',
'value': ('TAIItem', {'tAI': {'pLMNidentity': plmn_str_to_buf(self.UE.PLMN),
'tAC': uint_to_bytes(self.UE.TAC, 16)}})
}]
#
IEs['UEIdentityIndexValue'] = (int(self.UE.IMSI) % 1024, 10)
#
if isinstance(dom, str_types) and dom.lower() == 'cs' \
or self.PAG_DOM_DEF.lower() == 'cs':
IEs['CNDomain'] = 'cs'
else:
IEs['CNDomain'] = 'ps'
#
if prio is not None or self.PAG_PRIO_DEF is not None:
if isinstance(prio, integer_types) and 1 <= prio <= 8:
IEs['PagingPriority'] = 'priolevel%i' % prio
elif isinstance(prio, str_types) and re.match(r'priolevel[1-8]', prio):
IEs['PagingPriority'] = prio
elif isinstance(self.PAG_PRIO_DEF, integer_types) and 1 <= self.PAG_PRIO_DEF <= 8:
IEs['PagingPriority'] = 'priolevel%i' % self.PAG_PRIO_DEF
elif isinstance(self.PAG_PRIO_DEF, str_types) \
and re.match(r'priolevel[1-8]', self.PAG_PRIO_DEF):
IEs['PagingPriority'] = self.PAG_PRIO_DEF
#
if 'UERadioCapPaging' in self.UE.Cap:
IEs['UERadioCapabilityForPaging'] = self.UE.Cap['UERadioCapPaging'][0]
#
return IEs
def page(self, dom=None, prio=None):
"""send S1AP Paging command to eNB responsible for the UE TAI
"""
# send a S1APPaging for the EPS domain
if self.connected.is_set():
self._log('DBG', 'paging: UE already connected')
return
# get the set of eNBs serving the UE TAI
# eNB id is 2-tuple whereas gNB id is 3-tuple
tai = (self.UE.PLMN, self.UE.TAC)
try:
enbs = [self.Server.RAN[enbid] for enbid in self.Server.TAI[tai] if len(enbid) == 2]
except Exception:
self._log('ERR', 'paging: no eNB serving the UE TAI %s.%.4x' % tai)
return
#
IEs = self._get_paging_ies(dom, prio)
if not IEs:
self._log('ERR', 'paging: missing basic information')
return
#
# start an S1APPaging procedure on all eNBs
for enb in enbs:
enb.page(**IEs)
self._log('INF', 'paging: ongoing')
def page_block(self, dom=None, prio=None):
"""page the UE and wait for it to connect, or the paging procedure to timeout.
Returns True if UE gets connected, False otherwise.
"""
# send a S1APPaging for the EPS domain
if self.connected.is_set():
self._log('DBG', 'paging: UE already connected')
return True
# get the set of eNBs serving the UE TAI
# eNB id is 2-tuple whereas gNB id is 3-tuple
tai = (self.UE.PLMN, self.UE.TAC)
try:
enbs = [self.Server.RAN[enbid] for enbid in self.Server.TAI[tai] if len(enbid) == 2]
except Exception:
self._log('ERR', 'paging: no eNB serving the UE TAI %s.%.4x' % tai)
return False
#
IEs = self._get_paging_ies(dom, prio)
if not IEs:
self._log('ERR', 'paging: missing basic information')
return False
#
# retries paging as defined in case UE does not connect
i = 0
while i <= self.PAG_RETR:
# start an S1APPaging procedure on all RNCs
for enb in enbs:
enb.page(**IEs)
# check until UE gets connected or timer expires
if self.connected.wait(self.PAG_WAIT):
self._log('INF', 'paging: UE connected')
return True
else:
# timeout
i += 1
self._log('WNG', 'paging: timeout, UE not connected')
return False
def release(self, cause=('nas', 'normal-release')):
"""release the S1 link with the given S1AP cause
"""
if not self.connected.is_set():
# nothing to release
self._log('DBG', 'release: UE not connected')
return True
# prepare the S1APUEContextRelease procedure
S1apProc = self.init_s1ap_proc(S1APUEContextRelease, Cause=cause)
if not S1apProc:
return False
if not self.transmit_s1ap_proc([S1apProc]):
return False
else:
return True
def send_error_ind(self, cause, **IEs):
"""start a S1APErrorIndCN with the given S1AP cause
IEs can contain any of the optional or extended IEs
"""
if not self.connected.is_set():
# S1AP link disconnected
if self.S1AP_FORCE_PAGE:
# force to connect
if not self._net_init_con():
# unable to connect with the UE
return False
else:
return False
# prepare the S1AP procedure
IEs['Cause'] = cause
S1apProc = self.init_s1ap_proc(S1APErrorIndCN, **IEs)
if not S1apProc:
return False
if not self.transmit_s1ap_proc([S1apProc]):
return False
else:
return True
def _get_trace_act_ie(self, traceref, interfaces=None, depth=None, addr=None, mdtcfg=None):
# prepare the TraceActivation IE
#
if interfaces is None:
interfaces = self.TRA_IF
#
if depth is None:
depth = self.TRA_DEPTH
elif isinstance(depth, integer_types):
depth = S1AP.S1AP_IEs.TraceDepth._cont_rev[depth]
#
if addr is None:
if self.TRA_TLA is None:
addr = b'\0'
else:
addr = inet_aton_cn(*self.TRA_TLA)
else:
addr = inet_aton_cn(*addr)
#
traceact = {
'e-UTRAN-Trace-ID': plmn_str_to_buf(self.Server.PLMN) + b'\0\0\0' + traceref,
'interfacesToTrace': (interfaces, 8),
'traceDepth': depth,
'traceCollectionEntityIPAddress': (bytes_to_uint(addr, 8*len(addr)), 8*len(addr)),
}
#
if mdtcfg is None and self.TRA_MDT_CFG is not None:
mdtcfg = self.TRA_MDT_CFG
if mdtcfg is not None:
traceact['iE-Extensions'] = [{
'id': 162,
'criticality': 'ignore',
'extensionValue': ('MDT-Configuration', mdtcfg)
}]
#
return traceact
def start_trace(self, traceref, interfaces=None, depth=None, addr=None, mdtcfg=None):
"""start a S1APTraceStart with the given traceref (2 bytes) and other parameters:
interfaces: None or uint8, bitmap (S1-MME | X2 | UU | 5-bit reserved)
depth : None or uint 0..5 or str (see TraceDepth)
addr : None or PDN-like addr (1, IPv4 ascii) or (2, IPv6 ascii)
mdtcfg: None or dict, MDT-Configuration IE value
"""
if not self.connected.is_set():
# S1AP link disconnected
if self.S1AP_FORCE_PAGE:
# force to connect
if not self._net_init_con():
# unable to connect with the UE
return False
else:
return False
# prepare the S1AP procedure
if not isinstance(traceref, bytes_types) or len(traceref) != 2:
return False
traceact = self._get_trace_act_ie(traceref, interfaces, depth, addr, mdtcfg)
S1apProc = self.init_s1ap_proc(S1APTraceStart, TraceActivation=traceact)
if not S1apProc:
return False
if not self.transmit_s1ap_proc([S1apProc]):
return False
else:
return True
def deactivate_trace(self, traceref):
"""start a S1APDeactivateTrace with the given traceref (2 bytes)
"""
if not self.connected.is_set():
# S1AP link disconnected
if self.S1AP_FORCE_PAGE:
# force to connect
if not self._net_init_con():
# unable to connect with the UE
return False
else:
return False
# prepare the S1AP procedure
if not isinstance(traceref, bytes_types) or len(traceref) != 2:
return False
traceid = plmn_buf_to_str(self.Server.PLMN) + b'\0\0\0' + traceref
S1apProc = self.init_s1ap_proc(S1APDeactivateTrace, E_UTRAN_Trace_ID=traceid)
if not S1apProc:
return False
if not self.transmit_s1ap_proc([S1apProc]):
return False
else:
return True
def report_loc_ctrl(self, reqtype={'eventType': 'direct', 'reportArea': 'ecgi'}):
"""start a S1APLocationReportingControl with a given request type
RequestType is a sequence of {EventType (enum), ReportArea (enum)}
"""
if not self.connected.is_set():
# S1AP link disconnected
if self.S1AP_FORCE_PAGE:
# force to connect
if not self._net_init_con():
# unable to connect with the UE
return False
else:
return False
# prepare the S1AP procedure
S1apProc = self.init_s1ap_proc(S1APLocationReportingControl, RequestType=reqtype)
if not S1apProc:
return False
if not self.transmit_s1ap_proc([S1apProc]):
return False
else:
return True
# this is used by send_raw() and other network-initiated procedures
def _net_init_con(self):
return self.EMM._net_init_con()
#--------------------------------------------------------------------------#
# to send arbitrary NAS buffers to the UE
#--------------------------------------------------------------------------#
def send_nas_raw(self, naspdu, sec=True, rx_hook=lambda x:[], wait_t=1):
"""Sends whatever bytes, or list of bytes, to the UE as NAS PDU(s)
"""
if not self._net_init_con():
return False
#
if isinstance(naspdu, bytes_types):
return self._send_nas_raw(naspdu, sec, rx_hook, wait_t)
#
elif isinstance(naspdu, (tuple, list)):
for pdu in naspdu:
ret = self.send_nas_raw(pdu, sec, rx_hook, wait_t=wait_t)
if not ret:
return False
return True
def _send_nas_raw(self, naspdu, sec=True, rx_hook=lambda x:[], wait_t=1):
# overwrite the class attribute
self.RX_HOOK = rx_hook
#
if sec:
# need to wrap the naspdu into a pseudo NasTx structure
NasTx = Envelope('NASDummy', GEN=(Uint8('SecHdr', trans=True),
Buf('NASPDU', val=naspdu)))
NasTx._sec = True
naspdu = self.output_nas_sec(NasTx)
if naspdu is None:
del self.RX_HOOK
return False
#
S1apProc = self.init_s1ap_proc(S1APDownlinkNASTransport,
NAS_PDU=naspdu)
if S1apProc:
if not self.transmit_s1ap_proc([S1apProc]):
ret = False
else:
self._log('INF', 'send_nas_raw: 0x%s' % hexlify(naspdu).decode('ascii'))
sleep(wait_t)
ret = True
else:
ret = False
#
# restore the class attribute
del self.RX_HOOK
return ret
#--------------------------------------------------------------------------#
# EPS bearers activation
#--------------------------------------------------------------------------#
def bearer_act(self):
# reactivate all PDN connections
erablist, ebilist, brdl, brul = [], [], 0, 0
for ebi, pdncfg in self.ESM.PDN.items():
if 'RAB' in pdncfg:
rabcfg = pdncfg['RAB']
ebilist.append(ebi)
# erab ext IE can be Correlation-ID and/or BearerType
erablist.append({
'id': 52,
'criticality': 'reject',
'value': ('E-RABToBeSetupItemCtxtSUReq', {
'e-RAB-ID': ebi,
'e-RABlevelQoSParameters': rabcfg['E-RABlevelQoSParameters'],
'transportLayerAddress': (bytes_to_uint(inet_aton(rabcfg['SGW-TLA']), 32), 32),
'gTP-TEID': uint_to_bytes(rabcfg['SGW-GTP-TEID'], 32),
#'iE-Extensions': [],
})
})
brdl += rabcfg['BitrateDL']
brul += rabcfg['BitrateUL']
if not erablist:
# no PDN connection to reactivate
return None
#
# get the current sec context to setup the eNB security layer
secctx = self.get_sec_ctx()
if secctx and 'UESecCap' in self.UE.Cap:
# create the KeNB
self._log('DBG', 'NAS UL count for Kenb derivation, %i' % secctx['UL_enb'])
Kenb, UESecCap = conv_401_A3(secctx['Kasme'], secctx['UL_enb']), self.UE.Cap['UESecCap'][1]
secctx['Kenb'] = Kenb
secctx['NCC'] = 0
secctx['NH'] = conv_401_A4(secctx['Kasme'], Kenb)
else:
self._log('WNG', 'no active NAS security context, using the null AS security context')
Kenb, UESecCap = self.SECAS_NULL_CTX
#
IEs = {
'E_RABToBeSetupListCtxtSUReq': erablist,
'UEAggregateMaximumBitrate': {
'uEaggregateMaximumBitRateDL': brdl,
'uEaggregateMaximumBitRateUL': brul
},
'SecurityKey': (bytes_to_uint(Kenb, 256), 256),
'UESecurityCapabilities': {
'encryptionAlgorithms': ((UESecCap[1].get_val()<<15) + \
(UESecCap[2].get_val()<<14) + \
(UESecCap[3].get_val()<<13), 16),
'integrityProtectionAlgorithms': ((UESecCap[9].get_val()<<15) + \
(UESecCap[10].get_val()<<14) + \
(UESecCap[11].get_val()<<13), 16)
}
}
#
if self.ICS_RADCAP_INCL and 'UERadioCap' in self.UE.Cap:
IEs['UERadioCapability'] = self.UE.Cap['UERadioCap'][0]
if self.ICS_GUMMEI_INCL:
IEs['GUMMEI'] = gummei_to_asn(self.UE.Server.PLMN,
self.UE.Server.MME_GID,
self.UE.Server.MME_CODE)
if self.ICS_TRACE_ACT:
IEs['TraceActivation'] = self.ICS_TRACE_ACT
#
S1apProc = self.init_s1ap_proc(S1APInitialContextSetup, **IEs)
if S1apProc:
# pass the info required for setting the GTPU tunnel
S1apProc._gtp_add_mobile_ebi = ebilist
return S1apProc
else:
return None