Introduce new object-oriented TLV parser/decoder/encoder

This introduces a new TLV library that heavily builds upon python object
oriented concepts.  Contrary to classic TLV parsers it doesn't focus on
the structure of Tag, Length and binary Value only, but it supports
actual decoding/interpretation of the value part into some kind of JSON
serializable dict.  The latter can be achieved by imperative
encode/decode methods, or by using our existing declarative 'construct'
based approach.

The TLV library supports both BER-TLV and COMPREHENSION-TLV for both
nested and non-nested TLV definitions.

As an example we include TLV definitions for a number of CAT (Card
Application Toolkit) IEs.

Change-Id: I7fc1699443bc9d8a4e7cdd2687af9af7cc03c30e
This commit is contained in:
Harald Welte 2021-05-24 23:15:54 +02:00
parent 8f892fbbe8
commit bb3b5df8bf
3 changed files with 618 additions and 0 deletions

View File

@ -80,6 +80,11 @@ pySim construct utilities
.. automodule:: pySim.construct
:members:
pySim TLV utilities
-------------------
.. automodule:: pySim.tlv
:members:
pySim utility functions
-----------------------

222
pySim/cat.py Normal file
View File

@ -0,0 +1,222 @@
"""Code related to the Card Application Toolkit (CAT) as described in
mainly) ETSI TS 102 223, ETSI TS 101 220 and 3GPP TS 31.111."""
# (C) 2021 by Harald Welte <laforge@osmocom.org>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from pySim.tlv import *
from pySim.construct import *
from construct import *
# Tag values as per TS 101 220 Table 7.23
# TS 102 223 Section 8.1
class Address(COMPR_TLV_IE, tag=0x06):
_construct = Struct('ton_npi'/Int8ub,
'call_number'/BcdAdapter(Bytes(this._.total_len-1)))
# TS 102 223 Section 8.2
class AlphaIdentifier(COMPR_TLV_IE, tag=0x05):
# FIXME: like EF.ADN
pass
# TS 102 223 Section 8.3
class Subaddress(COMPR_TLV_IE, tag=0x08):
pass
# TS 102 223 Section 8.4
class CapabilityConfigParams(COMPR_TLV_IE, tag=0x07):
pass
# TS 31.111 Section 8.5
class CBSPage(COMPR_TLV_IE, tag=0x0C):
pass
# TS 102 223 Section 8.6
class CommandDetails(COMPR_TLV_IE, tag=0x01):
_construct = Struct('command_number'/Int8ub,
'type_of_command'/Int8ub,
'command_qualifier'/Int8ub)
# TS 102 223 Section 8.7
class DeviceIdentities(COMPR_TLV_IE, tag=0x02):
DEV_IDS = bidict({
0x01: 'keypad',
0x02: 'display',
0x03: 'earpiece',
0x10: 'addl_card_reader_0',
0x11: 'addl_card_reader_1',
0x12: 'addl_card_reader_2',
0x13: 'addl_card_reader_3',
0x14: 'addl_card_reader_4',
0x15: 'addl_card_reader_5',
0x16: 'addl_card_reader_6',
0x17: 'addl_card_reader_7',
0x21: 'channel_1',
0x22: 'channel_2',
0x23: 'channel_3',
0x24: 'channel_4',
0x25: 'channel_5',
0x26: 'channel_6',
0x27: 'channel_7',
0x31: 'ecat_client_1',
0x32: 'ecat_client_2',
0x33: 'ecat_client_3',
0x34: 'ecat_client_4',
0x35: 'ecat_client_5',
0x36: 'ecat_client_6',
0x37: 'ecat_client_7',
0x38: 'ecat_client_8',
0x39: 'ecat_client_9',
0x3a: 'ecat_client_a',
0x3b: 'ecat_client_b',
0x3c: 'ecat_client_c',
0x3d: 'ecat_client_d',
0x3e: 'ecat_client_e',
0x3f: 'ecat_client_f',
0x81: 'uicc',
0x82: 'terminal',
0x83: 'network',
})
def _from_bytes(self, do:bytes):
return {'source_dev_id': self.DEV_IDS[do[0]], 'dest_dev_id': self.DEV_IDS[do[1]]}
def _to_bytes(self):
src = self.DEV_IDS.inverse[self.decoded['source_dev_id']]
dst = self.DEV_IDS.inverse[self.decoded['dest_dev_id']]
return bytes([src, dst])
# TS 102 223 Section 8.8
class Duration(COMPR_TLV_IE, tag=0x04):
_construct = Struct('time_unit'/Int8ub,
'time_interval'/Int8ub)
# TS 102 223 Section 8.9
class Item(COMPR_TLV_IE, tag=0x0f):
_construct = Struct('identifier'/Int8ub,
'text_string'/GsmStringAdapter(GreedyBytes))
# TS 102 223 Section 8.10
class ItemIdentifier(COMPR_TLV_IE, tag=0x10):
_construct = Struct('identifier'/Int8ub)
# TS 102 223 Section 8.11
class ResponseLength(COMPR_TLV_IE, tag=0x11):
_construct = Struct('minimum_length'/Int8ub,
'maximum_length'/Int8ub)
# TS 102 223 Section 8.12
class Result(COMPR_TLV_IE, tag=0x03):
_construct = Struct('general_result'/Int8ub,
'additional_information'/HexAdapter(GreedyBytes))
# TS 102 223 Section 8.13 + TS 31.111 Section 8.13
class SMS_TPDU(COMPR_TLV_IE, tag=0x0B):
pass
# TS 102 223 Section 8.15
class TextString(COMPR_TLV_IE, tag=0x0d):
_construct = Struct('dcs'/Int8ub,
'text_string'/HexAdapter(GreedyBytes))
# TS 102 223 Section 8.16
class Tone(COMPR_TLV_IE, tag=0x0e):
_construct = Struct('tone'/Int8ub)
# TS 31 111 Section 8.17
class USSDString(COMPR_TLV_IE, tag=0x0a):
_construct = Struct('dcs'/Int8ub,
'ussd_string'/HexAdapter(GreedyBytes))
# TS 101 220 Table 7.17
class ProactiveCommand(BER_TLV_IE, tag=0xD0):
pass
# TS 101 220 Table 7.17 + 31.111 7.1.1.2
class SMSPPDownload(BER_TLV_IE, tag=0xD1,
nested=[DeviceIdentities, Address, SMS_TPDU]):
pass
# TS 101 220 Table 7.17 + 31.111 7.1.1.3
class SMSCBDownload(BER_TLV_IE, tag=0xD2,
nested=[DeviceIdentities, CBSPage]):
pass
class USSDDownload(BER_TLV_IE, tag=0xD9,
nested=[DeviceIdentities, USSDString]):
pass
term_prof_bits = {
1: 'Profile download',
2: 'SMS-PP data doanload',
3: 'Cell Broadcast data download',
4: 'Menu selection',
5: 'SMS-PP data download',
6: 'Timer expiration',
7: 'USSD string DO support in CC by USIM',
8: 'Call Control by NAA',
9: 'Command result',
10: 'Call Controll by NAA',
11: 'Call Control by NAA',
12: 'MO short message control support',
13: 'Call Control by NAA',
14: 'UCS2 Entry supported',
15: 'UCS2 Display supported',
16: 'Display Text',
17: 'Proactive UICC: DISPLAY TEXT',
18: 'Proactive UICC: GET INKEY',
19: 'Proactive UICC: GET INPUT',
20: 'Proactive UICC: MORE TIME',
21: 'Proactive UICC: PLAY TONE',
22: 'Proactive UICC: POLL INTERVAL',
23: 'Proactive UICC: POLLING OFF',
24: 'Proactive UICC: REFRESH',
25: 'Proactive UICC: SELECT ITEM',
26: 'Proactive UICC: SEND SHORT MESSAGE with 3GPP-SMS-TPDU',
27: 'Proactive UICC: SEND SS',
28: 'Proactive UICC: SEND USSD',
29: 'Proactive UICC: SET UP CALL',
30: 'Proactive UICC: SET UP MENU',
31: 'Proactive UICC: PROVIDE LOCAL INFORMATION (MCC, MNC, LAC, Cell ID & IMEI)',
32: 'Proactive UICC: PROVIDE LOCAL INFORMATION (NMR)',
33: 'Proactive UICC: SET UP EVENT LIST',
34: 'Event: MT call',
35: 'Event: Call connected',
36: 'Event: Call disconnected',
37: 'Event: Location status',
38: 'Event: User activity',
39: 'Event: Idle screen available',
40: 'Event: Card reader status',
41: 'Event: Language selection',
42: 'Event: Browser Termination',
43: 'Event: Data aailable',
44: 'Event: Channel status',
45: 'Event: Access Technology Change',
46: 'Event: Display parameters changed',
47: 'Event: Local Connection',
48: 'Event: Network Search Mode Change',
# FIXME: remainder
}

