asn1c: finally brings the ASN1 tokenizer to life

this is however not a great success as this tokenization with the Python regexp scanner is slower than the current complete ASN.1 compiler...
This commit is contained in:
mitshell 2018-05-15 13:36:33 +02:00 committed by GitHub
parent 46c81abdd5
commit 0f46a8f3bf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 371 additions and 79 deletions

View File

@ -22,14 +22,16 @@
# *
# *--------------------------------------------------------
# * File Name : pycrate_asn1c/tokenizer.py
# * Created : 2018-03-12
# * Created : 2018-03-13
# * Authors : Benoit Michau
# *--------------------------------------------------------
#*/
import re
from .err import *
from pycrate_asn1c.err import *
from pycrate_asn1c.utils import *
from pycrate_asn1c.dictobj import *
# white space and new line
_NL = '\x0a\x0b\x0c\x0d'
@ -87,8 +89,8 @@ REScannerNTypes = '|'.join((
REScannerInt = '([+\-](?:[%s]{0,})){0,1}[0-9]{1,}' % _SNL
# real int dec exp
REScannerReal = '(%s){1}(?:\.([0-9]{0,})){0,1}(?:(?:[%s]{0,})[eE](%s)){0,1}'\
% (REScannerInt, _SNL, REScannerInt)
REScannerReal = '(%s){1}(?:\.([0-9]{1,})){0,1}(?:[eE](%s)){0,1}'\
% (REScannerInt, REScannerInt)
# bstring
REScannerBStr = '\'[%s01]{0,}\'B' % _SNL
@ -186,6 +188,12 @@ TOK_ARRO = 'ARRO' # @
TOK_EXCL = 'EXCL' # !
TOKS_OBJS = {TOK_NULL, TOK_NTYPE, TOK_HID, TOK_ID, TOK_LID}
TOKS_TYPES = {TOK_NULL, TOK_NTYPE, TOK_HID, TOK_ID}
TOKS_OBJS_EXT = {TOK_NULL, TOK_NTYPE, TOK_HID, TOK_ID, TOK_LID, TOK_CLAID}
TOKS_TYPES_EXT = {TOK_HID, TOK_ID, TOK_CLAID}
REScannerASN1 = re.Scanner([
#
(r'(--).*?([%s]|(--)|$)' % _NL, lambda s, t: (TOK_CMT, t)),
@ -242,7 +250,7 @@ REScannerASN1 = re.Scanner([
(r'MIN%s' % _EXC, lambda s, t: TOK_MIN),
(r'MINUS-INFINITY%s' % _EXC, lambda s, t: TOK_MINF),
(r'NOT-A-NUMBER%s' % _EXC, lambda s, t: TOK_NAN),
(r'NULL%s' % _EXC, lambda s, t: TOK_NULL),
(r'NULL%s' % _EXC, lambda s, t: (TOK_NULL, t)),
(r'OF%s' % _EXC, lambda s, t: TOK_OF),
(r'OPTIONAL%s' % _EXC, lambda s, t: TOK_OPT),
(r'PATTERN%s' % _EXC, lambda s, t: TOK_PAT),
@ -263,7 +271,7 @@ REScannerASN1 = re.Scanner([
#
(r'(%s)%s' % (REScannerNTypes, _EXC), lambda s, t: (TOK_NTYPE, t)),
(r'&[a-zA-Z](?:\-{0,1}[a-zA-Z0-9]{1,}){0,}%s' % _EXC, lambda s, t: (TOK_CLAID, t)),
(r'[A-Z](?:\-{0,1}[a-zA-Z0-9]{1,}){0,}%s' % _EXC, lambda s, t: (TOK_HID, t)),
(r'[A-Z](?:\-{0,1}[A-Z0-9]{1,}){0,}%s' % _EXC, lambda s, t: (TOK_HID, t)),
(r'[A-Z](?:\-{0,1}[a-zA-Z0-9]{1,}){0,}%s' % _EXC, lambda s, t: (TOK_ID, t)),
(r'[a-z](?:\-{0,1}[a-zA-Z0-9]{1,}){0,}%s' % _EXC, lambda s, t: (TOK_LID, t)),
#
@ -278,12 +286,14 @@ class Tokenizer(object):
ASN.1 comments
"""
REPR_OFF = 10
GROUP = {
TOK_PARO : TOK_PARC,
TOK_DBRAO : TOK_DBRAC,
TOK_BRAO : TOK_BRAC,
TOK_CBRAO : TOK_CBRAC,
TOK_BEG : TOK_END
TOK_PARO : TOK_PARC, # ( )
TOK_DBRAO : TOK_DBRAC, # [[ ]]
TOK_BRAO : TOK_BRAC, # [ ]
TOK_CBRAO : TOK_CBRAC, # { }
TOK_BEG : TOK_END # BEGIN END
}
def __init__(self, tokens=[]):
@ -293,6 +303,10 @@ class Tokenizer(object):
# stack of previous cursor value
self.curp = []
def __repr__(self):
cur = self.get_cur()
return repr(self.toks[cur-self.REPR_OFF:cur+self.REPR_OFF])
def get_cur(self):
return self.cur
@ -360,7 +374,7 @@ class Tokenizer(object):
self.cur += 1
return self.__class__(self.toks[max(0, curp):self.cur-1])
def get_group(self):
def get_group(self, wbnd=True):
tok, curp = self.toks[self.cur], self.cur
if tok in self.GROUP:
op, clo = tok, self.GROUP[tok]
@ -378,7 +392,10 @@ class Tokenizer(object):
if depth == 0:
break
self.curp.append(curp)
return self.__class__(self.toks[1+curp:self.cur])
if wbnd:
return self.__class__(self.toks[curp:1+self.cur])
else:
return self.__class__(self.toks[1+curp:self.cur])
def get_comps(self, sep=TOK_COM):
comps, curp, curlast = [], self.cur, self.cur
@ -414,16 +431,12 @@ class Tokenizer(object):
# ObjName ObjParam ObjType ::= ObjVal
# ObjName ObjParam ObjType ::= ObjSet
# ObjName ObjParam ::= ObjType
# ObjName MACRO ::= BEGIN .. END
# ObjName MACRO ::= BEGIN ... END
#
# ASN.1 object type structure:
# ObjTags ObjType ObjParamAct ObjConsts ObjCont
# CLASS ObjParamAct ObjCont WITH SYNTAX ObjSynt
from .utils import *
from .dictobj import *
def tokenize_text(text=u'', **kwargs):
"""tokenize the provided textual ASN.1 specification
"""
@ -438,6 +451,7 @@ def tokenize_text(text=u'', **kwargs):
asnlog('%i remaining chars at the end of spec' % len(rest))
# build the handler for the tokens
Tok = Tokenizer(toks)
modules = ASN1Dict()
#
# scan the tokens for all ASN.1 modules defined
while True:
@ -454,21 +468,8 @@ def tokenize_text(text=u'', **kwargs):
module['_name_'] = name
if oid:
module['_oidtok_'] = oid
'''
# using a dummy OID object to parse the value
OidDummy = OID()
_path_stack(['val'])
try:
rest = OidDummy.parse_value(oid)
except Exception as err:
_path_pop()
asnlog('[proc] module {0}: invalid OID value {1!r}, ignoring it'\
.format(name, oid))
module['_oid_'] = []
else:
_path_pop()
module['_oid_'] = OidDummy._val
'''
# TODO: parse the OID value
module['_oid_'] = []
else:
module['_oidtok_'] = []
@ -495,8 +496,8 @@ def tokenize_text(text=u'', **kwargs):
# 3) scan tokens for BEGIN - END block
if Tok.get_tok() != TOK_BEG:
raise(ASN1ProcTextErr('missing BEGIN statement'))
TokDef = Tok.get_group()
module['_tok_'] = Tok
TokDef = Tok.get_group(wbnd=False)
module['_tok_'] = TokDef
#asnlog('[proc] module %s: %i tokens' % (name, TokDef.count()))
if Tok.has_next():
Tok.get_next()
@ -524,19 +525,33 @@ def tokenize_text(text=u'', **kwargs):
TokDef.undo()
#
# 6) scan the module definition block for objects
module['_obj_'] = scan_module_obj(TokDef)
objs = scan_objs(TokDef)
#
# 7) init objects types for the module
module['_obj_'] = []
module['_obj_'] = objs
module['_type_'] = []
module['_set_'] = []
module['_val_'] = []
module['_class_'] = []
module['_param_'] = []
#
# 7) scan the asnblock for assignments and initialize all ASN.1 objects
# TODO
#return module
for obj in objs.values():
if obj['mode'] == MODE_TYPE:
module['_type_'] = obj['name']
elif obj['mode'] == MODE_SET:
module['_set_'] = obj['name']
elif obj['mode'] == MODE_VALUE:
module['_val_'] = obj['name']
else:
assert()
if obj['typedef']['type'] == 'CLASS':
module['_class_'] = obj['name']
if obj['param']:
module['_param_'] = obj['name']
#
modules[name] = module
#
return modules
def scan_module_decl(Tok):
@ -545,14 +560,15 @@ def scan_module_decl(Tok):
# scan ModuleIdentifier
tok = Tok.get_next()
if tok[0] not in (TOK_HID, TOK_ID):
raise(ASN1ProcTextErr('invalid module name, %r' % tok))
raise(ASN1ProcTextErr('invalid module declaration, invalid name %r' % tok))
name = tok[1]
if Tok.has_next() and Tok.get_next() == TOK_CBRAO:
oid = Tok.get_group()
if Tok.has_next():
if Tok.get_next() == TOK_CBRAO:
oid = Tok.get_group()
else:
raise(ASN1ProcTextErr('invalid module declaration'))
else:
oid = None
if Tok.has_next():
raise(ASN1ProcTextErr('invalid module declaration'))
return name, oid
@ -561,65 +577,68 @@ def scan_module_opt(Tok):
"""
# scan TagDefault and ExtensionDefault
# TODO: scan EncodingReferenceDefault first
if Tok.has_next() and Tok.get_next() in (TOK_AUTO, TOK_TEXP, TOK_TIMP):
tag, ext = None, False
if not Tok.has_next():
return tag, next
tok = Tok.get_next()
if tok in (TOK_AUTO, TOK_TEXP, TOK_TIMP):
tag = Tok.get_tok()
if Tok.get_next() != TOK_TAGS:
raise(ASN1ProcTextErr('missing TAGS keyword'))
else:
tag = None
if Tok.has_next() and Tok.get_next() == TOK_EXTI:
raise(ASN1ProcTextErr('invalid module options, missing TAGS keyword'))
if not Tok.has_next():
return tag, ext
tok = Tok.get_next()
if tok == TOK_EXTI:
ext = True
else:
ext = False
raise(ASN1ProcTextErr('invalid module options'))
if Tok.has_next():
raise(ASN1ProcTextErr('invalid module declaration'))
raise(ASN1ProcTextErr('invalid module options'))
return tag, ext
def scan_module_exp(Tok):
"""consume the tokens searching for exports declaration
"""consume the tokens searching for module exports declaration
"""
tok = Tok.get_next()
if tok == TOK_ALL:
if Tok.get_next() != TOK_SCOL:
raise(ASN1ProcTextErr('invalid EXPORT statement'))
raise(ASN1ProcTextErr('invalid module export'))
else:
return None
elif tok[0] in (TOK_NTYPE, TOK_HID, TOK_ID, TOK_LID):
elif tok[0] in TOKS_OBJS:
exp = []
while tok != TOK_SCOL:
if tok[0] in (TOK_NTYPE, TOK_HID, TOK_ID, TOK_LID):
if tok[0] in TOKS_OBJS:
exp.append(tok[1])
elif tok == TOK_CBRAO:
tok = Tok.get_next()
if tok != TOK_CBRAC:
raise(ASN1ProcTextErr('invalid EXPORT statement'))
raise(ASN1ProcTextErr('invalid module export, parameterized reference'))
elif tok != TOK_COM:
raise(ASN1ProcTextErr('invalid EXPORT statement'))
raise(ASN1ProcTextErr('invalid module export'))
tok = Tok.get_next()
return exp
else:
raise(ASN1ProcTextErr('invalid EXPORT statement'))
raise(ASN1ProcTextErr('invalid module export'))
def scan_module_imp(Tok):
"""consume the tokens searching for imports declaration
"""consume the tokens searching for module imports declaration
"""
# TODO: check how to deal with OID value ref after "FORM $modulename",
# to not mix it with with a symbol from the next imported module
sym, imp = [], []
tok = Tok.get_next()
while tok != TOK_SCOL:
if tok[0] in (TOK_NTYPE, TOK_HID, TOK_ID, TOK_LID):
if tok[0] in TOKS_OBJS:
sym.append(tok[1])
elif tok == TOK_CBRAO:
# parameterized ref: ignoring it
if Tok.get_next() != TOK_CBRAC:
raise(ASN1ProcTextErr('invalid IMPORT statement'))
raise(ASN1ProcTextErr('invalid module import, parameterized reference'))
elif tok == TOK_FROM:
tok = Tok.get_next()
if tok[0] not in (TOK_HID, TOK_ID) or not sym:
raise(ASN1ProcTextErr('invalid IMPORT statement'))
raise(ASN1ProcTextErr('invalid module import'))
imp.append({'name': tok[1], 'sym': sym})
sym, rev, tok = [], True, Tok.get_next()
if tok == TOK_CBRAO:
@ -629,14 +648,16 @@ def scan_module_imp(Tok):
rev = False
elif tok[0] == TOK_LID:
asnlog('imported module OID reference is ambiguous: %s' % tok[1])
asnlog('imported module OID reference is ambiguous, %s' % tok[1])
# will be dealt with at the end
if rev:
Tok.undo()
elif tok != TOK_COM:
raise(ASN1ProcTextErr('invalid IMPORT statement'))
raise(ASN1ProcTextErr('invalid module import'))
tok = Tok.get_next()
if sym:
if len(sym) == 1 and sym[0][0].islower():
asnlog('imported module ambiguous OID references were actually OID references')
# this means all those ambiguous OID ref were actually OID ref for
# the previous module instead of imported symbols
for i in range(len(imp)-1):
@ -646,30 +667,301 @@ def scan_module_imp(Tok):
del imp[i+1]['sym'][0]
imp[-1]['oidtok'] = sym[0]
else:
raise(ASN1ProcTextErr('invalid IMPORT statement'))
raise(ASN1ProcTextErr('invalid module import'))
return imp
def scan_module_obj(Tok):
def scan_objs(Tok):
"""consume the tokens searching for objects declaration
"""
obj = ASN1Dict()
return obj
objs = ASN1Dict()
while Tok.has_next():
objdict = scan_obj(Tok)
if objdict['name'] in objs:
asnlog('multiple definitions of %s' % objdict['name'])
objs[objdict['name']] = objdict
return objs
def scan_obj(Tok):
"""consume the tokens searching for the complete declaration of a single object
"""
# ASN.1 object structure:
# ObjName ObjParam ObjType ::= ObjVal
# ObjName ObjParam ObjType ::= ObjSet
# ObjName ObjParam ::= ObjType
# ObjName MACRO ::= BEGIN ... END
#
param, typedef, mode, val = None, {}, None, None
tok = Tok.get_next()
if tok[0] == TOK_LID:
mode = MODE_VALUE
elif tok[0] in (TOK_ID, TOK_HID):
mode = MODE_TYPE
else:
raise(ASN1ProcTextErr('invalid object name, %r' % (tok, )))
name = tok[1]
tok = Tok.get_next()
if tok == TOK_CBRAO:
# formal parameters
param = Tok.get_group()
tok = Tok.get_next()
if tok == TOK_BRAO or tok[0] in TOKS_TYPES:
if tok[1] == 'MACRO':
# MACRO
if Tok.get_next() != TOK_ASSI or Tok.get_next() != TOK_BEG:
raise(ASN1ProcTextErr('%s invalid MACRO definition' % name))
typedef['type'] = 'MACRO'
typedef['cont'] = Tok.get_group()
else:
# object value or set
if mode == MODE_TYPE:
mode = MODE_SET
# object type will be rescanned in scan_typedef()
Tok.undo()
try:
typedef = scan_typedef(Tok)
except Exception as Err:
Err.args = ('%s (%s) invalid definition, %s' % (name, mode, Err.args[0]), )
raise(Err)
if Tok.get_next() != TOK_ASSI:
raise(ASN1ProcTextErr('%s (%s) invalid definition' % (name, mode)))
try:
val = scan_val(Tok)
except Exception as Err:
Err.args = ('%s (%s) invalid definition, %s' % (name, mode, Err.args[0]), )
raise(Err)
elif tok == TOK_ASSI:
# object type
if mode == MODE_VALUE:
raise(ASN1ProcTextErr('%s (%s) invalid definition' % (name, mode)))
try:
typedef = scan_typedef(Tok)
except Exception as Err:
Err.args = ('%s (%s) invalid definition, %s' % (name, mode, Err.args[0]), )
raise(Err)
else:
raise(ASN1ProcTextErr('%s invalid definition' % name))
return {'name': name, 'param': param, 'typedef': typedef, 'mode': mode, 'val': val}
def scan_val(Tok):
"""consume the tokens searching for the complete value of a single object
"""
if Tok.get_next() == TOK_CBRAO:
val = [Tok.get_group()]
return val
else:
val = [Tok.get_tok()]
while Tok.has_next():
tok = Tok.get_next()
if tok == TOK_DOT:
val.append(tok)
tok = Tok.get_next()
if tok[0] not in TOKS_OBJS_EXT:
raise(ASN1ProcTextErr('invalid value definition'))
val.append(tok)
elif tok == TOK_COL:
val.append(tok)
val.extend( scan_val(Tok) )
elif tok == TOK_CBRAO:
# parameterized value
val.append(Tok.get_group())
return val
else:
Tok.undo()
return val
return val
def scan_typedef(Tok):
"""consume the tokens searching for the complete type declaration of a single object
"""
# ASN.1 object type structure:
# ObjTags ObjType ObjParamAct ObjConsts [OF] ObjCont
# CLASS ObjCont WITH SYNTAX ObjSynt
#
typedict = {'tags': [], 'type': None}
tok = Tok.get_next()
if tok == TOK_BRAO:
# tag(s)
typedict['tags'] = scan_tags(Tok)
tok = Tok.get_next()
if tok[0] in TOKS_TYPES:
typedict['type'] = scan_type(Tok)
if not Tok.has_next():
return typedict
else:
try:
if typedict['type'] == ['CLASS']:
_scan_typedef_class(Tok, typedict)
elif typedict['type'] in (['SET'], ['SEQUENCE']):
_scan_typedef_seq(Tok, typedict)
else:
_scan_typedef_std(Tok, typedict)
except Exception as Err:
Err.args = ('invalid type definition, %s' % Err.args[0], )
raise(Err)
return typedict
else:
raise(ASN1ProcTextErr('invalid type definition'))
def scan_tags(Tok):
tags = []
while True:
tags.append( scan_tag(Tok) )
if Tok.get_next() != TOK_BRAO:
Tok.undo()
return tags
def scan_tag(Tok):
tag = {'val': Tok.get_group(), 'mode': None}
tok = Tok.get_next()
if tok in (TOK_TEXP, TOK_TIMP):
tag['mode'] = tok
else:
Tok.undo()
return tag
def scan_type(Tok):
typ = [Tok.get_tok()[1]]
if Tok.has_next():
tok = Tok.get_next()
while tok == TOK_DOT:
tok = Tok.get_next()
if tok[0] not in TOKS_TYPES_EXT:
raise(ASN1ProcTextErr('invalid composite type definition'))
typ.append(tok[1])
if Tok.has_next():
tok = Tok.get_next()
else:
return typ
Tok.undo()
return typ
def _scan_typedef_class(Tok, typedict):
# CLASS ObjCont WITH SYNTAX ObjSynt
tok = Tok.get_next()
if tok != TOK_CBRAO:
raise(ASN1ProcTextErr('invalid CLASS object definition'))
typedict['cont'] = Tok.get_group()
if Tok.has_next():
tok = Tok.get_next()
if tok == TOK_WSYN:
tok = Tok.get_next()
if tok != TOK_CBRAO:
raise(ASN1ProcTextErr('invalid CLASS object SYNTAX definition'))
typedict['synt'] = Tok.get_group()
else:
Tok.undo()
def _scan_typedef_seq(Tok, typedict):
# SEQUENCE / SET ObjCont ObjConsts
# SEQUENCE / SET ObjConsts [SIZE (...)] OF ObjType
tok = Tok.get_next()
if tok == TOK_CBRAO:
# ObjCont
typedict['cont'] = Tok.get_group()
if Tok.has_next():
tok = Tok.get_next()
if tok == TOK_PARO:
typedict['const'] = scan_const(Tok)
else:
Tok.undo()
elif tok in (TOK_PARO, TOK_SIZE):
if tok == TOK_SIZE:
# special case of the SIZE constraint outside of a constraint notation
if Tok.get_next() != TOK_PARO:
raise(ASN1ProcTextErr('invalid SEQ / SET OF SIZE definition'))
typedict['const_sz'] = scan_const(Tok)
else:
# ObjConsts
typedict['const'] = scan_const(Tok)
tok = Tok.get_next()
if tok != TOK_OF:
raise(ASN1ProcTextErr('invalid SEQ / SET OF definition'))
_scan_typedef_seqof(Tok, typedict)
elif tok == TOK_OF:
# OF
_scan_typedef_seqof(Tok, typedict)
else:
raise(ASN1ProcTextErr('invalid SEQ / SET definition'))
def _scan_typedef_seqof(Tok, typedict):
typedict['type'][0] = typedict['type'][0] + ' OF'
# can have a component name
tok = Tok.get_next()
if tok[0] == TOK_LID:
# component name
typedict['cont_name'] = tok[1]
else:
Tok.undo()
try:
typedict['cont'] = scan_typedef(Tok)
except Exception as Err:
Err.args = ('invalid SEQ / SET OF definition, %s' % Err.args[0], )
raise(Err)
def _scan_typedef_std(Tok, typedict):
# ObjParamAct | ObjCont ObjConsts
tok = Tok.get_next()
if tok == TOK_CBRAO:
typedict['cont'] = Tok.get_group()
if not Tok.has_next():
return
tok = Tok.get_next()
if tok == TOK_PARO:
typedict['const'] = scan_const(Tok)
else:
Tok.undo()
def scan_const(Tok):
const = []
while True:
const.append( Tok.get_group() )
if Tok.has_next():
if Tok.get_next() != TOK_PARO:
Tok.undo()
return const
else:
return const
def test():
import os
from .specdir import ASN_SPECS
from pycrate_asn1c.specdir import ASN_SPECS
p = '/home/user/pycrate/pycrate_asn1dir/'
M = ASN1Dict()
for S in ASN_SPECS.values():
if isinstance(S, (list, tuple)):
S = S[0]
for fn in os.listdir( '%s%s/' % (p, S)):
if fn[-4:] == '.asn':
fp = '%s%s/%s' % (p, S, fn)
print(fp)
#mod = tokenize_text(open(fp).read().decode('utf-8'))
mod = tokenize_text(open(fp).read())
if S != 'IETF_SNMP':
for fn in os.listdir( '%s%s/' % (p, S)):
if fn[-4:] == '.asn':
fp = '%s%s/%s' % (p, S, fn)
print(fp)
if python_version < 3:
mods = tokenize_text(open(fp).read().decode('utf-8'))
else:
mods = tokenize_text(open(fp).read())
for modname, moddict in mods.items():
M[modname] = moddict
return M
if __name__ == '__main__':
import sys
M = test()
sys.exit(0)