mobile: fix last GTPv1-C issues and introduce some basic tests, to be enhanced
This commit is contained in:
parent
b0cd1c45b1
commit
ca7e9b4895
|
@ -845,7 +845,7 @@ class UpdatePDPCtxtRespGGSN(GTPMsg):
|
|||
GTPIETV('ChargingID', val={'Type': GTPIEType.ChargingID.value}, bl={'Data': 32}, trans=True),
|
||||
GTPIETLV('PCO', val={'Type': GTPIEType.PCO.value}, trans=True),
|
||||
GTPIETLV('GGSNAddrForControlPlane', val={'Type': GTPIEType.GSNAddr.value}, trans=True),
|
||||
GTPIETLV('GGSNAddressForUserTraffic', val={'Type': GTPIEType.GSNAddr.value}, trans=True),
|
||||
GTPIETLV('GGSNAddrForUserTraffic', val={'Type': GTPIEType.GSNAddr.value}, trans=True),
|
||||
GTPIETLV('AltGGSNAddrForControlPlane', val={'Type': GTPIEType.GSNAddr.value}, trans=True),
|
||||
GTPIETLV('AltGGSNAddrForUserTraffic', val={'Type': GTPIEType.GSNAddr.value}, trans=True),
|
||||
GTPIETLV('QoSProfile', val={'Type': GTPIEType.QoSProfile.value}, trans=True),
|
||||
|
@ -1865,7 +1865,7 @@ GTPDispatcherSGSN = {
|
|||
}
|
||||
|
||||
|
||||
GTPCDispatcherGGSN = {
|
||||
GTPDispatcherGGSN = {
|
||||
1 : EchoReq,
|
||||
2 : EchoResp,
|
||||
3 : VersionNotSupported,
|
||||
|
@ -1936,7 +1936,7 @@ ERR_GTP_TYPE_NONEXIST = 3
|
|||
ERR_GTP_MAND_IE_MISS = 4
|
||||
|
||||
|
||||
def parse_GTPC_SGSN(buf):
|
||||
def parse_GTP_SGSN(buf):
|
||||
"""parses the buffer `buf' for GTPv1-C message as received by a SGSN
|
||||
and returns a 2-tuple:
|
||||
- GTPv1-C message structure, or None if parsing failed
|
||||
|
@ -1955,13 +1955,13 @@ def parse_GTPC_SGSN(buf):
|
|||
try:
|
||||
Msg.from_bytes(buf)
|
||||
except GTPDecErr:
|
||||
GTPCIEs.VERIF_MAND = False
|
||||
GTPIEs.VERIF_MAND = False
|
||||
Msg = Msg.__class__()
|
||||
try:
|
||||
Msg.from_bytes(buf)
|
||||
GTPCIEs.VERIF_MAND = True
|
||||
GTPIEs.VERIF_MAND = True
|
||||
except Exception:
|
||||
GTPCIEs.VERIF_MAND = True
|
||||
GTPIEs.VERIF_MAND = True
|
||||
return None, ERR_GTP_BUF_INVALID
|
||||
else:
|
||||
return Msg, ERR_GTP_MAND_IE_MISS
|
||||
|
@ -1971,7 +1971,7 @@ def parse_GTPC_SGSN(buf):
|
|||
return Msg, 0
|
||||
|
||||
|
||||
def parse_GTPC_GGSN(buf):
|
||||
def parse_GTP_GGSN(buf):
|
||||
"""parses the buffer `buf' for GTPv1-C message as received by a GGSN
|
||||
and returns a 2-tuple:
|
||||
- GTPv1-C message structure, or None if parsing failed
|
||||
|
@ -1990,13 +1990,13 @@ def parse_GTPC_GGSN(buf):
|
|||
try:
|
||||
Msg.from_bytes(buf)
|
||||
except GTPDecErr:
|
||||
GTPCIEs.VERIF_MAND = False
|
||||
GTPIEs.VERIF_MAND = False
|
||||
Msg = Msg.__class__()
|
||||
try:
|
||||
Msg.from_bytes(buf)
|
||||
GTPCIEs.VERIF_MAND = True
|
||||
GTPIEs.VERIF_MAND = True
|
||||
except Exception:
|
||||
GTPCIEs.VERIF_MAND = True
|
||||
GTPIEs.VERIF_MAND = True
|
||||
return None, ERR_GTP_BUF_INVALID
|
||||
else:
|
||||
return Msg, ERR_GTP_MAND_IE_MISS
|
||||
|
|
|
@ -38,6 +38,7 @@ from pycrate_mobile.NAS import *
|
|||
from pycrate_mobile.SIGTRAN import *
|
||||
from pycrate_mobile.SCCP import *
|
||||
from pycrate_mobile.ISUP import *
|
||||
from pycrate_mobile.TS29060_GTP import *
|
||||
from pycrate_mobile.TS29281_GTPU import *
|
||||
from pycrate_mobile.TS29274_GTPC import *
|
||||
from pycrate_mobile.TS29244_PFCP import *
|
||||
|
@ -181,6 +182,17 @@ isup_pdu = tuple(map(unhexlify, (
|
|||
'bf081000', # ISUP Release Complete
|
||||
)))
|
||||
|
||||
# GTPv1-C messages
|
||||
gtp_pdu = tuple(map(unhexlify, (
|
||||
'3213003527c9b42e6a2400000180100102030411010203047f11223344850004750102038500047501020487000f020a921f7396ccfe9601ffff003600', # UpdatePDPCtxtRespSGSN
|
||||
#'32120032be29401157c400000e05100908070611191817161405850004900102038500049001021387000f020a921f7396ccfe9601ffff003600', # UpdatePDPCtxtReqSGSN
|
||||
'3202000600000000f36e00000e20', # EchoResp
|
||||
'320100040000000000020000', # EchoReq
|
||||
'321500063aca3f774ee000000180', # DeletePDPCtxtResp
|
||||
'321400089fcf40346d80000013ff1405', # DeletePDPCtxtReq
|
||||
'3211005f1cc18cd10ffa0000018008fe109fd7a035119fd7a0357f616263648000046566676884001a80802110030000108106404142430202830640414244000501018500042a2b2c2d8500042a2b2c3d87000f020a921f7396ccfe9601ffff003600b8000100', # CreatePDPCtxtResp
|
||||
)))
|
||||
|
||||
# GTPv1-U messages
|
||||
gtpu_pdu = tuple(map(unhexlify, (
|
||||
'30ff003c04cec0bb4500003c22cb000080019bad0aa002ff481e268c0800995a0300b1016162636465666768696a6b6c6d6e6f7071727374757677616263646566676869', # GPDU (wireshark bug tracker)
|
||||
|
@ -346,6 +358,22 @@ def test_isup(isup_pdu=isup_pdu):
|
|||
assert( m.get_val() == v )
|
||||
|
||||
|
||||
def test_gtp(gtp_pdu=gtp_pdu):
|
||||
for pdu in gtp_pdu:
|
||||
m, e = parse_GTP_SGSN(pdu)
|
||||
assert( e == 0 )
|
||||
v = m.get_val()
|
||||
m.reautomate()
|
||||
assert( m.get_val() == v )
|
||||
m.set_val(v)
|
||||
assert( m.to_bytes() == pdu )
|
||||
#
|
||||
if _with_json:
|
||||
t = m.to_json()
|
||||
m.from_json(t)
|
||||
assert( m.get_val() == v )
|
||||
|
||||
|
||||
def test_gtpu(gtpu_pdu=gtpu_pdu):
|
||||
for pdu in gtpu_pdu:
|
||||
m, e = parse_GTPU(pdu)
|
||||
|
@ -417,11 +445,11 @@ def test_pfcp(pfcp_pdu=pfcp_pdu):
|
|||
def test_perf_mobile():
|
||||
|
||||
print('[+] NAS MO decoding and re-encoding')
|
||||
Ta = timeit(test_nas_mo, number=14)
|
||||
Ta = timeit(test_nas_mo, number=15)
|
||||
print('test_nas_mo: {0:.4f}'.format(Ta))
|
||||
|
||||
print('[+] NAS MT decoding and re-encoding')
|
||||
Tb = timeit(test_nas_mt, number=24)
|
||||
Tb = timeit(test_nas_mt, number=25)
|
||||
print('test_nas_mt: {0:.4f}'.format(Tb))
|
||||
|
||||
print('[+] NAS 5G decoding and re-encoding')
|
||||
|
@ -429,19 +457,23 @@ def test_perf_mobile():
|
|||
print('test_nas_5g: {0:.4f}'.format(Tc))
|
||||
|
||||
print('[+] SIGTRAN decoding and re-encoding')
|
||||
Td = timeit(test_sigtran, number=350)
|
||||
Td = timeit(test_sigtran, number=500)
|
||||
print('test_sigtran: {0:.4f}'.format(Td))
|
||||
|
||||
print('[+] SCCP decoding and re-encoding')
|
||||
Te = timeit(test_sccp, number=130)
|
||||
Te = timeit(test_sccp, number=150)
|
||||
print('test_sccp: {0:.4f}'.format(Te))
|
||||
|
||||
print('[+] ISUP decoding and re-encoding')
|
||||
Tj = timeit(test_isup, number=60)
|
||||
print('test_isup: {0:.4f}'.format(Tj))
|
||||
|
||||
print('[+] GTPv1-U decoding and re-encoding')
|
||||
Tf = timeit(test_gtpu, number=600)
|
||||
print('[+] GTPv1-C decoding and re-encoding')
|
||||
Tk = timeit(test_gtp, number=200)
|
||||
print('test_gtp: {0:.4f}'.format(Tk))
|
||||
|
||||
print('[+] GTP-U decoding and re-encoding')
|
||||
Tf = timeit(test_gtpu, number=300)
|
||||
print('test_gtpu: {0:.4f}'.format(Tf))
|
||||
|
||||
print('[+] GTPv2-C decoding and re-encoding')
|
||||
|
@ -449,14 +481,14 @@ def test_perf_mobile():
|
|||
print('test_gtpc: {0:.4f}'.format(Tg))
|
||||
|
||||
print('[+] Diameter decoding and re-encoding')
|
||||
Th = timeit(test_diameter, number=5)
|
||||
Th = timeit(test_diameter, number=8)
|
||||
print('test_diameter: {0:.4f}'.format(Th))
|
||||
|
||||
print('[+] PFCP decoding and re-encoding')
|
||||
Ti = timeit(test_pfcp, number=60)
|
||||
Ti = timeit(test_pfcp, number=50)
|
||||
print('test_pfcp: {0:.4f}'.format(Ti))
|
||||
|
||||
print('[+] test_mobile total time: {0:.4f}'.format(Ta+Tb+Tc+Td+Te+Tf+Tg+Th+Ti+Tj))
|
||||
print('[+] test_mobile total time: {0:.4f}'.format(Ta+Tb+Tc+Td+Te+Tf+Tg+Th+Ti+Tj+Tk))
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
|
|
Loading…
Reference in New Issue