corenet: few fixes and some more S1AP methods

This commit is contained in:
mitshell 2018-02-23 22:44:37 +01:00
parent 2788783d2c
commit ae0c39c645
4 changed files with 488 additions and 111 deletions

View File

@ -85,7 +85,7 @@ class UEEMMd(SigStack):
#--------------------------------------------------------------------------#
# T3412, periodic TAU timer: dict {'Unit': uint3, 'Value': uint5}
# Unit: 0: 2s, 1: 1mn, 2: 6mn, 7: deactivated
_T3412 = {'Unit': 1, 'Value': 2} # 30mn
_T3412 = {'Unit': 2, 'Value': 2} # 12mn
#_T3412 = {'Unit': 7, 'Value': 0} # deactivated
#
# Reattach attempt after a failure timer: dict {'Unit': uint3, 'Value': uint5}
@ -966,21 +966,21 @@ class UEESMd(SigStack):
# err cause 51: PDN type IPv6 only allowed
return None, 51
else:
IEs['PDNAddr'] = {'Type': 1, 'Addr': inet_aton_cn(1, pdncfg['Addr'][1])}
IEs['PDNAddr'] = {'Type': 1, 'Addr': inet_aton_cn(1, pdncfg['Addr'][1], dom='EPS')}
ipaddr = (1, pdncfg['Addr'][1])
elif pdntue == 2:
if pdntnet not in (2, 3):
# err cause 50: PDN type IPv4 only allowed
return None, 50
else:
IEs['PDNAddr'] = {'Type': 2, 'Addr': inet_aton_cn(2, pdncfg['Addr'][2])}
IEs['PDNAddr'] = {'Type': 2, 'Addr': inet_aton_cn(2, pdncfg['Addr'][2], dom='EPS')}
ipaddr = (2, pdncfg['Addr'][2])
elif pdntue == 3:
if not 1 <= pdntnet <= 3:
# err cause 111: Protocol error, unspecified
return None, 111
else:
IEs['PDNAddr'] = {'Type': pdntnet, 'Addr': inet_aton_cn(*pdncfg['Addr'])}
IEs['PDNAddr'] = {'Type': pdntnet, 'Addr': inet_aton_cn(*pdncfg['Addr'], dom='EPS')}
ipaddr = pdncfg['Addr']
else:
# err cause 28: Unknown PDN type
@ -1172,6 +1172,27 @@ class UES1d(SigStack):
# to activate traces (None or dict of values to be passed to the TraceActivation IEs)
ICS_TRACE_ACT = None
#--------------------------------------------------------------------------#
# S1APTraceStart policy
#--------------------------------------------------------------------------#
# TraceActivation content:
# interfaces to trace, uint8, bitmap (S1-MME | X2 | UU | 5-bit reserved)
TRA_IF = 0b11100000
# depth: minimum, medium, maximum, ...
TRA_DEPTH = 'medium'
# traceCollectionEntityIPAddress
TRA_TLA = (1, '127.0.1.100')
# MDT configuration: None or dict
TRA_MDT_CFG = {
'mdt-Activation': 'immediate-MDT-and-Trace',
'areaScopeOfMDT': ('pLMNWide', 0),
'mDTMode': ('immediateMDT', {
'measurementsToActivate': 0b11111110, # uint8 bitmap, M1 to M7
'm1reportingTrigger': 'periodic'
})
}
TRA_MDT_CFG = None # comment this to send the MDT config in trace activation
def _log(self, logtype, msg):
self.UE._log(logtype, '[UES1d: %3i] %s' % (self.CtxId, msg))
@ -1203,12 +1224,12 @@ class UES1d(SigStack):
self.SMS = UESMSd(ued, self)
def set_ran(self, enbd):
# TODO: handle mobility from 1 eNB to another, and inter-RAT
self.SEC['KSI'] = None
self.ENB = enbd
self.connected.set()
def unset_ran(self):
self.ENB.unset_ue_s1(self.CtxId)
del self.ENB
self.SEC['KSI'] = None
self.clear()
@ -1433,18 +1454,16 @@ class UES1d(SigStack):
else:
S1apTxProc = self.EMM.process(NasRx)
elif sh in (1, 2, 3, 4) and pd == 7:
# security-protected NAS message
if self.UE.TRACE_NAS_EPS_SEC:
self._log('TRACE_NAS_EPS_UL_SEC', '\n' + NasRxSec.show())
NasRx, err = self.process_nas_sec(NasRxSec, sh)
#try:
# NasRx, err = self.process_nas_sec(NasRxSec, sh)
#except Exception as err:
# self._log('ERR', 'NAS SEC UL: unable to deprotect the NAS message %s' % err)
# return self._s1ap_nas_sec_err()
if sh in (1, 3):
# integrity-protected NAS message
NasRx, err = self.process_nas_sec_noenc(NasRxSec, sh)
else:
# integrity-protected and ciphered NAS message
NasRx, err = self.process_nas_sec_enc(NasRxSec, sh)
if err & 0xff:
# non-security related error
self._log('WNG', 'invalid EPS NAS inner message')
S1apTxProc = self.ret_s1ap_dnt(NAS.EMMStatus(val={'EMMCause':err}, sec=True))
elif not NasRx:
# deciphering failed
@ -1522,8 +1541,40 @@ class UES1d(SigStack):
secctx['UL'] += 1
return ServReq, 0
def process_nas_sec(self, NasRxSec, sh):
"""Check the security on all UL EMM messages, except the Service Request.
def process_nas_sec_mac(self, NasRxSec, secctx, inner_name):
#
sqnmsb, sqnlsb = secctx['UL'] & 0xffffff00, secctx['UL'] & 0xff
verif_mac = NasRxSec.mac_verify(secctx['Knasint'], 0, secctx['EIA'], sqnmsb)
ue_sqn = NasRxSec['Seqn'].get_val()
verif_sqn = True if ue_sqn == sqnlsb else False
if not inner_name:
inner_name = NasRxSec._name
#
if not verif_mac:
if self.SECNAS_MAC:
self._log('ERR', 'NAS SEC UL: MAC verif failed, dropping %s' % inner_name)
return False, 0x200, False, 0
else:
self._log('WNG', 'NAS SEC UL: MAC verif failed in %s' % inner_name)
return True, 0x200, False, sqnmsb+ue_sqn
elif not verif_sqn:
if self.SECNAS_UL:
self._log('ERR', 'NAS SEC UL: UL count verif failed, dropping %s' % inner_name)
return False, 0x300, False, 0
else:
self._log('WNG', 'NAS SEC UL: UL count verif failed in %s' % inner_name)
# resynch uplink count
secctx['UL'] = sqnmsb+ue_sqn+1
return True, 0x300, False, sqnmsb+ue_sqn
else:
self._log('DBG', 'NAS SEC UL: MAC verified, UL count %i' % secctx['UL'])
ulcnt = secctx['UL']
secctx['UL'] += 1
return True, 0, True, ulcnt
def process_nas_sec_noenc(self, NasRxSec, sh):
"""Check the security on all UL EMM messages which are not encrypted,
except the Service Request.
Returns the message or None (if security checks are enforced), and the
security error code.
@ -1535,99 +1586,109 @@ class UES1d(SigStack):
The returned message gets 2 attributes (_sec [bool], _ulcnt [uint])
"""
if self.SECNAS_DISABLED:
# TODO: eventually remove the sec header
NasRxSec._sec = True
NasRxSec._ulcnt = 0
return NasRxSec, 0
# decode the inner NAS message
buf = NasRxSec['NASMessage'].get_val()
NasRx, err = NAS.parse_NASLTE_MO(buf, inner=False)
if err:
self._log('WNG', 'invalid EPS NAS message: %s' % hexlify(buf).decode('ascii'))
#
if 'KSI' in NasRxSec._by_name:
ue_ksi = NasRxSec['KSI'].get_val()
if self.SECNAS_DISABLED:
if err:
return None, err
else:
NasRx._sec = True
NasRx._ulcnt = 0
return NasRx, 0
#
if 'NAS_KSI' in NasRx._by_name:
ue_ksi = NasRx['NAS_KSI'][-1].get_val()
if ue_ksi[0] == 1:
self._log('INF', 'NAS SEC UL: mapped NAS KSI %i in %s' % (ue_ksi[1], NasRxSec._name))
ue_ksi = (ue_ksi[0]<<3) + ue_ksi[1]
# TODO: map the 3G corresponding sec context to a 4G one
else:
ue_ksi = ue_ksi[1]
if ue_ksi not in self.SEC:
# UE KSI unknown
self.reset_sec_ctx()
if sh in (1, 3):
# still, decode the inner NASMessage
NasRx, err = NAS.parse_NASLTE_MO(NasRxSec['NASMessage'].get_val(), inner=False)
if not err:
self._log('WNG', 'NAS SEC UL: unknown NAS KSI %i in %s' % (ue_ksi, NasRx._name))
NasRx._sec = False
NasRx._ulcnt = 0
return NasRx, 0x100
# there is nothing we can do here
self._log('WNG', 'NAS SEC UL: unknown NAS KSI %i, dropping %s' % (ue_ksi, NasRxSec._name))
return None, 0x100
if not err:
self._log('INF', 'NAS SEC UL: unknown NAS KSI %i in %s' % (ue_ksi, NasRx._name))
NasRx._sec = False
NasRx._ulcnt = NasRxSec['Seqn'].get_val()
return NasRx, 0x100
else:
# there is nothing we can do here
self._log('INF', 'NAS SEC UL: unknown NAS KSI %i' % ue_ksi)
return None, err + 0x100
else:
self.SEC['KSI'] = ue_ksi
secctx = self.SEC[ue_ksi]
else:
if self.SEC['KSI'] not in self.SEC:
# no active KSI: happens when restarting corenet, and UE using a forgotten sec ctx
if sh in (1, 3):
# still, decode the inner NASMessage
NasRx, err = NAS.parse_NASLTE_MO(NasRxSec['NASMessage'].get_val(), inner=False)
if not err:
if self.SEC['KSI'] is not None:
self._log('WNG', 'NAS SEC UL: unset NAS KSI for processing %s' % NasRx._name)
self.reset_sec_ctx()
NasRx._sec = False
NasRx._ulcnt = 0
return NasRx, 0x100
self._log('WNG', 'NAS SEC UL: unset NAS KSI, dropping %s' % NasRxSec._name)
return None, 0x100
# no correct active KSI: happens when restarting corenet, and UE using a forgotten sec ctx
self.reset_sec_ctx()
if not err:
self._log('INF', 'NAS SEC UL: no NAS KSI in %s neither valid active KSI' % NasRx._name)
NasRx._sec = False
NasRx._ulcnt = NasRxSec['Seqn'].get_val()
return NasRx, 0x100
else:
# there is nothing we can do here
self._log('INF', 'NAS SEC UL: no NAS KSI')
return None, err + 0x100
else:
secctx = self.SEC[self.SEC['KSI']]
#
sqnmsb, sqnlsb = secctx['UL'] & 0xffffff00, secctx['UL'] & 0xff
verif_mac = NasRxSec.mac_verify(secctx['Knasint'], 0, secctx['EIA'], sqnmsb)
ue_sqn = NasRxSec['Seqn'].get_val()
verif_sqn = True if ue_sqn == sqnlsb else False
#
err = 0
if not verif_mac:
if self.SECNAS_MAC:
self._log('ERR', 'NAS SEC UL: MAC verif failed, dropping %s' % NasRxSec._name)
return None, 0x200
else:
self._log('WNG', 'NAS SEC UL: MAC verif failed in %s' % NasRxSec._name)
err = 0x200
sec = False
ulcnt = sqnmsb + ue_sqn
elif not verif_sqn:
if self.SECNAS_UL:
self._log('ERR', 'NAS SEC UL: UL count verif failed, dropping %s' % NasRxSec._name)
return None, 0x300
else:
self._log('WNG', 'NAS SEC UL: UL count verif failed in %s' % NasRxSec._name)
# resynch uplink count
secctx['UL'] = sqnmsb + ue_sqn + 1
err = 0x300
sec = False
ulcnt = sqnmsb + ue_sqn
if NasRx:
chk, err, NasRx._sec, NasRx._ulcnt = self.process_nas_sec_mac(NasRxSec, secctx, NasRx._name)
else:
self._log('DBG', 'NAS SEC UL: MAC verified, UL count %i' % secctx['UL'])
sec = True
ulcnt = secctx['UL']
secctx['UL'] += 1
#
if sh in (2, 4):
if secctx['EEA'] == 0:
NasRx, err2 = NAS.parse_NASLTE_MO(NasRxSec['NASMessage'].get_val(), inner=False)
else:
NasRxSec.decrypt(secctx['Knasenc'], 0, secctx['EEA'], sqnmsb)
NasRx, err2 = NAS.parse_NASLTE_MO(NasRxSec._dec_msg, inner=False)
if err2:
# decrypted decoded part is malformed
err += err2
chk, err, sec, ulcnt = self.process_nas_sec_mac(NasRxSec, secctx, None)
if not chk:
return None, err
else:
NasRx, err2 = NAS.parse_NASLTE_MO(NasRxSec['NASMessage'].to_bytes(), inner=False)
if err2:
# decoded part is malformed
err += err2
return NasRx, err
def process_nas_sec_enc(self, NasRxSec, sh):
"""Check the security on all UL EMM messages which are encrypted.
Returns the message or None (if security checks are enforced), and the
security error code.
Security error codes:
0: no error
0x100: no active NAS KSI
0x200: MAC verification failed
0x300: NAS UL count not matching
The returned message gets 2 attributes (_sec [bool], _ulcnt [uint])
"""
if self.SECNAS_DISABLED:
# TODO: try to decode the inner NAS message, in case EEA0 is in use ?
self._log('WNG', 'unable to decode the inner NAS message')
return None, 0
#
if self.SEC['KSI'] not in self.SEC:
# no active KSI: happens when restarting corenet, and UE using a forgotten sec ctx
self._log('WNG', 'NAS SEC UL: no active NAS KSI')
return None, 0x100
else:
secctx = self.SEC[self.SEC['KSI']]
#
chk, err, sec, ulcnt = self.process_nas_sec_mac(NasRxSec, secctx, None)
if not chk:
return None, err
#
if secctx['EEA'] == 0:
buf = NasRxSec['NASMessage'].get_val()
else:
NasRxSec.decrypt(secctx['Knasenc'], 0, secctx['EEA'], sqnmsb)
buf = NasRxSec._dec_msg
NasRx, err2 = NAS.parse_NASLTE_MO(buf, inner=False)
if err2:
# decrypted decoded part is malformed
self._log('WNG', 'invalid EPS NAS message: %s' % hexlify(buf).decode('ascii'))
NasRx._sec = sec
NasRx._ulcnt = ulcnt
return NasRx, err
return NasRx, err + err2
def output_nas_sec(self, NasTx):
"""Apply the security on all DL ESM / EMM messages.
@ -1715,7 +1776,7 @@ class UES1d(SigStack):
self.ESM.clear()
#--------------------------------------------------------------------------#
# paging and network-initiated procedures' routines
# network-initiated method (fg task, to be used from the interpreter)
#--------------------------------------------------------------------------#
def _get_paging_ies(self, dom, prio):
@ -1770,7 +1831,7 @@ class UES1d(SigStack):
return IEs
def page(self, dom=None, prio=None):
"""sends S1AP Paging command to eNB responsible for the UE TAI
"""send S1AP Paging command to eNB responsible for the UE TAI
"""
# send a S1APPaging for the EPS domain
if self.connected.is_set():
@ -1795,7 +1856,7 @@ class UES1d(SigStack):
self._log('INF', 'paging: ongoing')
def page_block(self, dom=None, prio=None):
"""Pages the UE and wait for it to connect, or the paging procedure to timeout.
"""page the UE and wait for it to connect, or the paging procedure to timeout.
Returns True if UE gets connected, False otherwise.
"""
# send a S1APPaging for the EPS domain
@ -1831,6 +1892,158 @@ class UES1d(SigStack):
self._log('WNG', 'paging: timeout, UE not connected')
return False
def release(self, cause=('nas', 'normal-release')):
"""release the S1 link with the given S1AP cause
"""
if not self.connected.is_set():
# nothing to release
self._log('DBG', 'release: UE not connected')
return True
# prepare the S1APUEContextRelease procedure
S1apProc = self.init_s1ap_proc(S1APUEContextRelease, Cause=cause)
if not S1apProc:
return False
if not self.transmit_s1ap_proc([S1apProc]):
return False
else:
return True
def send_error_ind(self, cause, **IEs):
"""start a S1APErrorIndCN with the given S1AP cause
IEs can contain any of the optional or extended IEs
"""
if not self.connected.is_set():
# S1AP link disconnected
if self.S1AP_FORCE_PAGE:
# force to connect
if not self._net_init_con():
# unable to connect with the UE
return False
else:
return False
# prepare the S1AP procedure
IEs['Cause'] = cause
S1apProc = self.init_s1ap_proc(S1APErrorIndCN, **IEs)
if not S1apProc:
return False
if not self.transmit_s1ap_proc([S1apProc]):
return False
else:
return True
def _get_trace_act_ie(self, traceref, interfaces=None, depth=None, addr=None, mdtcfg=None):
# prepare the TraceActivation IE
#
if interfaces is None:
interfaces = self.TRA_IF
#
if depth is None:
depth = self.TRA_DEPTH
elif isinstance(depth, integer_types):
depth = S1AP.S1AP_IEs.TraceDepth._cont_rev[depth]
#
if addr is None:
if self.TRA_TLA is None:
addr = b'\0'
else:
addr = inet_aton_cn(*self.TRA_TLA)
else:
addr = inet_aton_cn(*addr)
#
traceact = {
'e-UTRAN-Trace-ID': plmn_str_to_buf(self.Server.PLMN) + b'\0\0\0' + traceref,
'interfacesToTrace': (interfaces, 8),
'traceDepth': depth,
'traceCollectionEntityIPAddress': (bytes_to_uint(addr, 8*len(addr)), 8*len(addr)),
}
#
if mdtcfg is None and self.TRA_MDT_CFG is not None:
mdtcfg = self.TRA_MDT_CFG
if mdtcfg is not None:
traceact['iE-Extensions'] = [{
'id': 162,
'criticality': 'ignore',
'extensionValue': ('MDT-Configuration', mdtcfg)
}]
#
return traceact
def start_trace(self, traceref, interfaces=None, depth=None, addr=None, mdtcfg=None):
"""start a S1APTraceStart with the given traceref (2 bytes) and other parameters:
interfaces: None or uint8, bitmap (S1-MME | X2 | UU | 5-bit reserved)
depth : None or uint 0..5 or str (see TraceDepth)
addr : None or PDN-like addr (1, IPv4 ascii) or (2, IPv6 ascii)
mdtcfg: None or dict, MDT-Configuration IE value
"""
if not self.connected.is_set():
# S1AP link disconnected
if self.S1AP_FORCE_PAGE:
# force to connect
if not self._net_init_con():
# unable to connect with the UE
return False
else:
return False
# prepare the S1AP procedure
if not isinstance(traceref, bytes_types) or len(traceref) != 2:
return False
traceact = self._get_trace_act_ie(traceref, interfaces, depth, addr, mdtcfg)
S1apProc = self.init_s1ap_proc(S1APTraceStart, TraceActivation=traceact)
if not S1apProc:
return False
if not self.transmit_s1ap_proc([S1apProc]):
return False
else:
return True
def deactivate_trace(self, traceref):
"""start a S1APDeactivateTrace with the given traceref (2 bytes)
"""
if not self.connected.is_set():
# S1AP link disconnected
if self.S1AP_FORCE_PAGE:
# force to connect
if not self._net_init_con():
# unable to connect with the UE
return False
else:
return False
# prepare the S1AP procedure
if not isinstance(traceref, bytes_types) or len(traceref) != 2:
return False
traceid = plmn_buf_to_str(self.Server.PLMN) + b'\0\0\0' + traceref
S1apProc = self.init_s1ap_proc(S1APDeactivateTrace, E_UTRAN_Trace_ID=traceid)
if not S1apProc:
return False
if not self.transmit_s1ap_proc([S1apProc]):
return False
else:
return True
def report_loc_ctrl(self, reqtype={'eventType': 'direct', 'reportArea': 'ecgi'}):
"""start a S1APLocationReportingControl with a given request type
RequestType is a sequence of {EventType (enum), ReportArea (enum)}
"""
if not self.connected.is_set():
# S1AP link disconnected
if self.S1AP_FORCE_PAGE:
# force to connect
if not self._net_init_con():
# unable to connect with the UE
return False
else:
return False
# prepare the S1AP procedure
S1apProc = self.init_s1ap_proc(S1APLocationReportingControl, RequestType=reqtype)
if not S1apProc:
return False
if not self.transmit_s1ap_proc([S1apProc]):
return False
else:
return True
# this is used by send_raw() and other network-initiated procedures
def _net_init_con(self):
return self.EMM._net_init_con()
@ -1953,4 +2166,3 @@ class UES1d(SigStack):
return S1apProc
else:
return None

