mirror of https://gerrit.osmocom.org/pysim
403 lines
17 KiB
Python
403 lines
17 KiB
Python
"""Code related to SMS Encoding/Decoding"""
|
|
# simplistic SMS T-PDU code, as unfortunately nobody bothered to port the python smspdu
|
|
# module to python3, and I gave up after >= 3 hours of trying and failing to do so
|
|
|
|
# (C) 2022 by Harald Welte <laforge@osmocom.org>
|
|
#
|
|
# This program is free software: you can redistribute it and/or modify
|
|
# it under the terms of the GNU General Public License as published by
|
|
# the Free Software Foundation, either version 2 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU General Public License
|
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
import typing
|
|
import abc
|
|
from bidict import bidict
|
|
from construct import Int8ub, Byte, Bytes, Bit, Flag, BitsInteger, Flag
|
|
from construct import Struct, Enum, Tell, BitStruct, this, Padding
|
|
from construct import Prefixed, GreedyRange, GreedyBytes
|
|
|
|
from pySim.construct import HexAdapter, BcdAdapter, TonNpi
|
|
from pySim.utils import Hexstr, h2b, b2h
|
|
|
|
from smpp.pdu import pdu_types, operations
|
|
|
|
BytesOrHex = typing.Union[Hexstr, bytes]
|
|
|
|
class UserDataHeader:
|
|
# a single IE in the user data header
|
|
ie_c = Struct('iei'/Int8ub, 'length'/Int8ub, 'value'/Bytes(this.length))
|
|
# parser for the full UDH: Length octet followed by sequence of IEs
|
|
_construct = Struct('ies'/Prefixed(Int8ub, GreedyRange(ie_c)),
|
|
'data'/GreedyBytes)
|
|
|
|
def __init__(self, ies=[]):
|
|
self.ies = ies
|
|
|
|
def __repr__(self) -> str:
|
|
return 'UDH(%r)' % self.ies
|
|
|
|
def has_ie(self, iei:int) -> bool:
|
|
for ie in self.ies:
|
|
if ie['iei'] == iei:
|
|
return True
|
|
return False
|
|
|
|
@classmethod
|
|
def fromBytes(cls, inb: BytesOrHex) -> typing.Tuple['UserDataHeader', bytes]:
|
|
if isinstance(inb, str):
|
|
inb = h2b(inb)
|
|
res = cls._construct.parse(inb)
|
|
return cls(res['ies']), res['data']
|
|
|
|
def toBytes(self) -> bytes:
|
|
return self._construct.build({'ies':self.ies, 'data':b''})
|
|
|
|
|
|
def smpp_dcs_is_8bit(dcs: pdu_types.DataCoding) -> bool:
|
|
"""Determine if the given SMPP data coding scheme is 8-bit or not."""
|
|
if dcs == pdu_types.DataCoding(pdu_types.DataCodingScheme.DEFAULT,
|
|
pdu_types.DataCodingDefault.OCTET_UNSPECIFIED):
|
|
return True
|
|
if dcs == pdu_types.DataCoding(pdu_types.DataCodingScheme.DEFAULT,
|
|
pdu_types.DataCodingDefault.OCTET_UNSPECIFIED_COMMON):
|
|
return True
|
|
# pySim/sms.py:72:21: E1101: Instance of 'DataCodingScheme' has no 'GSM_MESSAGE_CLASS' member (no-member)
|
|
# pylint: disable=no-member
|
|
if dcs.scheme == pdu_types.DataCodingScheme.GSM_MESSAGE_CLASS and dcs.schemeData['msgCoding'] == pdu_types.DataCodingGsmMsgCoding.DATA_8BIT:
|
|
return True
|
|
else:
|
|
return False
|
|
|
|
def ensure_smpp_is_8bit(dcs: pdu_types.DataCoding):
|
|
"""Assert if given SMPP data coding scheme is not 8-bit."""
|
|
if not smpp_dcs_is_8bit(dcs):
|
|
raise ValueError('We only support 8bit coded SMS for now')
|
|
|
|
class AddressField:
|
|
"""Representation of an address field as used in SMS T-PDU."""
|
|
_construct = Struct('addr_len'/Int8ub,
|
|
'type_of_addr'/TonNpi,
|
|
'digits'/BcdAdapter(Bytes(this.addr_len//2 + this.addr_len%2)),
|
|
'tell'/Tell)
|
|
smpp_map_npi = bidict({
|
|
'UNKNOWN': 'unknown',
|
|
'ISDN': 'isdn_e164',
|
|
'DATA': 'data_x121',
|
|
'TELEX': 'telex_f69',
|
|
'LAND_MOBILE': 'sc_specific6',
|
|
'NATIONAL': 'national',
|
|
'PRIVATE': 'private',
|
|
'ERMES': 'ermes',
|
|
})
|
|
smpp_map_ton = bidict({
|
|
'UNKNOWN': 'unknown',
|
|
'INTERNATIONAL': 'international',
|
|
'NATIONAL': 'national',
|
|
'NETWORK_SPECIFIC': 'network_specific',
|
|
'SUBSCRIBER_NUMBER': 'short_code',
|
|
'ALPHANUMERIC': 'alphanumeric',
|
|
'ABBREVIATED': 'abbreviated',
|
|
})
|
|
|
|
|
|
def __init__(self, digits, ton='unknown', npi='unknown'):
|
|
self.ton = ton
|
|
self.npi = npi
|
|
self.digits = digits
|
|
|
|
def __str__(self):
|
|
return 'AddressField(TON=%s, NPI=%s, %s)' % (self.ton, self.npi, self.digits)
|
|
|
|
@classmethod
|
|
def fromBytes(cls, inb: BytesOrHex) -> typing.Tuple['AddressField', bytes]:
|
|
"""Construct an AddressField instance from the binary T-PDU address format."""
|
|
if isinstance(inb, str):
|
|
inb = h2b(inb)
|
|
res = cls._construct.parse(inb)
|
|
#print("size: %s" % cls._construct.sizeof())
|
|
ton = res['type_of_addr']['type_of_number']
|
|
npi = res['type_of_addr']['numbering_plan_id']
|
|
# return resulting instance + remainder bytes
|
|
return cls(res['digits'][:res['addr_len']], ton, npi), inb[res['tell']:]
|
|
|
|
@classmethod
|
|
def fromSmpp(cls, addr, ton, npi) -> 'AddressField':
|
|
"""Construct an AddressField from {source,dest}_addr_{,ton,npi} attributes of smpp.pdu."""
|
|
# return the resulting instance
|
|
return cls(addr.decode('ascii'), AddressField.smpp_map_ton[ton.name], AddressField.smpp_map_npi[npi.name])
|
|
|
|
def toSmpp(self):
|
|
"""Return smpp.pdo.*.source,dest}_addr_{,ton,npi} attributes for given AddressField."""
|
|
return (self.digits, self.smpp_map_ton.inverse[self.ton], self.smpp_map_npi.inverse[self.npi])
|
|
|
|
def toBytes(self) -> bytes:
|
|
"""Encode the AddressField into the binary representation as used in T-PDU."""
|
|
num_digits = len(self.digits)
|
|
if num_digits % 2:
|
|
self.digits += 'f'
|
|
d = {
|
|
'addr_len': num_digits,
|
|
'type_of_addr': {
|
|
'ext': True,
|
|
'type_of_number': self.ton,
|
|
'numbering_plan_id': self.npi,
|
|
},
|
|
'digits': self.digits,
|
|
}
|
|
return self._construct.build(d)
|
|
|
|
|
|
class SMS_TPDU(abc.ABC):
|
|
"""Base class for a SMS T-PDU."""
|
|
def __init__(self, **kwargs):
|
|
self.tp_mti = kwargs.get('tp_mti', None)
|
|
self.tp_rp = kwargs.get('tp_rp', False)
|
|
self.tp_udhi = kwargs.get('tp_udhi', False)
|
|
self.tp_pid = kwargs.get('tp_pid', None)
|
|
self.tp_dcs = kwargs.get('tp_dcs', None)
|
|
self.tp_udl = kwargs.get('tp_udl', None)
|
|
self.tp_ud = kwargs.get('tp_ud', None)
|
|
|
|
|
|
|
|
class SMS_DELIVER(SMS_TPDU):
|
|
"""Representation of a SMS-DELIVER T-PDU. This is the Network to MS/UE (downlink) direction."""
|
|
flags_construct = BitStruct('tp_rp'/Flag, 'tp_udhi'/Flag, 'tp_rp'/Flag, 'tp_sri'/Flag,
|
|
Padding(1), 'tp_mms'/Flag, 'tp_mti'/BitsInteger(2))
|
|
def __init__(self, **kwargs):
|
|
kwargs['tp_mti'] = 0
|
|
super().__init__(**kwargs)
|
|
self.tp_lp = kwargs.get('tp_lp', False)
|
|
self.tp_mms = kwargs.get('tp_mms', False)
|
|
self.tp_oa = kwargs.get('tp_oa', None)
|
|
self.tp_scts = kwargs.get('tp_scts', None)
|
|
self.tp_sri = kwargs.get('tp_sri', False)
|
|
|
|
def __repr__(self):
|
|
return '%s(MTI=%s, MMS=%s, LP=%s, RP=%s, UDHI=%s, SRI=%s, OA=%s, PID=%2x, DCS=%x, SCTS=%s, UDL=%u, UD=%s)' % (self.__class__.__name__, self.tp_mti, self.tp_mms, self.tp_lp, self.tp_rp, self.tp_udhi, self.tp_sri, self.tp_oa, self.tp_pid, self.tp_dcs, self.tp_scts, self.tp_udl, self.tp_ud)
|
|
|
|
@classmethod
|
|
def fromBytes(cls, inb: BytesOrHex) -> 'SMS_DELIVER':
|
|
"""Construct a SMS_DELIVER instance from the binary encoded format as used in T-PDU."""
|
|
if isinstance(inb, str):
|
|
inb = h2b(inb)
|
|
flags = inb[0]
|
|
d = SMS_DELIVER.flags_construct.parse(inb)
|
|
oa, remainder = AddressField.fromBytes(inb[1:])
|
|
d['tp_oa'] = oa
|
|
offset = 0
|
|
d['tp_pid'] = remainder[offset]
|
|
offset += 1
|
|
d['tp_dcs'] = remainder[offset]
|
|
offset += 1
|
|
# TODO: further decode
|
|
d['tp_scts'] = remainder[offset:offset+7]
|
|
offset += 7
|
|
d['tp_udl'] = remainder[offset]
|
|
offset += 1
|
|
d['tp_ud'] = remainder[offset:]
|
|
return cls(**d)
|
|
|
|
def toBytes(self) -> bytes:
|
|
"""Encode a SMS_DELIVER instance to the binary encoded format as used in T-PDU."""
|
|
outb = bytearray()
|
|
d = {
|
|
'tp_mti': self.tp_mti, 'tp_mms': self.tp_mms, 'tp_lp': self.tp_lp,
|
|
'tp_rp': self.tp_rp, 'tp_udhi': self.tp_udhi, 'tp_sri': self.tp_sri,
|
|
}
|
|
flags = SMS_DELIVER.flags_construct.build(d)
|
|
outb.extend(flags)
|
|
outb.extend(self.tp_oa.toBytes())
|
|
outb.append(self.tp_pid)
|
|
outb.append(self.tp_dcs)
|
|
outb.extend(self.tp_scts)
|
|
outb.append(self.tp_udl)
|
|
outb.extend(self.tp_ud)
|
|
|
|
return outb
|
|
|
|
@classmethod
|
|
def fromSmpp(cls, smpp_pdu) -> 'SMS_DELIVER':
|
|
"""Construct a SMS_DELIVER instance from the deliver format used by smpp.pdu."""
|
|
if smpp_pdu.id == pdu_types.CommandId.submit_sm:
|
|
return cls.fromSmppSubmit(smpp_pdu)
|
|
else:
|
|
raise ValueError('Unsupported SMPP commandId %s' % smpp_pdu.id)
|
|
|
|
@classmethod
|
|
def fromSmppSubmit(cls, smpp_pdu) -> 'SMS_DELIVER':
|
|
"""Construct a SMS_DELIVER instance from the submit format used by smpp.pdu."""
|
|
ensure_smpp_is_8bit(smpp_pdu.params['data_coding'])
|
|
tp_oa = AddressField.fromSmpp(smpp_pdu.params['source_addr'],
|
|
smpp_pdu.params['source_addr_ton'],
|
|
smpp_pdu.params['source_addr_npi'])
|
|
tp_ud = smpp_pdu.params['short_message']
|
|
d = {
|
|
'tp_lp': False,
|
|
'tp_mms': False,
|
|
'tp_oa': tp_oa,
|
|
'tp_scts': h2b('22705200000000'), # FIXME
|
|
'tp_sri': False,
|
|
'tp_rp': False,
|
|
'tp_udhi': pdu_types.EsmClassGsmFeatures.UDHI_INDICATOR_SET in smpp_pdu.params['esm_class'].gsmFeatures,
|
|
'tp_pid': smpp_pdu.params['protocol_id'],
|
|
'tp_dcs': 0xF6, # we only deal with binary SMS here
|
|
'tp_udl': len(tp_ud),
|
|
'tp_ud': tp_ud,
|
|
}
|
|
return cls(**d)
|
|
|
|
|
|
|
|
class SMS_SUBMIT(SMS_TPDU):
|
|
"""Representation of a SMS-SUBMIT T-PDU. This is the MS/UE -> network (uplink) direction."""
|
|
flags_construct = BitStruct('tp_srr'/Flag, 'tp_udhi'/Flag, 'tp_rp'/Flag,
|
|
'tp_vpf'/Enum(BitsInteger(2), none=0, relative=2, enhanced=1, absolute=3),
|
|
'tp_rd'/Flag, 'tp_mti'/BitsInteger(2))
|
|
def __init__(self, **kwargs):
|
|
kwargs['tp_mti'] = 1
|
|
super().__init__(**kwargs)
|
|
self.tp_rd = kwargs.get('tp_rd', False)
|
|
self.tp_vpf = kwargs.get('tp_vpf', 'none')
|
|
self.tp_srr = kwargs.get('tp_srr', False)
|
|
self.tp_mr = kwargs.get('tp_mr', None)
|
|
self.tp_da = kwargs.get('tp_da', None)
|
|
self.tp_vp = kwargs.get('tp_vp', None)
|
|
|
|
def __repr__(self):
|
|
return '%s(MTI=%s, RD=%s, VPF=%u, RP=%s, UDHI=%s, SRR=%s, DA=%s, PID=%2x, DCS=%x, VP=%s, UDL=%u, UD=%s)' % (self.__class__.__name__, self.tp_mti, self.tp_rd, self.tp_vpf, self.tp_rp, self.tp_udhi, self.tp_srr, self.tp_da, self.tp_pid, self.tp_dcs, self.tp_vp, self.tp_udl, self.tp_ud)
|
|
|
|
@classmethod
|
|
def fromBytes(cls, inb:BytesOrHex) -> 'SMS_SUBMIT':
|
|
"""Construct a SMS_SUBMIT instance from the binary encoded format as used in T-PDU."""
|
|
offset = 0
|
|
if isinstance(inb, str):
|
|
inb = h2b(inb)
|
|
d = SMS_SUBMIT.flags_construct.parse(inb)
|
|
offset += 1
|
|
d['tp_mr']= inb[offset]
|
|
offset += 1
|
|
da, remainder = AddressField.fromBytes(inb[2:])
|
|
d['tp_da'] = da
|
|
|
|
offset = 0
|
|
d['tp_pid'] = remainder[offset]
|
|
offset += 1
|
|
d['tp_dcs'] = remainder[offset]
|
|
offset += 1
|
|
if d['tp_vpf'] == 'none':
|
|
pass
|
|
elif d['tp_vpf'] == 'relative':
|
|
# TODO: further decode
|
|
d['tp_vp'] = remainder[offset:offset+1]
|
|
offset += 1
|
|
elif d['tp_vpf'] == 'enhanced':
|
|
# TODO: further decode
|
|
d['tp_vp'] = remainder[offset:offset+7]
|
|
offset += 7
|
|
pass
|
|
elif d['tp_vpf'] == 'absolute':
|
|
# TODO: further decode
|
|
d['tp_vp'] = remainder[offset:offset+7]
|
|
offset += 7
|
|
pass
|
|
else:
|
|
raise ValueError('Invalid VPF: %s' % d['tp_vpf'])
|
|
d['tp_udl'] = remainder[offset]
|
|
offset += 1
|
|
d['tp_ud'] = remainder[offset:]
|
|
return cls(**d)
|
|
|
|
def toBytes(self) -> bytes:
|
|
"""Encode a SMS_SUBMIT instance to the binary encoded format as used in T-PDU."""
|
|
outb = bytearray()
|
|
d = {
|
|
'tp_mti': self.tp_mti, 'tp_rd': self.tp_rd, 'tp_vpf': self.tp_vpf,
|
|
'tp_rp': self.tp_rp, 'tp_udhi': self.tp_udhi, 'tp_srr': self.tp_srr,
|
|
}
|
|
flags = SMS_SUBMIT.flags_construct.build(d)
|
|
outb.extend(flags)
|
|
outb.append(self.tp_mr)
|
|
outb.extend(self.tp_da.toBytes())
|
|
outb.append(self.tp_pid)
|
|
outb.append(self.tp_dcs)
|
|
if self.tp_vpf != 'none':
|
|
outb.extend(self.tp_vp)
|
|
outb.append(self.tp_udl)
|
|
outb.extend(self.tp_ud)
|
|
return outb
|
|
|
|
@classmethod
|
|
def fromSmpp(cls, smpp_pdu) -> 'SMS_SUBMIT':
|
|
"""Construct a SMS_SUBMIT instance from the format used by smpp.pdu."""
|
|
if smpp_pdu.id == pdu_types.CommandId.submit_sm:
|
|
return cls.fromSmppSubmit(smpp_pdu)
|
|
else:
|
|
raise ValueError('Unsupported SMPP commandId %s' % smpp_pdu.id)
|
|
|
|
@classmethod
|
|
def fromSmppSubmit(cls, smpp_pdu) -> 'SMS_SUBMIT':
|
|
"""Construct a SMS_SUBMIT instance from the submit format used by smpp.pdu."""
|
|
ensure_smpp_is_8bit(smpp_pdu.params['data_coding'])
|
|
tp_da = AddressField.fromSmpp(smpp_pdu.params['destination_addr'],
|
|
smpp_pdu.params['dest_addr_ton'],
|
|
smpp_pdu.params['dest_addr_npi'])
|
|
tp_ud = smpp_pdu.params['short_message']
|
|
#vp_smpp = smpp_pdu.params['validity_period']
|
|
#if not vp_smpp:
|
|
# vpf = 'none'
|
|
d = {
|
|
'tp_rd': True if smpp_pdu.params['replace_if_present_flag'].name == 'REPLACE' else False,
|
|
'tp_vpf': None, # vpf,
|
|
'tp_rp': False, # related to ['registered_delivery'] ?
|
|
'tp_udhi': pdu_types.EsmClassGsmFeatures.UDHI_INDICATOR_SET in smpp_pdu.params['esm_class'].gsmFeatures,
|
|
'tp_srr': True if smpp_pdu.params['registered_delivery'] else False,
|
|
'tp_mr': 0, # FIXME: sm_default_msg_id ?
|
|
'tp_da': tp_da,
|
|
'tp_pid': smpp_pdu.params['protocol_id'],
|
|
'tp_dcs': 0xF6, # FIXME: we only deal with binary SMS here
|
|
'tp_vp': None, # FIXME: implement VPF conversion
|
|
'tp_udl': len(tp_ud),
|
|
'tp_ud': tp_ud,
|
|
}
|
|
return cls(**d)
|
|
|
|
def toSmpp(self) -> pdu_types.PDU:
|
|
"""Translate a SMS_SUBMIT instance to a smpp.pdu.operations.SubmitSM instance."""
|
|
esm_class = pdu_types.EsmClass(pdu_types.EsmClassMode.DEFAULT, pdu_types.EsmClassType.DEFAULT)
|
|
reg_del = pdu_types.RegisteredDelivery(pdu_types.RegisteredDeliveryReceipt.NO_SMSC_DELIVERY_RECEIPT_REQUESTED)
|
|
if self.tp_rp:
|
|
repl_if = pdu_types.ReplaceIfPresentFlag.REPLACE
|
|
else:
|
|
repl_if = pdu_types.ReplaceIfPresentFlag.DO_NOT_REPLACE
|
|
# we only deal with binary SMS here:
|
|
if self.tp_dcs != 0xF6:
|
|
raise ValueError('Unsupported DCS: We only support DCS=0xF6 for now')
|
|
dc = pdu_types.DataCoding(pdu_types.DataCodingScheme.DEFAULT, pdu_types.DataCodingDefault.OCTET_UNSPECIFIED)
|
|
(daddr, ton, npi) = self.tp_da.toSmpp()
|
|
return operations.SubmitSM(service_type='',
|
|
source_addr_ton=pdu_types.AddrTon.ALPHANUMERIC,
|
|
source_addr_npi=pdu_types.AddrNpi.UNKNOWN,
|
|
source_addr='simcard',
|
|
dest_addr_ton=ton,
|
|
dest_addr_npi=npi,
|
|
destination_addr=daddr,
|
|
esm_class=esm_class,
|
|
protocol_id=self.tp_pid,
|
|
priority_flag=pdu_types.PriorityFlag.LEVEL_0,
|
|
#schedule_delivery_time,
|
|
#validity_period,
|
|
registered_delivery=reg_del,
|
|
replace_if_present_flag=repl_if,
|
|
data_coding=dc,
|
|
#sm_default_msg_id,
|
|
short_message=self.tp_ud)
|