diff --git a/pySim/sms.py b/pySim/sms.py index a2812131..a15bbcb8 100644 --- a/pySim/sms.py +++ b/pySim/sms.py @@ -18,20 +18,25 @@ # along with this program. If not, see . import typing -from construct import Int8ub, Bytes -from construct import Struct, Tell, this, RepeatUntil +import abc +from bidict import bidict +from construct import Int8ub, Byte, Bytes, Bit, Flag, BitsInteger, Flag +from construct import Struct, Enum, Tell, BitStruct, this, Padding +from construct import Prefixed, GreedyRange, GreedyBytes +from pySim.construct import HexAdapter, BcdAdapter, TonNpi from pySim.utils import Hexstr, h2b, b2h +from smpp.pdu import pdu_types, operations + BytesOrHex = typing.Union[Hexstr, bytes] class UserDataHeader: # a single IE in the user data header - ie_c = Struct('offset'/Tell, 'iei'/Int8ub, 'length'/Int8ub, 'data'/Bytes(this.length)) + ie_c = Struct('iei'/Int8ub, 'length'/Int8ub, 'value'/Bytes(this.length)) # parser for the full UDH: Length octet followed by sequence of IEs - _construct = Struct('udhl'/Int8ub, - # FIXME: somehow the below lambda is not working, we always only get the first IE? - 'ies'/RepeatUntil(lambda obj,lst,ctx: ctx._io.tell() > 1+this.udhl, ie_c)) + _construct = Struct('ies'/Prefixed(Int8ub, GreedyRange(ie_c)), + 'data'/GreedyBytes) def __init__(self, ies=[]): self.ies = ies @@ -50,4 +55,346 @@ class UserDataHeader: if isinstance(inb, str): inb = h2b(inb) res = cls._construct.parse(inb) - return cls(res['ies']), inb[1+res['udhl']:] + return cls(res['ies']), res['data'] + + def toBytes(self) -> bytes: + return self._construct.build({'ies':self.ies, 'data':b''}) + + +def smpp_dcs_is_8bit(dcs: pdu_types.DataCoding) -> bool: + """Determine if the given SMPP data coding scheme is 8-bit or not.""" + if dcs == pdu_types.DataCoding(pdu_types.DataCodingScheme.DEFAULT, + pdu_types.DataCodingDefault.OCTET_UNSPECIFIED): + return True + if dcs == pdu_types.DataCoding(pdu_types.DataCodingScheme.DEFAULT, + pdu_types.DataCodingDefault.OCTET_UNSPECIFIED_COMMON): + return True + if dcs.scheme == pdu_types.DataCodingScheme.GSM_MESSAGE_CLASS and dcs.schemeData['msgCoding'] == pdu_types.DataCodingGsmMsgCoding.DATA_8BIT: + return True + else: + return False + +def ensure_smpp_is_8bit(dcs: pdu_types.DataCoding): + """Assert if given SMPP data coding scheme is not 8-bit.""" + if not smpp_dcs_is_8bit(dcs): + raise ValueError('We only support 8bit coded SMS for now') + +class AddressField: + """Representation of an address field as used in SMS T-PDU.""" + _construct = Struct('addr_len'/Int8ub, + 'type_of_addr'/TonNpi, + 'digits'/BcdAdapter(Bytes(this.addr_len//2 + this.addr_len%2)), + 'tell'/Tell) + smpp_map_npi = bidict({ + 'UNKNOWN': 'unknown', + 'ISDN': 'isdn_e164', + 'DATA': 'data_x121', + 'TELEX': 'telex_f69', + 'LAND_MOBILE': 'sc_specific6', + 'NATIONAL': 'national', + 'PRIVATE': 'private', + 'ERMES': 'ermes', + }) + smpp_map_ton = bidict({ + 'UNKNOWN': 'unknown', + 'INTERNATIONAL': 'international', + 'NATIONAL': 'national', + 'NETWORK_SPECIFIC': 'network_specific', + 'SUBSCRIBER_NUMBER': 'short_code', + 'ALPHANUMERIC': 'alphanumeric', + 'ABBREVIATED': 'abbreviated', + }) + + + def __init__(self, digits, ton='unknown', npi='unknown'): + self.ton = ton + self.npi = npi + self.digits = digits + + def __str__(self): + return 'AddressField(TON=%s, NPI=%s, %s)' % (self.ton, self.npi, self.digits) + + @classmethod + def fromBytes(cls, inb: BytesOrHex) -> typing.Tuple['AddressField', bytes]: + """Construct an AddressField instance from the binary T-PDU address format.""" + if isinstance(inb, str): + inb = h2b(inb) + res = cls._construct.parse(inb) + #print("size: %s" % cls._construct.sizeof()) + ton = res['type_of_addr']['type_of_number'] + npi = res['type_of_addr']['numbering_plan_id'] + # return resulting instance + remainder bytes + return cls(res['digits'][:res['addr_len']], ton, npi), inb[res['tell']:] + + @classmethod + def fromSmpp(cls, addr, ton, npi) -> 'AddressField': + """Construct an AddressField from {source,dest}_addr_{,ton,npi} attributes of smpp.pdu.""" + # return the resulting instance + return cls(addr.decode('ascii'), AddressField.smpp_map_ton[ton.name], AddressField.smpp_map_npi[npi.name]) + + def toSmpp(self): + """Return smpp.pdo.*.source,dest}_addr_{,ton,npi} attributes for given AddressField.""" + return (self.digits, self.smpp_map_ton.inverse[self.ton], self.smpp_map_npi.inverse[self.npi]) + + def toBytes(self) -> bytes: + """Encode the AddressField into the binary representation as used in T-PDU.""" + num_digits = len(self.digits) + if num_digits % 2: + self.digits += 'f' + d = { + 'addr_len': num_digits, + 'type_of_addr': { + 'ext': True, + 'type_of_number': self.ton, + 'numbering_plan_id': self.npi, + }, + 'digits': self.digits, + } + return self._construct.build(d) + + +class SMS_TPDU(abc.ABC): + """Base class for a SMS T-PDU.""" + def __init__(self, **kwargs): + self.tp_mti = kwargs.get('tp_mti', None) + self.tp_rp = kwargs.get('tp_rp', False) + self.tp_udhi = kwargs.get('tp_udhi', False) + self.tp_pid = kwargs.get('tp_pid', None) + self.tp_dcs = kwargs.get('tp_dcs', None) + self.tp_udl = kwargs.get('tp_udl', None) + self.tp_ud = kwargs.get('tp_ud', None) + + + +class SMS_DELIVER(SMS_TPDU): + """Representation of a SMS-DELIVER T-PDU. This is the Network to MS/UE (downlink) direction.""" + flags_construct = BitStruct('tp_rp'/Flag, 'tp_udhi'/Flag, 'tp_rp'/Flag, 'tp_sri'/Flag, + Padding(1), 'tp_mms'/Flag, 'tp_mti'/BitsInteger(2)) + def __init__(self, **kwargs): + kwargs['tp_mti'] = 0 + super().__init__(**kwargs) + self.tp_lp = kwargs.get('tp_lp', False) + self.tp_mms = kwargs.get('tp_mms', False) + self.tp_oa = kwargs.get('tp_oa', None) + self.tp_scts = kwargs.get('tp_scts', None) + self.tp_sri = kwargs.get('tp_sri', False) + + def __repr__(self): + return '%s(MTI=%s, MMS=%s, LP=%s, RP=%s, UDHI=%s, SRI=%s, OA=%s, PID=%2x, DCS=%x, SCTS=%s, UDL=%u, UD=%s)' % (self.__class__.__name__, self.tp_mti, self.tp_mms, self.tp_lp, self.tp_rp, self.tp_udhi, self.tp_sri, self.tp_oa, self.tp_pid, self.tp_dcs, self.tp_scts, self.tp_udl, self.tp_ud) + + @classmethod + def fromBytes(cls, inb: BytesOrHex) -> 'SMS_DELIVER': + """Construct a SMS_DELIVER instance from the binary encoded format as used in T-PDU.""" + if isinstance(inb, str): + inb = h2b(inb) + flags = inb[0] + d = SMS_DELIVER.flags_construct.parse(inb) + oa, remainder = AddressField.fromBytes(inb[1:]) + d['tp_oa'] = oa + offset = 0 + d['tp_pid'] = remainder[offset] + offset += 1 + d['tp_dcs'] = remainder[offset] + offset += 1 + # TODO: further decode + d['tp_scts'] = remainder[offset:offset+7] + offset += 7 + d['tp_udl'] = remainder[offset] + offset += 1 + d['tp_ud'] = remainder[offset:] + return cls(**d) + + def toBytes(self) -> bytes: + """Encode a SMS_DELIVER instance to the binary encoded format as used in T-PDU.""" + outb = bytearray() + d = { + 'tp_mti': self.tp_mti, 'tp_mms': self.tp_mms, 'tp_lp': self.tp_lp, + 'tp_rp': self.tp_rp, 'tp_udhi': self.tp_udhi, 'tp_sri': self.tp_sri, + } + flags = SMS_DELIVER.flags_construct.build(d) + outb.extend(flags) + outb.extend(self.tp_oa.toBytes()) + outb.append(self.tp_pid) + outb.append(self.tp_dcs) + outb.extend(self.tp_scts) + outb.append(self.tp_udl) + outb.extend(self.tp_ud) + + return outb + + @classmethod + def fromSmpp(cls, smpp_pdu) -> 'SMS_DELIVER': + """Construct a SMS_DELIVER instance from the deliver format used by smpp.pdu.""" + if smpp_pdu.id == pdu_types.CommandId.submit_sm: + return cls.fromSmppSubmit(smpp_pdu) + else: + raise ValueError('Unsupported SMPP commandId %s' % smpp_pdu.id) + + @classmethod + def fromSmppSubmit(cls, smpp_pdu) -> 'SMS_DELIVER': + """Construct a SMS_DELIVER instance from the submit format used by smpp.pdu.""" + ensure_smpp_is_8bit(smpp_pdu.params['data_coding']) + tp_oa = AddressField.fromSmpp(smpp_pdu.params['source_addr'], + smpp_pdu.params['source_addr_ton'], + smpp_pdu.params['source_addr_npi']) + tp_ud = smpp_pdu.params['short_message'] + d = { + 'tp_lp': False, + 'tp_mms': False, + 'tp_oa': tp_oa, + 'tp_scts': h2b('22705200000000'), # FIXME + 'tp_sri': False, + 'tp_rp': False, + 'tp_udhi': pdu_types.EsmClassGsmFeatures.UDHI_INDICATOR_SET in smpp_pdu.params['esm_class'].gsmFeatures, + 'tp_pid': smpp_pdu.params['protocol_id'], + 'tp_dcs': 0xF6, # we only deal with binary SMS here + 'tp_udl': len(tp_ud), + 'tp_ud': tp_ud, + } + return cls(**d) + + + +class SMS_SUBMIT(SMS_TPDU): + """Representation of a SMS-SUBMIT T-PDU. This is the MS/UE -> network (uplink) direction.""" + flags_construct = BitStruct('tp_srr'/Flag, 'tp_udhi'/Flag, 'tp_rp'/Flag, + 'tp_vpf'/Enum(BitsInteger(2), none=0, relative=2, enhanced=1, absolute=3), + 'tp_rd'/Flag, 'tp_mti'/BitsInteger(2)) + def __init__(self, **kwargs): + kwargs['tp_mti'] = 1 + super().__init__(**kwargs) + self.tp_rd = kwargs.get('tp_rd', False) + self.tp_vpf = kwargs.get('tp_vpf', 'none') + self.tp_srr = kwargs.get('tp_srr', False) + self.tp_mr = kwargs.get('tp_mr', None) + self.tp_da = kwargs.get('tp_da', None) + self.tp_vp = kwargs.get('tp_vp', None) + + def __repr__(self): + return '%s(MTI=%s, RD=%s, VPF=%u, RP=%s, UDHI=%s, SRR=%s, DA=%s, PID=%2x, DCS=%x, VP=%s, UDL=%u, UD=%s)' % (self.__class__.__name__, self.tp_mti, self.tp_rd, self.tp_vpf, self.tp_rp, self.tp_udhi, self.tp_srr, self.tp_da, self.tp_pid, self.tp_dcs, self.tp_vp, self.tp_udl, self.tp_ud) + + @classmethod + def fromBytes(cls, inb:BytesOrHex) -> 'SMS_SUBMIT': + """Construct a SMS_SUBMIT instance from the binary encoded format as used in T-PDU.""" + offset = 0 + if isinstance(inb, str): + inb = h2b(inb) + d = SMS_SUBMIT.flags_construct.parse(inb) + offset += 1 + d['tp_mr']= inb[offset] + offset += 1 + da, remainder = AddressField.fromBytes(inb[2:]) + d['tp_da'] = da + + offset = 0 + d['tp_pid'] = remainder[offset] + offset += 1 + d['tp_dcs'] = remainder[offset] + offset += 1 + if d['tp_vpf'] == 'none': + pass + elif d['tp_vpf'] == 'relative': + # TODO: further decode + d['tp_vp'] = remainder[offset:offset+1] + offset += 1 + elif d['tp_vpf'] == 'enhanced': + # TODO: further decode + d['tp_vp'] = remainder[offset:offset+7] + offset += 7 + pass + elif d['tp_vpf'] == 'absolute': + # TODO: further decode + d['tp_vp'] = remainder[offset:offset+7] + offset += 7 + pass + else: + raise ValueError('Invalid VPF: %s' % d['tp_vpf']) + d['tp_udl'] = remainder[offset] + offset += 1 + d['tp_ud'] = remainder[offset:] + return cls(**d) + + def toBytes(self) -> bytes: + """Encode a SMS_SUBMIT instance to the binary encoded format as used in T-PDU.""" + outb = bytearray() + d = { + 'tp_mti': self.tp_mti, 'tp_rd': self.tp_rd, 'tp_vpf': self.tp_vpf, + 'tp_rp': self.tp_rp, 'tp_udhi': self.tp_udhi, 'tp_srr': self.tp_srr, + } + flags = SMS_SUBMIT.flags_construct.build(d) + outb.extend(flags) + outb.append(self.tp_mr) + outb.extend(self.tp_da.toBytes()) + outb.append(self.tp_pid) + outb.append(self.tp_dcs) + if self.tp_vpf != 'none': + outb.extend(self.tp_vp) + outb.append(self.tp_udl) + outb.extend(self.tp_ud) + return outb + + @classmethod + def fromSmpp(cls, smpp_pdu) -> 'SMS_SUBMIT': + """Construct a SMS_SUBMIT instance from the format used by smpp.pdu.""" + if smpp_pdu.id == pdu_types.CommandId.submit_sm: + return cls.fromSmppSubmit(smpp_pdu) + else: + raise ValueError('Unsupported SMPP commandId %s' % smpp_pdu.id) + + @classmethod + def fromSmppSubmit(cls, smpp_pdu) -> 'SMS_SUBMIT': + """Construct a SMS_SUBMIT instance from the submit format used by smpp.pdu.""" + ensure_smpp_is_8bit(smpp_pdu.params['data_coding']) + tp_da = AddressField.fromSmpp(smpp_pdu.params['destination_addr'], + smpp_pdu.params['dest_addr_ton'], + smpp_pdu.params['dest_addr_npi']) + tp_ud = smpp_pdu.params['short_message'] + #vp_smpp = smpp_pdu.params['validity_period'] + #if not vp_smpp: + # vpf = 'none' + d = { + 'tp_rd': True if smpp_pdu.params['replace_if_present_flag'].name == 'REPLACE' else False, + 'tp_vpf': None, # vpf, + 'tp_rp': False, # related to ['registered_delivery'] ? + 'tp_udhi': pdu_types.EsmClassGsmFeatures.UDHI_INDICATOR_SET in smpp_pdu.params['esm_class'].gsmFeatures, + 'tp_srr': True if smpp_pdu.params['registered_delivery'] else False, + 'tp_mr': 0, # FIXME: sm_default_msg_id ? + 'tp_da': tp_da, + 'tp_pid': smpp_pdu.params['protocol_id'], + 'tp_dcs': 0xF6, # FIXME: we only deal with binary SMS here + 'tp_vp': None, # FIXME: implement VPF conversion + 'tp_udl': len(tp_ud), + 'tp_ud': tp_ud, + } + return cls(**d) + + def toSmpp(self) -> pdu_types.PDU: + """Translate a SMS_SUBMIT instance to a smpp.pdu.operations.SubmitSM instance.""" + esm_class = pdu_types.EsmClass(pdu_types.EsmClassMode.DEFAULT, pdu_types.EsmClassType.DEFAULT) + reg_del = pdu_types.RegisteredDelivery(pdu_types.RegisteredDeliveryReceipt.NO_SMSC_DELIVERY_RECEIPT_REQUESTED) + if self.tp_rp: + repl_if = pdu_types.ReplaceIfPresentFlag.REPLACE + else: + repl_if = pdu_types.ReplaceIfPresentFlag.DO_NOT_REPLACE + # we only deal with binary SMS here: + if self.tp_dcs != 0xF6: + raise ValueError('Unsupported DCS: We only support DCS=0xF6 for now') + dc = pdu_types.DataCoding(pdu_types.DataCodingScheme.DEFAULT, pdu_types.DataCodingDefault.OCTET_UNSPECIFIED) + (daddr, ton, npi) = self.tp_da.toSmpp() + return operations.SubmitSM(service_type='', + source_addr_ton=pdu_types.AddrTon.ALPHANUMERIC, + source_addr_npi=pdu_types.AddrNpi.UNKNOWN, + source_addr='simcard', + dest_addr_ton=ton, + dest_addr_npi=npi, + destination_addr=daddr, + esm_class=esm_class, + protocol_id=self.tp_pid, + priority_flag=pdu_types.PriorityFlag.LEVEL_0, + #schedule_delivery_time, + #validity_period, + registered_delivery=reg_del, + replace_if_present_flag=repl_if, + data_coding=dc, + #sm_default_msg_id, + short_message=self.tp_ud) diff --git a/requirements.txt b/requirements.txt index b12cb4db..4332b66f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -11,3 +11,4 @@ termcolor colorlog pycryptodomex packaging +git+https://github.com/hologram-io/smpp.pdu diff --git a/setup.py b/setup.py index d4fb15f7..5678c554 100644 --- a/setup.py +++ b/setup.py @@ -22,6 +22,7 @@ setup( "colorlog", "pycryptodomex", "packaging", + "smpp.pdu @ git+https://github.com/hologram-io/smpp.pdu", ], scripts=[ 'pySim-prog.py', diff --git a/tests/test_sms.py b/tests/test_sms.py new file mode 100644 index 00000000..83552245 --- /dev/null +++ b/tests/test_sms.py @@ -0,0 +1,105 @@ +#!/usr/bin/env python3 + +import unittest +from pySim.utils import h2b, b2h +from pySim.sms import * + +class Test_SMS_UDH(unittest.TestCase): + def test_single_ie(self): + udh, tail = UserDataHeader.fromBytes('027100') + self.assertEqual(len(udh.ies), 1) + ie = udh.ies[0] + self.assertEqual(ie.iei, 0x71) + self.assertEqual(ie.length, 0) + self.assertEqual(ie.value, b'') + self.assertEqual(tail, b'') + + def test_single_ie_tail(self): + udh, tail = UserDataHeader.fromBytes('027100abcdef') + self.assertEqual(len(udh.ies), 1) + ie = udh.ies[0] + self.assertEqual(ie.iei, 0x71) + self.assertEqual(ie.length, 0) + self.assertEqual(ie.value, b'') + self.assertEqual(tail, b'\xab\xcd\xef') + + def test_single_ie_value(self): + udh, tail = UserDataHeader.fromBytes('03710110') + self.assertEqual(len(udh.ies), 1) + ie = udh.ies[0] + self.assertEqual(ie.iei, 0x71) + self.assertEqual(ie.length, 1) + self.assertEqual(ie.value, b'\x10') + self.assertEqual(tail, b'') + + def test_two_ie_data_tail(self): + udh, tail = UserDataHeader.fromBytes('0571007001ffabcd') + self.assertEqual(len(udh.ies), 2) + ie = udh.ies[0] + self.assertEqual(ie.iei, 0x71) + self.assertEqual(ie.length, 0) + self.assertEqual(ie.value, b'') + ie = udh.ies[1] + self.assertEqual(ie.iei, 0x70) + self.assertEqual(ie.length, 1) + self.assertEqual(ie.value, b'\xff') + self.assertEqual(tail, b'\xab\xcd') + + def test_toBytes(self): + indata = h2b('0571007001ff') + udh, tail = UserDataHeader.fromBytes(indata) + encoded = udh.toBytes() + self.assertEqual(encoded, indata) + +class Test_AddressField(unittest.TestCase): + def test_fromBytes(self): + encoded = h2b('0480214399') + af, trailer = AddressField.fromBytes(encoded) + self.assertEqual(trailer, b'\x99') + self.assertEqual(af.ton, 'unknown') + self.assertEqual(af.npi, 'unknown') + self.assertEqual(af.digits, '1234') + + def test_fromBytes_odd(self): + af, trailer = AddressField.fromBytes('038021f399') + self.assertEqual(trailer, b'\x99') + self.assertEqual(af.ton, 'unknown') + self.assertEqual(af.npi, 'unknown') + self.assertEqual(af.digits, '123') + + def test_toBytes(self): + encoded = h2b('04802143') + af, trailer = AddressField.fromBytes(encoded) + self.assertEqual(af.toBytes(), encoded) + + def test_toBytes_odd(self): + af = AddressField('12345', 'international', 'isdn_e164') + encoded = af.toBytes() + self.assertEqual(encoded, h2b('05912143f5')) + + +class Test_SUBMIT(unittest.TestCase): + def test_fromBytes(self): + s = SMS_SUBMIT.fromBytes('550d0b911614261771f000f5a78c0b050423f423f40003010201424547494e3a56434152440d0a56455253494f4e3a322e310d0a4e3a4d650d0a54454c3b505245463b43454c4c3b564f4943453a2b36313431363237313137300d0a54454c3b484f4d453b564f4943453a2b36313339353337303437310d0a54454c3b574f524b3b564f4943453a2b36313339363734373031350d0a454e443a') + self.assertEqual(s.tp_mti, 1) + self.assertEqual(s.tp_rd, True) + self.assertEqual(s.tp_vpf, 'relative') + self.assertEqual(s.tp_rp, False) + self.assertEqual(s.tp_udhi, True) + self.assertEqual(s.tp_srr, False) + self.assertEqual(s.tp_pid, 0) + self.assertEqual(s.tp_dcs, 0xf5) + self.assertEqual(s.tp_udl, 140) + +class Test_DELIVER(unittest.TestCase): + def test_fromBytes(self): + d = SMS_DELIVER.fromBytes('0408D0E5759A0E7FF6907090307513000824010101BB400101') + self.assertEqual(d.tp_mti, 0) + self.assertEqual(d.tp_mms, True) + self.assertEqual(d.tp_lp, False) + self.assertEqual(d.tp_rp, False) + self.assertEqual(d.tp_udhi, False) + self.assertEqual(d.tp_sri, False) + self.assertEqual(d.tp_pid, 0x7f) + self.assertEqual(d.tp_dcs, 0xf6) + self.assertEqual(d.tp_udl, 8)