mobile: fix NAS 5G payload container handling

This commit is contained in:
mich 2019-12-18 16:35:12 +01:00
parent 7e76e8f7c6
commit 893ebd2df0
3 changed files with 145 additions and 109 deletions

View File

@ -121,48 +121,69 @@ def parse_NAS5G(buf, inner=True, sec_hdr=True):
nasc.replace(nasc[-1], cont)
#
if typ in (65, 79, 103, 104):
payc = Msg['PayloadContainer']
if not payc.get_trans() and payc._IE is not None:
payct, payc = Msg['PayloadContainerType'], Msg['PayloadContainer']
if not payct.get_trans() and not payc.get_trans():
# Payload container present in Msg
for entry in payc._IE['Entries']:
etype = entry['Type'].get_val()
econt = entry['Cont'].get_val()
if etype == 1 and econt:
# 5GSM
cont, err = parse_NAS5G(nasc[-1].get_val(), inner=inner)
if err == 0:
entry.replace(entry['Cont'], cont)
elif etype == 2 and len(econt) >= 2:
# SMS PP
epd, etyp = unpack('>BB', econt[:2])
epd &= 0xF
if epd == 9 and etyp in (1, 4, 16):
cont = PPSMSCPTypeClasses[etyp]()
try:
cont.from_bytes(econt)
except Exception:
pass
else:
entry.replace(entry['Cont'], cont)
elif etype == 5 and len(econt) >= 2:
# UE Policy
_, etyp = unpack('>BB', econt[:2])
if 1 <= etyp <= 4:
cont = FGUEPOLTypeClasses[etyp]
try:
cont.from_bytes(econt)
except Exception:
pass
else:
entry.replace(entry['Cont'], cont)
#
# TODO: decode mode embedded protocols
# 5G NAS payload container entries:
# 3 : 'LTE Positioning Protocol message container',
# 4 : 'SOR transparent container',
# 6 : 'UE parameters update transparent container',
# 7 : 'Location services message container',
# 8 : 'CIoT user data container',
conttype, contbuf = payct['V'].get_val(), payc['V'].get_val()
cont = parse_NAS5GPayCont(conttype, contbuf)
if cont:
payc.replace(payc['V'], cont)
#
return Msg, 0
def parse_NAS5GPayCont(conttype, buf):
if conttype == 1 and len(buf) >= 2:
# 5GSM
cont, err = parse_NAS5G(buf, inner=True)
if err == 0:
return cont
elif conttype == 2 and len(buf) >= 2:
# SMS PP
pd, typ = unpack('>BB', buf)
pd &= 0xF
if pd == 9 and typ in (1, 4, 16):
cont = PPSMSCPTypeClasses[typ]()
try:
cont.from_bytes(buf)
except Exception:
pass
else:
return cont
elif conttype == 3:
# LPP, TODO
pass
elif conttype == 4 and len(buf) >= 17:
# SOR
cont = SORTransparentContainer()
try:
cont.from_bytes(buf)
except Exception:
pass
else:
return cont
elif conttype == 6:
# UE params update, TODO
pass
elif conttype == 7:
# Loc services, TODO
pass
elif conttype == 8:
# CIoT, TODO
pass
elif conttype == 15 and len(buf) >= 1:
# multi
cont = PayloadContainerMult()
try:
cont.from_bytes(buf)
except Exception:
pass
else:
# parse each entry
for entry in cont['Entries']:
econttype, ebuf = cont['Type'].get_val(), cont['Cont'].get_val()
econt = parse_NAS5GPayCont(econttype, ebuf)
if econt:
entry.replace(entr['Cont'], econt)
return None

View File