View File

@ -1602,6 +1602,17 @@ class S1APResetCN(S1APNonUESigProc):
'suc': ({}, {}),
'uns': None
}
send = S1APNonUESigProc._send
def recv(self, pdu):
self._recv(pdu)
try:
del self.ENB.Proc[self.Code]
except:
pass
if not self.errcause:
self._log('INF', 'success')
class S1APResetENB(S1APNonUESigProc):
@ -1637,6 +1648,43 @@ class S1APResetENB(S1APNonUESigProc):
'suc': ({}, {}),
'uns': None
}
def recv(self, pdu):
self._recv(pdu)
if not self.errcause:
if self.ENBInfo['ResetType'][0] == 's1-Interface':
# reset all resources
self._log('INF', 'complete s1 interface, cause %r' % (self.ENBInfo['Cause'], ))
for ue in self.ENB.UE.values():
ue.S1.unset_ran()
ue.S1.unset_ctx()
# prepare the reset response
self.encode_pdu('suc')
else:
# reset only listed resources
self._log('INF', 'part of s1 interface, cause %r' % (self.ENBInfo['Cause'], ))
# get the list of enb-ue-id to reset
ue_res_list, ue_ack_list = self.ENBInfo['ResetType'][1], []
for res in ue_res_list:
if res['id'] == 91 and res['Value'][0] == 'UE-associatedLogicalS1-ConnectionItem':
conitem = res['Value'][1]
if 'eNB-UE-S1AP-ID' in conitem:
uectx = conitem['eNB-UE-S1AP-ID']
elif 'mME-UE-S1AP-ID' in conitem:
uectx = conitem['mME-UE-S1AP-ID']
else:
uectx = None
if uectx is not None:
if uectx in self.ENB.UE:
ue = self.ENB.UE[enbid]
ue.S1.unset_ran()
ue.S1.unset_ctx()
ue_ack_list.append(res)
# prepare the reset response
self.encode_pdu('suc', E_associatedLogicalS1_ConnectionListResAck=ue_ack_list)
send = S1APNonUESigProc._send
class S1APErrorIndNonUECN(S1APNonUESigProc):
@ -2158,7 +2206,7 @@ class S1APUECapabilityInfoInd(S1APSigProc):
if 'UERadioCapabilityForPaging' in self.UEInfo:
self.UE.Cap['UERadioCapPaging'] = (self.UEInfo['UERadioCapabilityForPaging'],
None, None)
#------------------------------------------------------------------------------#
# Trace Procedures
@ -2195,6 +2243,8 @@ class S1APTraceStart(S1APSigProc):
'suc': None,
'uns': None
}
send = S1APSigProc._send
class S1APTraceFailureInd(S1APSigProc):
@ -2217,7 +2267,11 @@ class S1APTraceFailureInd(S1APSigProc):
# Custom decoders
Decod = {
'ini': ({}, {}),
'ini': ({
'E_UTRAN_Trace_ID': lambda x: (plmn_buf_to_str(x[:3]),
bytes_to_uint(x[3:6], 24),
bytes_to_uint(x[6:8], 16)),
}, {}),
'suc': None,
'uns': None
}
@ -2228,6 +2282,16 @@ class S1APTraceFailureInd(S1APSigProc):
'suc': None,
'uns': None
}
def recv(self, pdu):
self._recv(pdu)
if not self.errcause:
# just log the failure
self._log('INF', 'trace id %s.%.6x.%.4x, cause %r'\
% (self.UEInfo['E_UTRAN_Trace_ID'][0],
self.UEInfo['E_UTRAN_Trace_ID'][1],
self.UEInfo['E_UTRAN_Trace_ID'][2],
self.UEInfo['Cause']))
class S1APDeactivateTrace(S1APSigProc):
@ -2260,6 +2324,8 @@ class S1APDeactivateTrace(S1APSigProc):
'suc': None,
'uns': None
}
send = S1APSigProc._send
class S1APCellTrafficTrace(S1APSigProc):
@ -2284,7 +2350,13 @@ class S1APCellTrafficTrace(S1APSigProc):
# Custom decoders
Decod = {
'ini': ({}, {}),
'ini': ({
'E_UTRAN_Trace_ID': lambda x: (plmn_buf_to_str(x[:3]),
bytes_to_uint(x[3:6], 24),
bytes_to_uint(x[6:8], 16)),
'EUTRAN_CGI': lambda x: (plmn_buf_to_str(x['pLMNidentity']),
cellid_bstr_to_str(x['cell-ID'])),
}, {}),
'suc': None,
'uns': None
}
@ -2295,6 +2367,27 @@ class S1APCellTrafficTrace(S1APSigProc):
'suc': None,
'uns': None
}
def recv(self, pdu):
self._recv(pdu)
if not self.errcause:
# convert the TLA
if self.UEInfo['TransportLayerAddress'][1] == 32:
# IPv4
addr = inet_ntoa_cn(1, uint_to_bytes(self.UEInfo['TransportLayerAddress'][0], 32))
elif self.UEInfo['TransportLayerAddress'][1] == 128:
# IPv6
addr = inet_ntoa_cn(2, uint_to_bytes(self.UEInfo['TransportLayerAddress'][0], 128))
else:
addr = uint_to_bytes(*self.UEInfo['TransportLayerAddress'])
# just log the indications
self._log('INF', 'trace id %s.%.6x.%.4x, EUTRAN CGI %s.%s, address %s'\
% (self.UEInfo['E_UTRAN_Trace_ID'][0],
self.UEInfo['E_UTRAN_Trace_ID'][1],
self.UEInfo['E_UTRAN_Trace_ID'][2],
self.UEInfo['EUTRAN_CGI'][0],
self.UEInfo['EUTRAN_CGI'][1],
addr))
#------------------------------------------------------------------------------#
@ -2332,6 +2425,8 @@ class S1APLocationReportingControl(S1APSigProc):
'suc': None,
'uns': None
}
send = S1APSigProc._send
class S1APLocationReportFailure(S1APSigProc):
@ -2364,6 +2459,12 @@ class S1APLocationReportFailure(S1APSigProc):
'suc': None,
'uns': None
}
def recv(self, pdu):
self._recv(pdu)
if not self.errcause:
# just log the failure
self._log('INF', 'cause %r' % (self.UEInfo['Cause'], ))
class S1APLocationReport(S1APSigProc):
@ -2387,7 +2488,12 @@ class S1APLocationReport(S1APSigProc):
# Custom decoders
Decod = {
'ini': ({}, {}),
'ini': ({
'TAI' : lambda x: (plmn_buf_to_str(x['pLMNidentity']),
bytes_to_uint(x['tAC'], 16)),
'EUTRAN_CGI': lambda x: (plmn_buf_to_str(x['pLMNidentity']),
cellid_bstr_to_str(x['cell-ID']))
}, {}),
'suc': None,
'uns': None
}
@ -2398,7 +2504,18 @@ class S1APLocationReport(S1APSigProc):
'suc': None,
'uns': None
}
def recv(self, pdu):
self._recv(pdu)
if not self.errcause:
# just log the failure
self._log('INF', 'reqtype %s, TAI %s.%.4x, EUTRAN CGI %s.%s'\
% (self.UEInfo['RequestType'],
self.UEInfo['TAI'][0],
self.UEInfo['TAI'][1],
self.UEInfo['EUTRAN_CGI'][0],
self.UEInfo['EUTRAN_CGI'][1]
))
#------------------------------------------------------------------------------#
# Warning Message Transmission Procedures
@ -2449,6 +2566,26 @@ class S1APWriteReplaceWarning(S1APNonUESigProc):
'suc': ({}, {}),
'uns': None
}
send = S1APNonUESigProc._send
def recv(self, pdu):
self._recv(pdu)
try:
del self.ENB.Proc[self.Code]
except:
pass
if not self.errcause:
msgid, sernum = self.ENBInfo['MessageIdentifier'][0], self.ENBInfo['SerialNumber'][0]
if msgid != self._NetInfo['MessageIdentifier'][0]:
self._log('MessageIdentifier mismatch: 0x%.4x instead of 0x%.4x'\
% (msgid, self._NetInfo['MessageIdentifier'][0]))
elif 'BroadcastCompletedAreaList' not in self.ENBInfo \
or not self.ENBInfo['BroadcastCompletedAreaList']:
self._log('broadcasting failed')
else:
self.ENB.WARN[(msgid, sernum)] = self._NetInfo
self._log('INF', 'broadcasting warning message')
class S1APKill(S1APNonUESigProc):
@ -2488,6 +2625,17 @@ class S1APKill(S1APNonUESigProc):
'suc': ({}, {}),
'uns': None
}
send = S1APNonUESigProc._send
def recv(self, pdu):
self._recv(pdu)
try:
del self.ENB.Proc[self.Code]
except:
pass
if not self.errcause:
self._log('INF', 'stopped broadcasting warning message')
class S1APPWSRestartInd(S1APNonUESigProc):