391
pySim/tlv.py Normal file
View File

@ -0,0 +1,391 @@
"""object-oriented TLV parser/encoder library."""
# (C) 2021 by Harald Welte <laforge@osmocom.org>
# All Rights Reserved
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from typing import Optional, List, Dict, Any, Tuple
from bidict import bidict
from construct import *
from pySim.utils import bertlv_encode_len, bertlv_parse_len, bertlv_encode_tag, bertlv_parse_tag
from pySim.utils import comprehensiontlv_encode_tag, comprehensiontlv_parse_tag
from pySim.utils import bertlv_parse_one, comprehensiontlv_parse_one
from pySim.utils import bertlv_parse_tag_raw, comprehensiontlv_parse_tag_raw
from pySim.construct import parse_construct, LV, HexAdapter, BcdAdapter, BitsRFU, GsmStringAdapter
from pySim.exceptions import *
import inspect
import abc
class TlvMeta(abc.ABCMeta):
"""Metaclass which we use to set some class variables at the time of defining a subclass.
This allows us to create subclasses for each TLV/IE type, where the class represents fixed
parameters like the tag/type and instances of it represent the actual TLV data."""
def __new__(metacls, name, bases, namespace, **kwargs):
#print("TlvMeta_new_(metacls=%s, name=%s, bases=%s, namespace=%s, kwargs=%s)" % (metacls, name, bases, namespace, kwargs))
x = super().__new__(metacls, name, bases, namespace)
# this becomes a _class_ variable, not an instance variable
x.tag = namespace.get('tag', kwargs.get('tag', None))
x.desc = namespace.get('desc', kwargs.get('desc', None))
nested = namespace.get('nested', kwargs.get('nested', None))
if nested is None or inspect.isclass(nested) and issubclass(nested, TLV_IE_Collection):
# caller has specified TLV_IE_Collection sub-class, we can directly reference it
x.nested_collection_cls = nested
else:
# caller passed list of other TLV classes that might possibly appear within us,
# build a dynamically-created TLV_IE_Collection sub-class and reference it
name = 'auto_collection_%s' % (name)
cls = type(name, (TLV_IE_Collection,), {'nested': nested})
x.nested_collection_cls = cls
return x
class TlvCollectionMeta(abc.ABCMeta):
"""Metaclass which we use to set some class variables at the time of defining a subclass.
This allows us to create subclasses for each Collection type, where the class represents fixed
parameters like the nested IE classes and instances of it represent the actual TLV data."""
def __new__(metacls, name, bases, namespace, **kwargs):
#print("TlvCollectionMeta_new_(metacls=%s, name=%s, bases=%s, namespace=%s, kwargs=%s)" % (metacls, name, bases, namespace, kwargs))
x = super().__new__(metacls, name, bases, namespace)
# this becomes a _class_ variable, not an instance variable
x.possible_nested = namespace.get('nested', kwargs.get('nested', None))
return x
class Transcodable(abc.ABC):
_construct = None
"""Base class for something that can be encoded + encoded. Decoding and Encoding happens either
* via a 'construct' object stored in a derived class' _construct variable, or
* via a 'construct' object stored in an instance _construct variable, or
* via a derived class' _{to,from}_bytes() methods."""
def __init__(self):
self.encoded = None
self.decoded = None
self._construct = None
def to_bytes(self) -> bytes:
"""Convert from internal representation to binary bytes. Store the binary result
in the internal state and return it."""
if self._construct:
do = self._construct.build(self.decoded, total_len=None)
elif self.__class__._construct:
do = self.__class__._construct.build(self.decoded, total_len=None)
else:
do = self._to_bytes()
self.encoded = do
return do
# not an abstractmethod, as it is only required if no _construct exists
def _to_bytes(self):
raise NotImplementedError
def from_bytes(self, do:bytes):
"""Convert from binary bytes to internal representation. Store the decoded result
in the internal state and return it."""
self.encoded = do
if self._construct:
self.decoded = parse_construct(self._construct, do)
elif self.__class__._construct:
self.decoded = parse_construct(self.__class__._construct, do)
else:
self.decoded = self._from_bytes(do)
return self.decoded
# not an abstractmethod, as it is only required if no _construct exists
def _from_bytes(self, do:bytes):
raise NotImplementedError
class IE(Transcodable, metaclass=TlvMeta):
# we specify the metaclass so any downstream subclasses will automatically use it
"""Base class for various Information Elements. We understand the notion of a hierarchy
of IEs on top of the Transcodable class."""
# this is overridden by the TlvMeta metaclass, if it is used to create subclasses
nested_collection_cls = None
tag = None
def __init__(self, **kwargs):
super().__init__()
self.nested_collection = None
if self.nested_collection_cls:
self.nested_collection = self.nested_collection_cls()
# if we are a constructed IE, [ordered] list of actual child-IE instances
self.children = kwargs.get('children', [])
self.decoded = kwargs.get('decoded', None)
def __repr__(self):
"""Return a string representing the [nested] IE data (for print)."""
if len(self.children):
member_strs = [repr(x) for x in self.children]
return '%s(%s)' % (type(self).__name__, ','.join(member_strs))
else:
return '%s(%s)' % (type(self).__name__, self.decoded)
def to_dict(self):
"""Return a JSON-serializable dict representing the [nested] IE data."""
if len(self.children):
v = [x.to_dict() for x in self.children]
else:
v = self.decoded
return {type(self).__name__: v}
def from_dict(self, decoded:dict):
"""Set the IE internal decoded representation to data from the argument.
If this is a nested IE, the child IE instance list is re-created."""
if self.nested_collection:
self.children = self.nested_collection.from_dict(decoded)
else:
self.children = []
self.decoded = decoded
def is_constructed(self):
"""Is this IE constructed by further nested IEs?"""
if len(self.children):
return True
else:
return False
@abc.abstractmethod
def to_ie(self) -> bytes:
"""Convert the internal representation to entire IE including IE header."""
def to_bytes(self) -> bytes:
"""Convert the internal representation _of the value part_ to binary bytes."""
if self.is_constructed():
# concatenate the encoded IE of all children to form the value part
out = b''
for c in self.children:
out += c.to_ie()
return out
else:
return super().to_bytes()
def from_bytes(self, do:bytes):
"""Parse _the value part_ from binary bytes to internal representation."""
if self.nested_collection:
self.children = self.nested_collection.from_bytes(do)
else:
self.children = []
return super().from_bytes(do)
class TLV_IE(IE):
"""Abstract base class for various TLV type Information Elements."""
def __init__(self, **kwargs):
super().__init__(**kwargs)
def _compute_tag(self) -> int:
"""Compute the tag (sometimes the tag encodes part of the value)."""
return self.tag
@classmethod
@abc.abstractmethod
def _parse_tag_raw(cls, do:bytes) -> Tuple[int, bytes]:
"""Obtain the raw TAG at the start of the bytes provided by the user."""
@classmethod
@abc.abstractmethod
def _parse_len(cls, do:bytes) -> Tuple[int, bytes]:
"""Obtain the length encoded at the start of the bytes provided by the user."""
@abc.abstractmethod
def _encode_tag(self) -> bytes:
"""Encode the tag part. Must be provided by derived (TLV format specific) class."""
@abc.abstractmethod
def _encode_len(self, val:bytes) -> bytes:
"""Encode the length part assuming a certain binary value. Must be provided by
derived (TLV format specific) class."""
def to_ie(self):
return self.to_tlv()
def to_tlv(self):
"""Convert the internal representation to binary TLV bytes."""
val = self.to_bytes()
return self._encode_tag() + self._encode_len(val) + val
def from_tlv(self, do:bytes):
(rawtag, remainder) = self.__class__._parse_tag_raw(do)
if rawtag:
if rawtag != self.tag:
raise ValueError("%s: Encountered tag %s doesn't match our supported tag %s" %
(self, rawtag, self.tag))
(length, remainder) = self.__class__._parse_len(remainder)
value = remainder[:length]
remainder = remainder[length:]
else:
value = do
remainder = b''
dec = self.from_bytes(value)
return dec, remainder
class BER_TLV_IE(TLV_IE):
"""TLV_IE formatted as ASN.1 BER described in ITU-T X.690 8.1.2."""
def __init__(self, **kwargs):
super().__init__(**kwargs)
@classmethod
def _decode_tag(cls, do:bytes) -> Tuple[dict, bytes]:
return bertlv_parse_tag(do)
@classmethod
def _parse_tag_raw(cls, do:bytes) -> Tuple[int, bytes]:
return bertlv_parse_tag_raw(do)
@classmethod
def _parse_len(cls, do:bytes) -> Tuple[int, bytes]:
return bertlv_parse_len(do)
def _encode_tag(self) -> bytes:
return bertlv_encode_tag(self._compute_tag())
def _encode_len(self, val:bytes) -> bytes:
return bertlv_encode_len(len(val))
class COMPR_TLV_IE(TLV_IE):
"""TLV_IE formated as COMPREHENSION-TLV as described in ETSI TS 101 220."""
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.comprehension = False
@classmethod
def _decode_tag(cls, do:bytes) -> Tuple[dict, bytes]:
return comprehensiontlv_parse_tag(do)
@classmethod
def _parse_tag_raw(cls, do:bytes) -> Tuple[int, bytes]:
return comprehensiontlv_parse_tag_raw(do)
@classmethod
def _parse_len(cls, do:bytes) -> Tuple[int, bytes]:
return bertlv_parse_len(do)
def _encode_tag(self) -> bytes:
return comprehensiontlv_encode_tag(self._compute_tag())
def _encode_len(self, val:bytes) -> bytes:
return bertlv_encode_len(len(val))
class TLV_IE_Collection(metaclass=TlvCollectionMeta):
# we specify the metaclass so any downstream subclasses will automatically use it
"""A TLV_IE_Collection consists of multiple TLV_IE classes identified by their tags.
A given encoded DO may contain any of them in any order, and may contain multiple instances
of each DO."""
# this is overridden by the TlvCollectionMeta metaclass, if it is used to create subclasses
possible_nested = []
def __init__(self, desc=None, **kwargs):
self.desc = desc
#print("possible_nested: ", self.possible_nested)
self.members = kwargs.get('nested', self.possible_nested)
self.members_by_tag = {}
self.members_by_name = {}
self.members_by_tag = { m.tag:m for m in self.members }
self.members_by_name = { m.__name__:m for m in self.members }
# if we are a constructed IE, [ordered] list of actual child-IE instances
self.children = kwargs.get('children', [])
self.encoded = None
def __str__(self):
member_strs = [str(x) for x in self.members]
return '%s(%s)' % (type(self).__name__, ','.join(member_strs))
def __repr__(self):
member_strs = [repr(x) for x in self.members]
return '%s(%s)' % (self.__class__, ','.join(member_strs))
def __add__(self, other):
"""Extending TLV_IE_Collections with other TLV_IE_Collections or TLV_IEs."""
if isinstance(other, TLV_IE_Collection):
# adding one collection to another
members = self.members + other.members
return TLV_IE_Collection(self.desc, nested=members)
elif inspect.isclass(other) and issubclass(other, TLV_IE):
# adding a member to a collection
return TLV_IE_Collection(self.desc, nested = self.members + [other])
else:
raise TypeError
def from_bytes(self, binary:bytes) -> List[TLV_IE]:
"""Create a list of TLV_IEs from the collection based on binary input data.
Args:
binary : binary bytes of encoded data
Returns:
list of instances of TLV_IE sub-classes containing parsed data
"""
self.encoded = binary
# list of instances of TLV_IE collection member classes appearing in the data
res = []
remainder = binary
first = next(iter(self.members_by_tag.values()))
# iterate until no binary trailer is left
while len(remainder):
# obtain the tag at the start of the remainder
tag, r = first._parse_tag_raw(remainder)
if tag in self.members_by_tag:
cls = self.members_by_tag[tag]
# create an instance and parse accordingly
inst = cls()
dec, remainder = inst.from_tlv(remainder)
res.append(inst)
else:
# unknown tag; create the related class on-the-fly using the same base class
name = 'unknown_%s_%X' % (first.__base__.__name__, tag)
cls = type(name, (first.__base__,), {'tag':tag, 'possible_nested':[],
'nested_collection_cls':None})
cls._from_bytes = lambda s, a : {'raw': a.hex()}
cls._to_bytes = lambda s: bytes.fromhex(s.decoded['raw'])
# create an instance and parse accordingly
inst = cls()
dec, remainder = inst.from_tlv(remainder)
res.append(inst)
self.children = res
return res
def from_dict(self, decoded:List[dict]) -> List[TLV_IE]:
"""Create a list of TLV_IE instances from the collection based on an array
of dicts, where they key indicates the name of the TLV_IE subclass to use."""
# list of instances of TLV_IE collection member classes appearing in the data
res = []
for i in decoded:
for k in i.keys():
if k in self.members_by_name:
cls = self.members_by_name[k]
inst = cls(decoded=i[k])
res.append(inst)
else:
raise ValueError('%s: Unknown TLV Class %s in %s; expected %s' %
(self, i[0], decoded, self.members_by_name.keys()))
self.children = res
return res
def to_dict(self):
return [x.to_dict() for x in self.children]
def to_bytes(self):
out = b''
for c in self.children:
out += c.to_tlv()
return out
def from_tlv(self, do):
return self.from_bytes(do)
def to_tlv(self):
return self.to_bytes()