@ -247,7 +247,7 @@ class FGMMRegistrationRequest(Layer3):
Type6TLVE('EPSNASContainer', val={'T':0x70, 'V':b'\x07\0'}),
Type6TLVE('LADNInd', val={'T':0x74, 'V':b''}, IE=LADNInd()),
Type1TV('PayloadContainerType', val={'T':0x8, 'V':1}, dic=PayloadContainerType_dict),
Type6TLVE('PayloadContainer', val={'T':0x7B, 'V':b'\0'}, IE=PayloadContainer()),
Type6TLVE('PayloadContainer', val={'T':0x7B, 'V':b'\0'}),
Type1TV('NetSlicingInd', val={'T':0x9, 'V':0}, IE=NetSlicingInd()),
Type4TLV('5GSUpdateType', val={'T':0x43, 'V':b'\0'}, IE=FGSUpdateType()),
Type4TLV('MSCm2', val={'T':0x41, 'V':b'@\0\0'}, IE=MSCm2()),
@ -346,7 +346,7 @@ class FGMMULNASTransport(Layer3):
FGMMHeader(val={'Type':103}),
Uint('spare', bl=4, rep=REPR_HEX),
Type1V('PayloadContainerType', val={'V':1}, dic=PayloadContainerType_dict),
Type6LVE('PayloadContainer', val={'V':b'\0'}, IE=PayloadContainer()),
Type6LVE('PayloadContainer', val={'V':b'\0'}),
Type3TV('PDUSessID', val={'T':0x12, 'V':b'\0'}, bl={'V':8}, IE=PDUSessID()),
Type3TV('OldPDUSessID', val={'T':0x59, 'V':b'\0'}, bl={'V':8}, IE=PDUSessID()),
Type1TV('RequestType', val={'T':0x8, 'V':0}, IE=RequestType()),
@ -368,7 +368,7 @@ class FGMMDLNASTransport(Layer3):
FGMMHeader(val={'Type':104}),
Uint('spare', bl=4, rep=REPR_HEX),
Type1V('PayloadContainerType', val={'V':1}, dic=PayloadContainerType_dict),
Type6LVE('PayloadContainer', val={'V':b'\0'}, IE=PayloadContainer()),
Type6LVE('PayloadContainer', val={'V':b'\0'}),
Type3TV('PDUSessID', val={'T':0x12, 'V':b'\0'}, bl={'V':8}, IE=PDUSessID()),
Type4TLV('AddInfo', val={'T':0x24, 'V':b'\0'}),
Type3TV('5GMMCause', val={'T':0x58, 'V':b'\x16'}, bl={'V':8}, IE=FGMMCause()),
@ -810,7 +810,7 @@ class FGMMControlPlaneServiceRequest(Layer3):
Type1V('CtrlPlaneServiceType', val={'V':0}, IE=CtrlPlaneServiceType()),
#Type4TLV('CIoTSmallDataContainer', val={'T':0x0, 'V':b'\x20\0'}, IE=CIoTSmallDataContainer()), # WNG: tag is undefined in current TS
Type1TV('PayloadContainerType', val={'T':0x8, 'V':1}, dic=PayloadContainerType_dict),
Type6TLVE('PayloadContainer', val={'T':0x7B, 'V':b'\0'}, IE=PayloadContainer()),
Type6TLVE('PayloadContainer', val={'T':0x7B, 'V':b'\0'}),
Type3TV('PDUSessID', val={'T':0x12, 'V':b'\0'}, bl={'V':8}, IE=PDUSessID()),
Type4TLV('PDUSessStat', val={'T':0x50, 'V':b'\0\0'}, IE=PDUSessStat()),
Type1TV('ReleaseAssistInd', val={'T':0xF, 'V':0}, IE=ReleaseAssistInd()),

View File

