csn1: implement a JSON api for CSN.1 objects

This commit is contained in:
mich 2018-12-12 11:00:22 +01:00
parent 432626e730
commit d3aa7249a0
2 changed files with 316 additions and 10 deletions

View File

@ -32,6 +32,16 @@ from pycrate_core.charpy import *
from .utils import *
try:
from json import JSONEncoder, JSONDecoder, JSONDecodeError
except ImportError:
_with_json = False
else:
_with_json = True
JsonEnc = JSONEncoder(sort_keys=True, indent=1)
JsonDec = JSONDecoder()
#------------------------------------------------------------------------------#
# CSN.1 runtime
#------------------------------------------------------------------------------#
@ -367,20 +377,16 @@ class CSN1Obj(Element):
assert( isinstance(self._val, list) )
if num > 1:
assert( len(self._val) == num )
_num = self._num
_val = self._val
self._num = 1
for val in _val:
self._val = val
sval = self._val
for v in sval:
self._val = v
ret.extend( self._to_pack_obj() )
self._num = _num
self._val = _val
self._val = sval
else:
assert()
#
if self._par is None:
_root_obj = root_obj
#
return ret
def get_bl(self):
@ -428,6 +434,101 @@ class CSN1Obj(Element):
are cloned except value
"""
raise(CSN1Err('not implemented'))
if _with_json:
def _from_jval(self, val):
raise(CSN1Err('not implemented'))
def _from_jval_csn(self, val):
if self._name:
if not isinstance(val, dict) or len(val) != 1:
raise(CSN1Err('{0}: invalid json value, {1}'.format(self._name, val)))
name, val = tuple(val.items())[0]
if name != self._name:
raise(CSN1Err('{0}: invalid json value, {1}'.format(self._name, val)))
#
global _root_obj
#
if self._par is None:
root_obj = _root_obj
_root_obj = self
#
if isinstance(self._num, tuple):
# dynamic number of repetitions
num = self._resolve_ref(self._num)
else:
# static number of repetitions
num = self._num
#
if num == 1:
self._from_jval(val)
elif num == -1 or num > 1:
if not isinstance(val, list):
raise(CSN1Err('{0}: invalid json value, {1}'.format(self._name, val)))
sval = []
for i, v in enumerate(val):
self._from_jval(v)
sval.append( self._val )
if num > 1 and i >= num:
raise(CSN1Err('{0}: invalid json value, {1}'.format(self._name, val)))
self._val = sval
else:
assert()
#
if self._par is None:
_root_obj = root_obj
def from_json(self, txt):
try:
val = JsonDec.decode(txt)
except JSONDecodeError as err:
raise(CSN1Err('{0}: invalid json, {1}'.format(self._name, err)))
self._from_jval_csn(val)
def _to_jval(self):
raise(CSN1Err('not implemented'))
def _to_jval_csn(self):
global _root_obj
#
if self._par is None:
root_obj = _root_obj
_root_obj = self
#
if isinstance(self._num, tuple):
# dynamic number of repetitions
num = self._resolve_ref(self._num)
else:
# static number of repetitions
num = self._num
#
ret = None
if num == 1:
ret = self._to_jval()
elif num == -1 or num > 1:
# self._val is a list
assert( isinstance(self._val, list) )
if num > 1:
assert( len(self._val) == num )
snum, sval, ret = self._num, self._val, []
self._num = 1
for v in sval:
self._val = v
ret.append( self._to_jval() )
self._num, self._val = snum, sval
else:
assert()
#
if self._par is None:
_root_obj = root_obj
if self._name:
return {self._name: ret}
else:
return ret
def to_json(self):
return JsonEnc.encode(self._to_jval_csn())
class CSN1Bit(CSN1Obj):
@ -513,7 +614,7 @@ class CSN1Bit(CSN1Obj):
if self._type == CSN1T_UINT:
return [(CSN1T_UINT, self._val, bit)]
else:
return [(CSN1T_UINT, int(self._val, 2), bit)]
return [(CSN1T_BSTR, int(self._val, 2), bit)]
def clone(self):
kw = self._clone_get_kw()
@ -524,6 +625,45 @@ class CSN1Bit(CSN1Obj):
if self._dic != self.__class__._dic:
kw['dic'] = self._dic
return self.__class__(**kw)
if _with_json:
def _from_jval(self, val):
if isinstance(self._bit, tuple):
bit = self._resolve_ref(self._bit)
elif self._bit == -1:
# consumes all the remaining bits
bit = len(val)
self._valbl = bit
else:
# static number of bits
bit = self._bit
if len(val) != bit:
raise(CSN1Err('{0}: invalid json value, {1!r}'.format(self._name, val)))
if self._type == CSN1T_UINT:
try:
self._val = bitstr_to_uint(val)
except Exception:
raise(CSN1Err('{0}: invalid json value, {1!r}'.format(self._name, val)))
else:
self._val = val
def _to_jval(self):
if isinstance(self._bit, tuple):
bit = self._resolve_ref(self._bit)
elif self._bit == -1:
if hasattr(self, '_valbl'):
bit = self._valbl
else:
bit = 0
else:
bit = self._bit
self._off = (self._off + bit) % 8
if self._type == CSN1T_BSTR:
return self._val
else:
return uint_to_bitstr(self._val, bit)
class CSN1Val(CSN1Obj):
@ -628,6 +768,17 @@ class CSN1Val(CSN1Obj):
kw = self._clone_get_kw()
kw['val'] = self._stat
return self.__class__(**kw)
if _with_json:
def _from_jval(self, val):
if val != self._stat:
raise(CSN1Err('{0}: invalid json value, {1!r}'.format(self._name, val)))
else:
self._val = self._stat
def _to_jval(self):
return self._stat
class CSN1Ref(CSN1Obj):
@ -692,6 +843,21 @@ class CSN1Ref(CSN1Obj):
kw = self._clone_get_kw()
kw['obj'] = self._obj.clone()
return self.__class__(**kw)
if _with_json:
def _from_jval(self, val):
obj_val = self._obj._val
self._obj._from_jval_csn(val)
self._val = self._obj._val
self._obj._val = obj_val
def _to_jval(self):
obj_val = self._obj._val
self._obj._val = self._val
ret = self._obj._to_jval_csn()
self._obj._val = obj_val
return ret
class CSN1List(CSN1Obj):
@ -786,6 +952,36 @@ class CSN1List(CSN1Obj):
if self._trunc:
kw['trunc'] = True
return self.__class__(**kw)
if _with_json:
def _from_jval(self, val):
if not isinstance(val, list):
raise(CSN1Err('{0}: invalid json value, {1!r}'.format(self._name, val)))
self._val = []
for i, Obj in enumerate(self._list):
obj_val = Obj._val
try:
Obj._from_jval_csn(val[i])
except IndexError:
Obj._val = obj_val
if self._trunc:
break
else:
raise(CSN1Err('{0}: invalid json value, {1!r}'.format(self._name, val)))
else:
self._val.append(Obj._val)
Obj._val = obj_val
def _to_jval(self):
ret = []
for i, val in enumerate(self._val):
Obj = self._list[i]
obj_val = Obj._val
Obj._val = val
ret.append( Obj._to_jval_csn() )
Obj._val = obj_val
return ret
class CSN1Alt(CSN1Obj):
@ -983,6 +1179,54 @@ class CSN1Alt(CSN1Obj):
clo._kord = self._kord
clo._pad_gsm = self._pad_gsm
return clo
if _with_json:
def _from_jval(self, val):
if not isinstance(val, list):
raise(CSN1Err('{0}: invalid json value, {1!r}'.format(self._name, val)))
self._val = []
if not val:
if None in self._alt:
return
else:
raise(CSN1Err('{0}: invalid json value, {1!r}'.format(self._name, val)))
else:
k = val[0]
self._val = [k]
#
try:
alt_name, obj_list = self._alt[k]
except KeyError:
raise(CSN1Err('{0}: invalid json value, {1!r}'.format(self._name, val)))
for i, Obj in enumerate(obj_list):
obj_val = Obj._val
try:
Obj._from_jval_csn(val[1+i])
except IndexError:
Obj._val = obj_val
if self._trunc:
break
else:
raise(CSN1Err('{0}: invalid json value, {1!r}'.format(self._name, val)))
else:
self._val.append(Obj._val)
Obj._val = obj_val
def _to_jval(self):
if not self._val:
return []
else:
k = self._val[0]
alt_name, obj_list = self._alt[k]
ret = [k]
for i, val in enumerate(self._val[1:]):
Obj = obj_list[i]
obj_val = Obj._val
Obj._val = val
ret.append( Obj._to_jval_csn() )
Obj._val = obj_val
return ret
class CSN1SelfRef(CSN1Obj):
@ -1036,4 +1280,21 @@ class CSN1SelfRef(CSN1Obj):
def clone(self):
return self.__class__(**self._clone_get_kw())
if _with_json:
def _from_jval(self, val):
global _root_obj
root_obj_val = _root_obj._val
_root_obj._from_jval_csn(val)
self._val = _root_obj._val
_root_obj._val = root_obj_val
def _to_jval(self):
global _root_obj
obj_val = _root_obj._val
_root_obj._val = self._val
ret = _root_obj._to_jval_csn()
_root_obj._val = obj_val
return ret

View File

@ -36,6 +36,8 @@ from pycrate_csn1dir.receive_npdu_number_list_value import receive_npdu_number
from pycrate_csn1dir.si2quater_rest_octets import si2quater_rest_octets
from pycrate_csn1dir.si_13_rest_octets import si_13_rest_octets
from pycrate_csn1.csnobj import _with_json
def test_msnetcap():
Obj = ms_network_capability_value_part.clone()
@ -46,6 +48,11 @@ def test_msnetcap():
rep = Obj.repr()
assert( Obj.get_val() == val )
assert( Obj.to_bytes() == buf )
#
if _with_json:
t = Obj.to_json()
Obj.from_json(t)
assert( Obj.get_val() == val )
def test_mscm3():
@ -111,7 +118,11 @@ def test_mscm3():
rep = Obj.repr()
assert( Obj.get_val() == val )
assert( Obj.to_bytes() == buf )
#
if _with_json:
t = Obj.to_json()
Obj.from_json(t)
assert( Obj.get_val() == val )
def test_rcvnpdunumlist():
Obj = receive_npdu_number_list_value.clone()
@ -262,6 +273,12 @@ def test_msracap():
rep = Obj.repr()
assert( Obj.get_val() == val )
assert( Obj.to_bytes() == buf )
#
if _with_json:
t = Obj.to_json()
Obj.from_json(t)
assert( Obj.get_val() == val )
#
buf = unhexlify(b'1bb3432b259ef989004000d801bbe8c662401000360068f8b1989004000d8010')
val = [[['0001',
@ -435,6 +452,11 @@ def test_msracap():
rep = Obj.repr()
assert( Obj.get_val() == val )
assert( Obj.to_bytes() == buf )
#
if _with_json:
t = Obj.to_json()
Obj.from_json(t)
assert( Obj.get_val() == val )
def test_si2qr():
@ -442,20 +464,38 @@ def test_si2qr():
buf = unhexlify(b'46a032caa88c2fcf8e0b2b2b2b2b2b2b2b2b2b2b')
#
Obj.from_bytes(buf)
val = Obj.get_val()
rep = Obj.repr()
assert( Obj.to_bytes() == buf )
#
if _with_json:
t = Obj.to_json()
Obj.from_json(t)
assert( Obj.get_val() == val )
buf = unhexlify(b'cee0048648c0100401004010040100401000802b')
#
Obj.from_bytes(buf)
val = Obj.get_val()
rep = Obj.repr()
assert( Obj.to_bytes() == buf )
#
if _with_json:
t = Obj.to_json()
Obj.from_json(t)
assert( Obj.get_val() == val )
buf = unhexlify(b'ef200bc10996463fc15010c1ceada382a02b2b2b')
#
Obj.from_bytes(buf)
val = Obj.get_val()
rep = Obj.repr()
assert( Obj.to_bytes() == buf )
#
if _with_json:
t = Obj.to_json()
Obj.from_json(t)
assert( Obj.get_val() == val )
def test_si13r():
@ -548,6 +588,11 @@ def test_si13r():
rep = Obj.repr()
assert( Obj.get_val() == val )
assert( Obj.to_bytes() == buf )
#
if _with_json:
t = Obj.to_json()
Obj.from_json(t)
assert( Obj.get_val() == val )
if __name__ == '__main__':