View File

@ -28,10 +28,12 @@
__all__ = ['SMSd']
from queue import Queue, Empty, Full
from time import localtime
from .utils import *
# -> TS23038 and TS23040_SMS are available under the NAS module
if python_version < 3:
from Queue import Queue, Empty, Full
else:
from queue import Queue, Empty, Full
class SMSd(object):
@ -436,12 +438,18 @@ class SMSd(object):
#--------------------------------------------------------------------------#
def send_text(self, text, num):
"""sends a given text (ascii string, that will be converted to SMS 7bit)
to a given phone number
"""
tp_dcs = self.TP_DCS
self.TP_DCS = {'Group': 0, 'Charset': 0, 'Class': 0} # GSM 7bit
self.send_tpud(text, num=num)
self.TP_DCS = tp_dcs
def send_tpud(self, *ud, num):
def send_tpud(self, ud, num):
"""sends a given user-data (a buffer, or a tuple of buffers for inserting options)
to a given phone number
"""
# TODO: implement SMS UD fragmentation into several tp_msg
try:
tp_msg = NAS.SMS_DELIVER(val={'TP_MMS': 1, # no more messages
@ -449,13 +457,16 @@ class SMSd(object):
'TP_PID': self.TP_PID,
'TP_DCS': self.TP_DCS})
self._set_tp_scts(tp_msg['TP_SCTS'])
if len(ud) > 1:
# UD header IEs
tp_msg['TP_UDHI'].set_val(1)
tp_msg['TP_UD']['UDH']['UDH'].set_val(ud[:-1])
tp_msg['TP_UD']['UD'].set_val(ud[-1])
if isinstance(ud, (list, tuple)):
if len(ud) > 1:
# UD header IEs
tp_msg['TP_UDHI'].set_val(1)
tp_msg['TP_UD']['UDH']['UDH'].set_val(ud[:-1])
data = ud[-1]
else:
data = ud
tp_msg['TP_UD']['UD'].set_val(data)
except:
self._log('WNG', 'invalid TP UD')
else:
self._inject_tp(tp_msg, num)

View File

@ -418,7 +418,11 @@ def served_gummei_to_asn(val):
def mac_aton(mac='00:00:00:00:00:00'):
return unhexlify(mac.replace(':', ''))
def inet_aton_cn(*pdnaddr, dom='EPS'):
def inet_aton_cn(*pdnaddr, **kw):
"""convert a PDN / PDP address tuple to a buffer
kw can be:
- dom: 'PS' or 'EPS'
"""
if pdnaddr[0] == 0:
# PPP address
return pdnaddr[1]
@ -443,7 +447,7 @@ def inet_aton_cn(*pdnaddr, dom='EPS'):
return ipaddr
elif pdnaddr[0] == 3:
# IPv4v6 addresses
if dom == 'EPS':
if 'dom' in kw and kw['dom'] == 'EPS':
# PDN address
try:
return inet_aton_cn(2, pdnaddr[2]) + inet_aton_cn(1, pdnaddr[1])
@ -462,6 +466,8 @@ def inet_aton_cn(*pdnaddr, dom='EPS'):
return pdnaddr[1]
def inet_ntoa_cn(pdntype, buf, dom='EPS'):
"""convert a buffer for a given pdntype and domain to a humane-readable address
"""
if pdntype == 0:
# PPP address
return (pdntype, buf)