diameter: add IETF and 3GPP specific variants

This commit is contained in:
mich 2019-08-02 00:20:40 +02:00
parent 8127674872
commit b27005dfbc
4 changed files with 1537 additions and 46 deletions

View File

@ -27,12 +27,43 @@
# *--------------------------------------------------------
#*/
#__all__ = [
# ]
__all__ = [
# AVP Data Format
'OctetString',
'Integer32',
'Integer64',
'Unsigned32',
'Unsigned64',
'Float32',
'Float64',
'DiameterIdentity',
'DiameterURI',
'Enumerated',
'IPFilterRule',
'Address',
'Time',
'UTF8String',
# AVP
'AVPHdr',
'AVPGeneric',
# Diameter
'DiameterHdr',
'DiameterGeneric',
# custom AVP generator
'GenerateAVP',
# dictionnaries
'AppID_dict',
'Cmd_dict',
'AVPCodes_dict',
'AVPSpecVal_dict',
'AddrFamNums_dict'
]
#------------------------------------------------------------------------------#
# IETF RFC 6733
# https://tools.ietf.org/html/rfc6733
# generic implementation (no AVP specific format)
#------------------------------------------------------------------------------#
import datetime
@ -116,16 +147,16 @@ class Float64(_IEEE_754_1985):
# IPFilterRule: Buf(), ascii-encoded filter rule
class DiameterIdentity(Buf):
pass
_rep = REPR_HUM
class DiameterURI(UTF8String):
pass
_rep = REPR_HUM
class Enumerated(Int32):
pass
class IPFiterRule(Buf):
pass
class IPFilterRule(Buf):
_rep = REPR_HUM
class Address(Envelope):
@ -166,7 +197,12 @@ class AVPHdr(Envelope):
self[6].set_transauto(lambda: False if self[1].get_val() else True)
class AVP(Envelope):
class AVPGeneric(Envelope):
# FMT_LUT is a lookup table to get the proper AVP Data Format from the
# AVP Code in the AVPHdr
FMT_LUT = {}
_GEN = (
AVPHdr(),
Buf('AVPData', rep=REPR_HEX, hier=1),
@ -180,44 +216,72 @@ class AVP(Envelope):
self[1].set_blauto(lambda: (self[0][5].get_val() - 12) << 3 if self[0][1].get_val() else \
(self[0][5].get_val() - 8) << 3)
self[2].set_blauto(lambda: (-self[1].get_len()%4) << 3)
def GenerateAVP(Code, DataType, M=0, P=0, VendorID=None):
"""generate a specific Diameter AVP with the appropriate Code and Data type
"""
val_hdr = {'Code': Code, 'M': M, 'P': P}
if VendorID is not None:
val_hdr['V'] = 1
val_hdr['VendorID'] = VendorID
#
if isinstance(DataType._bl, integer_types) and DataType._bl % 32 == 0:
# fixed length AVP, no padding required
class AVP(Envelope):
_GEN = (
AVPHdr(val=val_hdr),
DataType('AVPData', hier=1)
)
def __init__(self, *args, **kwargs):
Envelope.__init__(self, *args, **kwargs)
self[0][5].set_valauto(lambda: 12 + (self[1]._bl >> 3) if self[0][1].get_val() else \
8 + (self[1]._bl >> 3))
else:
# variable length AVP, padding may be required
class AVP(Envelope):
_GEN = (
AVPHdr(val=val_hdr),
DataType('AVPData', hier=1),
Buf('AVPPad', rep=REPR_HEX)
)
def __init__(self, *args, **kwargs):
Envelope.__init__(self, *args, **kwargs)
self[0][5].set_valauto(lambda: 12 + self[1].get_len() if self[0][1].get_val() else \
8 + self[1].get_len())
self[1].set_blauto(lambda: (self[0][5].get_val() - 12) << 3 if self[0][1].get_val() else \
def set_val(self, val):
if isinstance(val, (tuple, list)) and val:
self.set_val_hdr(val[0])
if len(val) > 1:
self[1].set_val(val[1])
if len(val) > 2:
self[2].set_val(val[2])
elif isinstance(val, dict) and 'AVPHdr' in val:
self.set_val_hdr(val['AVPHdr'])
if 'AVPData' in val:
self[1].set_val(val['AVPData'])
if 'AVPPad' in val:
self[1].set_val(val['AVPPad'])
def set_val_hdr(self, val_hdr):
self[0].set_val(val_hdr)
avp_code = self[0][0].get_val()
if avp_code in self.FMT_LUT:
AVPData = self.FMT_LUT[avp_code]()
self.replace(self[1], AVPData)
def _from_char(self, char):
self[0]._from_char(char)
avp_code = self[0][0].get_val()
restore_char_len = False
if avp_code in self.FMT_LUT:
AVPData = self.FMT_LUT[avp_code]()
if hasattr(AVPData, '_bl') and AVPData._bl is None:
# atomic variable length AVP, need to automate AVPData length
AVPData.set_blauto(lambda: (self[0][5].get_val() - 12) << 3 if \
self[0][1].get_val() else \
(self[0][5].get_val() - 8) << 3)
self[2].set_blauto(lambda: (-self[1].get_len()%4) << 3)
#
return AVP
else:
# Grouped or Float AVP
restore_char_len = True
char_bl = char._len_bit
dat_len = self[0][5].get_val() - 8
if self[0][1].get_val():
dat_len -= 4
if dat_len < 0:
raise(EltErr('{0} [_from_char] invalid AVP length, {1}'\
.format(self._name, self[0][5].get_val())))
char._len_bit = char._cur + 8 * dat_len
if avp_code in AVPSpecVal_dict:
# add dict for value interpretation
AVPData._dic = AVPSpecVal_dict[avp_code]
# replace the generic Data format with the custom one
self.replace(self[1], AVPData)
self[1]._from_char(char)
if restore_char_len:
# Grouped or Float AVP
char._len_bit = char_bl
else:
# atomic variable length AVP
self[2]._from_char(char)
#------------------------------------------------------------------------------#
# 4.4. Grouped AVP Values
#------------------------------------------------------------------------------#
# This definition is not required here, as there is no specific AVP Data format
# defined in this module
#class Grouped(Sequence):
# _GEN = AVPGeneric()
#------------------------------------------------------------------------------#
@ -243,7 +307,7 @@ class DiameterHdr(Envelope):
class DiameterGeneric(Envelope):
_GEN = (
DiameterHdr(),
Sequence('AVPs', GEN=AVP(), hier=1)
Sequence('AVPs', GEN=AVPGeneric(), hier=1)
)
def __init__(self, *args, **kwargs):
@ -269,3 +333,60 @@ class DiameterGeneric(Envelope):
if restore_char_len:
char._len_bit = char_bl
#------------------------------------------------------------------------------#
# custom AVP generator
#------------------------------------------------------------------------------#
def GenerateAVP(Code, DataType, M=0, P=0, VendorID=None):
"""generate a specific Diameter AVP with the appropriate Code and Data type
"""
val_hdr = {'Code': Code, 'M': M, 'P': P}
if VendorID is not None:
val_hdr['V'] = 1
val_hdr['VendorID'] = VendorID
#
if hasattr(DataType, '_bl') and \
isinstance(DataType._bl, integer_types) and \
DataType._bl % 32 == 0:
# fixed length AVP, no padding required
class AVP(Envelope):
_GEN = (
AVPHdr(val=val_hdr),
DataType('AVPData', hier=1)
)
def __init__(self, *args, **kwargs):
Envelope.__init__(self, *args, **kwargs)
self[0][5].set_valauto(lambda: 12 + (self[1]._bl >> 3) if self[0][1].get_val() else \
8 + (self[1]._bl >> 3))
#
elif isinstance(DataType, Grouped):
# variable length nested AVP, no padding required
class AVP(Envelope):
_GEN = (
AVPHdr(val=val_hdr),
DataType('AVPData', hier=1)
)
def __init__(self, *args, **kwargs):
Envelope.__init__(self, *args, **kwargs)
self[0][5].set_valauto(lambda: 12 + self[1].get_len() if self[0][1].get_val() else \
8 + self[1].get_len())
#
else:
# variable length AVP, padding may be required
class AVP(Envelope):
_GEN = (
AVPHdr(val=val_hdr),
DataType('AVPData', hier=1),
Buf('AVPPad', rep=REPR_HEX)
)
def __init__(self, *args, **kwargs):
Envelope.__init__(self, *args, **kwargs)
self[0][5].set_valauto(lambda: 12 + self[1].get_len() if self[0][1].get_val() else \
8 + self[1].get_len())
self[1].set_blauto(lambda: (self[0][5].get_val() - 12) << 3 if self[0][1].get_val() else \
(self[0][5].get_val() - 8) << 3)
self[2].set_blauto(lambda: (-self[1].get_len()%4) << 3)
#
return AVP

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,133 @@
# -*- coding: UTF-8 -*-
#/**
# * Software Name : pycrate
# * Version : 0.4
# *
# * Copyright 2019. Benoit Michau. P1Sec.
# *
# * This library is free software; you can redistribute it and/or
# * modify it under the terms of the GNU Lesser General Public
# * License as published by the Free Software Foundation; either
# * version 2.1 of the License, or (at your option) any later version.
# *
# * This library is distributed in the hope that it will be useful,
# * but WITHOUT ANY WARRANTY; without even the implied warranty of
# * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# * Lesser General Public License for more details.
# *
# * You should have received a copy of the GNU Lesser General Public
# * License along with this library; if not, write to the Free Software
# * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
# * MA 02110-1301 USA
# *
# *--------------------------------------------------------
# * File Name : pycrate_diameter/DiameterIETF.py
# * Created : 2019-08-01
# * Authors : Benoit Michau
# *--------------------------------------------------------
#*/
__all__ = [
'FMT_LUT_RFC6733',
'Grouped',
'AVPIETF',
'DiameterIETF'
]
#------------------------------------------------------------------------------#
# IETF RFC 6733
# https://tools.ietf.org/html/rfc6733
# IETF specific implementation (with AVP specific format)
#------------------------------------------------------------------------------#
from pycrate_core.elt import Sequence
from pycrate_diameter.Diameter import *
#------------------------------------------------------------------------------#
# 4.1. AVP Header
#------------------------------------------------------------------------------#
class AVPIETF(AVPGeneric):
pass
#------------------------------------------------------------------------------#
# 4.4. Grouped AVP Values
#------------------------------------------------------------------------------#
class Grouped(Sequence):
_GEN = AVPIETF()
#------------------------------------------------------------------------------#
# 4.5. Diameter Base Protocol AVPs
#------------------------------------------------------------------------------#
FMT_LUT_RFC6733 = {
1 : UTF8String,
25 : OctetString,
27 : Unsigned32,
33 : OctetString,
44 : OctetString,
50 : UTF8String,
55 : Time,
85 : Unsigned32,
257 : Address,
258 : Unsigned32,
259 : Unsigned32,
260 : Grouped,
261 : Enumerated,
262 : Unsigned32,
263 : UTF8String,
264 : DiameterIdentity,
265 : Unsigned32,
266 : Unsigned32,
267 : Unsigned32,
268 : Unsigned32,
269 : UTF8String,
270 : Unsigned32,
271 : Enumerated,
272 : Unsigned32,
273 : Enumerated,
274 : Enumerated,
276 : Unsigned32,
277 : Enumerated,
278 : Unsigned32,
279 : Grouped,
280 : DiameterIdentity,
281 : UTF8String,
282 : DiameterIdentity,
283 : DiameterIdentity,
284 : Grouped,
285 : Enumerated,
287 : Unsigned64,
291 : Unsigned32,
292 : DiameterURI,
293 : DiameterIdentity,
294 : DiameterIdentity,
295 : Enumerated,
296 : DiameterIdentity,
297 : Grouped,
298 : Unsigned32,
299 : Unsigned32,
480 : Enumerated,
483 : Enumerated,
485 : Unsigned32,
}
#------------------------------------------------------------------------------#
# 3. Diameter Header
#------------------------------------------------------------------------------#
AVPIETF.FMT_LUT = FMT_LUT_RFC6733
class DiameterIETF(DiameterGeneric):
_GEN = (
DiameterHdr(),
Sequence('AVPs', GEN=AVPIETF(), hier=1)
)

View File

@ -27,5 +27,5 @@
# *--------------------------------------------------------
#*/
#
__all__ = ['Diameter', ]
__all__ = ['Diameter', 'DiameterIETF', 'Diameter3GPP']
__version__ = '0.4.0'