CHOICE support including *_ws methods.
This commit is contained in:
parent
9272ea75fc
commit
3dc1e3bb67
|
@ -580,6 +580,122 @@ Specific attributes:
|
|||
ret = {ident[5:] : self._val[1]}
|
||||
return ret
|
||||
|
||||
###
|
||||
# conversion between internal value and ASN.1 OER/COER encoding
|
||||
###
|
||||
|
||||
def _to_oer(self):
|
||||
if self._val[0][:5] == '_ext_':
|
||||
# unknown extension re-encoding
|
||||
cl, _, tval = int(self._val[0][5:6]), int(self._val[0][6:7]), int(self._val[0][7:])
|
||||
try:
|
||||
return ASN1CodecOER.encode_open_type(
|
||||
ASN1CodecOER.TagClassLUT[cl], tval, self._val[1])
|
||||
except KeyError:
|
||||
ASN1OEREncodeErr("Unknown tag class: {0}".format(cl))
|
||||
else:
|
||||
# Known choice
|
||||
try:
|
||||
tag_class, tag = next(t for t, ident in self._cont_tags.items()
|
||||
if ident == self._val[0])
|
||||
tag_class = ASN1CodecOER.TagClassLUT[tag_class]
|
||||
except StopIteration:
|
||||
raise ASN1OEREncodeErr("Unknown tags for item {0}".format(
|
||||
self._val[0]))
|
||||
except KeyError:
|
||||
ASN1OEREncodeErr("Unknown tag class: {0}".format(tag_class))
|
||||
|
||||
# Tag
|
||||
temp = ASN1CodecOER.encode_tag(tag, tag_class)
|
||||
|
||||
# Value
|
||||
Cho = self._cont[self._val[0]]
|
||||
Cho._val = self._val[1]
|
||||
temp.extend(Cho._to_oer())
|
||||
return temp
|
||||
|
||||
def _to_oer_ws(self):
|
||||
if self._val[0][:5] == '_ext_':
|
||||
# unknown extension re-encoding
|
||||
cl, _, tval = int(self._val[0][5:6]), int(self._val[0][6:7]), int(self._val[0][7:])
|
||||
try:
|
||||
_gen= ASN1CodecOER.encode_open_type_ws(
|
||||
ASN1CodecOER.TagClassLUT[cl], tval, self._val[1])
|
||||
return Envelope(self._name, GEN=(_gen,))
|
||||
except KeyError:
|
||||
ASN1OEREncodeErr("Unknown tag class: {0}".format(cl))
|
||||
else:
|
||||
# Known choice
|
||||
try:
|
||||
tag_class, tag = next(t for t, ident in self._cont_tags.items()
|
||||
if ident == self._val[0])
|
||||
tag_class = ASN1CodecOER.TagClassLUT[tag_class]
|
||||
except StopIteration:
|
||||
raise ASN1OEREncodeErr("Unknown tags for item {0}".format(
|
||||
self._val[0]))
|
||||
except KeyError:
|
||||
ASN1OEREncodeErr("Unknown tag class: {0}".format(tag_class))
|
||||
|
||||
# Tag
|
||||
temp = ASN1CodecOER.encode_tag_ws(tag, tag_class)
|
||||
|
||||
# Value
|
||||
Cho = self._cont[self._val[0]]
|
||||
Cho._val = self._val[1]
|
||||
temp.extend(Cho._to_oer_ws())
|
||||
return Envelope(self._name, GEN=tuple(temp))
|
||||
|
||||
def _from_oer(self, char):
|
||||
tag_class, tag = ASN1CodecOER.decode_tag(char)
|
||||
tag_class = ASN1CodecOER.TagClassLUT[tag_class]
|
||||
try:
|
||||
ident = self._cont_tags[(tag_class, tag)]
|
||||
Cho = self._cont[ident]
|
||||
_par = Cho._parent
|
||||
Cho._parent = self
|
||||
Cho.from_oer(char)
|
||||
Cho._parent = _par
|
||||
self._val = (ident, Cho._val)
|
||||
return
|
||||
except KeyError:
|
||||
if self._ext is not None:
|
||||
# It's extension type
|
||||
# NOTE: There is no way how to resolve the primitive/constructed
|
||||
# flag in OER, as far as I understand it.
|
||||
ident = "_ext_{0}{1}{2}".format(tag_class, 0, tag)
|
||||
l_val = ASN1CodecOER.decode_length_determinant(char)
|
||||
val_bytes = char.get_bytes(l_val*8)
|
||||
self._val = (ident, val_bytes)
|
||||
|
||||
def _from_oer_ws(self, char):
|
||||
tag_class, tag, tag_struct = ASN1CodecOER.decode_tag_ws(char)
|
||||
tag_class = ASN1CodecOER.TagClassLUT[tag_class]
|
||||
_gen = [tag_struct]
|
||||
try:
|
||||
ident = self._cont_tags[(tag_class, tag)]
|
||||
Cho = self._cont[ident]
|
||||
_par = Cho._parent
|
||||
Cho._parent = self
|
||||
Cho.from_oer_ws(char)
|
||||
Cho._parent = _par
|
||||
_gen.extend(Cho._struct)
|
||||
self._struct = Envelope(self._name, GEN=tuple(_gen) )
|
||||
self._val = (ident, Cho._val)
|
||||
return
|
||||
except KeyError:
|
||||
if self._ext is not None:
|
||||
# It's extension type
|
||||
# NOTE: There is no way how to resolve the primitive/constructed
|
||||
# flag in OER, as far as I understand it.
|
||||
ident = "_ext_{0}{1}{2}".format(tag_class, 0, tag)
|
||||
l_val, l_struct = ASN1CodecOER.decode_length_determinant_ws(char)
|
||||
_gen.append(l_struct)
|
||||
val_struct = Buf('V', bl=l_val * 8)
|
||||
val_struct._from_char(char)
|
||||
_gen.append(val_struct)
|
||||
val = val_struct.get_val()
|
||||
self._struct = Envelope(self._name, GEN=tuple(_gen))
|
||||
self._val = (ident, val)
|
||||
|
||||
#------------------------------------------------------------------------------#
|
||||
# SEQUENCE and SET
|
||||
|
|
|
@ -1680,6 +1680,136 @@ class ASN1CodecOER(ASN1Codec):
|
|||
REAL_IEEE754_64_EXP_MIN = -1022
|
||||
REAL_IEEE754_64_EXP_MAX = 1023
|
||||
|
||||
# tag classes
|
||||
TagClassLUT = {0b00: TAG_UNIVERSAL,
|
||||
0b01: TAG_APPLICATION,
|
||||
0b10: TAG_CONTEXT_SPEC,
|
||||
0b11: TAG_PRIVATE,
|
||||
TAG_UNIVERSAL: 0b00,
|
||||
TAG_APPLICATION: 0b01,
|
||||
TAG_CONTEXT_SPEC: 0b10,
|
||||
TAG_PRIVATE: 0b11}
|
||||
|
||||
@classmethod
|
||||
def encode_tag(cls, tag, tag_class=TAG_CONTEXT_SPEC):
|
||||
try:
|
||||
tag_enc = [(T_UINT, cls.TagClassLUT[tag_class], 2)]
|
||||
except KeyError:
|
||||
ASN1OEREncodeErr("Unknown tag class: {0}".format(tag_class))
|
||||
|
||||
if tag < 63:
|
||||
# Tag encoded in 6b:
|
||||
tag_enc.append((T_UINT, tag, 6))
|
||||
else:
|
||||
# Multi-octet tag encoding
|
||||
tag_enc.append((T_UINT, 0b111111, 6)) # Initial octet
|
||||
tag_temp = []
|
||||
# Last byte (going in reverse)
|
||||
tag_temp.append((T_UINT, tag & 0b1111111, 7))
|
||||
tag_temp.append((T_UINT, 0, 1))
|
||||
tag = tag >> 7
|
||||
|
||||
# All the other bytes
|
||||
while tag.bit_length() > 0:
|
||||
tag_temp.append((T_UINT, tag & 0b1111111, 7))
|
||||
tag_temp.append((T_UINT, 1, 1))
|
||||
tag = tag >> 7
|
||||
|
||||
tag_enc.extend(tag_temp[::-1])
|
||||
|
||||
return tag_enc
|
||||
|
||||
@classmethod
|
||||
def encode_tag_ws(cls, tag, tag_class=TAG_CONTEXT_SPEC):
|
||||
try:
|
||||
tag_enc = [Uint('Class', val=cls.TagClassLUT[tag_class], bl=2)]
|
||||
except KeyError:
|
||||
ASN1OEREncodeErr("Unknown tag class: {0}".format(tag_class))
|
||||
|
||||
if tag < 63:
|
||||
# Tag encoded in 6b:
|
||||
tag_enc.append(Uint('Val', val=tag, bl=6))
|
||||
else:
|
||||
# Multi-octet tag encoding
|
||||
tag_enc.append(Uint('Multi-octet', val=0b111111, bl=6)) # Initial octet
|
||||
tag_temp = []
|
||||
# Last byte (going in reverse)
|
||||
tag_temp.append(Uint('Val', val=tag & 0b1111111, bl=7))
|
||||
tag_temp.append(Uint('Cont',val=0, bl=1))
|
||||
tag = tag >> 7
|
||||
|
||||
# All the other bytes
|
||||
while tag.bit_length() > 0:
|
||||
tag_temp.append(Uint('Val', val=tag & 0b1111111, bl=7))
|
||||
tag_temp.append(Uint('Cont', val=1, bl=1))
|
||||
tag = tag >> 7
|
||||
|
||||
tag_enc.extend(tag_temp[::-1])
|
||||
|
||||
return Envelope('T', GEN=tuple(tag_enc))
|
||||
|
||||
@classmethod
|
||||
def decode_tag(cls, char):
|
||||
tag_class = char.get_uint(2)
|
||||
try:
|
||||
tag_class = cls.TagClassLUT[tag_class]
|
||||
except KeyError:
|
||||
ASN1OEREncodeErr("Unknown tag class: {0}".format(tag_class))
|
||||
|
||||
first_octet = char.get_uint(6)
|
||||
if first_octet < 63:
|
||||
# Tag encoded in 6b:
|
||||
tag_val = first_octet
|
||||
else:
|
||||
# Multi-octet tag encoding
|
||||
tag_val = []
|
||||
while True:
|
||||
val_cont = char.get_uint(1)
|
||||
tag_val.extend(char.get_bitlist(7))
|
||||
if not val_cont:
|
||||
break
|
||||
|
||||
tag_val = bitlist_to_uint(tag_val)
|
||||
|
||||
return tag_class, tag_val
|
||||
|
||||
@classmethod
|
||||
def decode_tag_ws(cls, char):
|
||||
tag_class = Uint('Class', bl=2)
|
||||
tag_class._from_char(char)
|
||||
try:
|
||||
tag_class_val = cls.TagClassLUT[tag_class.get_val()]
|
||||
except KeyError:
|
||||
ASN1OEREncodeErr("Unknown tag class: {0}".format(tag_class.get_val()))
|
||||
|
||||
tag_struct = [tag_class]
|
||||
|
||||
first_val = Uint('Val', bl=6)
|
||||
first_val._from_char(char)
|
||||
tag_struct.append(first_val)
|
||||
|
||||
if first_val.get_val() < 63:
|
||||
# Tag encoded in 6b:
|
||||
tag_val = first_val.get_val()
|
||||
else:
|
||||
first_val._name = 'Multi-octet'
|
||||
tag_val = 0
|
||||
while True:
|
||||
tmp_cont = Uint('Cont', bl=1)
|
||||
tmp_cont._from_char(char)
|
||||
tmp_val = Uint('Val', bl=7)
|
||||
tmp_val._from_char(char)
|
||||
|
||||
tag_val = tag_val << 7
|
||||
tag_val = tag_val | tmp_val.get_val()
|
||||
tag_struct.append(tmp_cont)
|
||||
tag_struct.append(tmp_val)
|
||||
|
||||
if not tmp_cont.get_val():
|
||||
break
|
||||
|
||||
return tag_class_val, tag_val, Envelope('T', GEN=tuple(tag_struct))
|
||||
|
||||
@classmethod
|
||||
def encode_length_determinant(cls, length):
|
||||
# Always canonical (implementing non-canonical doesn't make sense)
|
||||
|
@ -1938,3 +2068,42 @@ class ASN1CodecOER(ASN1Codec):
|
|||
enum_val = val.get_val()
|
||||
t_enum.append(val)
|
||||
return enum_val, t_enum
|
||||
|
||||
@classmethod
|
||||
def encode_open_type(cls, tag_class, tag, value_bytes):
|
||||
tmp = cls.encode_tag(tag, tag_class)
|
||||
l_val = len(value_bytes)
|
||||
tmp.extend(cls.encode_length_determinant(l_val))
|
||||
tmp.append((T_BYTES, value_bytes, l_val*8))
|
||||
|
||||
return tmp
|
||||
|
||||
@classmethod
|
||||
def encode_open_type_ws(cls, tag_class, tag, value_bytes):
|
||||
_gen = [cls.encode_tag_ws(tag, tag_class),]
|
||||
l_val = len(value_bytes)
|
||||
_gen.append(cls.encode_length_determinant_ws(l_val))
|
||||
_gen.append(Buf('V', val=value_bytes, bl=l_val*8))
|
||||
return Envelope("Open-Type", GEN=tuple(_gen))
|
||||
|
||||
@classmethod
|
||||
def decode_open_type(cls, char):
|
||||
tag_class, tag = cls.decode_tag(char)
|
||||
l_val = cls.decode_length_determinant(char)
|
||||
val_bytes = char.get_bytes(l_val * 8)
|
||||
|
||||
return tag_class, tag, val_bytes
|
||||
|
||||
@classmethod
|
||||
def decode_open_type_ws(cls, char):
|
||||
tag_class, tag_val, tag_struct = cls.decode_tag_ws(char)
|
||||
_gen = [tag_struct]
|
||||
l_val, l_struct = cls.decode_length_determinant_ws(char)
|
||||
_gen.append(l_struct)
|
||||
val_struct = Buf('V', bl=l_val*8)
|
||||
val_struct._from_char(char)
|
||||
_gen.append(val_struct)
|
||||
val = val_struct.get_val()
|
||||
|
||||
return tag_class, tag_val, val, Envelope("Open-Type", GEN=tuple(_gen))
|
||||
|
||||
|
|
|
@ -1889,6 +1889,8 @@ def _test_rt_base():
|
|||
assert( Cho01.to_ber() == Cho01.to_ber_ws() == b'\xaa\x07\x7fP\x04\x02\x02\x07\xd0' )
|
||||
assert( Cho01.to_cer() == Cho01.to_cer_ws() == b'\xaa\x80\x7fP\x80\x02\x02\x07\xd0\x00\x00\x00\x00' )
|
||||
assert( Cho01.to_der() == Cho01.to_der_ws() == b'\xaa\x07\x7fP\x04\x02\x02\x07\xd0' )
|
||||
assert( Cho01.to_oer() == Cho01.to_oer_ws() == b'\x8a\x02\x07\xd0' )
|
||||
assert( Cho01.to_coer() == Cho01.to_coer_ws() == b'\x8a\x02\x07\xd0' )
|
||||
# decoding
|
||||
Cho01.from_aper(b' \x02\x07\xcf')
|
||||
assert( Cho01._val == ('int', 2000) )
|
||||
|
@ -1915,7 +1917,16 @@ def _test_rt_base():
|
|||
assert( Cho01._to_jval() == {'int': 2000} )
|
||||
Cho01.from_jer('{"int": 2000}')
|
||||
assert( Cho01._val == ('int', 2000) )
|
||||
|
||||
# OER/COER
|
||||
Cho01.from_oer(b'\x8a\x02\x07\xd0')
|
||||
assert( Cho01._val == ('int', 2000) )
|
||||
Cho01.from_oer_ws(b'\x8a\x02\x07\xd0')
|
||||
assert( Cho01._val == ('int', 2000) )
|
||||
Cho01.from_coer(b'\x8a\x02\x07\xd0')
|
||||
assert( Cho01._val == ('int', 2000) )
|
||||
Cho01.from_coer_ws(b'\x8a\x02\x07\xd0')
|
||||
assert( Cho01._val == ('int', 2000) )
|
||||
|
||||
# Seq01 ::= SEQUENCE { --check test_asn1rt_mod.asn file-- }
|
||||
Seq01 = Mod['Seq01']
|
||||
Seq01.from_asn1('{boo FALSE, int 1024, enu cake}')
|
||||
|
|
Loading…
Reference in New Issue