GTP-U: complete rework of the Extension Headers

This commit is contained in:
p1-bmu 2021-10-20 12:29:04 +02:00
parent 5d42082fba
commit 11c791572c
1 changed files with 148 additions and 214 deletions

View File

@ -1,7 +1,7 @@
# -*- coding: UTF-8 -*-
#/**
# * Software Name : pycrate
# * Version : 0.4
# * Version : 0.5
# *
# * Copyright 2019. Benoit Michau. P1Sec.
# *
@ -27,25 +27,15 @@
# *--------------------------------------------------------
#*/
'''
__all__ = [
# GTPU extension headers
'GTPUHdrExtAuto',
'GTPUHdrExt3',
'GTPUHdrExt32',
'GTPUHdrExt64',
'GTPUHdrExt129',
'GTPUHdrExt131',
'GTPUHdrExt132',
'GTPUHdrExt133',
'GTPUHdrExt192',
# GTPU Messages
'GTPUMsg',
'GTPUEchoRequest',
'GTPUEchoResponse',
'GTPUErrorInd',
'GTPUSuppExtHdrNotif',
'GTPUEndMarker',
'GTPUTunnelStatus',
'GPDU',
# GTPU Message parser and associated errors
'parse_GTPU',
@ -53,7 +43,7 @@ __all__ = [
'ERR_GTPU_BUF_INVALID',
'ERR_GTPU_TYPE_NONEXIST'
]
'''
#------------------------------------------------------------------------------#
# 3GPP TS 29.281: General Packet Radio System (GPRS) Tunnelling Protocol
@ -74,12 +64,12 @@ from pycrate_core.base import *
#------------------------------------------------------------------------------#
GTPUNextExtHeader_dict = {
0 : 'No more extension headers',
1 : 'Reserved - Control Plane only',
2 : 'Reserved - Control Plane only',
3 : 'Long PDCP PDU Number',
32 : 'Service Class Indicator',
64 : 'UDP source port of the triggering message',
0 : 'No more extension headers',
1 : 'Reserved - Control Plane only',
2 : 'Reserved - Control Plane only',
3 : 'Long PDCP PDU Number',
32 : 'Service Class Indicator',
64 : 'UDP source port of the triggering message',
129 : 'RAN Container',
130 : 'Long PDCP PDU Number',
131 : 'Xw RAN Container',
@ -90,28 +80,10 @@ GTPUNextExtHeader_dict = {
194 : 'Reserved - Control Plane only'
}
# non-automated base class
class GTPUHdrExt(Envelope):
_ID = 0xff
_GEN = (
Uint8('Len'),
Buf('Content'),
Uint8('NextExt', dic=GTPUNextExtHeader_dict)
)
def __init__(self, *args, **kwargs):
Envelope.__init__(self, *args, **kwargs)
self[-1].set_valauto(lambda: self._get_next_hdr())
def _get_next_hdr(self):
n = self.get_next()
if n and hasattr(n, '_ID'):
return n._ID
else:
return 0
# buffer that makes the Ext Header 32-bit-aligned
class BufAligned(Buf):
PAD = b'\0'
def set_val(self, val):
@ -122,135 +94,107 @@ class BufAligned(Buf):
Buf.set_val(self, val)
# automated base class
class GTPUHdrExtAuto(GTPUHdrExt):
_ID = 0xff
# prototype for the content of a generic Ext Header
class _GTPUHdrExtCont(Envelope):
_GEN = (
BufAligned('Value', val=b'\0\0', rep=REPR_HEX),
)
_ID = 1
def __init__(self, *args, **kwargs):
if 'ID' in kwargs:
self._ID = kwargs['ID']
del kwargs['ID']
Envelope.__init__(self, *args, **kwargs)
def clone(self):
c = Envelope.clone(self)
c._ID = self._ID
return c
class _LongPDCPPDUNumber(_GTPUHdrExtCont):
_GEN = (
Uint('spare', bl=6, rep=REPR_HEX),
Uint('Value', bl=18),
Uint24('spare', rep=REPR_HEX),
)
# All defined Ext Header
GTPUHdrExtCont_dict = {
3 : _LongPDCPPDUNumber('LongPDCPPDUNumber',
ID=3),
32 : _GTPUHdrExtCont('ServiceClassInd', GEN=(
Uint8('Value'),
Uint8('spare', rep=REPR_HEX),
), ID=32),
64 : _GTPUHdrExtCont('UDPPort', GEN=(
Uint16('Value'),
), ID=64),
129 : _GTPUHdrExtCont('RANContainer',
ID=129),
130 : _LongPDCPPDUNumber('LongPDCPPDUNumber',
ID=130),
131 : _GTPUHdrExtCont('XwRANContainer',
ID=131),
132 : _GTPUHdrExtCont('NRRANContainer',
D=132),
133 : _GTPUHdrExtCont('PDUSessionContainer',
ID=133),
192 : _GTPUHdrExtCont('PDCPPDUNumber', GEN=(
Uint('Value', bl=15),
Uint('spare', bl=1, rep=REPR_HEX),
), ID=192)
}
class GTPUHdrExt(Envelope):
_GEN = (
Uint8('Len'),
BufAligned('Content', rep=REPR_HEX),
_GTPUHdrExtCont('Content', rep=REPR_HEX),
Uint8('NextExt', dic=GTPUNextExtHeader_dict)
)
def __init__(self, *args, **kwargs):
GTPUHdrExt.__init__(self, *args, **kwargs)
Envelope.__init__(self, *args, **kwargs)
self[0].set_valauto(lambda: (2 + self[1].get_len()) >> 2)
self[1].set_blauto(lambda: max(0, (self[0].get_val()*32) - 16))
#------------------------------------------------------------------------------#
# UDP Port
# TS 29.281, section 5.2.2.1
#------------------------------------------------------------------------------#
class GTPUHdrExt64(GTPUHdrExt):
_ID = 64
_GEN = (
Uint8('Len', val=1),
Uint16('UDPPort'),
Uint8('NextExt')
)
#------------------------------------------------------------------------------#
# PDCP PDU Number
# TS 29.281, section 5.2.2.2
#------------------------------------------------------------------------------#
class GTPUHdrExt192(GTPUHdrExt):
_ID = 192
_GEN = (
Uint8('Len', val=1),
Uint('PDCPPDUNumber', bl=15),
Uint('spare', bl=1, rep=REPR_HEX),
Uint8('NextExt')
)
#------------------------------------------------------------------------------#
# Long PDCP PDU Number
# TS 29.281, section 5.2.2.2A
#------------------------------------------------------------------------------#
class GTPUHdrExt3(GTPUHdrExt):
_ID = 3
_GEN = (
Uint8('Len', val=2),
Uint('spare', bl=6, rep=REPR_HEX),
Uint('LongPDCPPDUNumber', bl=18),
Uint24('spare', rep=REPR_HEX),
Uint8('NextExt')
)
#------------------------------------------------------------------------------#
# Service Class Indicator
# TS 29.281, section 5.2.2.3
#------------------------------------------------------------------------------#
class GTPUHdrExt32(GTPUHdrExt):
_ID = 32
_GEN = (
Uint8('Len', val=1),
Uint8('ServiceClassInd'),
Uint8('spare', rep=REPR_HEX),
Uint8('NextExt')
)
#------------------------------------------------------------------------------#
# RAN Container
# TS 29.281, section 5.2.2.4
#------------------------------------------------------------------------------#
class GTPUHdrExt129(GTPUHdrExtAuto):
_ID = 129
_GEN = (
Uint8('Len'),
BufAligned('RANContainer', rep=REPR_HEX),
Uint8('NextExt')
)
#------------------------------------------------------------------------------#
# Xw RAN Container
# TS 29.281, section 5.2.2.5
#------------------------------------------------------------------------------#
class GTPUHdrExt131(GTPUHdrExtAuto):
_ID = 131
_GEN = (
Uint8('Len'),
BufAligned('XwRANContainer', rep=REPR_HEX),
Uint8('NextExt')
)
#------------------------------------------------------------------------------#
# NR RAN Container
# TS 29.281, section 5.2.2.6
#------------------------------------------------------------------------------#
class GTPUHdrExt132(GTPUHdrExtAuto):
_ID = 132
_GEN = (
Uint8('Len'),
BufAligned('NRRANContainer', rep=REPR_HEX),
Uint8('NextExt')
)
#------------------------------------------------------------------------------#
# PDU Session Container
# TS 29.281, section 5.2.2.7
#------------------------------------------------------------------------------#
class GTPUHdrExt133(GTPUHdrExtAuto):
_ID = 133
_GEN = (
Uint8('Len'),
BufAligned('PDUSessionContainer', rep=REPR_HEX),
Uint8('NextExt')
)
self[1].set_blauto(lambda: self._get_cont_len())
self[2].set_valauto(lambda: self._get_ne())
def _get_cont_len(self):
return max(0, (self[0].get_val()*32) - 16)
def _get_ne(self):
n = self.get_next()
if n:
return n[1]._ID
else:
return 0
def set_val(self, val):
self._set_cont_cls()
Envelope.set_val(self, val)
def _from_char(self, char):
self._set_cont_cls()
Envelope._from_char(self, char)
def _set_cont_cls(self):
ne = 1
if self._env:
p = self.get_prev()
if p:
# get NextExt from previous GTPUHdrExt
ne = p['NextExt'].get_val()
elif self._env._env:
# get NextExt from GTPUHdrOpt
ne = self._env._env['GTPUHdrOpt']['NextExt'].get_val()
if ne in GTPUHdrExtCont_dict:
Cont = GTPUHdrExtCont_dict[ne].clone()
Cont.set_blauto(lambda: self._get_cont_len())
self.replace(self[1], Cont)
#------------------------------------------------------------------------------#
@ -258,35 +202,24 @@ class GTPUHdrExt133(GTPUHdrExtAuto):
# TS 29.281, section 5.1
#------------------------------------------------------------------------------#
# Warning: this GTPUHdrExtList is not friendly to build, as the user requires to
# explicitely add GTPUHdrExt.. into it when building a message
class GTPUHdrExtList(Envelope):
_GEN = ()
_HdrExt = {
3 : GTPUHdrExt3,
32 : GTPUHdrExt32,
64 : GTPUHdrExt64,
129 : GTPUHdrExt129,
130 : GTPUHdrExt3,
131 : GTPUHdrExt131,
132 : GTPUHdrExt132,
133 : GTPUHdrExt133,
192 : GTPUHdrExt192
}
class GTPUHdrExtList(Sequence):
_GEN = GTPUHdrExt()
def _from_char(self, char):
if self.get_trans():
return
self.__init__()
self.set_val(None)
l = 0
p = self.get_prev()
if p and not p.get_trans():
neid = p['NextExt'].get_val()
while neid > 0 and char.len_bit() >= 32:
ne = self._HdrExt.get(neid, GTPUHdrExtAuto)()
ne._from_char(char)
self.append(ne)
neid = ne['NextExt'].get_val()
if not p:
return
l += 1
self.set_num(l)
self[-1]._from_char(char)
while self[-1]['NextExt'].get_val() != 0:
l += 1
self.set_num(l)
self[-1]._from_char(char)
class GTPUHdrOpt(Envelope):
@ -298,19 +231,19 @@ class GTPUHdrOpt(Envelope):
def __init__(self, *args, **kwargs):
Envelope.__init__(self, *args, **kwargs)
self[-1].set_valauto(lambda: self._get_next_hdr())
self[-1].set_valauto(lambda: self._get_ne())
def _get_next_hdr(self):
def _get_ne(self):
n = self.get_next()
if isinstance(n, GTPUHdrExtList) and len(n._content) and hasattr(n[0], '_ID'):
return n[0]._ID
if isinstance(n, GTPUHdrExtList) and n.get_num():
return n[0][1]._ID
else:
return 0
ProtType_dict = {
0 : 'GTP prime',
1 : 'GTP',
1 : 'GTP', # the one for GTP-U
}
GTPUType_dict = {
@ -324,13 +257,13 @@ GTPUType_dict = {
}
class GTPUType(IntEnum):
EchoRequest = 1
EchoResponse = 2
ErrorIndication = 26
SupportedExtensionHeadersNotification = 31
TunnelStatus = 253
EndMarker = 254
GPDU = 255
EchoRequest = 1
EchoResponse = 2
ErrorIndication = 26
SupportedExtensionHeadersNotification = 31
TunnelStatus = 253
EndMarker = 254
GPDU = 255
class GTPUHdr(Envelope):
@ -464,7 +397,7 @@ class GTPUIEPrivateExt(GTPUIE):
# TS 29.281, section 7
#------------------------------------------------------------------------------#
class GTPUMsg(Envelope):
class _GTPUMsg(Envelope):
ENV_SEL_TRANS = False
@ -499,9 +432,9 @@ class GTPUMsg(Envelope):
# TS 29.281, section 7.2.1
#------------------------------------------------------------------------------#
class GTPUEchoRequest(GTPUMsg):
class GTPUEchoRequest(_GTPUMsg):
_GEN = (
GTPUHdr(val={'Type': 1}),
GTPUHdr(val={'Type': GTPUType.EchoRequest.value}),
GTPUIEPrivateExt(hier=1, trans=True) # optional
)
@ -511,9 +444,9 @@ class GTPUEchoRequest(GTPUMsg):
# TS 29.281, section 7.2.2
#------------------------------------------------------------------------------#
class GTPUEchoResponse(GTPUMsg):
class GTPUEchoResponse(_GTPUMsg):
_GEN = (
GTPUHdr(val={'Type': 2}),
GTPUHdr(val={'Type': GTPUType.EchoResponse.value}),
GTPUIERecovery(hier=1),
GTPUIEPrivateExt(hier=1, trans=True) # optional
)
@ -524,9 +457,9 @@ class GTPUEchoResponse(GTPUMsg):
# TS 29.281, section 7.2.3
#------------------------------------------------------------------------------#
class GTPUSuppExtHdrNotif(GTPUMsg):
class GTPUSuppExtHdrNotif(_GTPUMsg):
_GEN = (
GTPUHdr(val={'Type': 31}),
GTPUHdr(val={'Type': GTPUType.SupportedExtensionHeadersNotification.value}),
GTPUIEExtHdrList(hier=1)
)
@ -536,9 +469,9 @@ class GTPUSuppExtHdrNotif(GTPUMsg):
# TS 29.281, section 7.3.1
#------------------------------------------------------------------------------#
class GTPUErrorInd(GTPUMsg):
class GTPUErrorInd(_GTPUMsg):
_GEN = (
GTPUHdr(val={'Type': 26}),
GTPUHdr(val={'Type': GTPUType.ErrorIndication.value}),
GTPUIETEID(hier=1),
GTPUIEPeerAddr(hier=1),
GTPUIEPrivateExt(hier=1, trans=True) # optional
@ -550,9 +483,9 @@ class GTPUErrorInd(GTPUMsg):
# TS 29.281, section 7.3.2
#------------------------------------------------------------------------------#
class GTPUEndMarker(GTPUMsg):
class GTPUEndMarker(_GTPUMsg):
_GEN = (
GTPUHdr(val={'Type': 254}),
GTPUHdr(val={'Type': GTPUType.EndMarker.value}),
GTPUIEPrivateExt(hier=1, trans=True) # optional
)
@ -562,9 +495,9 @@ class GTPUEndMarker(GTPUMsg):
# TS 29.281, section 7.3.3
#------------------------------------------------------------------------------#
class GTPUTunnelStatus(GTPUMsg):
class GTPUTunnelStatus(_GTPUMsg):
_GEN = (
GTPUHdr(val={'Type': 253}),
GTPUHdr(val={'Type': GTPUType.TunnelStatus.value}),
GTPUIEPrivateExt(hier=1, trans=True) # optional
)
@ -576,7 +509,7 @@ class GTPUTunnelStatus(GTPUMsg):
class GPDU(Envelope):
_GEN = (
GTPUHdr(val={'Type': 255}),
GTPUHdr(val={'Type': GTPUType.GPDU.value}),
Buf('TPDU', hier=1, rep=REPR_HEX)
)
@ -586,9 +519,10 @@ GTPUDispatcher = {
2 : GTPUEchoResponse,
26 : GTPUErrorInd,
31 : GTPUSuppExtHdrNotif,
253 : GTPUTunnelStatus,
254 : GTPUEndMarker,
255 : GPDU
}
255 : GPDU,
}
ERR_GTPU_BUF_TOO_SHORT = 1