@ -90,66 +90,78 @@ class DNN(Envelope):
# TS 24.501, 9.11.2.8
#------------------------------------------------------------------------------#
class _SNSSAI_SST_MappedHSST(Envelope):
_GEN = (
Uint8('SST'),
Uint8('MappedHPLMNSST')
)
class _SNSSAI_SST_SD(Envelope):
_GEN = (
Uint8('SST'),
Uint24('SD')
)
class _SNSSAI_SST_SD_MappedHSST(Envelope):
_GEN = (
Uint8('SST'),
Uint24('SD'),
Uint8('MappedHPLMNSST')
)
class _SNSSAI_SST_SD_MappedHSSTSD(Envelope):
_GEN = (
Uint8('SST'),
Uint24('SD'),
Uint8('MappedHPLMNSST'),
Uint24('MappedHPLMNSD')
)
class _SNSSAI_SST_SD_MappedHSSTSD_spare(Envelope):
_GEN = (
Uint8('SST'),
Uint24('SD'),
Uint8('MappedHPLMNSST'),
Uint24('MappedHPLMNSD'),
Buf('spare', rep=REPR_HEX)
)
class SNSSAI(Envelope):
ENV_SEL_TRANS = False
_GEN = (
Uint8('SST'),
Uint24('SD', trans=True),
Uint8('MappedHPLMNSST', trans=True),
Uint24('MappedHPLMNSD', trans=True)
)
def set_val(self, val):
if isinstance(val, (tuple, list)):
if len(val) == 1:
self[1].set_trans(True)
self[2].set_trans(True)
self[3].set_trans(True)
elif len(val) == 2:
self[1].set_trans(False)
self[2].set_trans(True)
self[3].set_trans(True)
elif len(val) == 3:
self[1].set_trans(False)
self[2].set_trans(False)
self[3].set_trans(True)
elif len(val) > 3:
self[1].set_trans(False)
self[2].set_trans(False)
self[3].set_trans(False)
elif isinstance(val, dict):
if 'SD' in val:
self[1].set_trans(False)
if 'MappedHPLMNSST' in val:
self[2].set_trans(False)
if 'MappedHPLMNSD' in val:
self[3].set_trans(False)
Envelope.set_val(self, val)
def _from_char(self, char):
l = char.len_bit()
if l == 8:
self[1].set_trans(True)
self[2].set_trans(True)
self[3].set_trans(True)
elif l == 32:
self[1].set_trans(False)
self[2].set_trans(True)
self[3].set_trans(True)
elif l == 40:
self[1].set_trans(False)
self[2].set_trans(False)
self[3].set_trans(True)
elif l >= 64:
self[1].set_trans(False)
self[2].set_trans(False)
self[3].set_trans(False)
Envelope._from_char(self, char)
class L_SNSSAI(Envelope):
_name = 'SNSSAI'
_GEN = (
Uint8('Len'),
Alt('Value', GEN={
1: Uint8('SST'),
2: _SNSSAI_SST_MappedHSST('SST_MappedHPLMNSST'),
4: _SNSSAI_SST_SD('SST_SD'),
5: _SNSSAI_SST_SD_MappedHSST('SST_SD_MappedHPLMNSST'),
8: _SNSSAI_SST_SD_MappedHSSTSD('SST_SD_MappedHPLMNSSTSD')},
DEFAULT=_SNSSAI_SST_SD_MappedHSSTSD_spare('SST_SD_MappedHPLMNSSTSD_spare'),
sel=lambda self: self.get_env()['Len'].get_val()
)
SNSSAI()
)
def __init__(self, *args, **kwargs):
Envelope.__init__(self, *args, **kwargs)
# do not automate length, otherwise, it breaks reautomate() due to Value
# using the provided Len value to select the appropriate content
#self[0].set_valauto(lambda: self[1].get_len())
self[0].set_valauto(lambda: self[1].get_len())
self[1].set_blauto(lambda: self[0].get_val()<<3)
#------------------------------------------------------------------------------#
@ -955,7 +967,7 @@ class NetSlicingInd(Envelope):
#------------------------------------------------------------------------------#
class NSSAI(Sequence):
_GEN = SNSSAI()
_GEN = L_SNSSAI()
#------------------------------------------------------------------------------#
@ -1022,7 +1034,7 @@ class _CritOSAppIds(Envelope):
class _CritSNSSAI(Envelope):
_GEN = (
Uint8('Cnt'),
Sequence('SNSSAIs', GEN=SNSSAI())
Sequence('SNSSAIs', GEN=L_SNSSAI())
)
def __init__(self, *args, **kwargs):
@ -1148,7 +1160,7 @@ class _PayContOpt(Envelope):
37 : GPRSTimer3('BackOffTimer'),
59 : PDUSessID('OldPDUSessID'),
80 : Uint8('RequestType', dic=_RequestType_dict),
22 : SNSSAI()['Value'],
22 : SNSSAI(),
25 : Buf('DNN')
},
DEFAULT=Buf('Val'),
@ -1179,7 +1191,7 @@ class _PayContEntry(Envelope):
self[4].set_blauto(lambda: (self[0].get_val()-1-self[3].get_len())<<3)
class PayloadContainer(Envelope):
class PayloadContainerMult(Envelope):
_GEN = (
Uint8('Num'),
Sequence('Entries', GEN=_PayContEntry('Entry'))
@ -1263,20 +1275,23 @@ class PDUSessStat(Envelope):
# TS 24.501, 9.11.3.46
#------------------------------------------------------------------------------#
_RejectedSNSSAICause_dict = {
0 : 'S-NSSAI not available in the current PLMN or SNPN',
1 : 'S-NSSAI not available in the current registration area',
2 : 'Network slice specific authentication and authorization pending for the S-NSSAI'
}
class RejectedSNSSAI(Envelope):
_GEN = (
Uint8('Len'),
Alt('Value', GEN={
1: Uint8('SST'),
4: Envelope('SST_SD', GEN=(Uint8('SST'), Uint24('SD')))},
sel=lambda self: self.get_env()['Len'].get_val()
)
Uint('Len', bl=4),
Uint('Cause', bl=4, dic=_RejectedSNSSAICause_dict),
SNSSAI()
)
def __init__(self, *args, **kwargs):
Envelope.__init__(self, *args, **kwargs)
self[0].set_valauto(lambda: self[1].get_len())
self[1].set_blauto(lambda: self[0].get_val()<<3)
self[0].set_valauto(lambda: self[2].get_len())
self[2].set_blauto(lambda: self[0].get_val()<<3)
class RejectedNSSAI(Sequence):