pycrate/pycrate_mobile/SCCP.py

1634 lines
49 KiB
Python
Raw Normal View History

2017-12-15 20:28:21 +00:00
# -*- coding: UTF-8 -*-
#/**
# * Software Name : pycrate
2019-02-25 10:26:10 +00:00
# * Version : 0.4
2017-12-15 20:28:21 +00:00
# *
# * Copyright 2017. Benoit Michau. ANSSI.
2019-02-18 15:19:38 +00:00
# * Copyright 2018. Benoit Michau. P1Sec.
2017-12-15 20:28:21 +00:00
# *
# * This library is free software; you can redistribute it and/or
# * modify it under the terms of the GNU Lesser General Public
# * License as published by the Free Software Foundation; either
# * version 2.1 of the License, or (at your option) any later version.
# *
# * This library is distributed in the hope that it will be useful,
2017-12-15 20:28:21 +00:00
# * but WITHOUT ANY WARRANTY; without even the implied warranty of
# * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# * Lesser General Public License for more details.
# *
# * You should have received a copy of the GNU Lesser General Public
# * License along with this library; if not, write to the Free Software
# * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
# * MA 02110-1301 USA
2017-12-15 20:28:21 +00:00
# *
# *--------------------------------------------------------
# * File Name : pycrate_mobile/SCCP.py
# * Created : 2017-11-27
# * Authors : Benoit Michau
# *--------------------------------------------------------
#*/
2019-10-08 10:23:22 +00:00
from binascii import unhexlify
2017-12-15 20:28:21 +00:00
from pycrate_core.utils import *
from pycrate_core.repr import *
from pycrate_core.elt import *
from pycrate_core.base import *
from pycrate_core.charpy import *
from .TS24008_IE import BufBCD
#------------------------------------------------------------------------------#
# ITU-T Q.713: Signalling connection control part formats and codes
# SCCP
#------------------------------------------------------------------------------#
#------------------------------------------------------------------------------#
# SCCP general part
# ITU-T Q.713, section 2
#------------------------------------------------------------------------------#
_SCCPType_dict = {
1 : 'CR',
2 : 'CC',
3 : 'CREF',
4 : 'RLSD',
5 : 'RLC',
6 : 'DT1',
7 : 'DT2',
8 : 'AK',
9 : 'UDT',
2018-10-26 13:04:03 +00:00
10 : 'UDTS',
2017-12-15 20:28:21 +00:00
11 : 'ED',
12 : 'EA',
13 : 'RSR',
14 : 'RSC',
15 : 'ERR',
16 : 'IT',
17 : 'XUDT',
18 : 'XUDTS',
19 : 'LUDT',
20 : 'LUDTS'
}
#------------------------------------------------------------------------------#
# SCCP parameters
# ITU-T Q.713, section 3
#------------------------------------------------------------------------------#
_SCCPParam_dict = {
0 : 'End of optional parameters',
1 : 'Destination local reference',
2 : 'Source local reference',
3 : 'Called Party address',
4 : 'Calling party address',
5 : 'Protocol class',
6 : 'Segmenting/reassembling',
7 : 'Receive sequence number',
8 : 'Sequencint/segmenting',
9 : 'Credit',
10: 'Release cause',
11: 'Return cause',
12: 'Reset cause',
13: 'Error cause',
14: 'Refusal cause',
15: 'Data',
16: 'Segmentation',
17: 'Hop counter',
18: 'Importance',
19: 'Long data'
}
#------------------------------------------------------------------------------#
# End of optional parameters
# ITU-T Q.713, section 3.1
#------------------------------------------------------------------------------#
class EOO(Uint8):
_val = 0
_dic = _SCCPParam_dict
2017-12-15 20:28:21 +00:00
#------------------------------------------------------------------------------#
# Destination local reference
# ITU-T Q.713, section 3.2
#------------------------------------------------------------------------------#
class DstLocalRef(Uint24):
pass
#------------------------------------------------------------------------------#
# Source local reference
# ITU-T Q.713, section 3.3
#------------------------------------------------------------------------------#
class SrcLocalRef(Uint24):
pass
#------------------------------------------------------------------------------#
# Called Party Address / Calling Party Address
# ITU-T Q.713, section 3.4 / 3.5
#------------------------------------------------------------------------------#
class SCCPBufBCD(BufBCD):
# as given in 3.4.2.3.1
_chars = ('0', '1', '2', '3', '4', '5', '6', '7', '8', '9',
'_spare10_', '_code11_', '_code12_', '_spare13_', '_spare14_', '_ST_')
# TODO: make something that can be re-encoded
_filler = 0x0
2017-12-15 20:28:21 +00:00
# GTInd 0001
# section 3.4.2.3.1
_GTNAI_dict = {
0 : 'unknown',
1 : 'subscriber number',
2 : 'reserved for national use',
3 : 'national significant number',
4 : 'international number'
}
_str_reserved_natuse = 'reserved for national use'
for i in range(0b1110000, 0b1111110):
_GTNAI_dict[i] = _str_reserved_natuse
class _GlobalTitle0001(Envelope):
_GEN = (
Uint('OE', bl=1, dic={0:'even number of address signals', 1:'odd number of address signals'}),
Uint('NAI', val=1, bl=7, dic=_GTNAI_dict),
SCCPBufBCD('Addr', val=b'')
2017-12-15 20:28:21 +00:00
)
2019-10-08 10:23:22 +00:00
def get_addr(self):
"""return BCD-encoded Addr, properly decoded according to OE
"""
if self[0].get_val():
return self[2].decode()[:-1]
else:
return self[2].decode()
def set_addr(self, addr):
"""set the BCD-encoded Addr and OE
"""
if len(addr) % 2:
self[0].set_val(1)
self[2].encode(addr)
set_attr_bcd = set_addr
2017-12-15 20:28:21 +00:00
# GTInd 0010
# section 3.4.2.3.2
class _GlobalTitle0010(Envelope):
_GEN = (
Uint8('TranslationType'),
Buf('Addr', val=b'', rep=REPR_HEX)
)
2019-10-08 10:23:22 +00:00
def get_addr(self):
"""return the hex-encoded Addr
"""
return self[1].hex()
def set_addr(self, addr):
"""set the hex-encoded Addr
"""
self[1].set_val(unhexlify(addr))
2017-12-15 20:28:21 +00:00
# GTInd 0011
# section 3.4.2.3.3
_NumPlan_dict = {
0 : 'unknown',
1 : 'ISDN/telephony numbering plan (ITU-T E.163 and E.164)',
2 : 'generic numbering plan',
3 : 'data numbering plan (ITU-T X.121)',
4 : 'telex numbering plan (ITU-T F.69)',
5 : 'maritime mobile numbering plan (ITU-T E.210, E.211)',
6 : 'land mobile numbering plan (ITU-T E.212)',
7 : 'ISDN/mobile numbering plan (ITU-T E.214)',
14 : 'private network or network-specific numbering plan'
}
# kind reminder:
# from https://en.wikipedia.org/wiki/Global_title
# E.164 : MSISDN (CC + NDC + SN)
# E.212 : IMSI (MCC + MNC + MSIN)
# E.214 : MGT (CC + NDC + MSIN)
2017-12-15 20:28:21 +00:00
_EncScheme_dict = {
0 : 'unknown',
1 : 'BCD, odd number of digits',
2 : 'BCD, even number of digits',
3 : 'national specific'
}
class _GlobalTitle0011(Envelope):
ENV_SEL_TRANS = False
_GEN = (
Uint8('TranslationType'),
Uint('NumberingPlan', val=1, bl=4, dic=_NumPlan_dict),
Uint('EncodingScheme', val=1, bl=4, dic=_EncScheme_dict),
Alt('Addr', GEN={
1 : SCCPBufBCD('BCD', val=b''),
2 : SCCPBufBCD('BCD', val=b'')},
DEFAULT=Buf('Raw', val=b'', rep=REPR_HEX),
sel=lambda self: self.get_env()[2].get_val())
2017-12-15 20:28:21 +00:00
)
2019-10-08 10:23:22 +00:00
def get_addr(self):
"""return the BCD- or hex-encoded Addr, properly decoded according to EncodingScheme
"""
enc = self[2].get_val()
if enc == 1:
return self[3].get_alt().decode()[:-1]
elif enc == 2:
return self[3].get_alt().decode()
else:
return self[3].get_alt().hex()
def set_addr_bcd(self, addr):
"""set the BCD-encoded Addr and EncodingScheme
"""
if len(addr) % 2:
self[2].set_val(1)
else:
self[2].set_val(2)
self[3].get_alt().encode(addr)
2017-12-15 20:28:21 +00:00
# GTInd 0100
# section 3.4.2.3.4
_GTIntTransType_dict = {
255: 'reserved for expansion'
}
_str_int_serv = 'international services'
_str_nat_serv = 'national network specific'
for i in range(0b1, 0b111111):
_GTIntTransType_dict[i] = _str_int_serv
for i in range(0b10000000, 0b11111110):
_GTIntTransType_dict[i] = _str_nat_serv
class _GlobalTitle0100(Envelope):
ENV_SEL_TRANS = False
_GEN = (
Uint8('TranslationType', val=1, dic=_GTIntTransType_dict),
Uint('NumberingPlan', val=1, bl=4, dic=_NumPlan_dict),
Uint('EncodingScheme', val=1, bl=4, dic=_EncScheme_dict),
2017-12-15 20:28:21 +00:00
Uint('spare', bl=1),
Uint('NAI', val=1, bl=7, dic=_GTNAI_dict),
Alt('Addr', GEN={
1 : SCCPBufBCD('BCD', val=b''),
2 : SCCPBufBCD('BCD', val=b'')},
DEFAULT=Buf('Raw', val=b'', rep=REPR_HEX),
sel=lambda self: self.get_env()[2].get_val())
2017-12-15 20:28:21 +00:00
)
2019-10-08 10:23:22 +00:00
def get_addr(self):
"""return the BCD- or hex-encoded Addr, properly decoded according to EncodingScheme
"""
enc = self[2].get_val()
if enc == 1:
return self[5].get_alt().decode()[:-1]
elif enc == 2:
return self[5].get_alt().decode()
else:
return self[5].get_alt().hex()
def set_addr_bcd(self, addr):
"""set the BCD-encoded Addr and EncodingScheme
"""
if len(addr) % 2:
self[2].set_val(1)
else:
self[2].set_val(2)
self[5].get_alt().encode(addr)
2017-12-15 20:28:21 +00:00
# SCCP called / calling party address
_RouteInd_dict = {
0 : 'route on GT',
1 : 'route on SSN'
}
_GTInd_dict = {
0 : 'no global title included',
1 : 'global title includes nature of address indicator only',
2 : 'global title includes translation type only',
3 : 'global title includes translation type, numbering plan and encoding scheme',
4 : 'global title includes translation type, numbering plan, '\
'encoding scheme and nature of address indicator',
}
_SSN_dict = {
0 : 'SSN not known/not used',
1 : 'SCCP management',
2 : 'reserved for ITU-T allocation',
3 : 'ISDN user part',
4 : 'operation, maintenance and administration part (OMAP)',
5 : 'mobile application part (MAP)',
6 : 'home location register (HLR)',
7 : 'visitor location register (VLR)',
8 : 'mobile switching centre (MSC)',
2019-01-07 13:32:44 +00:00
9 : 'equipment identifier register (EIR)',
2017-12-15 20:28:21 +00:00
10 : 'authentication centre (AUC)',
11 : 'ISDN supplementary services',
12 : 'reserved for international use',
13 : 'broadband ISDN edge-to-edge applications',
2018-09-24 14:45:21 +00:00
14 : 'TC test responder',
# from 3GPP TS 29.002 and 23.003
142 : '3GPP RANAP',
143 : '3GPP RNSAP',
145 : '3GPP GMLC (MAP)',
146 : '3GPP CAP',
147 : '3GPP gsmSCF (MAP) or IM-SSF (MAP) or Presence Network Agent',
148 : '3GPP SIWF (MAP)',
149 : '3GPP SGSN (MAP)',
150 : '3GPP GGSN (MAP)',
2019-01-07 13:32:44 +00:00
248 : '3GPP CSS (MAP)',
249 : '3GPP PCAP',
250 : '3GPP BSC (BSSAP-LE)',
251 : '3GPP MSC (BSSAP-LE)',
252 : '3GPP SMLC (BSSAP-LE)',
253 : '3GPP BSS O&M (A interface)',
254 : '3GPP BSSAP (A interface)'
2017-12-15 20:28:21 +00:00
}
class _SCCPAddr(Envelope):
2018-10-26 13:04:03 +00:00
# this is to bypass the GT decoding process, and get a simple Buf() instead
GT_DONT_DECODE = False
2017-12-15 20:28:21 +00:00
ENV_SEL_TRANS = False
_GEN = (
Envelope('AddrInd', GEN=(
Uint('res', bl=1),
Uint('RoutingInd', val=0, bl=1, dic=_RouteInd_dict),
Uint('GTInd', val=3, bl=4, dic=_GTInd_dict),
Uint('SSNInd', val=0, bl=1),
Uint('PCInd', val=0, bl=1)
)),
Uint16LE('PC'), # presence depends on PCInd
Uint8('SSN', val=0, dic=_SSN_dict), # presence depends on SSNInd
Alt('GT', GEN={
1 : _GlobalTitle0001('GT_1'),
2 : _GlobalTitle0010('GT_2'),
3 : _GlobalTitle0011('GT_3'),
4 : _GlobalTitle0100('GT_4')},
DEFAULT=Buf('GT_unk', val=b'', rep=REPR_HEX),
sel=lambda self: self.get_env()[0][2].get_val())
2017-12-15 20:28:21 +00:00
)
2018-10-26 13:04:03 +00:00
2017-12-15 20:28:21 +00:00
def __init__(self, *args, **kwargs):
Envelope.__init__(self, *args, **kwargs)
self[1].set_transauto(lambda: False if self[0][4].get_val() != 0 else True)
self[2].set_transauto(lambda: False if self[0][3].get_val() != 0 else True)
self[3].set_transauto(lambda: False if self[0][2].get_val() != 0 else True)
2018-10-26 13:04:03 +00:00
def _from_char(self, char):
if not self.GT_DONT_DECODE:
Envelope._from_char(self, char)
else:
self[0]._from_char(char)
self[1]._from_char(char)
self[2]._from_char(char)
del self[3]
gt = Buf('GT', rep=REPR_HEX)
2018-10-26 13:04:03 +00:00
gt._from_char(char)
self.append( gt )
2017-12-15 20:28:21 +00:00
class SCCPPartyAddr(Envelope):
2017-12-15 20:28:21 +00:00
_GEN = (
Uint8('Len'),
_SCCPAddr('Value')
)
2017-12-15 20:28:21 +00:00
def __init__(self, *args, **kwargs):
Envelope.__init__(self, *args, **kwargs)
self[0].set_valauto(lambda: self[1].get_len())
self[1].set_blauto(lambda: self[0].get_val()<<3)
def get_gt(self):
return self[1][3].get_alt()
2017-12-15 20:28:21 +00:00
class CallingPartyAddr(SCCPPartyAddr):
pass
class CalledPartyAddr(SCCPPartyAddr):
pass
2017-12-15 20:28:21 +00:00
#------------------------------------------------------------------------------#
# Protocol Class
# ITU-T Q.713, section 3.6
#------------------------------------------------------------------------------#
_ProtClass_dict = {
0 : 'Class 0 (connection-less)',
1 : 'Class 1 (connection-less)',
2 : 'Class 2 (connection-oriented)',
3 : 'Class 3 (connection-oriented)'
}
_ProtCon_dict = {i: 'spare' for i in range(16)}
_ProtConLess_dict = {i: 'spare' for i in range(16)}
_ProtConLess_dict[0] = 'no special options'
_ProtConLess_dict[8] = 'return message on error'
2017-12-15 20:28:21 +00:00
class ProtocolClass(Envelope):
_GEN = (
Uint('Handling', bl=4),
2017-12-15 20:28:21 +00:00
Uint('Class', bl=4, dic=_ProtClass_dict)
)
2017-12-15 20:28:21 +00:00
def __init__(self, *args, **kwargs):
Envelope.__init__(self, *args, **kwargs)
self[0].set_dicauto(lambda: _ProtConLess_dict if self[1].get_val() < 2 else _ProtCon_dict)
2017-12-15 20:28:21 +00:00
#------------------------------------------------------------------------------#
# Segmenting/reassembling
# ITU-T Q.713, section 3.7
#------------------------------------------------------------------------------#
class SegmentReassemb(Envelope):
_GEN = (
Uint('spare', bl=7),
Uint('M', val=0, bl=1, dic={0:'no more data', 1:'more data'})
)
#------------------------------------------------------------------------------#
# Receive sequence number
# ITU-T Q.713, section 3.8
#------------------------------------------------------------------------------#
class RecvSeqn(Envelope):
_GEN = (
Uint('PR', val=0, bl=7),
Uint('spare', bl=1)
)
#------------------------------------------------------------------------------#
# Sequencing/segmenting
# ITU-T Q.713, section 3.9
#------------------------------------------------------------------------------#
class SeqSegment(Envelope):
_GEN = (
Uint('PS', val=0, bl=7),
Uint('spare', bl=1),
Uint('PR', val=0, bl=7),
Uint('M', val=0, bl=1, dic={0:'no more data', 1:'more data'})
)
#------------------------------------------------------------------------------#
# Credit
# ITU-T Q.713, section 3.10
#------------------------------------------------------------------------------#
class Credit(Uint8):
pass
#------------------------------------------------------------------------------#
# Release Cause
# ITU-T Q.713, section 3.11
#------------------------------------------------------------------------------#
_RelCause_dict = {
0 : 'end user originated',
1 : 'end user congestion',
2 : 'end user failure',
3 : 'SCCP user originated',
4 : 'remote procedure error',
5 : 'inconsistent connection data',
6 : 'access failure',
7 : 'access congestion',
8 : 'subsystem failure',
9 : 'subsystem congestion',
10 : 'MTP failure',
11 : 'network congestion',
12 : 'expiration of reset timer',
13 : 'expiration of receive inactivity timer',
14 : 'reserved',
15 : 'unqualified',
16 : 'SCCP failure'
}
class RelCause(Uint8):
_dic = _RelCause_dict
#------------------------------------------------------------------------------#
# Return Cause
# ITU-T Q.713, section 3.12
#------------------------------------------------------------------------------#
_RetCause_dict = {
0 : 'no translation for an address of such nature',
1 : 'no translation for this specific address',
2 : 'subsystem congestion',
3 : 'subsystem failure',
4 : 'unequipped user',
5 : 'MTP failure',
6 : 'network congestion',
7 : 'unqualified',
8 : 'error in message transport (Note)',
9 : 'error in local processing (Note)',
10 : 'destination cannot perform reassembly (Note)',
11 : 'SCCP failure',
12 : 'hop counter violation',
13 : 'segmentation not supported',
14 : 'segmentation failure'
}
class RetCause(Uint8):
_dic = _RetCause_dict
#------------------------------------------------------------------------------#
# Reset Cause
# ITU-T Q.713, section 3.13
#------------------------------------------------------------------------------#
_ResCause_dict = {
0 : 'end user originated',
1 : 'SCCP user originated',
2 : 'message out of order incorrect P(S)',
3 : 'message out of order incorrect P(R)',
4 : 'remote procedure error message out of window',
5 : 'remote procedure error incorrect P(S) after (re)initialization',
6 : 'remote procedure error general',
7 : 'remote end user operational',
8 : 'network operational',
9 : 'access operational',
10 : 'network congestion',
11 : 'reserved',
12 : 'unqualified'
}
class ResCause(Uint8):
_dic = _ResCause_dict
#------------------------------------------------------------------------------#
# Error Cause
# ITU-T Q.713, section 3.14
#------------------------------------------------------------------------------#
_ErrCause_dict = {
0 : 'local reference number (LRN) mismatch unassigned destination LRN',
1 : 'local reference number (LRN) mismatch inconsistent source LRN',
2 : 'point code mismatch',
3 : 'service class mismatch',
4 : 'unqualified'
}
class ErrCause(Uint8):
_dic = _ErrCause_dict
#------------------------------------------------------------------------------#
# Refusal Cause
# ITU-T Q.713, section 3.15
#------------------------------------------------------------------------------#
_RefCause_dict = {
0 : 'end user originated',
1 : 'end user congestion',
2 : 'end user failure',
3 : 'SCCP user originated',
4 : 'destination address unknown',
5 : 'destination inaccessible',
6 : 'network resource QoS not available/non-transient',
7 : 'network resource QoS not available/transient',
8 : 'access failure',
9 : 'access congestion',
10 : 'subsystem failure',
11 : 'subsystem congestion',
12 : 'expiration of the connection establishment timer',
13 : 'incompatible user data',
14 : 'reserved',
15 : 'unqualified',
16 : 'hop counter violation',
17 : 'SCCP failure',
18 : 'no translation for an address of such nature',
19 : 'unequipped user'
}
class RefCause(Uint8):
_dic = _RefCause_dict
#------------------------------------------------------------------------------#
# Data
# ITU-T Q.713, section 3.16
#------------------------------------------------------------------------------#
class Data(Envelope):
_GEN = (
Uint8('Len'),
Buf('Value', val=b'', rep=REPR_HEX)
2017-12-15 20:28:21 +00:00
)
2017-12-15 20:28:21 +00:00
def __init__(self, *args, **kwargs):
Envelope.__init__(self, *args, **kwargs)
self[0].set_valauto(lambda: self[1].get_len())
self[1].set_blauto(lambda: self[0].get_val()<<3)
2017-12-15 20:28:21 +00:00
#------------------------------------------------------------------------------#
# Segmentation
# ITU-T Q.713, section 3.17
#------------------------------------------------------------------------------#
class Segmentation(Envelope):
_GEN = (
Uint('F', val=1, bl=1, dic={1:'first segment'}),
Uint('C', bl=1, dic={0:'class 0 selected', 1:'class 1 selected'}),
Uint('spare', bl=2),
Uint('RemainingSeg', val=0, bl=4, dic={0:'last segment'}),
Uint24('SegmentLocalRef')
)
#------------------------------------------------------------------------------#
# Hop counter
# ITU-T Q.713, section 3.18
#------------------------------------------------------------------------------#
class HopCounter(Uint8):
pass
#------------------------------------------------------------------------------#
# Importance
# ITU-T Q.713, section 3.19
#------------------------------------------------------------------------------#
class Importance(Envelope):
_GEN = (
Uint('spare', bl=5),
Uint('Value', val=0, bl=3, dic={0:'least important', 7:'more important'})
)
#------------------------------------------------------------------------------#
# Long data
# ITU-T Q.713, section 3.20
#------------------------------------------------------------------------------#
class LongData(Envelope):
_GEN = (
# WNG: Q.713 does not say if it is in BE or LE, we guess LE similarly to Ptr16
Uint16LE('Len'),
2017-12-15 20:28:21 +00:00
Buf('Value', val=b'', rep=REPR_HEX)
)
2017-12-15 20:28:21 +00:00
def __init__(self, *args, **kwargs):
Envelope.__init__(self, *args, **kwargs)
self[0].set_valauto(lambda: self[1].get_len())
self[1].set_blauto(lambda: self[0].get_val()<<3)
2017-12-15 20:28:21 +00:00
#------------------------------------------------------------------------------#
# SCCP messages and codes
# ITU-T Q.713, section 4
#------------------------------------------------------------------------------#
class _Ptr(UintLE):
"""Special SCCP pointer element, which sets its value automatically to the
given field name
"""
_field = None
def __init__(self, *args, **kwargs):
if 'field' in kwargs:
self._field = kwargs['field']
del kwargs['field']
UintLE.__init__(self, *args, **kwargs)
self._max = (1<<self._bl)-1
def clone(self):
c = UintLE.clone(self)
if self._field:
c._field = self._field
return c
def _make_val(self):
if self._env is None or self._env._env is None:
return 0
else:
val = 0
# in case of pointer to optional part and no options, set to 0
if self._field == 'Opt' and all([o.get_trans() for o in self._env._env['Opt']._content]):
return val
#
# get the length of the following pointers (including self) within the Pointers envelope
ind = self._env._content.index(self)
for ptr in self._env._content[ind:]:
val += ptr._bl//8
#print('%s: %s, %i' % (self._name, ptr._name, val))
if self._bl == 16:
# see Q.713, section 2.3:
# pointer value is between the MSB (LE) of the pointer and the parameter
# hence removing 1 bytes for Ptr16
val -= 1
#
# get the length of the fields after the Pointers field, up to self._field
ind = 1 + self._env._env._content.index(self._env)
2017-12-15 20:28:21 +00:00
for field in self._env._env._content[ind:]:
if field._name == self._field:
break
else:
val += field.get_len()
#print('%s: %s, %i' % (self._name, field._name, val))
#
return min(self._max, val)
2017-12-15 20:28:21 +00:00
_valauto = _make_val
class Ptr8(_Ptr):
_bl = 8
class Ptr16(_Ptr):
_bl = 16
# constructor for an optional parameter wrapper
def Optional(param, name):
2018-10-30 14:18:24 +00:00
"""prefix the parameter element `param' with an uint8 as name, and eventually
an uint8 as len, and make it transparent by default
2017-12-15 20:28:21 +00:00
"""
if param._name == 'EOO':
w = Envelope(param._name, GEN=(param, ), trans=True)
elif param.CLASS == 'Envelope' and param[0]._name == 'Len':
2018-10-30 14:18:24 +00:00
# length prefix already present
w = Envelope(param._name, GEN=(
Uint8('Name', val=name, dic=_SCCPParam_dict),
param), trans=True)
else:
# length prefix to be added
class Option(Envelope):
_GEN = (
Uint8('Name', val=name, dic=_SCCPParam_dict),
Uint8('Len'),
param
)
def __init__(self, *args, **kwargs):
Envelope.__init__(self, *args, **kwargs)
self[1].set_valauto(lambda: self[2].get_len())
if not hasattr(param, '_bl') or param._bl is None:
self[2].set_blauto(lambda: self[1].get_val()<<3)
2018-10-30 14:18:24 +00:00
self.set_trans(True)
w = Option(param._name)
2017-12-15 20:28:21 +00:00
return w
# parent class for SCCP messages
class SCCPMessage(Envelope):
"""Parent class for all SCCP message, handling the parsing of variable-length
fields specifically
"""
def _from_char(self, char):
if self.get_trans():
return
2017-12-15 20:28:21 +00:00
# parse the fixed content up the the pointers
start = 0
for e in self._content:
e._from_char(char)
if e._name == 'Pointers':
# e: envelope containing all Ptr8 or Ptr16 values
start = 1 + self.index(e)
2017-12-15 20:28:21 +00:00
break
if not start:
return
# parse the variable-length and optional content according to the pointers
# this is done non-sequentially
ccur, numptr = char._cur, len(e._content)
# ccur: cursor at the end of the Pointers field
# numptr: number of pointers
2017-12-15 20:28:21 +00:00
for ind, ptr in enumerate(e._content):
if ptr._field == 'Opt' and ptr._val == 0:
break
# update the charpy cursor
char._cur = ccur + (8 * ptr.get_val()) - ((numptr-ind) * ptr._bl)
if ptr._bl == 16:
# see Q.713, section 2.3:
# pointer value is between the MSB (LE) of the pointer and the parameter
# hence removing 1 bytes for Ptr16
char._cur += 8
2017-12-15 20:28:21 +00:00
# select the corresponding field
field = self._content[start+ind]
#print('%s: %s, %i' % (ptr._name, field._name, char._cur))
field._from_char(char)
def is_valid(self):
"""Ensures an SCCP message has a valid layout,
i.e. its mandatory parameters with variable length (those with pointers)
do not overlap
Args:
None
Returns:
result: bool
"""
if 'Pointers' in self._by_name:
ptrs_ind = self._by_name.index('Pointers')
ptrs = self._content[ptrs_ind].get_val()
prms = self._content[1+ptrs_ind:]
areas, len_ptrs = [], len(ptrs)
for i in range(len_ptrs):
prm = prms[i]
if prm._name == 'Opt':
# Options have no global length prefix, but are always the last parameter
# moreover, if their pointer is null, the parameter is not present
ptr = ptrs[i]
if ptr != 0:
areas.append( (ptr-(len_ptrs-i), None) )
else:
areas.append( (ptrs[i]-(len_ptrs-i), 1+prm._content[0].get_val()) )
areas.sort(key=lambda x: x[0])
# areas: list of (prm_offset, prm_length), sorted by prm_offset
#print(areas)
off = 0
for prm in areas:
if prm[0] < off:
return False
else:
if prm[1] is not None:
off += prm[1]
return True
2017-12-15 20:28:21 +00:00
# parent class for SCCP messages options
class SCCPOpt(Envelope):
"""Class for handling the set of SCCP optional fields in a given message
"""
ENV_SEL_TRANS = False
_opts = {}
def __init__(self, *args, **kwargs):
Envelope.__init__(self, *args, **kwargs)
# build a dict of name : optional field
self._opts = {e[0]._val: e for e in self._content}
def _from_char(self, char):
if self.get_trans():
return
2017-12-15 20:28:21 +00:00
# parse the different options in the given order
ind = 0
while char.len_bit() >= 8:
name = char.get_uint(8)
2017-12-15 20:28:21 +00:00
if name not in self._opts:
#raise(PycrateErr('SCCP option: invalid identifier %i' % name))
# unknown option
opt = Optional(Buf('_unk_%i' % name, rep=REPR_HEX), name)
unk = True
else:
opt = self._opts[name]
unk = False
opt.set_trans(False)
2017-12-15 20:28:21 +00:00
opt[0].set_val(name)
# parse the rest of the option
for e in opt._content[1:]:
e._from_char(char)
if unk:
# insert unknown optional field
self.insert(ind, opt)
elif self.index(opt) != ind:
# reorder optional field
2017-12-15 20:28:21 +00:00
self.remove(opt)
self.insert(ind, opt)
ind += 1
if name == 0:
# end of options
break
#------------------------------------------------------------------------------#
# Connection Request (CR)
# ITU-T Q.713, section 4.2
#------------------------------------------------------------------------------#
class SCCPConnectionRequest(SCCPMessage):
_GEN = (
Uint8('Type', val=1, dic=_SCCPType_dict),
SrcLocalRef(),
ProtocolClass(),
Envelope('Pointers', GEN=(
Ptr8('Ptr0', field='CalledPartyAddr'),
Ptr8('Ptr1', field='Opt')
)),
CalledPartyAddr(),
SCCPOpt('Opt', GEN=(
Optional(Credit(), 9),
Optional(CallingPartyAddr(), 4),
Optional(Data(), 15),
Optional(HopCounter(), 17),
Optional(Importance(), 18),
Optional(EOO(), 0)
))
)
#------------------------------------------------------------------------------#
# Connection Confirm (CC)
# ITU-T Q.713, section 4.3
#------------------------------------------------------------------------------#
class SCCPConnectionConfirm(SCCPMessage):
_GEN = (
Uint8('Type', val=2, dic=_SCCPType_dict),
DstLocalRef(),
SrcLocalRef(),
ProtocolClass(),
Envelope('Pointers', GEN=(
Ptr8('Ptr0', field='Opt'),
)),
SCCPOpt('Opt', GEN=(
Optional(Credit(), 9),
Optional(CalledPartyAddr(), 4),
Optional(Data(), 15),
Optional(HopCounter(), 17),
Optional(Importance(), 18),
Optional(EOO(), 0)
))
)
#------------------------------------------------------------------------------#
# Connection Refused (CREF)
# ITU-T Q.713, section 4.4
#------------------------------------------------------------------------------#
class SCCPConnectionRefused(SCCPMessage):
_GEN = (
Uint8('Type', val=3, dic=_SCCPType_dict),
DstLocalRef(),
RefCause(),
Envelope('Pointers', GEN=(
Ptr8('Ptr0', field='Opt'),
)),
SCCPOpt('Opt', GEN=(
Optional(CalledPartyAddr(), 4),
Optional(Data(), 15),
Optional(Importance(), 18),
Optional(EOO(), 0)
))
)
#------------------------------------------------------------------------------#
# Released (RLSD)
# ITU-T Q.713, section 4.5
#------------------------------------------------------------------------------#
class SCCPReleased(SCCPMessage):
_GEN = (
Uint8('Type', val=4, dic=_SCCPType_dict),
DstLocalRef(),
SrcLocalRef(),
RelCause(),
Envelope('Pointers', GEN=(
Ptr8('Ptr0', field='Opt'),
)),
SCCPOpt('Opt', GEN=(
Optional(Data(), 15),
Optional(Importance(), 18),
Optional(EOO(), 0)
))
)
#------------------------------------------------------------------------------#
# Release complete (RLC)
# ITU-T Q.713, section 4.6
#------------------------------------------------------------------------------#
class SCCPReleaseComplete(SCCPMessage):
_GEN = (
Uint8('Type', val=5, dic=_SCCPType_dict),
DstLocalRef(),
SrcLocalRef()
)
#------------------------------------------------------------------------------#
# Data form 1 (DT1)
# ITU-T Q.713, section 4.7
#------------------------------------------------------------------------------#
class SCCPDataForm1(SCCPMessage):
_GEN = (
Uint8('Type', val=6, dic=_SCCPType_dict),
DstLocalRef(),
SegmentReassemb(),
Envelope('Pointers', GEN=(
Ptr8('Ptr0', field='Data'),
)),
Data()
)
#------------------------------------------------------------------------------#
# Data form 2 (DT2)
# ITU-T Q.713, section 4.8
#------------------------------------------------------------------------------#
class SCCPDataForm2(SCCPMessage):
_GEN = (
Uint8('Type', val=7, dic=_SCCPType_dict),
DstLocalRef(),
SeqSegment(),
Envelope('Pointers', GEN=(
Ptr8('Ptr0', field='Data'),
)),
Data()
)
#------------------------------------------------------------------------------#
# Data acknowledgement (AK)
# ITU-T Q.713, section 4.9
#------------------------------------------------------------------------------#
class SCCPDataAck(SCCPMessage):
_GEN = (
Uint8('Type', val=8, dic=_SCCPType_dict),
DstLocalRef(),
RecvSeqn(),
Credit()
)
#------------------------------------------------------------------------------#
# Unit Data (UDT)
# ITU-T Q.713, section 4.10
#------------------------------------------------------------------------------#
class SCCPUnitData(SCCPMessage):
_GEN = (
Uint8('Type', val=9, dic=_SCCPType_dict),
ProtocolClass(),
Envelope('Pointers', GEN=(
Ptr8('Ptr0', field='CalledPartyAddr'),
Ptr8('Ptr1', field='CallingPartyAddr'),
Ptr8('Ptr2', field='Data')
)),
CalledPartyAddr(),
CallingPartyAddr(),
Data()
)
#------------------------------------------------------------------------------#
# Unit Data Service (UDTS)
# ITU-T Q.713, section 4.11
#------------------------------------------------------------------------------#
class SCCPUnitDataService(SCCPMessage):
_GEN = (
Uint8('Type', val=10, dic=_SCCPType_dict),
RetCause(),
Envelope('Pointers', GEN=(
Ptr8('Ptr0', field='CalledPartyAddr'),
Ptr8('Ptr1', field='CallingPartyAddr'),
Ptr8('Ptr2', field='Data')
)),
CalledPartyAddr(),
CallingPartyAddr(),
Data()
)
#------------------------------------------------------------------------------#
# Expedited data (ED)
# ITU-T Q.713, section 4.12
#------------------------------------------------------------------------------#
class SCCPExpeditedData(SCCPMessage):
_GEN = (
Uint8('Type', val=11, dic=_SCCPType_dict),
DstLocalRef(),
Envelope('Pointers', GEN=(
Ptr8('Ptr0', field='Data'),
)),
Data()
)
#------------------------------------------------------------------------------#
# Expedited data acknowledgement (EA)
# ITU-T Q.713, section 4.13
#------------------------------------------------------------------------------#
class SCCPExpeditedDataAck(SCCPMessage):
_GEN = (
Uint8('Type', val=12, dic=_SCCPType_dict),
DstLocalRef()
)
#------------------------------------------------------------------------------#
# Reset request (RSR)
# ITU-T Q.713, section 4.14
#------------------------------------------------------------------------------#
class SCCPResetRequest(SCCPMessage):
_GEN = (
Uint8('Type', val=13, dic=_SCCPType_dict),
DstLocalRef(),
SrcLocalRef(),
ResCause(),
Envelope('Pointers', GEN=(
Ptr8('Ptr0', field='Opt'),
)),
SCCPOpt('Opt', GEN=(
Optional(EOO(), 0),
))
)
#------------------------------------------------------------------------------#
# Reset confirmation (RSC)
# ITU-T Q.713, section 4.15
#------------------------------------------------------------------------------#
class SCCPResetConf(SCCPMessage):
_GEN = (
Uint8('Type', val=14, dic=_SCCPType_dict),
DstLocalRef(),
SrcLocalRef()
)
#------------------------------------------------------------------------------#
# Protocol data unit error (ERR)
# ITU-T Q.713, section 4.16
#------------------------------------------------------------------------------#
class SCCPError(SCCPMessage):
_GEN = (
Uint8('Type', val=15, dic=_SCCPType_dict),
DstLocalRef(),
ErrCause,
Envelope('Pointers', GEN=(
Ptr8('Ptr0', field='Opt'),
)),
SCCPOpt('Opt', GEN=(
Optional(EOO(), 0),
))
)
#------------------------------------------------------------------------------#
# Inactivity test (IT)
# ITU-T Q.713, section 4.17
#------------------------------------------------------------------------------#
class SCCPInactivityTest(SCCPMessage):
_GEN = (
Uint8('Type', val=16, dic=_SCCPType_dict),
DstLocalRef(),
SrcLocalRef(),
ProtocolClass(),
SeqSegment(),
Credit()
)
#------------------------------------------------------------------------------#
# Extended unitdata (XUDT)
# ITU-T Q.713, section 4.18
#------------------------------------------------------------------------------#
class SCCPExtUnitData(SCCPMessage):
_GEN = (
Uint8('Type', val=17, dic=_SCCPType_dict),
ProtocolClass(),
HopCounter(),
Envelope('Pointers', GEN=(
Ptr8('Ptr0', field='CalledPartyAddr'),
Ptr8('Ptr1', field='CallingPartyAddr'),
Ptr8('Ptr2', field='Data'),
Ptr8('Ptr3', field='Opt')
)),
CalledPartyAddr(),
CallingPartyAddr(),
Data(),
SCCPOpt('Opt', GEN=(
Optional(Segmentation(), 16),
Optional(Importance(), 18),
Optional(EOO(), 0)
))
)
#------------------------------------------------------------------------------#
# Extended unitdata service (XUDTS)
# ITU-T Q.713, section 4.19
#------------------------------------------------------------------------------#
class SCCPExtUnitDataService(SCCPMessage):
_GEN = (
Uint8('Type', val=18, dic=_SCCPType_dict),
RetCause(),
HopCounter(),
Envelope('Pointers', GEN=(
Ptr8('Ptr0', field='CalledPartyAddr'),
Ptr8('Ptr1', field='CallingPartyAddr'),
Ptr8('Ptr2', field='Data'),
Ptr8('Ptr3', field='Opt')
)),
CalledPartyAddr(),
CallingPartyAddr(),
Data(),
SCCPOpt('Opt', GEN=(
Optional(Segmentation(), 16),
Optional(Importance(), 18),
Optional(EOO(), 0)
))
)
#------------------------------------------------------------------------------#
# Long unitdata (LUDT)
# ITU-T Q.713, section 4.20
#------------------------------------------------------------------------------#
class SCCPLongUnitData(SCCPMessage):
_GEN = (
Uint8('Type', val=19, dic=_SCCPType_dict),
ProtocolClass(),
HopCounter(),
Envelope('Pointers', GEN=(
Ptr16('Ptr0', field='CalledPartyAddr'),
Ptr16('Ptr1', field='CallingPartyAddr'),
Ptr16('Ptr2', field='LongData'),
Ptr16('Ptr3', field='Opt')
)),
CalledPartyAddr(),
CallingPartyAddr(),
LongData(),
SCCPOpt('Opt', GEN=(
Optional(Segmentation(), 16),
Optional(Importance(), 18),
Optional(EOO(), 0)
))
)
def is_valid(self):
"""Ensures an SCCP message has a valid layout,
i.e. its mandatory parameters with variable length (those with pointers)
do not overlap
2017-12-15 20:28:21 +00:00
Args:
None
Returns:
result: bool
"""
# for LUDT / LUDTS, pointer and length prefix are 2 bytes long
ptrs = self._content[3].get_val()
prms = self._content[4:]
areas = []
for i in range(4):
prm = prms[i]
if prm._name == 'Opt':
# Options have no global length prefix, but are always the last parameter
# moreover, if their pointer is null, the parameter is not present
ptr = ptrs[i]
if ptr != 0:
areas.append( (ptr-2*(4-i), None) )
else:
areas.append( (ptrs[i]-2*(4-i), 2+prm._content[0].get_val()) )
areas.sort(key=lambda x: x[0])
# areas: list of (prm_offset, prm_length), sorted by prm_offset
#print(areas)
off = 0
for prm in areas:
if prm[0] < off:
return False
else:
if prm[1] is not None:
off += prm[1]
return True
2017-12-15 20:28:21 +00:00
#------------------------------------------------------------------------------#
# Long unitdata service (LUDTS)
# ITU-T Q.713, section 4.21
#------------------------------------------------------------------------------#
class SCCPLongUnitDataService(SCCPMessage):
2017-12-15 20:28:21 +00:00
_GEN = (
Uint8('Type', val=20, dic=_SCCPType_dict),
RetCause(),
HopCounter(),
Envelope('Pointers', GEN=(
Ptr16('Ptr0', field='CalledPartyAddr'),
Ptr16('Ptr1', field='CallingPartyAddr'),
Ptr16('Ptr2', field='LongData'),
Ptr16('Ptr3', field='Opt')
)),
CalledPartyAddr(),
CallingPartyAddr(),
LongData(),
SCCPOpt('Opt', GEN=(
Optional(Segmentation(), 16),
Optional(Importance(), 18),
Optional(EOO(), 0)
))
)
def is_valid(self):
"""Ensures an SCCP message has a valid layout,
i.e. its mandatory parameters with variable length (those with pointers)
do not overlap
Args:
None
2017-12-15 20:28:21 +00:00
Returns:
result: bool
"""
# for LUDT / LUDTS, pointer and length prefix are 2 bytes long
ptrs = self._content[3].get_val()
prms = self._content[4:]
areas = []
for i in range(4):
prm = prms[i]
if prm._name == 'Opt':
# Options have no global length prefix, but are always the last parameter
# moreover, if their pointer is null, the parameter is not present
ptr = ptrs[i]
if ptr != 0:
areas.append( (ptr-2*(4-i), None) )
else:
areas.append( (ptrs[i]-2*(4-i), 2+prm._content[0].get_val()) )
areas.sort(key=lambda x: x[0])
# areas: list of (prm_offset, prm_length), sorted by prm_offset
#print(areas)
off = 0
for prm in areas:
if prm[0] < off:
return False
elif prm[1] is None:
return False
else:
off = prm[1]
return True
2017-12-15 20:28:21 +00:00
#------------------------------------------------------------------------------#
# SCCP Message dispatcher
#------------------------------------------------------------------------------#
SCCPTypeClasses = {
1 : SCCPConnectionRequest,
2 : SCCPConnectionConfirm,
3 : SCCPConnectionRefused,
4 : SCCPReleased,
5 : SCCPReleaseComplete,
6 : SCCPDataForm1,
7 : SCCPDataForm2,
8 : SCCPDataAck,
9 : SCCPUnitData,
10 : SCCPUnitDataService,
11 : SCCPExpeditedData,
12 : SCCPExpeditedDataAck,
13 : SCCPResetRequest,
14 : SCCPResetConf,
15 : SCCPError,
16 : SCCPInactivityTest,
17 : SCCPExtUnitData,
18 : SCCPExtUnitDataService,
19 : SCCPLongUnitData,
20 : SCCPLongUnitDataService,
2017-12-15 20:28:21 +00:00
}
def get_sccp_msg_instances():
return {k: SCCPTypeClasses[k]() for k in SCCPTypeClasses}
#------------------------------------------------------------------------------#
# SCCP Management messages and codes
# ITU-T Q.713, section 5
#------------------------------------------------------------------------------#
#------------------------------------------------------------------------------#
# SCMG message parameters
# ITU-T Q.713, section 5.2
#------------------------------------------------------------------------------#
_SCMGType_dict = {
1 : 'SSA subsystem-allowed',
2 : 'SSP subsystem-prohibited',
3 : 'SST subsystem-status-test',
4 : 'SOR subsystem-out-of-service-request',
5 : 'SOG subsystem-out-of-service-grant',
6 : 'SSC SCCP/subsystem-congested'
}
_SMI_dict = {
0 : 'affected subsystem multiplicity unknown',
2 : 'reserved for national use',
3 : 'reserved for national use'
}
class SubsysMultInd(Envelope):
_GEN = (
Uint('spare', bl=6),
Uint('Value', val=0, bl=2, dic=_SMI_dict)
)
class CongestLevel(Envelope):
_GEN = (
Uint('spare', bl=4),
Uint('Value', val=1, bl=4, dic={1:'least congested', 8:'most congested'})
)
#------------------------------------------------------------------------------#
# SCCP Messages
# ITU-T Q.713, section 5.3
#------------------------------------------------------------------------------#
class SCMGSubsysAllowed(Envelope):
_GEN = (
Uint8('Type', val=1, dic=_SCMGType_dict),
Uint8('AffectedSSN', val=0, dic=_SSN_dict),
Uint16LE('AffectedPC', val=0),
SubsysMultInd()
)
class SCMGSubsysProhibited(Envelope):
_GEN = (
Uint8('Type', val=2, dic=_SCMGType_dict),
Uint8('AffectedSSN', val=0, dic=_SSN_dict),
Uint16LE('AffectedPC', val=0),
SubsysMultInd()
)
class SCMGSubsysStatTest(Envelope):
_GEN = (
Uint8('Type', val=3, dic=_SCMGType_dict),
Uint8('AffectedSSN', val=0, dic=_SSN_dict),
Uint16LE('AffectedPC', val=0),
SubsysMultInd()
)
class SCMGSubsysOutOfServRequest(Envelope):
_GEN = (
Uint8('Type', val=4, dic=_SCMGType_dict),
Uint8('AffectedSSN', val=0, dic=_SSN_dict),
Uint16LE('AffectedPC', val=0),
SubsysMultInd()
)
class SCMGSubsysOutOfServGrant(Envelope):
_GEN = (
Uint8('Type', val=5, dic=_SCMGType_dict),
Uint8('AffectedSSN', val=0, dic=_SSN_dict),
Uint16LE('AffectedPC', val=0),
SubsysMultInd()
)
class SCMGSubsysCongested(Envelope):
_GEN = (
Uint8('Type', val=6, dic=_SCMGType_dict),
Uint8('AffectedSSN', val=0, dic=_SSN_dict),
Uint16LE('AffectedPC', val=0),
SubsysMultInd(),
CongestLevel()
)
#------------------------------------------------------------------------------#
# SCMG Message dispatcher
#------------------------------------------------------------------------------#
SCMGTypeClasses = {
1 : SCMGSubsysAllowed,
2 : SCMGSubsysProhibited,
3 : SCMGSubsysStatTest,
4 : SCMGSubsysOutOfServRequest,
5 : SCMGSubsysOutOfServGrant,
6 : SCMGSubsysCongested
}
def get_scmg_msg_instances():
return {k: SCMGTypeClasses[k]() for k in SCMGTypeClasses}
#------------------------------------------------------------------------------#
# SCPP Message parser
#------------------------------------------------------------------------------#
def parse_SCCP(buf, w_scmg=True):
2017-12-15 20:28:21 +00:00
"""Parses an SCCP message bytes' buffer
Args:
buf: SCCP message bytes' buffer
Returns:
element, err: 2-tuple
element: Element instance, if err is null (no error)
element: None, if err is not null
err: 0 no error, 1 invalid message type, 2 message parsing failed
"""
if not buf:
return None, 1
if python_version < 3:
try:
Msg = SCCPTypeClasses[ord(buf[0])]()
except:
return None, 1
else:
try:
Msg = SCCPTypeClasses[buf[0]]()
except:
return None, 1
try:
Msg.from_bytes(buf)
except:
return None, 2
#
# if SCMG, parses it further (UDT/XUDT/LUDT, ProtocolClass 0, both addresses on SSN 1)
if w_scmg:
try:
if Msg[0].get_val() in (9, 17, 19) and Msg[1][1].get_val() == 0 and \
Msg[3][1][0]['RoutingInd'].get_val() == 1 and Msg[3][1][0]['SSNInd'].get_val() == 1 and Msg[3][1]['SSN'].get_val() == 1 and \
Msg[4][1][0]['RoutingInd'].get_val() == 1 and Msg[4][1][0]['SSNInd'].get_val() == 1 and Msg[4][1]['SSN'].get_val() == 1:
data = Msg[5]
dataval = data[1]
scmg, err = parse_SCMG(dataval.get_val())
if err == 0:
data.replace(dataval, scmg)
except:
pass
2017-12-15 20:28:21 +00:00
#
return Msg, 0
def parse_SCMG(buf):
"""Parses an SCMG message bytes' buffer
Args:
buf: SCMG message bytes' buffer
Returns:
element, err: 2-tuple
element: Element instance, if err is null (no error)
element: None, if err is not null
err: 0 no error, 1 invalid message type, 2 message parsing failed
"""
if not buf:
return None, 1
if python_version < 3:
try:
Msg = SCMGTypeClasses[ord(buf[0])]()
except:
return None, 1
else:
try:
Msg = SCMGTypeClasses[buf[0]]()
except:
return None, 1
try:
Msg.from_bytes(buf)
except:
return None, 2
return Msg, 0