globally remove Python 2 compat
This commit is contained in:
parent
22c6a5daec
commit
87d633859b
|
@ -120,18 +120,12 @@ def get_spec_files(spec_dir):
|
||||||
spec_texts, spec_fn = [], []
|
spec_texts, spec_fn = [], []
|
||||||
for fn in load:
|
for fn in load:
|
||||||
try:
|
try:
|
||||||
if python_version < 3:
|
fd = open('%s%s' % (spec_dir, fn), 'r', encoding='utf-8')
|
||||||
fd = open('%s%s' % (spec_dir, fn), 'r')
|
|
||||||
else:
|
|
||||||
fd = open('%s%s' % (spec_dir, fn), 'r', encoding='utf-8')
|
|
||||||
except Exception as err:
|
except Exception as err:
|
||||||
raise(ASN1Err(
|
raise(ASN1Err(
|
||||||
'[proc] unable to open spec file {0}, {1}'.format(fn, err)))
|
'[proc] unable to open spec file {0}, {1}'.format(fn, err)))
|
||||||
try:
|
try:
|
||||||
if python_version < 3:
|
spec_texts.append( fd.read() )
|
||||||
spec_texts.append( fd.read().decode('utf-8') )
|
|
||||||
else:
|
|
||||||
spec_texts.append( fd.read() )
|
|
||||||
except Exception as err:
|
except Exception as err:
|
||||||
fd.close()
|
fd.close()
|
||||||
raise(ASN1Err(
|
raise(ASN1Err(
|
||||||
|
|
|
@ -27,8 +27,6 @@
|
||||||
# *--------------------------------------------------------
|
# *--------------------------------------------------------
|
||||||
#*/
|
#*/
|
||||||
|
|
||||||
from .utils import python_version
|
|
||||||
|
|
||||||
#------------------------------------------------------------------------------#
|
#------------------------------------------------------------------------------#
|
||||||
# ordered dictionnary
|
# ordered dictionnary
|
||||||
#------------------------------------------------------------------------------#
|
#------------------------------------------------------------------------------#
|
||||||
|
@ -108,27 +106,16 @@ class ASN1Dict(object):
|
||||||
for key, val in other.items():
|
for key, val in other.items():
|
||||||
self.__setitem__(key, val)
|
self.__setitem__(key, val)
|
||||||
|
|
||||||
if python_version <= 2:
|
def keys(self):
|
||||||
def keys(self):
|
return self._index.__iter__()
|
||||||
return list(self._index)
|
|
||||||
|
|
||||||
def items(self):
|
|
||||||
return [(k, self._dict[k]) for k in self._index]
|
|
||||||
|
|
||||||
def values(self):
|
|
||||||
return [self._dict[k] for k in self._index]
|
|
||||||
|
|
||||||
else:
|
def items(self):
|
||||||
def keys(self):
|
# TODO: create a true iterator instead of a complete list over which we then iterate
|
||||||
return self._index.__iter__()
|
return [(k, self._dict[k]) for k in self._index].__iter__()
|
||||||
|
|
||||||
def items(self):
|
def values(self):
|
||||||
# TODO: create a true iterator instead of a complete list over which we then iterate
|
# TODO: create a true iterator instead of a complete list over which we then iterate
|
||||||
return [(k, self._dict[k]) for k in self._index].__iter__()
|
return [self._dict[k] for k in self._index].__iter__()
|
||||||
|
|
||||||
def values(self):
|
|
||||||
# TODO: create a true iterator instead of a complete list over which we then iterate
|
|
||||||
return [self._dict[k] for k in self._index].__iter__()
|
|
||||||
|
|
||||||
# custom pycrate_asn1 methods
|
# custom pycrate_asn1 methods
|
||||||
def copy(self):
|
def copy(self):
|
||||||
|
|
|
@ -155,10 +155,7 @@ def value_to_defin(v, Obj=None, Gen=None, ind=None):
|
||||||
return '(%s, %s)' % (vv, vl)
|
return '(%s, %s)' % (vv, vl)
|
||||||
elif Obj.TYPE == TYPE_OCT_STR:
|
elif Obj.TYPE == TYPE_OCT_STR:
|
||||||
# byte-string
|
# byte-string
|
||||||
if python_version > 2:
|
return repr(v)
|
||||||
return repr(v)
|
|
||||||
else:
|
|
||||||
return 'b%s' % repr(v)
|
|
||||||
elif Obj.TYPE == TYPE_OID:
|
elif Obj.TYPE == TYPE_OID:
|
||||||
# list of int -> convert to tuple
|
# list of int -> convert to tuple
|
||||||
return repr(tuple(v))
|
return repr(tuple(v))
|
||||||
|
|
|
@ -952,10 +952,7 @@ def test():
|
||||||
if fn[-4:] == '.asn':
|
if fn[-4:] == '.asn':
|
||||||
fp = '%s%s/%s' % (p, S, fn)
|
fp = '%s%s/%s' % (p, S, fn)
|
||||||
print(fp)
|
print(fp)
|
||||||
if python_version < 3:
|
mods = tokenize_text(open(fp).read())
|
||||||
mods = tokenize_text(open(fp).read().decode('utf-8'))
|
|
||||||
else:
|
|
||||||
mods = tokenize_text(open(fp).read())
|
|
||||||
for modname, moddict in mods.items():
|
for modname, moddict in mods.items():
|
||||||
M[modname] = moddict
|
M[modname] = moddict
|
||||||
return M
|
return M
|
||||||
|
|
|
@ -32,7 +32,7 @@ import pprint
|
||||||
from keyword import iskeyword
|
from keyword import iskeyword
|
||||||
|
|
||||||
# pycrate_core is used only for basic library-wide functions / variables:
|
# pycrate_core is used only for basic library-wide functions / variables:
|
||||||
# log(), python_version, integer_types, str_types
|
# log(), integer_types, str_types
|
||||||
from pycrate_core.utils import *
|
from pycrate_core.utils import *
|
||||||
from .err import ASN1Err
|
from .err import ASN1Err
|
||||||
|
|
||||||
|
@ -42,7 +42,7 @@ from .err import ASN1Err
|
||||||
|
|
||||||
def asnlog(msg):
|
def asnlog(msg):
|
||||||
"""
|
"""
|
||||||
customizable logging function for the whole asn1 part
|
customizable logging function for the whole asn1c part
|
||||||
"""
|
"""
|
||||||
log(msg)
|
log(msg)
|
||||||
|
|
||||||
|
|
|
@ -786,10 +786,7 @@ Specific attribute:
|
||||||
# base 10: character string encoding
|
# base 10: character string encoding
|
||||||
if B0 == 1:
|
if B0 == 1:
|
||||||
# NR1 encoding, simple whole numbers
|
# NR1 encoding, simple whole numbers
|
||||||
if python_version < 3:
|
m = self._NR1_RE.match( str(bytes[1:], 'ascii') )
|
||||||
m = self._NR1_RE.match( bytes[1:] )
|
|
||||||
else:
|
|
||||||
m = self._NR1_RE.match( str(bytes[1:], 'ascii') )
|
|
||||||
if not m:
|
if not m:
|
||||||
raise(ASN1BERDecodeErr('{0}: invalid REAL base 10 NR1 encoding, {1!r}'\
|
raise(ASN1BERDecodeErr('{0}: invalid REAL base 10 NR1 encoding, {1!r}'\
|
||||||
.format(self.fullname(), bytes[1:])))
|
.format(self.fullname(), bytes[1:])))
|
||||||
|
@ -797,10 +794,7 @@ Specific attribute:
|
||||||
self._val = (mant, 10, 0)
|
self._val = (mant, 10, 0)
|
||||||
elif B0 == 2:
|
elif B0 == 2:
|
||||||
# NR2 encoding, requires a decimal mark
|
# NR2 encoding, requires a decimal mark
|
||||||
if python_version < 3:
|
m = self._NR2_RE.match( str(bytes[1:], 'ascii') )
|
||||||
m = self._NR2_RE.match( bytes[1:] )
|
|
||||||
else:
|
|
||||||
m = self._NR2_RE.match( str(bytes[1:], 'ascii') )
|
|
||||||
if not m:
|
if not m:
|
||||||
raise(ASN1BERDecodeErr('{0}: invalid REAL base 10 NR2 encoding, {1!r}'\
|
raise(ASN1BERDecodeErr('{0}: invalid REAL base 10 NR2 encoding, {1!r}'\
|
||||||
.format(self.fullname(), bytes[1:])))
|
.format(self.fullname(), bytes[1:])))
|
||||||
|
@ -812,10 +806,7 @@ Specific attribute:
|
||||||
self._val = (int(i+d), 10, e)
|
self._val = (int(i+d), 10, e)
|
||||||
elif B0 == 3:
|
elif B0 == 3:
|
||||||
# NR3 encoding, requires the exponent notation
|
# NR3 encoding, requires the exponent notation
|
||||||
if python_version < 3:
|
m = self._NR3_RE.match( str(bytes[1:], 'ascii') )
|
||||||
m = self._NR3_RE.match( bytes[1:] )
|
|
||||||
else:
|
|
||||||
m = self._NR3_RE.match( str(bytes[1:], 'ascii') )
|
|
||||||
if not m:
|
if not m:
|
||||||
raise(ASN1BERDecodeErr('{0}: invalid REAL base 10 NR3 encoding, {1!r}'\
|
raise(ASN1BERDecodeErr('{0}: invalid REAL base 10 NR3 encoding, {1!r}'\
|
||||||
.format(self.fullname(), bytes[1:])))
|
.format(self.fullname(), bytes[1:])))
|
||||||
|
@ -890,21 +881,12 @@ Specific attribute:
|
||||||
raise(ASN1BEREncodeErr('{0}: invalid REAL base 10 encoding NR1 for decimal value'\
|
raise(ASN1BEREncodeErr('{0}: invalid REAL base 10 encoding NR1 for decimal value'\
|
||||||
.format(self.fullname())))
|
.format(self.fullname())))
|
||||||
i = self._val[0] * (10**self._val[2])
|
i = self._val[0] * (10**self._val[2])
|
||||||
if python_version < 3:
|
return b'\x01' + \
|
||||||
return '\x01' + \
|
ASN1CodecBER.ENC_REALNR1_SPA * b' ' + \
|
||||||
ASN1CodecBER.ENC_REALNR1_SPA * ' ' + \
|
ASN1CodecBER.ENC_REALNR1_ZER * b'0' + \
|
||||||
ASN1CodecBER.ENC_REALNR1_ZER * '0' + \
|
bytes(str(i), 'ascii')
|
||||||
str(i)
|
|
||||||
else:
|
|
||||||
return b'\x01' + \
|
|
||||||
ASN1CodecBER.ENC_REALNR1_SPA * b' ' + \
|
|
||||||
ASN1CodecBER.ENC_REALNR1_ZER * b'0' + \
|
|
||||||
bytes(str(i), 'ascii')
|
|
||||||
elif ASN1CodecBER.ENC_REALNR == 2:
|
elif ASN1CodecBER.ENC_REALNR == 2:
|
||||||
if python_version < 3:
|
i = bytes(str(self._val[0]), 'ascii')
|
||||||
i = bytes(self._val[0])
|
|
||||||
else:
|
|
||||||
i = bytes(str(self._val[0]), 'ascii')
|
|
||||||
if self._val[2] >= 0:
|
if self._val[2] >= 0:
|
||||||
# we need to add trailing zero
|
# we need to add trailing zero
|
||||||
i += self._val[2] * b'0'
|
i += self._val[2] * b'0'
|
||||||
|
@ -926,20 +908,14 @@ Specific attribute:
|
||||||
ASN1CodecBER.ENC_REALNR2_ZERTRAIL * b'0'
|
ASN1CodecBER.ENC_REALNR2_ZERTRAIL * b'0'
|
||||||
else:
|
else:
|
||||||
#ASN1CodecBER.ENC_REALNR == 3
|
#ASN1CodecBER.ENC_REALNR == 3
|
||||||
if python_version < 3:
|
i, e = bytes(str(self._val[0]), 'ascii'), self._val[2]
|
||||||
i, e = bytes(self._val[0]), self._val[2]
|
|
||||||
else:
|
|
||||||
i, e = bytes(str(self._val[0]), 'ascii'), self._val[2]
|
|
||||||
# remove trailing zero
|
# remove trailing zero
|
||||||
while i[-1:] == b'0':
|
while i[-1:] == b'0':
|
||||||
i = i[:-1]
|
i = i[:-1]
|
||||||
e += 1
|
e += 1
|
||||||
if e == 0:
|
if e == 0:
|
||||||
e = b'+0'
|
e = b'+0'
|
||||||
elif python_version < 3:
|
e = bytes(str(e), 'ascii')
|
||||||
e = bytes(e)
|
|
||||||
else:
|
|
||||||
e = bytes(str(e), 'ascii')
|
|
||||||
return b'\x03' + i + b'.E' + e
|
return b'\x03' + i + b'.E' + e
|
||||||
else:
|
else:
|
||||||
# base 2, encoding the CER / DER way:
|
# base 2, encoding the CER / DER way:
|
||||||
|
@ -957,16 +933,10 @@ Specific attribute:
|
||||||
LE = int_bytelen(E)
|
LE = int_bytelen(E)
|
||||||
E = int_to_bytes(E, 8*LE)
|
E = int_to_bytes(E, 8*LE)
|
||||||
N = uint_to_bytes(m, 8*uint_bytelen(m))
|
N = uint_to_bytes(m, 8*uint_bytelen(m))
|
||||||
if python_version < 3:
|
if LE > 3:
|
||||||
if LE > 3:
|
return bytes((0x80 + (S<<6) + 3, )) + uint_to_bytes(LE) + E + N
|
||||||
return chr(0x80 + (S<<6) + 3) + uint_to_bytes(LE) + E + N
|
|
||||||
else:
|
|
||||||
return chr(0x80 + (S<<6) + LE-1) + E + N
|
|
||||||
else:
|
else:
|
||||||
if LE > 3:
|
return bytes((0x80 + (S<<6) + LE-1, )) + E + N
|
||||||
return bytes((0x80 + (S<<6) + 3, )) + uint_to_bytes(LE) + E + N
|
|
||||||
else:
|
|
||||||
return bytes((0x80 + (S<<6) + LE-1, )) + E + N
|
|
||||||
|
|
||||||
def _decode_ber_cont_ws(self, char, vbnd):
|
def _decode_ber_cont_ws(self, char, vbnd):
|
||||||
char._cur, char._len_bit = vbnd[0], vbnd[1]
|
char._cur, char._len_bit = vbnd[0], vbnd[1]
|
||||||
|
@ -1805,10 +1775,7 @@ Single value: Python tuple of int
|
||||||
for f in fact[:-1]:
|
for f in fact[:-1]:
|
||||||
by.append( 0x80 + f )
|
by.append( 0x80 + f )
|
||||||
by.append( fact[-1] )
|
by.append( fact[-1] )
|
||||||
if python_version > 2:
|
return bytes(by)
|
||||||
return bytes(by)
|
|
||||||
else:
|
|
||||||
return ''.join(map(chr, by))
|
|
||||||
|
|
||||||
|
|
||||||
class REL_OID(_OID):
|
class REL_OID(_OID):
|
||||||
|
@ -1856,8 +1823,5 @@ Single value: Python tuple of int
|
||||||
for f in fact[:-1]:
|
for f in fact[:-1]:
|
||||||
by.append( 0x80 + f )
|
by.append( 0x80 + f )
|
||||||
by.append( fact[-1] )
|
by.append( fact[-1] )
|
||||||
if python_version > 2:
|
return bytes(by)
|
||||||
return bytes(by)
|
|
||||||
else:
|
|
||||||
return ''.join(map(chr, by))
|
|
||||||
|
|
||||||
|
|
|
@ -232,10 +232,7 @@ Single value: Python 2-tuple
|
||||||
if isinstance(self._val[0], str_types):
|
if isinstance(self._val[0], str_types):
|
||||||
if self._val[0][:5] == '_unk_':
|
if self._val[0][:5] == '_unk_':
|
||||||
# HSTRING
|
# HSTRING
|
||||||
if python_version >= 3:
|
return '\'%s\'H' % hexlify(self._val[1]).decode('ascii').upper()
|
||||||
return '\'%s\'H' % hexlify(self._val[1]).decode('ascii').upper()
|
|
||||||
else:
|
|
||||||
return '\'%s\'H' % hexlify(self._val[1]).upper()
|
|
||||||
else:
|
else:
|
||||||
ident = self._val[0]
|
ident = self._val[0]
|
||||||
Obj = self._get_val_obj(self._val[0])
|
Obj = self._get_val_obj(self._val[0])
|
||||||
|
|
|
@ -281,10 +281,7 @@ Specific constraints attributes:
|
||||||
elif self._ASN_WASC and self._val[1] % 8 == 0:
|
elif self._ASN_WASC and self._val[1] % 8 == 0:
|
||||||
# eventually add printable repr
|
# eventually add printable repr
|
||||||
try:
|
try:
|
||||||
if python_version < 3:
|
s = uint_to_bytes(self._val[0], self._val[1]).decode('ascii')
|
||||||
s = uint_to_bytes(self._val[0], self._val[1])
|
|
||||||
else:
|
|
||||||
s = uint_to_bytes(self._val[0], self._val[1]).decode('ascii')
|
|
||||||
if is_printable(s):
|
if is_printable(s):
|
||||||
return ret + ' -- %s --' % repr(s)[1:-1]
|
return ret + ' -- %s --' % repr(s)[1:-1]
|
||||||
else:
|
else:
|
||||||
|
@ -1313,10 +1310,7 @@ Specific constraints attributes:
|
||||||
def _to_asn1(self):
|
def _to_asn1(self):
|
||||||
if isinstance(self._val, bytes_types):
|
if isinstance(self._val, bytes_types):
|
||||||
# HSTRING
|
# HSTRING
|
||||||
if python_version >= 3:
|
ret = '\'%s\'H' % hexlify(self._val).decode('ascii').upper()
|
||||||
ret = '\'%s\'H' % hexlify(self._val).decode('ascii').upper()
|
|
||||||
else:
|
|
||||||
ret = '\'%s\'H' % hexlify(self._val).upper()
|
|
||||||
if self._ASN_WASC:
|
if self._ASN_WASC:
|
||||||
# eventually add printable repr
|
# eventually add printable repr
|
||||||
try:
|
try:
|
||||||
|
|
|
@ -27,8 +27,6 @@
|
||||||
# *--------------------------------------------------------
|
# *--------------------------------------------------------
|
||||||
#*/
|
#*/
|
||||||
|
|
||||||
from .utils import python_version
|
|
||||||
|
|
||||||
#------------------------------------------------------------------------------#
|
#------------------------------------------------------------------------------#
|
||||||
# ordered dictionnary
|
# ordered dictionnary
|
||||||
#------------------------------------------------------------------------------#
|
#------------------------------------------------------------------------------#
|
||||||
|
@ -102,27 +100,16 @@ class ASN1Dict(object):
|
||||||
for key, val in other.items():
|
for key, val in other.items():
|
||||||
self.__setitem__(key, val)
|
self.__setitem__(key, val)
|
||||||
|
|
||||||
if python_version <= 2:
|
def keys(self):
|
||||||
def keys(self):
|
return self._index.__iter__()
|
||||||
return list(self._index)
|
|
||||||
|
|
||||||
def items(self):
|
|
||||||
return [(k, self._dict[k]) for k in self._index]
|
|
||||||
|
|
||||||
def values(self):
|
|
||||||
return [self._dict[k] for k in self._index]
|
|
||||||
|
|
||||||
else:
|
def items(self):
|
||||||
def keys(self):
|
# TODO: create a true iterator instead of a complete list over which we then iterate
|
||||||
return self._index.__iter__()
|
return [(k, self._dict[k]) for k in self._index].__iter__()
|
||||||
|
|
||||||
def items(self):
|
def values(self):
|
||||||
# TODO: create a true iterator instead of a complete list over which we then iterate
|
# TODO: create a true iterator instead of a complete list over which we then iterate
|
||||||
return [(k, self._dict[k]) for k in self._index].__iter__()
|
return [self._dict[k] for k in self._index].__iter__()
|
||||||
|
|
||||||
def values(self):
|
|
||||||
# TODO: create a true iterator instead of a complete list over which we then iterate
|
|
||||||
return [self._dict[k] for k in self._index].__iter__()
|
|
||||||
|
|
||||||
# custom pycrate_asn1 methods
|
# custom pycrate_asn1 methods
|
||||||
def copy(self):
|
def copy(self):
|
||||||
|
|
|
@ -39,7 +39,7 @@ from .err import *
|
||||||
from pycrate_asn1c.utils import clean_text
|
from pycrate_asn1c.utils import clean_text
|
||||||
|
|
||||||
# pycrate_core is used for basic library-wide functions / variables
|
# pycrate_core is used for basic library-wide functions / variables
|
||||||
# (i.e. log(), python_version, integer_types, bytes_types, str_types)
|
# (i.e. log(), integer_types, bytes_types, str_types)
|
||||||
# and for encoding / decoding routines
|
# and for encoding / decoding routines
|
||||||
from pycrate_core.utils import *
|
from pycrate_core.utils import *
|
||||||
from pycrate_core.utils import TYPE_BYTES as T_BYTES
|
from pycrate_core.utils import TYPE_BYTES as T_BYTES
|
||||||
|
|
|
@ -35,9 +35,6 @@ __all__ = ['Buf', 'BufAuto', 'NullTermStr',
|
||||||
'IntLE', 'Int8LE', 'Int16LE', 'Int24LE', 'Int32LE', 'Int48LE', 'Int64LE']
|
'IntLE', 'Int8LE', 'Int16LE', 'Int24LE', 'Int32LE', 'Int48LE', 'Int64LE']
|
||||||
|
|
||||||
|
|
||||||
import sys
|
|
||||||
python_version = sys.version_info[0]
|
|
||||||
|
|
||||||
from .utils import *
|
from .utils import *
|
||||||
from .charpy import Charpy, CharpyErr
|
from .charpy import Charpy, CharpyErr
|
||||||
from .elt import Atom, EltErr, REPR_RAW, REPR_HEX, REPR_BIN, REPR_HD, REPR_HUM
|
from .elt import Atom, EltErr, REPR_RAW, REPR_HEX, REPR_BIN, REPR_HD, REPR_HUM
|
||||||
|
@ -345,10 +342,7 @@ class String(Atom):
|
||||||
# e.g. utf8, utf16, utf32...
|
# e.g. utf8, utf16, utf32...
|
||||||
CODEC = 'utf8'
|
CODEC = 'utf8'
|
||||||
|
|
||||||
if python_version <= 2:
|
TYPES = (str, )
|
||||||
TYPES = (unicode, )
|
|
||||||
else:
|
|
||||||
TYPES = (str, )
|
|
||||||
TYPENAMES = get_typenames(*TYPES)
|
TYPENAMES = get_typenames(*TYPES)
|
||||||
DEFAULT_VAL = u''
|
DEFAULT_VAL = u''
|
||||||
DEFAULT_BL = 0
|
DEFAULT_BL = 0
|
||||||
|
|
|
@ -150,14 +150,8 @@ class Charpy(object):
|
||||||
# bytes buffer, default one
|
# bytes buffer, default one
|
||||||
r = self.to_bytes()
|
r = self.to_bytes()
|
||||||
if len(r) > self._REPR_MAX:
|
if len(r) > self._REPR_MAX:
|
||||||
if python_version < 3:
|
return 'charpy({0}...{1})'.format(
|
||||||
return 'charpy(b{0}...{1})'.format(
|
repr(r[0:self._REPR_MAX])[:-1], repr(r[-2:])[2:])
|
||||||
repr(r[0:self._REPR_MAX])[:-1], repr(r[-2:])[1:])
|
|
||||||
else:
|
|
||||||
return 'charpy({0}...{1})'.format(
|
|
||||||
repr(r[0:self._REPR_MAX])[:-1], repr(r[-2:])[2:])
|
|
||||||
elif python_version < 3:
|
|
||||||
return 'charpy(b{0})'.format(repr(r))
|
|
||||||
else:
|
else:
|
||||||
return 'charpy({0})'.format(repr(r))
|
return 'charpy({0})'.format(repr(r))
|
||||||
elif self._REPR == 'bytelist':
|
elif self._REPR == 'bytelist':
|
||||||
|
@ -1323,13 +1317,10 @@ class Charpy(object):
|
||||||
# Python built-ins override
|
# Python built-ins override
|
||||||
#--------------------------------------------------------------------------#
|
#--------------------------------------------------------------------------#
|
||||||
|
|
||||||
__len__ = len_bit
|
__len__ = len_bit
|
||||||
__repr__ = repr
|
__repr__ = repr
|
||||||
__index__ = to_uint
|
__index__ = to_uint
|
||||||
if python_version < 3:
|
__bytes__ = to_bytes
|
||||||
__str__ = to_bytes
|
|
||||||
else:
|
|
||||||
__bytes__ = to_bytes
|
|
||||||
|
|
||||||
def __bool__(self):
|
def __bool__(self):
|
||||||
if self.len_bit() == 0:
|
if self.len_bit() == 0:
|
||||||
|
|
|
@ -72,24 +72,14 @@ REPR_HUM = 4
|
||||||
|
|
||||||
|
|
||||||
# for hexdump representation
|
# for hexdump representation
|
||||||
if python_version < 3:
|
def hview(buf, lw=16):
|
||||||
def hview(buf, lw=16):
|
hv = []
|
||||||
hv = []
|
for o in range(0, len(buf), lw):
|
||||||
for o in range(0, len(buf), lw):
|
l = buf[o:o+lw]
|
||||||
l = buf[o:o+lw]
|
# create the hex fmt string for each iteration
|
||||||
# create the hex fmt string for each iteration
|
hs = '%.2x ' * len(l) % tuple(l)
|
||||||
hs = '%.2x ' * len(l) % tuple(map(ord, l))
|
hv.append( ' ' + hs + ' '*(3*lw-len(hs)) + '| %r' % l )
|
||||||
hv.append( ' ' + hs + ' '*(3*lw-len(hs)) + '| %r' % l )
|
return hv
|
||||||
return hv
|
|
||||||
else:
|
|
||||||
def hview(buf, lw=16):
|
|
||||||
hv = []
|
|
||||||
for o in range(0, len(buf), lw):
|
|
||||||
l = buf[o:o+lw]
|
|
||||||
# create the hex fmt string for each iteration
|
|
||||||
hs = '%.2x ' * len(l) % tuple(l)
|
|
||||||
hv.append( ' ' + hs + ' '*(3*lw-len(hs)) + '| %r' % l )
|
|
||||||
return hv
|
|
||||||
|
|
||||||
#------------------------------------------------------------------------------#
|
#------------------------------------------------------------------------------#
|
||||||
# Element parent class
|
# Element parent class
|
||||||
|
@ -750,10 +740,7 @@ class Element(object):
|
||||||
b = self.to_bytes()
|
b = self.to_bytes()
|
||||||
return bytes_to_int(b, len(b)<<3)
|
return bytes_to_int(b, len(b)<<3)
|
||||||
|
|
||||||
if python_version < 3:
|
__bytes__ = to_bytes
|
||||||
__str__ = to_bytes
|
|
||||||
else:
|
|
||||||
__bytes__ = to_bytes
|
|
||||||
|
|
||||||
#--------------------------------------------------------------------------#
|
#--------------------------------------------------------------------------#
|
||||||
# representation routines
|
# representation routines
|
||||||
|
@ -2123,14 +2110,9 @@ class Envelope(Element):
|
||||||
Returns:
|
Returns:
|
||||||
None
|
None
|
||||||
"""
|
"""
|
||||||
if python_version < 3:
|
self._content.clear()
|
||||||
del self._content[:]
|
self._by_id.clear()
|
||||||
del self._by_id[:]
|
self._by_name.clear()
|
||||||
del self._by_name[:]
|
|
||||||
else:
|
|
||||||
self._content.clear()
|
|
||||||
self._by_id.clear()
|
|
||||||
self._by_name.clear()
|
|
||||||
|
|
||||||
def __iter__(self):
|
def __iter__(self):
|
||||||
self._it_saved.append(self._it)
|
self._it_saved.append(self._it)
|
||||||
|
@ -2156,9 +2138,6 @@ class Envelope(Element):
|
||||||
# transparent element, pass it and try the next one
|
# transparent element, pass it and try the next one
|
||||||
return self.__next__()
|
return self.__next__()
|
||||||
|
|
||||||
if python_version < 3:
|
|
||||||
next = __next__
|
|
||||||
|
|
||||||
def __getitem__(self, key):
|
def __getitem__(self, key):
|
||||||
if isinstance(key, str_types):
|
if isinstance(key, str_types):
|
||||||
try:
|
try:
|
||||||
|
@ -3041,10 +3020,7 @@ class Array(Element):
|
||||||
Returns:
|
Returns:
|
||||||
None
|
None
|
||||||
"""
|
"""
|
||||||
if python_version < 3:
|
self._val.clear()
|
||||||
del self._val[:]
|
|
||||||
else:
|
|
||||||
self._val.clear()
|
|
||||||
|
|
||||||
def __iter__(self):
|
def __iter__(self):
|
||||||
self._it_saved.append(self._it)
|
self._it_saved.append(self._it)
|
||||||
|
@ -3065,9 +3041,6 @@ class Array(Element):
|
||||||
clone._env = self
|
clone._env = self
|
||||||
return clone
|
return clone
|
||||||
|
|
||||||
if python_version < 3:
|
|
||||||
next = __next__
|
|
||||||
|
|
||||||
def __getitem__(self, key):
|
def __getitem__(self, key):
|
||||||
if isinstance(key, integer_types):
|
if isinstance(key, integer_types):
|
||||||
try:
|
try:
|
||||||
|
@ -3979,10 +3952,7 @@ class Sequence(Element):
|
||||||
Returns:
|
Returns:
|
||||||
None
|
None
|
||||||
"""
|
"""
|
||||||
if python_version < 3:
|
self._content.clear()
|
||||||
del self._content[:]
|
|
||||||
else:
|
|
||||||
self._content.clear()
|
|
||||||
|
|
||||||
def __iter__(self):
|
def __iter__(self):
|
||||||
self._it_saved.append(self._it)
|
self._it_saved.append(self._it)
|
||||||
|
@ -4008,9 +3978,6 @@ class Sequence(Element):
|
||||||
# transparent element, pass it and try the next one
|
# transparent element, pass it and try the next one
|
||||||
return self.__next__()
|
return self.__next__()
|
||||||
|
|
||||||
if python_version < 3:
|
|
||||||
next = __next__
|
|
||||||
|
|
||||||
def __getitem__(self, key):
|
def __getitem__(self, key):
|
||||||
if isinstance(key, integer_types):
|
if isinstance(key, integer_types):
|
||||||
try:
|
try:
|
||||||
|
|
|
@ -29,10 +29,7 @@
|
||||||
#
|
#
|
||||||
|
|
||||||
import sys
|
import sys
|
||||||
if sys.version_info[0] < 3:
|
import builtins
|
||||||
import __builtin__ as builtins
|
|
||||||
else:
|
|
||||||
import builtins
|
|
||||||
|
|
||||||
from .elt import Element
|
from .elt import Element
|
||||||
from .charpy import Charpy
|
from .charpy import Charpy
|
||||||
|
|
|
@ -28,12 +28,8 @@
|
||||||
#*/
|
#*/
|
||||||
|
|
||||||
import sys
|
import sys
|
||||||
|
from .utils_py3 import *
|
||||||
|
|
||||||
if sys.version_info[0] < 3:
|
|
||||||
from .utils_py2 import *
|
|
||||||
else:
|
|
||||||
from .utils_py3 import *
|
|
||||||
#
|
|
||||||
# configure max recursion
|
# configure max recursion
|
||||||
#sys.setrecursionlimit(200)
|
#sys.setrecursionlimit(200)
|
||||||
|
|
||||||
|
|
|
@ -942,22 +942,9 @@ class _DPI(object):
|
||||||
"""return the port TCP / UDP number
|
"""return the port TCP / UDP number
|
||||||
"""
|
"""
|
||||||
return unpack('!H', pay[2:4])[0]
|
return unpack('!H', pay[2:4])[0]
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def __get_dn_req_py2(req):
|
|
||||||
"""return the DNS name requested
|
|
||||||
"""
|
|
||||||
# remove fixed DNS header and Type / Class
|
|
||||||
s = req[12:-4]
|
|
||||||
n = []
|
|
||||||
while len(s) > 1:
|
|
||||||
l = ord(s[0])
|
|
||||||
n.append( s[1:1+l] )
|
|
||||||
s = s[1+l:]
|
|
||||||
return b'.'.join(n)
|
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def __get_dn_req_py3(req):
|
def __get_dn_req(req):
|
||||||
"""return the DNS name requested
|
"""return the DNS name requested
|
||||||
"""
|
"""
|
||||||
# remove fixed DNS header and Type / Class
|
# remove fixed DNS header and Type / Class
|
||||||
|
@ -968,31 +955,12 @@ class _DPI(object):
|
||||||
n.append( s[1:1+l] )
|
n.append( s[1:1+l] )
|
||||||
s = s[1+l:]
|
s = s[1+l:]
|
||||||
return b'.'.join(n)
|
return b'.'.join(n)
|
||||||
|
|
||||||
if python_version < 3:
|
|
||||||
get_dn_req = __get_dn_req_py2
|
|
||||||
else:
|
|
||||||
get_dn_req = __get_dn_req_py3
|
|
||||||
|
|
||||||
|
|
||||||
class DPIv4(_DPI):
|
class DPIv4(_DPI):
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def __get_ip_info_py2(ipbuf):
|
def __get_ip_info(ipbuf):
|
||||||
"""return a 3-tuple: ipdst (asc), protocol (uint), payload (bytes)
|
|
||||||
"""
|
|
||||||
# returns a 3-tuple: dst IP, protocol, payload buffer
|
|
||||||
# get IP header length
|
|
||||||
l = (ord(ipbuf[0]) & 0x0F) * 4
|
|
||||||
# get dst IP
|
|
||||||
dst = inet_ntoa(ipbuf[16:20])
|
|
||||||
# get protocol
|
|
||||||
prot = ord(ipbuf[9])
|
|
||||||
#
|
|
||||||
return (dst, prot, ipbuf[l:])
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def __get_ip_info_py3(ipbuf):
|
|
||||||
"""return a 3-tuple: ipdst (asc), protocol (uint), payload (bytes)
|
"""return a 3-tuple: ipdst (asc), protocol (uint), payload (bytes)
|
||||||
"""
|
"""
|
||||||
# returns a 3-tuple: dst IP, protocol, payload buffer
|
# returns a 3-tuple: dst IP, protocol, payload buffer
|
||||||
|
@ -1004,32 +972,12 @@ class DPIv4(_DPI):
|
||||||
prot = ipbuf[9]
|
prot = ipbuf[9]
|
||||||
#
|
#
|
||||||
return (dst, prot, ipbuf[l:])
|
return (dst, prot, ipbuf[l:])
|
||||||
|
|
||||||
if python_version < 3:
|
|
||||||
get_ip_info = __get_ip_info_py2
|
|
||||||
else:
|
|
||||||
get_ip_info = __get_ip_info_py3
|
|
||||||
|
|
||||||
|
|
||||||
class DPIv6(_DPI):
|
class DPIv6(_DPI):
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def __get_ip_info_py2(ipbuf):
|
def __get_ip_info(ipbuf):
|
||||||
"""return a 3-tuple: ipdst (asc), protocol (uint), payload (bytes)
|
|
||||||
"""
|
|
||||||
# returns a 3-tuple: dst IP, protocol, payload buffer
|
|
||||||
# get payload length
|
|
||||||
pl = unpack('>H', ipbuf[4:6])[0]
|
|
||||||
# get dst IP
|
|
||||||
dst = inet_ntop(AF_INET6, ipbuf[24:40])
|
|
||||||
# get protocol
|
|
||||||
# TODO: unstack IPv6 opts
|
|
||||||
prot = ord(ipbuf[6])
|
|
||||||
#
|
|
||||||
return (dst, prot, ipbuf[-pl:])
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def __get_ip_info_py3(ipbuf):
|
|
||||||
"""return a 3-tuple: ipdst (asc), protocol (uint), payload (bytes)
|
"""return a 3-tuple: ipdst (asc), protocol (uint), payload (bytes)
|
||||||
"""
|
"""
|
||||||
# returns a 3-tuple: dst IP, protocol, payload buffer
|
# returns a 3-tuple: dst IP, protocol, payload buffer
|
||||||
|
@ -1042,11 +990,6 @@ class DPIv6(_DPI):
|
||||||
prot = ipbuf[6]
|
prot = ipbuf[6]
|
||||||
#
|
#
|
||||||
return (dst, prot, ipbuf[-pl:])
|
return (dst, prot, ipbuf[-pl:])
|
||||||
|
|
||||||
if python_version < 3:
|
|
||||||
get_ip_info = __get_ip_info_py2
|
|
||||||
else:
|
|
||||||
get_ip_info = __get_ip_info_py3
|
|
||||||
|
|
||||||
|
|
||||||
class MOD(object):
|
class MOD(object):
|
||||||
|
|
|
@ -30,11 +30,8 @@
|
||||||
__all__ = ['SMSd']
|
__all__ = ['SMSd']
|
||||||
|
|
||||||
from time import localtime
|
from time import localtime
|
||||||
|
from queue import Queue, Empty, Full
|
||||||
from .utils import *
|
from .utils import *
|
||||||
if python_version < 3:
|
|
||||||
from Queue import Queue, Empty, Full
|
|
||||||
else:
|
|
||||||
from queue import Queue, Empty, Full
|
|
||||||
|
|
||||||
|
|
||||||
class SMSd(object):
|
class SMSd(object):
|
||||||
|
|
|
@ -31,7 +31,7 @@ from socket import inet_aton, inet_ntoa, inet_pton, inet_ntop, AF_INET, AF_INET6
|
||||||
from struct import pack
|
from struct import pack
|
||||||
from array import array
|
from array import array
|
||||||
|
|
||||||
from pycrate_core.utils import reverse_dict, log, python_version, str_types
|
from pycrate_core.utils import reverse_dict, log, str_types
|
||||||
from pycrate_core.elt import Envelope, Sequence, REPR_RAW, REPR_HEX, REPR_BIN, REPR_HUM
|
from pycrate_core.elt import Envelope, Sequence, REPR_RAW, REPR_HEX, REPR_BIN, REPR_HUM
|
||||||
from pycrate_core.base import *
|
from pycrate_core.base import *
|
||||||
from pycrate_core.repr import *
|
from pycrate_core.repr import *
|
||||||
|
@ -105,8 +105,7 @@ class IPAddr(Buf):
|
||||||
return val
|
return val
|
||||||
|
|
||||||
def set_val(self, val):
|
def set_val(self, val):
|
||||||
if python_version == 2 and isinstance(val, unicode) \
|
if isinstance(val, str_types):
|
||||||
or python_version > 2 and isinstance(val, str_types):
|
|
||||||
self.encode(val)
|
self.encode(val)
|
||||||
else:
|
else:
|
||||||
Buf.set_val(self, val)
|
Buf.set_val(self, val)
|
||||||
|
|
|
@ -3987,16 +3987,10 @@ def parse_ISUP(buf):
|
||||||
"""
|
"""
|
||||||
if len(buf) < 3:
|
if len(buf) < 3:
|
||||||
return None, 1
|
return None, 1
|
||||||
if python_version < 3:
|
try:
|
||||||
try:
|
Msg = ISUPTypeClasses[buf[2]]()
|
||||||
Msg = ISUPTypeClasses[ord(buf[2])]()
|
except:
|
||||||
except:
|
return None, 1
|
||||||
return None, 1
|
|
||||||
else:
|
|
||||||
try:
|
|
||||||
Msg = ISUPTypeClasses[buf[2]]()
|
|
||||||
except:
|
|
||||||
return None, 1
|
|
||||||
try:
|
try:
|
||||||
Msg.from_bytes(buf)
|
Msg.from_bytes(buf)
|
||||||
except:
|
except:
|
||||||
|
|
|
@ -28,8 +28,6 @@
|
||||||
#*/
|
#*/
|
||||||
|
|
||||||
from pycrate_core.utils import *
|
from pycrate_core.utils import *
|
||||||
if python_version < 3:
|
|
||||||
from struct import unpack
|
|
||||||
|
|
||||||
from .TS24008_IE import *
|
from .TS24008_IE import *
|
||||||
from .TS24008_MM import *
|
from .TS24008_MM import *
|
||||||
|
@ -103,18 +101,11 @@ def parse_NAS_MO(buf, inner=True):
|
||||||
element: Element instance, if err is null (no error)
|
element: Element instance, if err is null (no error)
|
||||||
element: None, if err is not null (standard NAS error code)
|
element: None, if err is not null (standard NAS error code)
|
||||||
"""
|
"""
|
||||||
if python_version < 3:
|
try:
|
||||||
try:
|
pd, type = buf[0], buf[1]
|
||||||
pd, type = unpack('>BB', buf[:2])
|
except Exception:
|
||||||
except Exception:
|
# error 111, unspecified protocol error
|
||||||
# error 111, unspecified protocol error
|
return None, 111
|
||||||
return None, 111
|
|
||||||
else:
|
|
||||||
try:
|
|
||||||
pd, type = buf[0], buf[1]
|
|
||||||
except Exception:
|
|
||||||
# error 111, unspecified protocol error
|
|
||||||
return None, 111
|
|
||||||
if pd & 0xf != 0xe:
|
if pd & 0xf != 0xe:
|
||||||
# 4-bit protocol discriminator
|
# 4-bit protocol discriminator
|
||||||
pd &= 0xf
|
pd &= 0xf
|
||||||
|
@ -155,24 +146,14 @@ def parse_NAS_MT(buf, inner=True, wl2=False):
|
||||||
element: Element instance, if err is null (no error)
|
element: Element instance, if err is null (no error)
|
||||||
element: None, if err is not null (standard NAS error code)
|
element: None, if err is not null (standard NAS error code)
|
||||||
"""
|
"""
|
||||||
if python_version < 3:
|
try:
|
||||||
try:
|
if wl2:
|
||||||
if wl2:
|
pd, type = buf[1], buf[2]
|
||||||
pd, type = unpack('>BB', buf[1:3])
|
else:
|
||||||
else:
|
pd, type = buf[0], buf[1]
|
||||||
pd, type = unpack('>BB', buf[:2])
|
except Exception:
|
||||||
except Exception:
|
# error 111, unspecified protocol error
|
||||||
# error 111, unspecified protocol error
|
return None, 111
|
||||||
return None, 111
|
|
||||||
else:
|
|
||||||
try:
|
|
||||||
if wl2:
|
|
||||||
pd, type = buf[1], buf[2]
|
|
||||||
else:
|
|
||||||
pd, type = buf[0], buf[1]
|
|
||||||
except Exception:
|
|
||||||
# error 111, unspecified protocol error
|
|
||||||
return None, 111
|
|
||||||
if pd & 0xf != 0xe:
|
if pd & 0xf != 0xe:
|
||||||
# 4-bit protocol discriminator
|
# 4-bit protocol discriminator
|
||||||
pd &= 0xf
|
pd &= 0xf
|
||||||
|
|
|
@ -94,10 +94,7 @@ def parse_NAS5G(buf, inner=True, sec_hdr=True):
|
||||||
elif pd == 46:
|
elif pd == 46:
|
||||||
# 5GSM
|
# 5GSM
|
||||||
try:
|
try:
|
||||||
if python_version < 3:
|
typ = buf[3]
|
||||||
typ = ord(buf[3:4])
|
|
||||||
else:
|
|
||||||
typ = buf[3]
|
|
||||||
except:
|
except:
|
||||||
# error 111, unspecified protocol error
|
# error 111, unspecified protocol error
|
||||||
return None, 111
|
return None, 111
|
||||||
|
|
|
@ -51,17 +51,10 @@ def parse_NASLTE_MO(buf, inner=True, sec_hdr=True):
|
||||||
element: Element instance, if err is null (no error)
|
element: Element instance, if err is null (no error)
|
||||||
element: None, if err is not null (standard LTE NAS error code)
|
element: None, if err is not null (standard LTE NAS error code)
|
||||||
"""
|
"""
|
||||||
if python_version < 3:
|
try:
|
||||||
try:
|
pd = buf[0]
|
||||||
pd = ord(buf[:1])
|
except Exception:
|
||||||
except Exception:
|
return None, 111
|
||||||
# error 111, unspecified protocol error
|
|
||||||
return None, 111
|
|
||||||
else:
|
|
||||||
try:
|
|
||||||
pd = buf[0]
|
|
||||||
except Exception:
|
|
||||||
return None, 111
|
|
||||||
shdr = pd>>4
|
shdr = pd>>4
|
||||||
pd &= 0xf
|
pd &= 0xf
|
||||||
|
|
||||||
|
@ -98,16 +91,10 @@ def parse_NASLTE_MO(buf, inner=True, sec_hdr=True):
|
||||||
#
|
#
|
||||||
if pd == 7:
|
if pd == 7:
|
||||||
# EMM
|
# EMM
|
||||||
if python_version < 3:
|
try:
|
||||||
try:
|
typ = buf[1]
|
||||||
typ = ord(buf[1:2])
|
except Exception:
|
||||||
except Exception:
|
return None, 111
|
||||||
return None, 111
|
|
||||||
else:
|
|
||||||
try:
|
|
||||||
typ = buf[1]
|
|
||||||
except Exception:
|
|
||||||
return None, 111
|
|
||||||
try:
|
try:
|
||||||
Msg = EMMTypeMOClasses[typ]()
|
Msg = EMMTypeMOClasses[typ]()
|
||||||
except KeyError:
|
except KeyError:
|
||||||
|
@ -115,16 +102,10 @@ def parse_NASLTE_MO(buf, inner=True, sec_hdr=True):
|
||||||
return None, 97
|
return None, 97
|
||||||
elif pd == 2:
|
elif pd == 2:
|
||||||
# ESM
|
# ESM
|
||||||
if python_version < 3:
|
try:
|
||||||
try:
|
typ = buf[2]
|
||||||
typ = ord(buf[2:3])
|
except Exception:
|
||||||
except Exception:
|
return None, 111
|
||||||
return None, 111
|
|
||||||
else:
|
|
||||||
try:
|
|
||||||
typ = buf[2]
|
|
||||||
except Exception:
|
|
||||||
return None, 111
|
|
||||||
try:
|
try:
|
||||||
Msg = ESMTypeClasses[typ]()
|
Msg = ESMTypeClasses[typ]()
|
||||||
except KeyError:
|
except KeyError:
|
||||||
|
@ -184,17 +165,10 @@ def parse_NASLTE_MT(buf, inner=True, sec_hdr=True):
|
||||||
element: Element instance, if err is null (no error)
|
element: Element instance, if err is null (no error)
|
||||||
element: None, if err is not null (standard LTE NAS error code)
|
element: None, if err is not null (standard LTE NAS error code)
|
||||||
"""
|
"""
|
||||||
if python_version < 3:
|
try:
|
||||||
try:
|
pd = buf[0]
|
||||||
pd = ord(buf[0])
|
except Exception:
|
||||||
except Exception:
|
return None, 111
|
||||||
# error 111, unspecified protocol error
|
|
||||||
return None, 111
|
|
||||||
else:
|
|
||||||
try:
|
|
||||||
pd = buf[0]
|
|
||||||
except Exception:
|
|
||||||
return None, 111
|
|
||||||
shdr = pd>>4
|
shdr = pd>>4
|
||||||
pd &= 0xf
|
pd &= 0xf
|
||||||
|
|
||||||
|
@ -231,16 +205,10 @@ def parse_NASLTE_MT(buf, inner=True, sec_hdr=True):
|
||||||
#
|
#
|
||||||
if pd == 7:
|
if pd == 7:
|
||||||
# EMM
|
# EMM
|
||||||
if python_version < 3:
|
try:
|
||||||
try:
|
typ = buf[1]
|
||||||
typ = ord(buf[1])
|
except Exception:
|
||||||
except Exception:
|
return None, 111
|
||||||
return None, 111
|
|
||||||
else:
|
|
||||||
try:
|
|
||||||
typ = buf[1]
|
|
||||||
except Exception:
|
|
||||||
return None, 111
|
|
||||||
try:
|
try:
|
||||||
Msg = EMMTypeMTClasses[typ]()
|
Msg = EMMTypeMTClasses[typ]()
|
||||||
except KeyError:
|
except KeyError:
|
||||||
|
@ -248,16 +216,10 @@ def parse_NASLTE_MT(buf, inner=True, sec_hdr=True):
|
||||||
return None, 97
|
return None, 97
|
||||||
elif pd == 2:
|
elif pd == 2:
|
||||||
# ESM
|
# ESM
|
||||||
if python_version < 3:
|
try:
|
||||||
try:
|
typ = buf[2]
|
||||||
typ = ord(buf[2])
|
except Exception:
|
||||||
except Exception:
|
return None, 111
|
||||||
return None, 111
|
|
||||||
else:
|
|
||||||
try:
|
|
||||||
typ = buf[2]
|
|
||||||
except Exception:
|
|
||||||
return None, 111
|
|
||||||
try:
|
try:
|
||||||
Msg = ESMTypeClasses[typ]()
|
Msg = ESMTypeClasses[typ]()
|
||||||
except KeyError:
|
except KeyError:
|
||||||
|
|
|
@ -1568,16 +1568,10 @@ def parse_SCCP(buf, w_scmg=True):
|
||||||
"""
|
"""
|
||||||
if not buf:
|
if not buf:
|
||||||
return None, 1
|
return None, 1
|
||||||
if python_version < 3:
|
try:
|
||||||
try:
|
Msg = SCCPTypeClasses[buf[0]]()
|
||||||
Msg = SCCPTypeClasses[ord(buf[0])]()
|
except:
|
||||||
except:
|
return None, 1
|
||||||
return None, 1
|
|
||||||
else:
|
|
||||||
try:
|
|
||||||
Msg = SCCPTypeClasses[buf[0]]()
|
|
||||||
except:
|
|
||||||
return None, 1
|
|
||||||
try:
|
try:
|
||||||
Msg.from_bytes(buf)
|
Msg.from_bytes(buf)
|
||||||
except:
|
except:
|
||||||
|
@ -1614,16 +1608,10 @@ def parse_SCMG(buf):
|
||||||
"""
|
"""
|
||||||
if not buf:
|
if not buf:
|
||||||
return None, 1
|
return None, 1
|
||||||
if python_version < 3:
|
try:
|
||||||
try:
|
Msg = SCMGTypeClasses[buf[0]]()
|
||||||
Msg = SCMGTypeClasses[ord(buf[0])]()
|
except:
|
||||||
except:
|
return None, 1
|
||||||
return None, 1
|
|
||||||
else:
|
|
||||||
try:
|
|
||||||
Msg = SCMGTypeClasses[buf[0]]()
|
|
||||||
except:
|
|
||||||
return None, 1
|
|
||||||
try:
|
try:
|
||||||
Msg.from_bytes(buf)
|
Msg.from_bytes(buf)
|
||||||
except:
|
except:
|
||||||
|
|
|
@ -1880,10 +1880,7 @@ def parse_GTPv0(buf):
|
||||||
"""
|
"""
|
||||||
if len(buf) < 20:
|
if len(buf) < 20:
|
||||||
return None, ERR_GTP_BUF_TOO_SHORT
|
return None, ERR_GTP_BUF_TOO_SHORT
|
||||||
if python_version < 3:
|
typ = buf[1]
|
||||||
typ = ord(buf[1])
|
|
||||||
else:
|
|
||||||
typ = buf[1]
|
|
||||||
try:
|
try:
|
||||||
Msg = GTPv0Dispatcher[typ]()
|
Msg = GTPv0Dispatcher[typ]()
|
||||||
except KeyError:
|
except KeyError:
|
||||||
|
|
|
@ -44,7 +44,7 @@ __all__ = [
|
||||||
# release 13 (d00)
|
# release 13 (d00)
|
||||||
#------------------------------------------------------------------------------#
|
#------------------------------------------------------------------------------#
|
||||||
|
|
||||||
from pycrate_core.utils import python_version, pack_val, TYPE_UINT, PycrateErr
|
from pycrate_core.utils import pack_val, TYPE_UINT, PycrateErr
|
||||||
from pycrate_core.charpy import Charpy
|
from pycrate_core.charpy import Charpy
|
||||||
from pycrate_core.elt import Envelope
|
from pycrate_core.elt import Envelope
|
||||||
from pycrate_core.base import Uint
|
from pycrate_core.base import Uint
|
||||||
|
@ -574,10 +574,7 @@ def encode_7b(txt, off=0):
|
||||||
# check the length in bits and add padding bits
|
# check the length in bits and add padding bits
|
||||||
pad = ((8-(7*len(arr)+off)%8)%8)
|
pad = ((8-(7*len(arr)+off)%8)%8)
|
||||||
arr.insert(0, (TYPE_UINT, 0, pad))
|
arr.insert(0, (TYPE_UINT, 0, pad))
|
||||||
if python_version < 3:
|
return bytes(reversed(pack_val(*arr)[0])), cnt
|
||||||
return ''.join(reversed(pack_val(*arr)[0])), cnt
|
|
||||||
else:
|
|
||||||
return bytes(reversed(pack_val(*arr)[0])), cnt
|
|
||||||
|
|
||||||
|
|
||||||
def decode_7b(buf, off=0):
|
def decode_7b(buf, off=0):
|
||||||
|
@ -590,10 +587,7 @@ def decode_7b(buf, off=0):
|
||||||
Returns:
|
Returns:
|
||||||
decoded text string (utf8 str)
|
decoded text string (utf8 str)
|
||||||
"""
|
"""
|
||||||
if python_version < 3:
|
char = Charpy(bytes(reversed(buf)))
|
||||||
char = Charpy(''.join(reversed(buf)))
|
|
||||||
else:
|
|
||||||
char = Charpy(bytes(reversed(buf)))
|
|
||||||
# jump over the padding bits
|
# jump over the padding bits
|
||||||
# WNG: in case of 7 bits padding, we will have an @ at the end
|
# WNG: in case of 7 bits padding, we will have an @ at the end
|
||||||
chars_num = (8*len(buf)-off) // 7
|
chars_num = (8*len(buf)-off) // 7
|
||||||
|
|
|
@ -80,8 +80,6 @@ def encode_bcd(dig):
|
||||||
|
|
||||||
|
|
||||||
def decode_bcd(buf):
|
def decode_bcd(buf):
|
||||||
if python_version < 3:
|
|
||||||
buf = [ord(c) for c in buf]
|
|
||||||
ret = []
|
ret = []
|
||||||
for o in buf:
|
for o in buf:
|
||||||
msb, lsb = o>>4, o&0xf
|
msb, lsb = o>>4, o&0xf
|
||||||
|
@ -162,10 +160,7 @@ class BufBCD(Buf):
|
||||||
def decode(self):
|
def decode(self):
|
||||||
"""returns the encoded string of digits
|
"""returns the encoded string of digits
|
||||||
"""
|
"""
|
||||||
if python_version < 3:
|
num = self.get_val()
|
||||||
num = [ord(c) for c in self.get_val()]
|
|
||||||
else:
|
|
||||||
num = self.get_val()
|
|
||||||
ret = []
|
ret = []
|
||||||
for o in num:
|
for o in num:
|
||||||
msb, lsb = o>>4, o&0xf
|
msb, lsb = o>>4, o&0xf
|
||||||
|
@ -192,10 +187,7 @@ class BufBCD(Buf):
|
||||||
if len(ret) % 2:
|
if len(ret) % 2:
|
||||||
ret.append( self._filler )
|
ret.append( self._filler )
|
||||||
#
|
#
|
||||||
if python_version < 3:
|
self._val = bytes(map(lambda x,y:x+(y<<4), ret[::2], ret[1::2]))
|
||||||
self._val = ''.join([chr(c) for c in map(lambda x,y:x+(y<<4), ret[::2], ret[1::2])])
|
|
||||||
else:
|
|
||||||
self._val = bytes(map(lambda x,y:x+(y<<4), ret[::2], ret[1::2]))
|
|
||||||
|
|
||||||
def repr(self):
|
def repr(self):
|
||||||
# special hexdump representation
|
# special hexdump representation
|
||||||
|
@ -288,33 +280,18 @@ class PLMN(Buf):
|
||||||
"""returns the encoded string of digits
|
"""returns the encoded string of digits
|
||||||
"""
|
"""
|
||||||
plmn = []
|
plmn = []
|
||||||
if python_version < 3:
|
for b in self.get_val():
|
||||||
for c in self.get_val():
|
plmn.append(b>>4)
|
||||||
b = ord(c)
|
plmn.append(b&0xf)
|
||||||
plmn.append(b>>4)
|
try:
|
||||||
plmn.append(b&0xf)
|
if plmn[2] == 0xf:
|
||||||
try:
|
return '%i%i%i%i%i' % (plmn[1], plmn[0], plmn[3], plmn[5], plmn[4])
|
||||||
if plmn[2] == 0xf:
|
else:
|
||||||
return u'%i%i%i%i%i' % (plmn[1], plmn[0], plmn[3], plmn[5], plmn[4])
|
return '%i%i%i%i%i%i' % (plmn[1], plmn[0], plmn[3], plmn[5], plmn[4], plmn[2])
|
||||||
else:
|
except IndexError:
|
||||||
return u'%i%i%i%i%i%i' % (plmn[1], plmn[0], plmn[3], plmn[5], plmn[4], plmn[2])
|
# an invalid value has been set, _SAFE_STAT / DYN is probably disabled
|
||||||
except IndexError:
|
# for e.g. fuzzing purpose, but there is still need to not break here
|
||||||
# an invalid value has been set, _SAFE_STAT / DYN is probably disabled
|
return '0x%s' % hexlify(self.to_bytes()).decode('ascii')
|
||||||
# for e.g. fuzzing purpose, but there is still need to not break here
|
|
||||||
return '0x%s' % hexlify(self.to_bytes()).decode('ascii')
|
|
||||||
else:
|
|
||||||
for b in self.get_val():
|
|
||||||
plmn.append(b>>4)
|
|
||||||
plmn.append(b&0xf)
|
|
||||||
try:
|
|
||||||
if plmn[2] == 0xf:
|
|
||||||
return '%i%i%i%i%i' % (plmn[1], plmn[0], plmn[3], plmn[5], plmn[4])
|
|
||||||
else:
|
|
||||||
return '%i%i%i%i%i%i' % (plmn[1], plmn[0], plmn[3], plmn[5], plmn[4], plmn[2])
|
|
||||||
except IndexError:
|
|
||||||
# an invalid value has been set, _SAFE_STAT / DYN is probably disabled
|
|
||||||
# for e.g. fuzzing purpose, but there is still need to not break here
|
|
||||||
return '0x%s' % hexlify(self.to_bytes()).decode('ascii')
|
|
||||||
|
|
||||||
def encode(self, plmn=u'00101'):
|
def encode(self, plmn=u'00101'):
|
||||||
"""encode the given PLMN string and store the resulting buffer in
|
"""encode the given PLMN string and store the resulting buffer in
|
||||||
|
|
|
@ -5449,10 +5449,7 @@ def parse_GTP_SGSN(buf):
|
||||||
"""
|
"""
|
||||||
if len(buf) < 8:
|
if len(buf) < 8:
|
||||||
return None, ERR_GTP_BUF_TOO_SHORT
|
return None, ERR_GTP_BUF_TOO_SHORT
|
||||||
if python_version < 3:
|
typ = buf[1]
|
||||||
typ = ord(buf[1])
|
|
||||||
else:
|
|
||||||
typ = buf[1]
|
|
||||||
try:
|
try:
|
||||||
Msg = GTPDispatcherSGSN[typ]()
|
Msg = GTPDispatcherSGSN[typ]()
|
||||||
except KeyError:
|
except KeyError:
|
||||||
|
@ -5484,10 +5481,7 @@ def parse_GTP_GGSN(buf):
|
||||||
"""
|
"""
|
||||||
if len(buf) < 8:
|
if len(buf) < 8:
|
||||||
return None, ERR_GTP_BUF_TOO_SHORT
|
return None, ERR_GTP_BUF_TOO_SHORT
|
||||||
if python_version < 3:
|
typ = buf[1]
|
||||||
typ = ord(buf[1])
|
|
||||||
else:
|
|
||||||
typ = buf[1]
|
|
||||||
try:
|
try:
|
||||||
Msg = GTPDispatcherGGSN[typ]()
|
Msg = GTPDispatcherGGSN[typ]()
|
||||||
except KeyError:
|
except KeyError:
|
||||||
|
|
|
@ -6073,10 +6073,7 @@ def parse_PFCP(buf):
|
||||||
"""
|
"""
|
||||||
if len(buf) < 8:
|
if len(buf) < 8:
|
||||||
return None, ERR_PFCP_BUF_TOO_SHORT
|
return None, ERR_PFCP_BUF_TOO_SHORT
|
||||||
if python_version < 3:
|
typ = buf[1]
|
||||||
typ = ord(buf[1])
|
|
||||||
else:
|
|
||||||
typ = buf[1]
|
|
||||||
try:
|
try:
|
||||||
Msg = PFCPDispatcher[typ]()
|
Msg = PFCPDispatcher[typ]()
|
||||||
except KeyError:
|
except KeyError:
|
||||||
|
|
|
@ -7587,12 +7587,9 @@ def parse_GTPC(buf):
|
||||||
"""
|
"""
|
||||||
if len(buf) < 8:
|
if len(buf) < 8:
|
||||||
return None, ERR_GTPC_BUF_TOO_SHORT
|
return None, ERR_GTPC_BUF_TOO_SHORT
|
||||||
if python_version < 3:
|
typ = buf[1]
|
||||||
type = ord(buf[1])
|
|
||||||
else:
|
|
||||||
type = buf[1]
|
|
||||||
try:
|
try:
|
||||||
Msg = GTPCDispatcher[type]()
|
Msg = GTPCDispatcher[typ]()
|
||||||
except KeyError:
|
except KeyError:
|
||||||
return None, ERR_GTPC_TYPE_NONEXIST
|
return None, ERR_GTPC_TYPE_NONEXIST
|
||||||
try:
|
try:
|
||||||
|
|
|
@ -416,12 +416,9 @@ def parse_GTPU(buf):
|
||||||
"""
|
"""
|
||||||
if len(buf) < 8:
|
if len(buf) < 8:
|
||||||
return None, ERR_GTPU_BUF_TOO_SHORT
|
return None, ERR_GTPU_BUF_TOO_SHORT
|
||||||
if python_version < 3:
|
typ = buf[1]
|
||||||
type = ord(buf[1])
|
|
||||||
else:
|
|
||||||
type = buf[1]
|
|
||||||
try:
|
try:
|
||||||
Msg = GTPUDispatcher[type]()
|
Msg = GTPUDispatcher[typ]()
|
||||||
except KeyError:
|
except KeyError:
|
||||||
return None, ERR_GTPU_TYPE_NONEXIST
|
return None, ERR_GTPU_TYPE_NONEXIST
|
||||||
try:
|
try:
|
||||||
|
|
|
@ -737,10 +737,7 @@ class RRHandoverFailure(Layer3):
|
||||||
def get_tbf(l3msg):
|
def get_tbf(l3msg):
|
||||||
v = l3msg[2][0].get_val()
|
v = l3msg[2][0].get_val()
|
||||||
if isinstance(v, str_types):
|
if isinstance(v, str_types):
|
||||||
if python_version:
|
return v[0] & 1
|
||||||
return v[0] & 1
|
|
||||||
else:
|
|
||||||
return ord(v[0]) & 1
|
|
||||||
elif isinstance(v, list):
|
elif isinstance(v, list):
|
||||||
return v[-1]
|
return v[-1]
|
||||||
else:
|
else:
|
||||||
|
|
10
setup.py
10
setup.py
|
@ -8,14 +8,6 @@ from setuptools import setup, find_packages
|
||||||
VERSION = "0.5.5"
|
VERSION = "0.5.5"
|
||||||
|
|
||||||
|
|
||||||
# get dependencies according to the Python version
|
|
||||||
if sys.version_info[0] == 2:
|
|
||||||
# Python2 requires enum34
|
|
||||||
install_reqs = ['enum34']
|
|
||||||
else:
|
|
||||||
install_reqs = []
|
|
||||||
|
|
||||||
|
|
||||||
# get long description from the README.md
|
# get long description from the README.md
|
||||||
with open(os.path.join(os.path.dirname(__file__), "README.md")) as fd:
|
with open(os.path.join(os.path.dirname(__file__), "README.md")) as fd:
|
||||||
long_description = fd.read()
|
long_description = fd.read()
|
||||||
|
@ -56,7 +48,7 @@ setup(
|
||||||
],
|
],
|
||||||
|
|
||||||
# potential dependencies
|
# potential dependencies
|
||||||
install_requires=install_reqs,
|
install_requires=[],
|
||||||
|
|
||||||
# optional dependencies
|
# optional dependencies
|
||||||
extras_require={
|
extras_require={
|
||||||
|
|
|
@ -387,10 +387,7 @@ def test_elt_2():
|
||||||
t['res'].set_val(0)
|
t['res'].set_val(0)
|
||||||
t['L'].set_val(t['V'].get_bl())
|
t['L'].set_val(t['V'].get_bl())
|
||||||
|
|
||||||
if python_version < 3:
|
assert( repr(t) == "<TestTLV : <T [Tag] : 0b00000010 (Tag2)><F1 [Flag1] : 1><F2 [Flag2] : 1><res [Reserved] : 0><L [Length] : 104><V [Value] : b'default value'>>" )
|
||||||
assert( repr(t) == "<TestTLV : <T [Tag] : 0b00000010 (Tag2)><F1 [Flag1] : 1><F2 [Flag2] : 1><res [Reserved] : 0><L [Length] : 104><V [Value] : 'default value'>>" )
|
|
||||||
else:
|
|
||||||
assert( repr(t) == "<TestTLV : <T [Tag] : 0b00000010 (Tag2)><F1 [Flag1] : 1><F2 [Flag2] : 1><res [Reserved] : 0><L [Length] : 104><V [Value] : b'default value'>>" )
|
|
||||||
|
|
||||||
if _with_json:
|
if _with_json:
|
||||||
jv = t._to_jval()
|
jv = t._to_jval()
|
||||||
|
@ -400,10 +397,7 @@ def test_elt_2():
|
||||||
t.from_json(jso)
|
t.from_json(jso)
|
||||||
assert( t._to_jval() == jv )
|
assert( t._to_jval() == jv )
|
||||||
|
|
||||||
if python_version < 3:
|
assert( repr(t) == "<TestTLV : <T [Tag] : 0b00000010 (Tag2)><F1 [Flag1] : 1><F2 [Flag2] : 1><res [Reserved] : 0><L [Length] : 104><V [Value] : b'default value'>>" )
|
||||||
assert( repr(t) == "<TestTLV : <T [Tag] : 0b00000010 (Tag2)><F1 [Flag1] : 1><F2 [Flag2] : 1><res [Reserved] : 0><L [Length] : 104><V [Value] : 'default value'>>" )
|
|
||||||
else:
|
|
||||||
assert( repr(t) == "<TestTLV : <T [Tag] : 0b00000010 (Tag2)><F1 [Flag1] : 1><F2 [Flag2] : 1><res [Reserved] : 0><L [Length] : 104><V [Value] : b'default value'>>" )
|
|
||||||
|
|
||||||
|
|
||||||
class TestTLV2(Envelope):
|
class TestTLV2(Envelope):
|
||||||
|
|
Loading…
Reference in New Issue