forked from sim-card/pysim
WIP SMS
Change-Id: I0d95e62c1e7183a7851d1fe38df0f5133830cb1f
This commit is contained in:
parent
63054b0d36
commit
66717dfc45
|
@ -27,6 +27,8 @@ pip install gsm0338
|
|||
pip install termcolor
|
||||
pip install colorlog
|
||||
pip install pycryptodome
|
||||
# we need this direct git install, as pypi only lists the python2.7 only release 0.3 from 2013 :(
|
||||
pip install git+https://github.com/hologram-io/smpp.pdu
|
||||
|
||||
# Execute automatically discovered unit tests first
|
||||
python -m unittest discover -v -s tests/
|
||||
|
|
|
@ -0,0 +1,57 @@
|
|||
#!/usr/bin/python3
|
||||
|
||||
from pySim.ota import *
|
||||
from pySim.sms import SMS_SUBMIT, SMS_DELIVER, AddressField
|
||||
from pySim.utils import h2b, h2b
|
||||
|
||||
# KIC1 + KID1 of 8988211000000515398
|
||||
#KIC1 = h2b('C039ED58F7B81446105E79EBFD373038')
|
||||
#KID1 = h2b('1799B93FE53F430BD7FD4810C77E1FDF')
|
||||
#KIC3 = h2b('167F2576D64C8D41862954875C8D7979')
|
||||
#KID3 = h2b('ECAE122B0E6AE4186D6487D50FDC0922')
|
||||
|
||||
# KIC1 + KID1 of 8988211000000467285
|
||||
KIC1 = h2b('D0FDA31990D8D64178601317191669B4')
|
||||
KID1 = h2b('D24EB461799C5E035C77451FD9404463')
|
||||
KIC3 = h2b('C21DD66ACAC13CB3BC8B331B24AFB57B')
|
||||
KID3 = h2b('12110C78E678C25408233076AA033615')
|
||||
|
||||
od = OtaKeyset(algo_crypt='triple_des_cbc2', kic_idx=3, kic=KIC3,
|
||||
algo_auth='triple_des_cbc2', kid_idx=3, kid=KID3)
|
||||
print(od.crypt)
|
||||
print(od.auth)
|
||||
|
||||
dialect = OtaDialectSms()
|
||||
|
||||
# RAM: B00000
|
||||
# SIM RFM: B00010
|
||||
# USIM RFM: B00011
|
||||
tar = h2b('B00011')
|
||||
|
||||
spi = {'counter':'no_counter', 'ciphering':True, 'rc_cc_ds': 'cc', 'por_in_submit':False,
|
||||
'por_shall_be_ciphered':True, 'por_rc_cc_ds': 'cc', 'por': 'por_required'}
|
||||
outp = dialect.encode_cmd(od, tar, spi, apdu=b'\x00\xa4\x00\x04\x02\x3f\x00')
|
||||
print("result: %s" % b2h(outp))
|
||||
|
||||
with_udh = b'\x02\x70\x00' + outp
|
||||
print("with_udh: %s" % b2h(with_udh))
|
||||
|
||||
|
||||
da = AddressField('12345678', 'unknown', 'isdn_e164')
|
||||
#tpdu = SMS_SUBMIT(tp_udhi=True, tp_mr=0x23, tp_da=da, tp_pid=0x7F, tp_dcs=0xF6, tp_udl=3, tp_ud=with_udh)
|
||||
tpdu = SMS_DELIVER(tp_udhi=True, tp_oa=da, tp_pid=0x7F, tp_dcs=0xF6, tp_scts=h2b('22705200000000'), tp_udl=3, tp_ud=with_udh)
|
||||
print(tpdu)
|
||||
print("tpdu: %s" % b2h(tpdu.toBytes()))
|
||||
|
||||
spi = {'counter':'no_counter', 'ciphering':True, 'rc_cc_ds': 'cc', 'por_in_submit':False,
|
||||
'por_shall_be_ciphered':True, 'por_rc_cc_ds': 'cc', 'por': 'por_required'}
|
||||
dialect.decode_resp(od, spi, '027100001c12b000119660ebdb81be189b5e4389e9e7ab2bc0954f963ad869ed7c')
|
||||
|
||||
spi = {'counter':'no_counter', 'ciphering':True, 'rc_cc_ds': 'cc', 'por_in_submit':False,
|
||||
'por_shall_be_ciphered':False, 'por_rc_cc_ds': 'cc', 'por': 'por_required'}
|
||||
dialect.decode_resp(od, spi, '027100001612b000110000000000000055f47118381175fb01612f')
|
||||
|
||||
spi = {'counter':'no_counter', 'ciphering':True, 'rc_cc_ds': 'cc', 'por_in_submit':False,
|
||||
'por_shall_be_ciphered':False, 'por_rc_cc_ds': 'no_rc_cc_ds', 'por': 'por_required'}
|
||||
dialect.decode_resp(od, spi, '027100000e0ab000110000000000000001612f')
|
||||
|
|
@ -189,7 +189,7 @@ class PysimApp(cmd2.Cmd):
|
|||
self.register_command_set(Iso7816Commands())
|
||||
self.register_command_set(Ts102222Commands())
|
||||
self.register_command_set(PySimCommands())
|
||||
self.iccid, sw = self.card.read_iccid()
|
||||
#self.iccid, sw = self.card.read_iccid()
|
||||
self.lchan.select('MF', self)
|
||||
rc = True
|
||||
else:
|
||||
|
|
355
pySim/sms.py
355
pySim/sms.py
|
@ -18,20 +18,25 @@
|
|||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
import typing
|
||||
from construct import Int8ub, Bytes
|
||||
from construct import Struct, Tell, this, RepeatUntil
|
||||
import abc
|
||||
from pprint import pprint as pp
|
||||
from construct import Int8ub, Byte, Bytes, Bit, Flag, BitsInteger, Flag
|
||||
from construct import Struct, Enum, Tell, BitStruct, this, Padding
|
||||
from construct import Prefixed, GreedyRange, GreedyBytes
|
||||
|
||||
from pySim.construct import HexAdapter, BcdAdapter, TonNpi
|
||||
from pySim.utils import Hexstr, h2b, b2h
|
||||
|
||||
from smpp.pdu import pdu_types
|
||||
|
||||
BytesOrHex = typing.Union[Hexstr, bytes]
|
||||
|
||||
class UserDataHeader:
|
||||
# a single IE in the user data header
|
||||
ie_c = Struct('offset'/Tell, 'iei'/Int8ub, 'length'/Int8ub, 'data'/Bytes(this.length))
|
||||
ie_c = Struct('iei'/Int8ub, 'length'/Int8ub, 'value'/Bytes(this.length))
|
||||
# parser for the full UDH: Length octet followed by sequence of IEs
|
||||
_construct = Struct('udhl'/Int8ub,
|
||||
# FIXME: somehow the below lambda is not working, we always only get the first IE?
|
||||
'ies'/RepeatUntil(lambda obj,lst,ctx: ctx._io.tell() > 1+this.udhl, ie_c))
|
||||
_construct = Struct('ies'/Prefixed(Int8ub, GreedyRange(ie_c)),
|
||||
'data'/GreedyBytes)
|
||||
|
||||
def __init__(self, ies=[]):
|
||||
self.ies = ies
|
||||
|
@ -50,4 +55,340 @@ class UserDataHeader:
|
|||
if isinstance(inb, str):
|
||||
inb = h2b(inb)
|
||||
res = cls._construct.parse(inb)
|
||||
return cls(res['ies']), inb[1+res['udhl']:]
|
||||
return cls(res['ies']), res['data']
|
||||
|
||||
def toBytes(self) -> bytes:
|
||||
return self._construct.build({'ies':self.ies, 'data':b''})
|
||||
|
||||
|
||||
def smpp_dcs_is_8bit(dcs: pdu_types.DataCoding) -> bool:
|
||||
if dcs == pdu_types.DataCoding(pdu_types.DataCodingScheme.DEFAULT,
|
||||
pdu_types.DataCodingDefault.OCTET_UNSPECIFIED):
|
||||
return True
|
||||
if dcs == pdu_types.DataCoding(pdu_types.DataCodingScheme.DEFAULT,
|
||||
pdu_types.DataCodingDefault.OCTET_UNSPECIFIED_COMMON):
|
||||
return True
|
||||
if dcs.scheme == pdu_types.DataCodingScheme.GSM_MESSAGE_CLASS and dcs.schemeData['msgCoding'] == pdu_types.DataCodingGsmMsgCoding.DATA_8BIT:
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
|
||||
def ensure_smpp_is_8bit(dcs: pdu_types.DataCoding):
|
||||
if not smpp_dcs_is_8bit(smpp_pdu.params['data_coding']):
|
||||
raise ValueError('We only support 8bit coded SMS for now')
|
||||
|
||||
class AddressField:
|
||||
"""Representation of an address field as used in SMS T-PDU."""
|
||||
_construct = Struct('addr_len'/Int8ub,
|
||||
'type_of_addr'/TonNpi,
|
||||
'digits'/BcdAdapter(Bytes(this.addr_len//2 + this.addr_len%2)),
|
||||
'tell'/Tell)
|
||||
|
||||
def __init__(self, digits, ton='unknown', npi='unknown'):
|
||||
self.ton = ton
|
||||
self.npi = npi
|
||||
self.digits = digits
|
||||
|
||||
def __str__(self):
|
||||
return 'AddressField(TON=%s, NPI=%s, %s)' % (self.ton, self.npi, self.digits)
|
||||
|
||||
@classmethod
|
||||
def fromBytes(cls, inb: BytesOrHex) -> typing.Tuple['AddressField', bytes]:
|
||||
"""Construct an AddressField instance from the binary T-PDU address format."""
|
||||
if isinstance(inb, str):
|
||||
inb = h2b(inb)
|
||||
res = cls._construct.parse(inb)
|
||||
#pp(res)
|
||||
#print("size: %s" % cls._construct.sizeof())
|
||||
ton = res['type_of_addr']['type_of_number']
|
||||
npi = res['type_of_addr']['numbering_plan_id']
|
||||
# return resulting instance + remainder bytes
|
||||
return cls(res['digits'][:res['addr_len']], ton, npi), inb[res['tell']:]
|
||||
|
||||
@classmethod
|
||||
def fromSmpp(cls, addr, ton, npi) -> 'AddressField':
|
||||
"""Construct an AddressField from {source,dest}_addr_{,ton,npi} attributes of smpp.pdu."""
|
||||
smpp_map_npi = {
|
||||
'UNKNOWN': 'unknown',
|
||||
'ISDN': 'isdn_e164',
|
||||
'DATA': 'data_x121',
|
||||
'TELEX': 'telex_f69',
|
||||
'LAND_MOBILE': 'sc_specific6',
|
||||
'NATIONAL': 'national',
|
||||
'PRIVATE': 'private',
|
||||
'ERMES': 'ermes',
|
||||
}
|
||||
smpp_map_ton = {
|
||||
'UNKNOWN': 'unknown',
|
||||
'INTERNATIONAL': 'international',
|
||||
'NATIONAL': 'national',
|
||||
'NETWORK_SPECIFIC': 'network_specific',
|
||||
'SUBSCRIBER_NUMBER': 'short_code',
|
||||
'ALPHANUMERIC': 'alphanumeric',
|
||||
'ABBREVIATED': 'abbreviated',
|
||||
}
|
||||
# return the resulting instance
|
||||
return cls(addr.decode('ascii'), smpp_map_ton[ton.name], smpp_map_npi[npi.name])
|
||||
|
||||
|
||||
def toBytes(self) -> bytes:
|
||||
"""Encode the AddressField into the binary representation as used in T-PDU."""
|
||||
num_digits = len(self.digits)
|
||||
if num_digits % 2:
|
||||
self.digits += 'f'
|
||||
d = {
|
||||
'addr_len': num_digits,
|
||||
'type_of_addr': {
|
||||
'ext': True,
|
||||
'type_of_number': self.ton,
|
||||
'numbering_plan_id': self.npi,
|
||||
},
|
||||
'digits': self.digits,
|
||||
}
|
||||
return self._construct.build(d)
|
||||
|
||||
|
||||
class SMS_TPDU(abc.ABC):
|
||||
"""Base class for a SMS T-PDU."""
|
||||
def __init__(self, **kwargs):
|
||||
self.tp_mti = kwargs.get('tp_mti', None)
|
||||
self.tp_rp = kwargs.get('tp_rp', False)
|
||||
self.tp_udhi = kwargs.get('tp_udhi', False)
|
||||
self.tp_pid = kwargs.get('tp_pid', None)
|
||||
self.tp_dcs = kwargs.get('tp_dcs', None)
|
||||
self.tp_udl = kwargs.get('tp_udl', None)
|
||||
self.tp_ud = kwargs.get('tp_ud', None)
|
||||
|
||||
|
||||
|
||||
class SMS_DELIVER(SMS_TPDU):
|
||||
"""Representation of a SMS-DELIVER T-PDU."""
|
||||
flags_construct = BitStruct('tp_rp'/Flag, 'tp_udhi'/Flag, 'tp_rp'/Flag, 'tp_sri'/Flag,
|
||||
Padding(1), 'tp_mms'/Flag, 'tp_mti'/BitsInteger(2))
|
||||
def __init__(self, **kwargs):
|
||||
kwargs['tp_mti'] = 0
|
||||
super().__init__(**kwargs)
|
||||
self.tp_lp = kwargs.get('tp_lp', False)
|
||||
self.tp_mms = kwargs.get('tp_mms', False)
|
||||
self.tp_oa = kwargs.get('tp_oa', None)
|
||||
self.tp_scts = kwargs.get('tp_scts', None)
|
||||
self.tp_sri = kwargs.get('tp_sri', False)
|
||||
|
||||
def __repr__(self):
|
||||
return '%s(MTI=%s, MMS=%s, LP=%s, RP=%s, UDHI=%s, SRI=%s, OA=%s, PID=%2x, DCS=%x, SCTS=%s, UDL=%u, UD=%s)' % (self.__class__.__name__, self.tp_mti, self.tp_mms, self.tp_lp, self.tp_rp, self.tp_udhi, self.tp_sri, self.tp_oa, self.tp_pid, self.tp_dcs, self.tp_scts, self.tp_udl, self.tp_ud)
|
||||
|
||||
@classmethod
|
||||
def fromBytes(cls, inb: BytesOrHex) -> 'SMS_DELIVER':
|
||||
"""Construct a SMS_DELIVER instance from the binary encoded format as used in T-PDU."""
|
||||
if isinstance(inb, str):
|
||||
inb = h2b(inb)
|
||||
flags = inb[0]
|
||||
d = SMS_DELIVER.flags_construct.parse(inb)
|
||||
oa, remainder = AddressField.fromBytes(inb[1:])
|
||||
d['tp_oa'] = oa
|
||||
offset = 0
|
||||
d['tp_pid'] = remainder[offset]
|
||||
offset += 1
|
||||
d['tp_dcs'] = remainder[offset]
|
||||
offset += 1
|
||||
# TODO: further decode
|
||||
d['tp_scts'] = remainder[offset:offset+7]
|
||||
offset += 7
|
||||
d['tp_udl'] = remainder[offset]
|
||||
offset += 1
|
||||
d['tp_ud'] = remainder[offset:]
|
||||
return cls(**d)
|
||||
|
||||
def toBytes(self) -> bytes:
|
||||
"""Encode a SMS_DELIVER instance to the binary encoded format as used in T-PDU."""
|
||||
outb = bytearray()
|
||||
d = {
|
||||
'tp_mti': self.tp_mti, 'tp_mms': self.tp_mms, 'tp_lp': self.tp_lp,
|
||||
'tp_rp': self.tp_rp, 'tp_udhi': self.tp_udhi, 'tp_sri': self.tp_sri,
|
||||
}
|
||||
flags = SMS_DELIVER.flags_construct.build(d)
|
||||
outb.extend(flags)
|
||||
outb.extend(self.tp_oa.toBytes())
|
||||
outb.append(self.tp_pid)
|
||||
outb.append(self.tp_dcs)
|
||||
outb.extend(self.tp_scts)
|
||||
outb.append(self.tp_udl)
|
||||
outb.extend(self.tp_ud)
|
||||
|
||||
return outb
|
||||
|
||||
@classmethod
|
||||
def fromSmpp(cls, smpp_pdu) -> 'SMS_DELIVER':
|
||||
"""Construct a SMS_DELIVER instance from the deliver format used by smpp.pdu."""
|
||||
if smpp_pdu.id == pdu_types.CommandId.submit_sm:
|
||||
return cls.fromSmppSubmit(cls, smpp_pdu)
|
||||
else:
|
||||
raise ValueError('Unsupported SMPP commandId %s' % smpp_pdu.id)
|
||||
|
||||
@classmethod
|
||||
def fromSmppSubmit(cls, smpp_pdu) -> 'SMS_DELIVER':
|
||||
"""Construct a SMS_DELIVER instance from the submit format used by smpp.pdu."""
|
||||
ensure_smpp_is_8bit(smpp_pdu.params['data_coding'])
|
||||
tp_oa = AddressField.fromSmpp(smpp_pdu.params['source_addr'],
|
||||
smpp_pdu.params['source_addr_ton'],
|
||||
smpp_pdu.params['source_addr_npi'])
|
||||
tp_ud = smpp_pdu.params['short_message']
|
||||
d = {
|
||||
'tp_lp': False,
|
||||
'tp_mms': False,
|
||||
'tp_oa': tp_oa,
|
||||
'tp_scts': h2b('22705200000000'), # FIXME
|
||||
'tp_sri': False,
|
||||
'tp_rp': False,
|
||||
'tp_udhi': pdu_types.EsmClassGsmFeatures.UDHI_INDICATOR_SET in smpp_pdu.params['esm_class'].gsmFeatures,
|
||||
'tp_pid': smpp_pdu.params['protocol_id'],
|
||||
'tp_dcs': 0xF6, # we only deal with binary SMS here
|
||||
'tp_udl': len(tp_ud),
|
||||
'tp_ud': tp_ud,
|
||||
}
|
||||
return cls(**d)
|
||||
|
||||
|
||||
|
||||
class SMS_SUBMIT(SMS_TPDU):
|
||||
"""Representation of a SMS-DELIVER T-PDU."""
|
||||
flags_construct = BitStruct('tp_srr'/Flag, 'tp_udhi'/Flag, 'tp_rp'/Flag,
|
||||
'tp_vpf'/Enum(BitsInteger(2), none=0, relative=2, enhanced=1, absolute=3),
|
||||
'tp_rd'/Flag, 'tp_mti'/BitsInteger(2))
|
||||
def __init__(self, **kwargs):
|
||||
kwargs['tp_mti'] = 1
|
||||
super().__init__(**kwargs)
|
||||
self.tp_rd = kwargs.get('tp_rd', False)
|
||||
self.tp_vpf = kwargs.get('tp_vpf', 'none')
|
||||
self.tp_srr = kwargs.get('tp_srr', False)
|
||||
self.tp_mr = kwargs.get('tp_mr', None)
|
||||
self.tp_da = kwargs.get('tp_da', None)
|
||||
self.tp_vp = kwargs.get('tp_vp', None)
|
||||
|
||||
def __repr__(self):
|
||||
return '%s(MTI=%s, RD=%s, VPF=%u, RP=%s, UDHI=%s, SRR=%s, DA=%s, PID=%2x, DCS=%x, VP=%s, UDL=%u, UD=%s)' % (self.__class__.__name__, self.tp_mti, self.tp_rd, self.tp_vpf, self.tp_rp, self.tp_udhi, self.tp_srr, self.tp_da, self.tp_pid, self.tp_dcs, self.tp_vp, self.tp_udl, self.tp_ud)
|
||||
|
||||
@classmethod
|
||||
def fromBytes(cls, inb:BytesOrHex) -> 'SMS_SUBMIT':
|
||||
"""Construct a SMS_SUBMIT instance from the binary encoded format as used in T-PDU."""
|
||||
offset = 0
|
||||
if isinstance(inb, str):
|
||||
inb = h2b(inb)
|
||||
d = SMS_SUBMIT.flags_construct.parse(inb)
|
||||
offset += 1
|
||||
d['tp_mr']= inb[offset]
|
||||
offset += 1
|
||||
da, remainder = AddressField.fromBytes(inb[2:])
|
||||
d['tp_da'] = da
|
||||
|
||||
offset = 0
|
||||
d['tp_pid'] = remainder[offset]
|
||||
offset += 1
|
||||
d['tp_dcs'] = remainder[offset]
|
||||
offset += 1
|
||||
if d['tp_vpf'] == 'none':
|
||||
pass
|
||||
elif d['tp_vpf'] == 'relative':
|
||||
# TODO: further decode
|
||||
d['tp_vp'] = remainder[offset:offset+1]
|
||||
offset += 1
|
||||
elif d['tp_vpf'] == 'enhanced':
|
||||
# TODO: further decode
|
||||
d['tp_vp'] = remainder[offset:offset+7]
|
||||
offset += 7
|
||||
pass
|
||||
elif d['tp_vpf'] == 'absolute':
|
||||
# TODO: further decode
|
||||
d['tp_vp'] = remainder[offset:offset+7]
|
||||
offset += 7
|
||||
pass
|
||||
else:
|
||||
raise ValueError('Invalid VPF: %s' % d['tp_vpf'])
|
||||
d['tp_udl'] = remainder[offset]
|
||||
offset += 1
|
||||
d['tp_ud'] = remainder[offset:]
|
||||
return cls(**d)
|
||||
|
||||
def toBytes(self) -> bytes:
|
||||
"""Encode a SMS_SUBMIT instance to the binary encoded format as used in T-PDU."""
|
||||
outb = bytearray()
|
||||
d = {
|
||||
'tp_mti': self.tp_mti, 'tp_rd': self.tp_rd, 'tp_vpf': self.tp_vpf,
|
||||
'tp_rp': self.tp_rp, 'tp_udhi': self.tp_udhi, 'tp_srr': self.tp_srr,
|
||||
}
|
||||
flags = SMS_SUBMIT.flags_construct.build(d)
|
||||
outb.extend(flags)
|
||||
outb.append(self.tp_mr)
|
||||
outb.extend(self.tp_da.toBytes())
|
||||
outb.append(self.tp_pid)
|
||||
outb.append(self.tp_dcs)
|
||||
if self.tp_vpf != 'none':
|
||||
outb.extend(self.tp_vp)
|
||||
outb.append(self.tp_udl)
|
||||
outb.extend(self.tp_ud)
|
||||
return outb
|
||||
|
||||
@classmethod
|
||||
def fromSmpp(cls, smpp_pdu) -> 'SMS_SUBMIT':
|
||||
"""Construct a SMS_DELIVER instance from the format used by smpp.pdu."""
|
||||
if smpp_pdu.id == pdu_types.CommandId.submit_sm:
|
||||
return cls.fromSmppSubmit(cls, smpp_pdu)
|
||||
else:
|
||||
raise ValueError('Unsupported SMPP commandId %s' % smpp_pdu.id)
|
||||
|
||||
@classmethod
|
||||
def fromSmppSubmit(cls, smpp_pdu) -> 'SMS_SUBMIT':
|
||||
"""Construct a SMS_DELIVER instance from the submit format used by smpp.pdu."""
|
||||
ensure_smpp_is_8bit(smpp_pdu.params['data_coding'])
|
||||
tp_da = AddressField.fromSmpp(smpp_pdu.params['destination_addr'],
|
||||
smpp_pdu.params['dest_addr_ton'],
|
||||
smpp_pdu.params['dest_addr_npi'])
|
||||
tp_ud = smpp_pdu.params['short_message']
|
||||
#vp_smpp = smpp_pdu.params['validity_period']
|
||||
#if not vp_smpp:
|
||||
# vpf = 'none'
|
||||
d = {
|
||||
'tp_rd': True if smpp_pdu.params['replace_if_present_flag'].name == 'REPLACE' else False,
|
||||
'tp_vpf': None, # vpf,
|
||||
'tp_rp': False, # related to ['registered_delivery'] ?
|
||||
'tp_udhi': pdu_types.EsmClassGsmFeatures.UDHI_INDICATOR_SET in smpp_pdu.params['esm_class'].gsmFeatures,
|
||||
'tp_srr': True if smpp_pdu.params['registered_delivery'] else False,
|
||||
'tp_mr': 0, # FIXME: sm_default_msg_id ?
|
||||
'tp_da': tp_da,
|
||||
'tp_pid': smpp_pdu.params['protocol_id'],
|
||||
'tp_dcs': 0xF6, # FIXME: we only deal with binary SMS here
|
||||
'tp_vp': None, # FIXME: implement VPF conversion
|
||||
'tp_udl': len(tp_ud),
|
||||
'tp_ud': tp_ud,
|
||||
}
|
||||
return cls(**d)
|
||||
|
||||
def toSmpp(self) -> pdu_types.PDU:
|
||||
"""Translate a SMS_DELIVER instance to a smpp.pdu.pdu_types.SubmitSM instance."""
|
||||
esm_class = pdu_types.EsmClass(pdu_types.EsmClassMode.DEFAULT, pdu_types.EsmClassType.DEFAULT)
|
||||
reg_del = pdu_types.RegisteredDelivery(pdu_types.RegisteredDeliveryReceipt.NO_SMSC_DELIVERY_RECEIPT_REQUESTED)
|
||||
if self.tp_rp:
|
||||
repl_if = pdu_types.ReplaceIfPresentFlag.REPLACE
|
||||
else:
|
||||
repl_if = pdu_types.ReplaceIfPresentFlag.DO_NOT_REPLACE
|
||||
# we only deal with binary SMS here:
|
||||
if self.tp_dcs != 0xF6:
|
||||
raise ValueError('Unsupported DCS: We only support DCS=0xF6 for now')
|
||||
dc = pdu_types.DataCoding(pdu_types.DataCodingScheme.DEFAULT, pdu_types.DataCodingDefault.OCTET_UNSPECIFIED)
|
||||
return pdu_types.SubmitSM(service_type='',
|
||||
source_addr_ton=pdu_types.AddrTon.ALPHANUMERIC,
|
||||
source_addr_npi=pdu_types.AddrNpi.UNKNOWN,
|
||||
source_addr='simcard',
|
||||
dest_addr_ton=FIXME(self.tp_da.ton),
|
||||
dest_addr_npi=FIXME(self.tp_da.npi),
|
||||
destination_addr=self.tp_da.digits,
|
||||
esm_class=esm_class,
|
||||
protocol_id=self.tp_pid,
|
||||
priority_flag=pdu_types.PriorityFlag.LEVEL_0,
|
||||
#schedule_delivery_time,
|
||||
#validity_period,
|
||||
registered_delivery=reg_del,
|
||||
replace_if_present_flag=repl_if,
|
||||
data_coding=dc,
|
||||
#sm_default_msg_id,
|
||||
short_message=self.tp_ud)
|
||||
|
|
|
@ -10,3 +10,4 @@ pyyaml>=5.1
|
|||
termcolor
|
||||
colorlog
|
||||
pycryptodome
|
||||
git+https://github.com/hologram-io/smpp.pdu
|
||||
|
|
3
setup.py
3
setup.py
|
@ -19,7 +19,8 @@ setup(
|
|||
"gsm0338",
|
||||
"termcolor",
|
||||
"colorlog",
|
||||
"pycryptodome"
|
||||
"pycryptodome",
|
||||
"smpp.pdu @ git+https://github.com/hologram-io/smpp.pdu",
|
||||
],
|
||||
scripts=[
|
||||
'pySim-prog.py',
|
||||
|
|
|
@ -0,0 +1,23 @@
|
|||
#!/usr/bin/env python3
|
||||
|
||||
from pySim.sms import *
|
||||
from pprint import pprint as pp
|
||||
from construct import setGlobalPrintPrivateEntries
|
||||
|
||||
|
||||
print(UserDataHeader.fromBytes('027100'))
|
||||
print(UserDataHeader.fromBytes('027100abcdef'))
|
||||
print(UserDataHeader.fromBytes('03710110'))
|
||||
print(UserDataHeader.fromBytes('0571007001ffabcd'))
|
||||
|
||||
setGlobalPrintPrivateEntries(True)
|
||||
pp(AddressField.fromBytes('0480214399'))
|
||||
|
||||
s = SMS_SUBMIT.fromBytes('550d0b911614261771f000f5a78c0b050423f423f40003010201424547494e3a56434152440d0a56455253494f4e3a322e310d0a4e3a4d650d0a54454c3b505245463b43454c4c3b564f4943453a2b36313431363237313137300d0a54454c3b484f4d453b564f4943453a2b36313339353337303437310d0a54454c3b574f524b3b564f4943453a2b36313339363734373031350d0a454e443a')
|
||||
pp(s)
|
||||
print(s.tp_da)
|
||||
pp(b2h(s.toBytes()))
|
||||
|
||||
d = SMS_DELIVER.fromBytes('0408D0E5759A0E7FF6907090307513000824010101BB400101')
|
||||
pp(d)
|
||||
pp(b2h(d.toBytes()))
|
|
@ -0,0 +1,105 @@
|
|||
#!/usr/bin/env python3
|
||||
|
||||
import unittest
|
||||
from pySim.utils import h2b, b2h
|
||||
from pySim.sms import *
|
||||
|
||||
class Test_SMS_UDH(unittest.TestCase):
|
||||
def test_single_ie(self):
|
||||
udh, tail = UserDataHeader.fromBytes('027100')
|
||||
self.assertEqual(len(udh.ies), 1)
|
||||
ie = udh.ies[0]
|
||||
self.assertEqual(ie.iei, 0x71)
|
||||
self.assertEqual(ie.length, 0)
|
||||
self.assertEqual(ie.value, b'')
|
||||
self.assertEqual(tail, b'')
|
||||
|
||||
def test_single_ie_tail(self):
|
||||
udh, tail = UserDataHeader.fromBytes('027100abcdef')
|
||||
self.assertEqual(len(udh.ies), 1)
|
||||
ie = udh.ies[0]
|
||||
self.assertEqual(ie.iei, 0x71)
|
||||
self.assertEqual(ie.length, 0)
|
||||
self.assertEqual(ie.value, b'')
|
||||
self.assertEqual(tail, b'\xab\xcd\xef')
|
||||
|
||||
def test_single_ie_value(self):
|
||||
udh, tail = UserDataHeader.fromBytes('03710110')
|
||||
self.assertEqual(len(udh.ies), 1)
|
||||
ie = udh.ies[0]
|
||||
self.assertEqual(ie.iei, 0x71)
|
||||
self.assertEqual(ie.length, 1)
|
||||
self.assertEqual(ie.value, b'\x10')
|
||||
self.assertEqual(tail, b'')
|
||||
|
||||
def test_two_ie_data_tail(self):
|
||||
udh, tail = UserDataHeader.fromBytes('0571007001ffabcd')
|
||||
self.assertEqual(len(udh.ies), 2)
|
||||
ie = udh.ies[0]
|
||||
self.assertEqual(ie.iei, 0x71)
|
||||
self.assertEqual(ie.length, 0)
|
||||
self.assertEqual(ie.value, b'')
|
||||
ie = udh.ies[1]
|
||||
self.assertEqual(ie.iei, 0x70)
|
||||
self.assertEqual(ie.length, 1)
|
||||
self.assertEqual(ie.value, b'\xff')
|
||||
self.assertEqual(tail, b'\xab\xcd')
|
||||
|
||||
def test_toBytes(self):
|
||||
indata = h2b('0571007001ff')
|
||||
udh, tail = UserDataHeader.fromBytes(indata)
|
||||
encoded = udh.toBytes()
|
||||
self.assertEqual(encoded, indata)
|
||||
|
||||
class Test_AddressField(unittest.TestCase):
|
||||
def test_fromBytes(self):
|
||||
encoded = h2b('0480214399')
|
||||
af, trailer = AddressField.fromBytes(encoded)
|
||||
self.assertEqual(trailer, b'\x99')
|
||||
self.assertEqual(af.ton, 'unknown')
|
||||
self.assertEqual(af.npi, 'unknown')
|
||||
self.assertEqual(af.digits, '1234')
|
||||
|
||||
def test_fromBytes_odd(self):
|
||||
af, trailer = AddressField.fromBytes('038021f399')
|
||||
self.assertEqual(trailer, b'\x99')
|
||||
self.assertEqual(af.ton, 'unknown')
|
||||
self.assertEqual(af.npi, 'unknown')
|
||||
self.assertEqual(af.digits, '123')
|
||||
|
||||
def test_toBytes(self):
|
||||
encoded = h2b('04802143')
|
||||
af, trailer = AddressField.fromBytes(encoded)
|
||||
self.assertEqual(af.toBytes(), encoded)
|
||||
|
||||
def test_toBytes_odd(self):
|
||||
af = AddressField('12345', 'international', 'isdn_e164')
|
||||
encoded = af.toBytes()
|
||||
self.assertEqual(encoded, h2b('05912143f5'))
|
||||
|
||||
|
||||
class Test_SUBMIT(unittest.TestCase):
|
||||
def test_fromBytes(self):
|
||||
s = SMS_SUBMIT.fromBytes('550d0b911614261771f000f5a78c0b050423f423f40003010201424547494e3a56434152440d0a56455253494f4e3a322e310d0a4e3a4d650d0a54454c3b505245463b43454c4c3b564f4943453a2b36313431363237313137300d0a54454c3b484f4d453b564f4943453a2b36313339353337303437310d0a54454c3b574f524b3b564f4943453a2b36313339363734373031350d0a454e443a')
|
||||
self.assertEqual(s.tp_mti, 1)
|
||||
self.assertEqual(s.tp_rd, True)
|
||||
self.assertEqual(s.tp_vpf, 'relative')
|
||||
self.assertEqual(s.tp_rp, False)
|
||||
self.assertEqual(s.tp_udhi, True)
|
||||
self.assertEqual(s.tp_srr, False)
|
||||
self.assertEqual(s.tp_pid, 0)
|
||||
self.assertEqual(s.tp_dcs, 0xf5)
|
||||
self.assertEqual(s.tp_udl, 140)
|
||||
|
||||
class Test_DELIVER(unittest.TestCase):
|
||||
def test_fromBytes(self):
|
||||
d = SMS_DELIVER.fromBytes('0408D0E5759A0E7FF6907090307513000824010101BB400101')
|
||||
self.assertEqual(d.tp_mti, 0)
|
||||
self.assertEqual(d.tp_mms, True)
|
||||
self.assertEqual(d.tp_lp, False)
|
||||
self.assertEqual(d.tp_rp, False)
|
||||
self.assertEqual(d.tp_udhi, False)
|
||||
self.assertEqual(d.tp_sri, False)
|
||||
self.assertEqual(d.tp_pid, 0x7f)
|
||||
self.assertEqual(d.tp_dcs, 0xf6)
|
||||
self.assertEqual(d.tp_udl, 8)
|
Loading…
Reference in New Issue