mirror of https://gerrit.osmocom.org/pysim
transport: add Bluetooth (SIM Access Profile) based transport
Change-Id: I2e8b202ac5cddf7c8533115d53dd0d64da6ca9b9
This commit is contained in:
parent
e8d177d88f
commit
ccb8499ea9
|
@ -215,6 +215,10 @@ def argparse_add_reader_args(arg_parser):
|
|||
osmobb_group.add_argument('--osmocon', dest='osmocon_sock', metavar='PATH', default=None,
|
||||
help='Socket path for Calypso (e.g. Motorola C1XX) based reader (via OsmocomBB)')
|
||||
|
||||
btsap_group = arg_parser.add_argument_group('Bluetooth Device (SIM Access Profile)')
|
||||
btsap_group.add_argument('--bt-addr', dest='bt_addr', metavar='ADDR', default=None,
|
||||
help='Bluetooth device address')
|
||||
|
||||
return arg_parser
|
||||
|
||||
|
||||
|
@ -237,6 +241,10 @@ def init_reader(opts, **kwargs) -> Optional[LinkBase]:
|
|||
from pySim.transport.modem_atcmd import ModemATCommandLink
|
||||
sl = ModemATCommandLink(
|
||||
device=opts.modem_dev, baudrate=opts.modem_baud, **kwargs)
|
||||
elif opts.bt_addr is not None:
|
||||
print("Using Bluetooth device (SIM Access Profile)")
|
||||
from pySim.transport.bt_rsap import BluetoothSapSimLink
|
||||
sl = BluetoothSapSimLink(opts.bt_addr, **kwargs)
|
||||
else: # Serial reader is default
|
||||
print("Using serial reader interface")
|
||||
from pySim.transport.serial import SerialSimLink
|
||||
|
|
|
@ -0,0 +1,554 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
|
||||
""" pySim: Bluetooth rSAP transport link
|
||||
"""
|
||||
|
||||
#
|
||||
# Copyright (C) 2021 Gabriel K. Gegenhuber <ggegenhuber@sba-research.org>
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU General Public License as published by
|
||||
# the Free Software Foundation, either version 2 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
#
|
||||
|
||||
import time
|
||||
import struct
|
||||
import logging
|
||||
import bluetooth
|
||||
|
||||
from pySim.exceptions import ReaderError, NoCardError, ProtocolError
|
||||
from pySim.transport import LinkBase
|
||||
from pySim.utils import b2h, h2b, rpad
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
# thx to osmocom/softsim
|
||||
# SAP table 5.16
|
||||
SAP_CONNECTION_STATUS = {
|
||||
0x00: "OK, Server can fulfill requirements",
|
||||
0x01: "Error, Server unable to establish connection",
|
||||
0x02: "Error, Server does not support maximum message size",
|
||||
0x03: "Error, maximum message size by Client is too small",
|
||||
0x04: "OK, ongoing call"
|
||||
}
|
||||
|
||||
# SAP table 5.18
|
||||
SAP_RESULT_CODE = {
|
||||
0x00: "OK, request processed correctly",
|
||||
0x01: "Error, no reason defined",
|
||||
0x02: "Error, card not accessible",
|
||||
0x03: "Error, card (already) powered off",
|
||||
0x04: "Error, card removed",
|
||||
0x05: "Error, card already powered on",
|
||||
0x06: "Error, data not available",
|
||||
0x07: "Error, not supported"
|
||||
}
|
||||
|
||||
# SAP table 5.19
|
||||
SAP_STATUS_CHANGE = {
|
||||
0x00: "Unknown Error",
|
||||
0x01: "Card reset",
|
||||
0x02: "Card not accessible",
|
||||
0x03: "Card removed",
|
||||
0x04: "Card inserted",
|
||||
0x05: "Card recovered"
|
||||
}
|
||||
|
||||
# SAP table 5.15
|
||||
SAP_PARAMETERS = [
|
||||
{
|
||||
'name': "MaxMsgSize",
|
||||
'length': 2,
|
||||
'id': 0x00
|
||||
},
|
||||
{
|
||||
'name': "ConnectionStatus",
|
||||
'length': 1,
|
||||
'id': 0x01
|
||||
},
|
||||
{
|
||||
'name': "ResultCode",
|
||||
'length': 1,
|
||||
'id': 0x02
|
||||
},
|
||||
{
|
||||
'name': "DisconnectionType",
|
||||
'length': 1,
|
||||
'id': 0x03
|
||||
},
|
||||
{
|
||||
'name': "CommandAPDU",
|
||||
'length': None,
|
||||
'id': 0x04
|
||||
},
|
||||
{
|
||||
'name': "ResponseAPDU",
|
||||
'length': None,
|
||||
'id': 0x05
|
||||
},
|
||||
{
|
||||
'name': "ATR",
|
||||
'length': None,
|
||||
'id': 0x06
|
||||
},
|
||||
{
|
||||
'name': "CardReaderdStatus",
|
||||
'length': 1,
|
||||
'id': 0x07
|
||||
},
|
||||
{
|
||||
'name': "StatusChange",
|
||||
'length': 1,
|
||||
'id': 0x08
|
||||
},
|
||||
{
|
||||
'name': "TransportProtocol",
|
||||
'length': 1,
|
||||
'id': 0x09
|
||||
},
|
||||
{
|
||||
'name': "CommandAPDU7816",
|
||||
'length': 2,
|
||||
'id': 0x10
|
||||
}
|
||||
]
|
||||
|
||||
# SAP table 5.1
|
||||
SAP_MESSAGES = [
|
||||
{
|
||||
'name': 'CONNECT_REQ',
|
||||
'client_to_server': True,
|
||||
'id': 0x00,
|
||||
'parameters': [(0x00, True)]
|
||||
},
|
||||
{
|
||||
'name': 'CONNECT_RESP',
|
||||
'client_to_server': False,
|
||||
'id': 0x01,
|
||||
'parameters': [(0x01, True), (0x00, False)]
|
||||
},
|
||||
{
|
||||
'name': 'DISCONNECT_REQ',
|
||||
'client_to_server': True,
|
||||
'id': 0x02,
|
||||
'parameters': []
|
||||
},
|
||||
{
|
||||
'name': 'DISCONNECT_RESP',
|
||||
'client_to_server': False,
|
||||
'id': 0x03,
|
||||
'parameters': []
|
||||
},
|
||||
{
|
||||
'name': 'DISCONNECT_IND',
|
||||
'client_to_server': False,
|
||||
'id': 0x04,
|
||||
'parameters': [(0x03, True)]
|
||||
},
|
||||
{
|
||||
'name': 'TRANSFER_APDU_REQ',
|
||||
'client_to_server': True,
|
||||
'id': 0x05,
|
||||
'parameters': [(0x04, False), (0x10, False)]
|
||||
},
|
||||
{
|
||||
'name': 'TRANSFER_APDU_RESP',
|
||||
'client_to_server': False,
|
||||
'id': 0x06,
|
||||
'parameters': [(0x02, True), (0x05, False)]
|
||||
},
|
||||
{
|
||||
'name': 'TRANSFER_ATR_REQ',
|
||||
'client_to_server': True,
|
||||
'id': 0x07,
|
||||
'parameters': []
|
||||
},
|
||||
{
|
||||
'name': 'TRANSFER_ATR_RESP',
|
||||
'client_to_server': False,
|
||||
'id': 0x08,
|
||||
'parameters': [(0x02, True), (0x06, False)]
|
||||
},
|
||||
{
|
||||
'name': 'POWER_SIM_OFF_REQ',
|
||||
'client_to_server': True,
|
||||
'id': 0x09,
|
||||
'parameters': []
|
||||
},
|
||||
{
|
||||
'name': 'POWER_SIM_OFF_RESP',
|
||||
'client_to_server': False,
|
||||
'id': 0x0A,
|
||||
'parameters': [(0x02, True)]
|
||||
},
|
||||
{
|
||||
'name': 'POWER_SIM_ON_REQ',
|
||||
'client_to_server': True,
|
||||
'id': 0x0B,
|
||||
'parameters': []
|
||||
},
|
||||
{
|
||||
'name': 'POWER_SIM_ON_RESP',
|
||||
'client_to_server': False,
|
||||
'id': 0x0C,
|
||||
'parameters': [(0x02, True)]
|
||||
},
|
||||
{
|
||||
'name': 'RESET_SIM_REQ',
|
||||
'client_to_server': True,
|
||||
'id': 0x0D,
|
||||
'parameters': []
|
||||
},
|
||||
{
|
||||
'name': 'RESET_SIM_RESP',
|
||||
'client_to_server': False,
|
||||
'id': 0x0E,
|
||||
'parameters': [(0x02, True)]
|
||||
},
|
||||
{
|
||||
'name': 'TRANSFER_CARD_READER_STATUS_REQ',
|
||||
'client_to_server': True,
|
||||
'id': 0x0F,
|
||||
'parameters': []
|
||||
},
|
||||
{
|
||||
'name': 'TRANSFER_CARD_READER_STATUS_RESP',
|
||||
'client_to_server': False,
|
||||
'id': 0x10,
|
||||
'parameters': [(0x02, True), (0x07, False)]
|
||||
},
|
||||
{
|
||||
'name': 'STATUS_IND',
|
||||
'client_to_server': False,
|
||||
'id': 0x11,
|
||||
'parameters': [(0x08, True)]
|
||||
},
|
||||
|
||||
{
|
||||
'name': 'ERROR_RESP',
|
||||
'client_to_server': False,
|
||||
'id': 0x12,
|
||||
'parameters': []
|
||||
},
|
||||
{
|
||||
'name': 'SET_TRANSPORT_PROTOCOL_REQ',
|
||||
'client_to_server': True,
|
||||
'id': 0x13,
|
||||
'parameters': [(0x09, True)]
|
||||
},
|
||||
{
|
||||
'name': 'SET_TRANSPORT_PROTOCOL_RESP',
|
||||
'client_to_server': False,
|
||||
'id': 0x14,
|
||||
'parameters': [(0x02, True)]
|
||||
},
|
||||
|
||||
]
|
||||
|
||||
|
||||
class BluetoothSapSimLink(LinkBase):
|
||||
# UUID for SIM Access Service
|
||||
UUID_SIM_ACCESS = '0000112d-0000-1000-8000-00805f9b34fb'
|
||||
SAP_MAX_MSG_SIZE = 0xffff
|
||||
|
||||
def __init__(self, bt_mac_addr, **kwargs):
|
||||
super().__init__(**kwargs)
|
||||
self._bt_mac_addr = bt_mac_addr
|
||||
self._max_msg_size = self.SAP_MAX_MSG_SIZE
|
||||
self._atr = None
|
||||
self.connected = False
|
||||
# at first try to find the bluetooth device
|
||||
if not bluetooth.find_service(address=bt_mac_addr):
|
||||
raise ReaderError(f"Cannot find bluetooth device [{bt_mac_addr}]")
|
||||
# then check for rSAP support
|
||||
self._sim_service = next(iter(bluetooth.find_service(
|
||||
uuid=self.UUID_SIM_ACCESS, address=bt_mac_addr)), None)
|
||||
if not self._sim_service:
|
||||
raise ReaderError(
|
||||
f"Bluetooth device [{bt_mac_addr}] does not support SIM Access service")
|
||||
|
||||
def __del__(self):
|
||||
# TODO: do something here
|
||||
pass
|
||||
|
||||
def wait_for_card(self, timeout=None, newcardonly=False):
|
||||
self.connect()
|
||||
|
||||
def connect(self):
|
||||
try:
|
||||
self._sock = bluetooth.BluetoothSocket(bluetooth.RFCOMM)
|
||||
self._sock.connect(
|
||||
(self._sim_service['host'], self._sim_service['port']))
|
||||
self.connected = True
|
||||
self.establish_sim_connection()
|
||||
self.retrieve_atr()
|
||||
except:
|
||||
raise ReaderError("Cannot connect to SIM Access service")
|
||||
|
||||
# def get_atr(self):
|
||||
# return bytes(self._con.getATR())
|
||||
|
||||
def disconnect(self):
|
||||
if self.connected:
|
||||
self.send_sap_message("DISCONNECT_REQ")
|
||||
self._sock.close()
|
||||
self.connected = False
|
||||
|
||||
def reset_card(self):
|
||||
if self._connected:
|
||||
self.send_sap_message("RESET_SIM_REQ")
|
||||
msg_name, param_list = self._recv_sap_response('RESET_SIM_RESP')
|
||||
connection_status = next(
|
||||
(x[1] for x in param_list if x[0] == 'ConnectionStatus'), 0x01)
|
||||
if connection_status == 0x00:
|
||||
logger.info("SIM Reset successful")
|
||||
return 1
|
||||
else:
|
||||
self.disconnect()
|
||||
self.connect()
|
||||
return 1
|
||||
|
||||
def send_sap_message(self, msg_name, param_list=[]):
|
||||
# maby check for idle state before sending?
|
||||
message = self.craft_sap_message(msg_name, param_list)
|
||||
return self._sock.send(message)
|
||||
|
||||
def _recv_sap_message(self):
|
||||
resp = self._sock.recv(self._max_msg_size)
|
||||
msg_name, param_list = self.parse_sap_message(resp)
|
||||
return msg_name, param_list
|
||||
|
||||
def _recv_sap_response(self, waiting_msg_name):
|
||||
while self.connected:
|
||||
msg_name, param_list = self._recv_sap_message()
|
||||
self.handle_sap_response_generic(msg_name, param_list)
|
||||
if msg_name == waiting_msg_name:
|
||||
return msg_name, param_list
|
||||
|
||||
def establish_sim_connection(self, retries=5):
|
||||
self.send_sap_message(
|
||||
"CONNECT_REQ", [("MaxMsgSize", self._max_msg_size)])
|
||||
msg_name, param_list = self._recv_sap_response('CONNECT_RESP')
|
||||
|
||||
connection_status = next(
|
||||
(x[1] for x in param_list if x[0] == 'ConnectionStatus'), 0x01)
|
||||
if connection_status == 0x00:
|
||||
logger.info("Successfully connected to rSAP server")
|
||||
return
|
||||
elif connection_status == 0x02: # invalid max size
|
||||
self._max_msg_size = next(
|
||||
(x[1] for x in param_list if x[0] == 'MaxMsgSize'), self._max_msg_size)
|
||||
return self.establish_sim_connection(retries)
|
||||
else:
|
||||
logger.info(
|
||||
"Wait some seconds and make another connection attempt...")
|
||||
time.sleep(5)
|
||||
return self.establish_sim_connection(retries-1)
|
||||
|
||||
def retrieve_atr(self):
|
||||
self.send_sap_message("TRANSFER_ATR_REQ")
|
||||
msg_name, param_list = self._recv_sap_response('TRANSFER_ATR_RESP')
|
||||
result_code = next(
|
||||
(x[1] for x in param_list if x[0] == 'ResultCode'), 0x01)
|
||||
if result_code == 0x00:
|
||||
atr = next((x[1] for x in param_list if x[0] == 'ATR'), None)
|
||||
self._atr = atr
|
||||
logger.debug(f"Recieved ATR from server: {b2h(atr)}")
|
||||
|
||||
def handle_sap_response_generic(self, msg_name, param_list):
|
||||
# print stuff
|
||||
logger.debug(
|
||||
f"Recieved sap message from server: {(msg_name, param_list)}")
|
||||
for param in param_list:
|
||||
param_name, param_value = param
|
||||
if param_name == 'ConnectionStatus':
|
||||
new_status = SAP_CONNECTION_STATUS.get(param_value)
|
||||
logger.debug(f"Connection Status: {new_status}")
|
||||
elif param_name == 'StatusChange':
|
||||
new_status = SAP_STATUS_CHANGE.get(param_value)
|
||||
logger.debug(f"SIM Status: {new_status}")
|
||||
elif param_name == 'ResultCode':
|
||||
response_code = SAP_RESULT_CODE.get(param_value)
|
||||
logger.debug(f"ResultCode: {response_code}")
|
||||
|
||||
# handle some important stuff:
|
||||
if msg_name == 'DISCONNECT_IND':
|
||||
# graceful disconnect --> technically could still send some apdus
|
||||
# however, we just make it short and sweet and directly disconnect
|
||||
self.send_sap_message("DISCONNECT_REQ")
|
||||
elif msg_name == 'DISCONNECT_RESP':
|
||||
self.connected = False
|
||||
logger.info(f"Client disconnected")
|
||||
|
||||
# if msg_name == 'CONNECT_RESP':
|
||||
# elif msg_name == 'DISCONNECT_RESP':
|
||||
# elif msg_name == 'DISCONNECT_IND':
|
||||
# elif msg_name == 'TRANSFER_APDU_RESP':
|
||||
# elif msg_name == 'TRANSFER_ATR_RESP':
|
||||
# elif msg_name == 'POWER_SIM_OFF_RESP':
|
||||
# elif msg_name == 'POWER_SIM_ON_RESP':
|
||||
# elif msg_name == 'RESET_SIM_RESP':
|
||||
# elif msg_name == 'TRANSFER_CARD_READER_STATUS_RESP':
|
||||
# elif msg_name == 'STATUS_IND':
|
||||
# elif msg_name == 'ERROR_RESP':
|
||||
# elif msg_name == 'SET_TRANSPORT_PROTOCOL_RESP':
|
||||
# else:
|
||||
# logger.error("Unknown message...")
|
||||
|
||||
def craft_sap_message(self, msg_name, param_list=[]):
|
||||
msg_info = next(
|
||||
(x for x in SAP_MESSAGES if x.get('name') == msg_name), None)
|
||||
if not msg_info:
|
||||
raise ProtocolError(f"Unknown SAP message name ({msg_name})")
|
||||
|
||||
msg_id = msg_info.get('id')
|
||||
msg_params = msg_info.get('parameters')
|
||||
# msg_direction = msg_info.get('client_to_server')
|
||||
|
||||
param_cnt = len(param_list)
|
||||
|
||||
msg_bytes = struct.pack(
|
||||
'!BBH',
|
||||
msg_id,
|
||||
param_cnt,
|
||||
0
|
||||
)
|
||||
|
||||
allowed_params = (x[0] for x in msg_params)
|
||||
mandatory_params = (x[0] for x in msg_params if x[1] == True)
|
||||
|
||||
collected_param_ids = []
|
||||
|
||||
for p in param_list:
|
||||
param_name = p[0]
|
||||
param_value = p[1]
|
||||
|
||||
param_id = next(
|
||||
(x.get('id') for x in SAP_PARAMETERS if x.get('name') == param_name), None)
|
||||
if param_id is None:
|
||||
raise ProtocolError(f"Unknown SAP param name ({param_name})")
|
||||
if param_id not in allowed_params:
|
||||
raise ProtocolError(
|
||||
f"Parameter {param_name} not allowed in message {msg_name}")
|
||||
|
||||
collected_param_ids.append(param_id)
|
||||
msg_bytes += self.craft_sap_parameter(param_name, param_value)
|
||||
|
||||
if not set(mandatory_params).issubset(collected_param_ids):
|
||||
raise ProtocolError(
|
||||
f"Missing mandatory parameter for message {msg_name} (mandatory: {*mandatory_params,}, present: {*collected_param_ids,})")
|
||||
|
||||
return msg_bytes
|
||||
|
||||
def calc_padding_len(self, length, blocksize=4):
|
||||
extra = length % blocksize
|
||||
if extra > 0:
|
||||
return blocksize-extra
|
||||
return 0
|
||||
|
||||
def pad_bytes(self, b, blocksize=4):
|
||||
padding_len = self.calc_padding_len(len(b), blocksize)
|
||||
return b + bytearray(padding_len)
|
||||
|
||||
def craft_sap_parameter(self, param_name, param_value):
|
||||
param_info = next(
|
||||
(x for x in SAP_PARAMETERS if x.get('name') == param_name), None)
|
||||
param_id = param_info.get('id')
|
||||
param_len = param_info.get('length')
|
||||
|
||||
if isinstance(param_value, str):
|
||||
param_value = h2b(param_value)
|
||||
|
||||
if isinstance(param_value, int):
|
||||
# TODO: when param len is not set we have a problem :X
|
||||
param_value = (param_value).to_bytes(param_len, byteorder='big')
|
||||
|
||||
if param_len is None:
|
||||
# just assume param length from bytearray
|
||||
param_len = len(param_value)
|
||||
elif param_len != len(param_value):
|
||||
raise ProtocolError(
|
||||
f"Invalid param length (epected {param_len} but got {len(param_value)} bytes)")
|
||||
|
||||
param_bytes = struct.pack(
|
||||
f'!BBH{param_len}s',
|
||||
param_id,
|
||||
0, # reserved
|
||||
param_len,
|
||||
param_value
|
||||
)
|
||||
param_bytes = self.pad_bytes(param_bytes)
|
||||
return param_bytes
|
||||
|
||||
def parse_sap_message(self, msg_bytes):
|
||||
header_struct = struct.Struct('!BBH')
|
||||
msg_id, param_cnt, reserved = header_struct.unpack_from(msg_bytes)
|
||||
msg_bytes = msg_bytes[header_struct.size:]
|
||||
|
||||
msg_info = next(
|
||||
(x for x in SAP_MESSAGES if x.get('id') == msg_id), None)
|
||||
|
||||
msg_name = msg_info.get('name')
|
||||
msg_params = msg_info.get('parameters')
|
||||
# msg_direction = msg_info.get('client_to_server')
|
||||
|
||||
# TODO: check if params allowed etc
|
||||
# allowed_params = (x[0] for x in msg_params)
|
||||
# mandatory_params = (x[0] for x in msg_params if x[1] == True)
|
||||
|
||||
param_list = []
|
||||
|
||||
for x in range(param_cnt):
|
||||
param_name, param_value, total_len = self.parse_sap_parameter(
|
||||
msg_bytes)
|
||||
param_list.append((param_name, param_value))
|
||||
msg_bytes = msg_bytes[total_len:]
|
||||
|
||||
return msg_name, param_list
|
||||
|
||||
def parse_sap_parameter(self, param_bytes):
|
||||
header_struct = struct.Struct('!BBH')
|
||||
total_len = header_struct.size
|
||||
param_id, reserved, param_len = header_struct.unpack_from(param_bytes)
|
||||
padding_len = self.calc_padding_len(param_len)
|
||||
paramval_struct = struct.Struct(f'!{param_len}s{padding_len}s')
|
||||
param_value, padding = paramval_struct.unpack_from(
|
||||
param_bytes[total_len:])
|
||||
total_len += paramval_struct.size
|
||||
|
||||
param_info = next(
|
||||
(x for x in SAP_PARAMETERS if x.get('id') == param_id), None)
|
||||
# TODO: check if param found, length plausible, ...
|
||||
param_name = param_info.get('name')
|
||||
|
||||
# if it is set then value was int, otherwise it is byte array
|
||||
if param_info.get('length') is not None:
|
||||
param_value = int.from_bytes(param_value, "big")
|
||||
# param_len = param_info.get('length')
|
||||
return param_name, param_value, total_len
|
||||
|
||||
def _send_apdu_raw(self, pdu):
|
||||
if isinstance(pdu, str):
|
||||
pdu = h2b(pdu)
|
||||
self.send_sap_message("TRANSFER_APDU_REQ", [("CommandAPDU", pdu)])
|
||||
|
||||
msg_name, param_list = self._recv_sap_response('TRANSFER_APDU_RESP')
|
||||
result_code = next(
|
||||
(x[1] for x in param_list if x[0] == 'ResultCode'), 0x01)
|
||||
if result_code == 0x00:
|
||||
response = next(
|
||||
(x[1] for x in param_list if x[0] == 'ResponseAPDU'), None)
|
||||
sw = response[-2:]
|
||||
data = response[0:-2]
|
||||
return b2h(data), b2h(sw)
|
||||
return None, None
|
Loading…
Reference in New Issue