mirror of https://gerrit.osmocom.org/pysim
169 lines
5.6 KiB
Python
169 lines
5.6 KiB
Python
# -*- coding: utf-8 -*-
|
|
|
|
# Copyright (C) 2020 Vadim Yanitskiy <axilirator@gmail.com>
|
|
#
|
|
# This program is free software: you can redistribute it and/or modify
|
|
# it under the terms of the GNU General Public License as published by
|
|
# the Free Software Foundation, either version 2 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU General Public License
|
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
#
|
|
|
|
import logging as log
|
|
import serial
|
|
import time
|
|
import re
|
|
|
|
from pySim.transport import LinkBase
|
|
from pySim.exceptions import *
|
|
|
|
# HACK: if somebody needs to debug this thing
|
|
# log.root.setLevel(log.DEBUG)
|
|
|
|
|
|
class ModemATCommandLink(LinkBase):
|
|
"""Transport Link for 3GPP TS 27.007 compliant modems."""
|
|
|
|
def __init__(self, device: str = '/dev/ttyUSB0', baudrate: int = 115200, **kwargs):
|
|
super().__init__(**kwargs)
|
|
self._sl = serial.Serial(device, baudrate, timeout=5)
|
|
self._echo = False # this will be auto-detected by _check_echo()
|
|
self._device = device
|
|
self._atr = None
|
|
|
|
# Check the AT interface
|
|
self._check_echo()
|
|
|
|
# Trigger initial reset
|
|
self.reset_card()
|
|
|
|
def __del__(self):
|
|
if hasattr(self, '_sl'):
|
|
self._sl.close()
|
|
|
|
def send_at_cmd(self, cmd, timeout=0.2, patience=0.002):
|
|
# Convert from string to bytes, if needed
|
|
bcmd = cmd if type(cmd) is bytes else cmd.encode()
|
|
bcmd += b'\r'
|
|
|
|
# Clean input buffer from previous/unexpected data
|
|
self._sl.reset_input_buffer()
|
|
|
|
# Send command to the modem
|
|
log.debug('Sending AT command: %s', cmd)
|
|
try:
|
|
wlen = self._sl.write(bcmd)
|
|
assert(wlen == len(bcmd))
|
|
except:
|
|
raise ReaderError('Failed to send AT command: %s' % cmd)
|
|
|
|
rsp = b''
|
|
its = 1
|
|
t_start = time.time()
|
|
while True:
|
|
rsp = rsp + self._sl.read(self._sl.in_waiting)
|
|
lines = rsp.split(b'\r\n')
|
|
if len(lines) >= 2:
|
|
res = lines[-2]
|
|
if res == b'OK':
|
|
log.debug('Command finished with result: %s', res)
|
|
break
|
|
if res == b'ERROR' or res.startswith(b'+CME ERROR:'):
|
|
log.error('Command failed with result: %s', res)
|
|
break
|
|
|
|
if time.time() - t_start >= timeout:
|
|
log.info('Command finished with timeout >= %ss', timeout)
|
|
break
|
|
time.sleep(patience)
|
|
its += 1
|
|
log.debug('Command took %0.6fs (%d cycles a %fs)',
|
|
time.time() - t_start, its, patience)
|
|
|
|
if self._echo:
|
|
# Skip echo chars
|
|
rsp = rsp[wlen:]
|
|
rsp = rsp.strip()
|
|
rsp = rsp.split(b'\r\n\r\n')
|
|
|
|
log.debug('Got response from modem: %s', rsp)
|
|
return rsp
|
|
|
|
def _check_echo(self):
|
|
"""Verify the correct response to 'AT' command
|
|
and detect if inputs are echoed by the device
|
|
|
|
Although echo of inputs can be enabled/disabled via
|
|
ATE1/ATE0, respectively, we rather detect the current
|
|
configuration of the modem without any change.
|
|
"""
|
|
# Next command shall not strip the echo from the response
|
|
self._echo = False
|
|
result = self.send_at_cmd('AT')
|
|
|
|
# Verify the response
|
|
if len(result) > 0:
|
|
if result[-1] == b'OK':
|
|
self._echo = False
|
|
return
|
|
elif result[-1] == b'AT\r\r\nOK':
|
|
self._echo = True
|
|
return
|
|
raise ReaderError(
|
|
'Interface \'%s\' does not respond to \'AT\' command' % self._device)
|
|
|
|
def reset_card(self):
|
|
# Reset the modem, just to be sure
|
|
if self.send_at_cmd('ATZ') != [b'OK']:
|
|
raise ReaderError('Failed to reset the modem')
|
|
|
|
# Make sure that generic SIM access is supported
|
|
if self.send_at_cmd('AT+CSIM=?') != [b'OK']:
|
|
raise ReaderError('The modem does not seem to support SIM access')
|
|
|
|
log.info('Modem at \'%s\' is ready!' % self._device)
|
|
|
|
def connect(self):
|
|
pass # Nothing to do really ...
|
|
|
|
def disconnect(self):
|
|
pass # Nothing to do really ...
|
|
|
|
def wait_for_card(self, timeout=None, newcardonly=False):
|
|
pass # Nothing to do really ...
|
|
|
|
def _send_apdu_raw(self, pdu):
|
|
# Make sure pdu has upper case hex digits [A-F]
|
|
pdu = pdu.upper()
|
|
|
|
# Prepare the command as described in 8.17
|
|
cmd = 'AT+CSIM=%d,\"%s\"' % (len(pdu), pdu)
|
|
log.debug('Sending command: %s', cmd)
|
|
|
|
# Send AT+CSIM command to the modem
|
|
# TODO: also handle +CME ERROR: <err>
|
|
rsp = self.send_at_cmd(cmd)
|
|
if len(rsp) != 2 or rsp[-1] != b'OK':
|
|
raise ReaderError('APDU transfer failed: %s' % str(rsp))
|
|
rsp = rsp[0] # Get rid of b'OK'
|
|
|
|
# Make sure that the response has format: b'+CSIM: %d,\"%s\"'
|
|
try:
|
|
result = re.match(b'\+CSIM: (\d+),\"([0-9A-F]+)\"', rsp)
|
|
(rsp_pdu_len, rsp_pdu) = result.groups()
|
|
except:
|
|
raise ReaderError('Failed to parse response from modem: %s' % rsp)
|
|
|
|
# TODO: make sure we have at least SW
|
|
data = rsp_pdu[:-4].decode().lower()
|
|
sw = rsp_pdu[-4:].decode().lower()
|
|
log.debug('Command response: %s, %s', data, sw)
|
|
return data, sw
|