Introduce APDU/TPDU trace decoder

This introduces a new pySim.apdu module hierarchy, which contains
classes that represent TPDU/APDUs as exchanged between
SIM/UICC/USIM/ISIM card and UE.

It contains instruction level decoders for SELECT, READ BINARY and
friends, and then uses the pySim.filesystem.Runtime{Lchan,State} classes
to keep track of the currently selected EF/DF/ADF for each logical
channel, and uses the file-specific decoder classes of pySim to decode
the actual file content that is being read or written.

This provides a much more meaningful decode of protocol traces than
wireshark will ever be able to give us.

Furthermore, there's the new pySim.apdu_source set of classes which
provides "input plugins" for obtaining APDU traces in a variety of
formats.  So far, GSMTAP UDP live capture and pyshark based RSPRO
live and pcap file reading are imlpemented.

Change-Id: I862d93163d495a294364168f7818641e47b18c0a
Closes: OS#5126
This commit is contained in:
Harald Welte 2022-07-16 14:06:46 +02:00
parent 1272129ea7
commit 1268ed5f3f
13 changed files with 1750 additions and 2 deletions

View File

@ -45,7 +45,7 @@ Please install the following dependencies:
Example for Debian:
```
apt-get install python3-pyscard python3-serial python3-pip python3-yaml
apt-get install python3-pyscard python3-serial python3-pip python3-yaml python3-termcolor python3-colorlog
pip3 install -r requirements.txt
```

159
pySim-trace.py Executable file
View File

@ -0,0 +1,159 @@
#!/usr/bin/env python3
import sys
import logging, colorlog
import argparse
from pprint import pprint as pp
from pySim.apdu import *
from pySim.filesystem import RuntimeState
from pySim.cards import UsimCard
from pySim.commands import SimCardCommands
from pySim.profile import CardProfile
from pySim.ts_102_221 import CardProfileUICCSIM
from pySim.ts_31_102 import CardApplicationUSIM
from pySim.ts_31_103 import CardApplicationISIM
from pySim.transport import LinkBase
from pySim.apdu_source.gsmtap import GsmtapApduSource
from pySim.apdu_source.pyshark_rspro import PysharkRsproPcap
from pySim.apdu.ts_102_221 import Select, Status
log_format='%(log_color)s%(levelname)-8s%(reset)s %(name)s: %(message)s'
colorlog.basicConfig(level=logging.INFO, format = log_format)
logger = colorlog.getLogger()
# merge all of the command sets into one global set. This will override instructions,
# the one from the 'last' set in the addition below will prevail.
from pySim.apdu.ts_102_221 import ApduCommands as UiccApduCommands
from pySim.apdu.ts_31_102 import ApduCommands as UsimApduCommands
from pySim.apdu.global_platform import ApduCommands as GpApduCommands
ApduCommands = UiccApduCommands + UsimApduCommands #+ GpApduCommands
class DummySimLink(LinkBase):
"""A dummy implementation of the LinkBase abstract base class. Currently required
as the UsimCard doesn't work without SimCardCommands, which in turn require
a LinkBase implementation talking to a card.
In the tracer, we don't actually talk to any card, so we simply drop everything
and claim it is successful.
The UsimCard / SimCardCommands should be refactored to make this obsolete later."""
def __init__(self, debug: bool = False, **kwargs):
super().__init__(**kwargs)
self._debug = debug
self._atr = h2i('3B9F96801F878031E073FE211B674A4C753034054BA9')
def _send_apdu_raw(self, pdu):
#print("DummySimLink-apdu: %s" % pdu)
return [], '9000'
def connect(self):
pass
def disconnect(self):
pass
def reset_card(self):
return 1
def get_atr(self):
return self._atr
def wait_for_card(self):
pass
class Tracer:
def __init__(self, **kwargs):
# we assume a generic SIM + UICC + USIM + ISIM card
profile = CardProfileUICCSIM()
profile.add_application(CardApplicationUSIM())
profile.add_application(CardApplicationISIM())
scc = SimCardCommands(transport=DummySimLink())
card = UsimCard(scc)
self.rs = RuntimeState(card, profile)
# APDU Decoder
self.ad = ApduDecoder(ApduCommands)
# parameters
self.suppress_status = kwargs.get('suppress_status', True)
self.suppress_select = kwargs.get('suppress_select', True)
self.source = kwargs.get('source', None)
def format_capdu(self, inst: ApduCommand):
"""Output a single decoded + processed ApduCommand."""
print("%02u %-16s %-35s %-8s %s %s" % (inst.lchan_nr, inst._name, inst.path_str, inst.col_id, inst.col_sw, inst.processed))
print("===============================")
def main(self):
"""Main loop of tracer: Iterates over all Apdu received from source."""
while True:
# obtain the next APDU from the source (blocking read)
apdu = self.source.read()
#print(apdu)
if isinstance(apdu, CardReset):
self.rs.reset()
continue
# ask ApduDecoder to look-up (INS,CLA) + instantiate an ApduCommand derived
# class like 'Select'
inst = self.ad.input(apdu)
# process the APDU (may modify the RuntimeState)
inst.process(self.rs)
# Avoid cluttering the log with too much verbosity
if self.suppress_select and isinstance(inst, Select):
continue
if self.suppress_status and isinstance(inst, Status):
continue
#print(inst)
self.format_capdu(inst)
option_parser = argparse.ArgumentParser(prog='pySim-trace', description='Osmocom pySim high-level SIM card trace decoder',
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
global_group = option_parser.add_argument_group('General Options')
global_group.add_argument('--no-suppress-select', help="Don't suppress displaying SELECT APDUs")
global_group.add_argument('--no-suppress-status', help="Don't suppress displaying STATUS APDUs")
subparsers = option_parser.add_subparsers(help='APDU Source', dest='source', required=True)
parser_gsmtap = subparsers.add_parser('gsmtap-udp', help='Live capture of GSMTAP-SIM on UDP port')
parser_gsmtap.add_argument('-i', '--bind-ip', default='127.0.0.1',
help='Local IP address to which to bind the UDP port')
parser_gsmtap.add_argument('-p', '--bind-port', default=4729,
help='Local UDP port')
parser_rspro_pyshark_pcap = subparsers.add_parser('rspro-pyshark-pcap', help="""
PCAP file containing RSPRO (osmo-remsim) communication; processed via pyshark.
REQUIRES OSMOCOM PATCHED WIRESHARK!""")
parser_rspro_pyshark_pcap.add_argument('-f', '--pcap-file', required=True,
help='Name of the PCAP[ng] file to be read')
parser_rspro_pyshark_live = subparsers.add_parser('rspro-pyshark-live', help="""
Live capture of RSPRO (osmo-remsim) communication; processed via pyshark.
REQUIRES OSMOCOM PATCHED WIRESHARK!""")
parser_rspro_pyshark_live.add_argument('-i', '--interface', required=True,
help='Name of the network interface to capture on')
if __name__ == '__main__':
opts = option_parser.parse_args()
pp(opts)
logger.info('Opening source %s...' % opts.source)
if opts.source == 'gsmtap-udp':
s = GsmtapApduSource(opts.bind_ip, opts.bind_port)
elif opts.source == 'rspro-pyshark-pcap':
s = PysharkRsproPcap(opts.pcap_file)
elif opts.source == 'rspro-pyshark-live':
s = PySharkRsproLive(opts.interface)
tracer = Tracer(source=s)
logger.info('Entering main loop...')
tracer.main()

436
pySim/apdu/__init__.py Normal file
View File

@ -0,0 +1,436 @@
# coding=utf-8
"""APDU (and TPDU) parser for UICC/USIM/ISIM cards.
The File (and its classes) represent the structure / hierarchy
of the APDUs as seen in SIM/UICC/SIM/ISIM cards. The primary use case
is to perform a meaningful decode of protocol traces taken between card and UE.
The ancient wirshark dissector developed for GSMTAP generated by SIMtrace
is far too simplistic, while this decoder can utilize all of the information
we already know in pySim about the filesystem structure, file encoding, etc.
"""
# (C) 2022 by Harald Welte <laforge@osmocom.org>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import abc
from termcolor import colored
from typing import List, Dict, Optional
from construct import *
from construct import Optional as COptional
from pySim.construct import *
from pySim.utils import *
from pySim.filesystem import RuntimeLchan, RuntimeState, lchan_nr_from_cla
from pySim.filesystem import CardADF, CardFile, TransparentEF, LinFixedEF
"""There are multiple levels of decode:
1) pure TPDU / APDU level (no filesystem state required to decode)
1a) the raw C-TPDU + R-TPDU
1b) the raw C-APDU + R-APDU
1c) the C-APDU + R-APDU split in its portions (p1/p2/lc/le/cmd/rsp)
1d) the abstract C-APDU + R-APDU (mostly p1/p2 parsing; SELECT response)
2) the decoded DATA of command/response APDU
* READ/UPDATE: requires state/context: which file is selected? how to decode it?
"""
class ApduCommandMeta(abc.ABCMeta):
"""A meta-class that we can use to set some class variables when declaring
a derived class of ApduCommand."""
def __new__(metacls, name, bases, namespace, **kwargs):
x = super().__new__(metacls, name, bases, namespace)
x._name = namespace.get('name', kwargs.get('n', None))
x._ins = namespace.get('ins', kwargs.get('ins', None))
x._cla = namespace.get('cla', kwargs.get('cla', None))
return x
class Tpdu:
def __init__(self, cmd: bytes, rsp: Optional[bytes] = None):
self.cmd = cmd
self.rsp = rsp
def __str__(self):
return '%s(%02X %02X %02X %02X %02X %s %s %s)' % (type(self).__name__, self.cla, self.ins, self.p1,
self.p2, self.p3, b2h(self.cmd_data), b2h(self.rsp_data), b2h(self.sw))
@property
def cla(self) -> int:
"""Return CLA of the C-APDU Header."""
return self.cmd[0]
@property
def ins(self) -> int:
"""Return INS of the C-APDU Header."""
return self.cmd[1]
@property
def p1(self) -> int:
"""Return P1 of the C-APDU Header."""
return self.cmd[2]
@property
def p2(self) -> int:
"""Return P2 of the C-APDU Header."""
return self.cmd[3]
@property
def p3(self) -> int:
"""Return P3 of the C-APDU Header."""
return self.cmd[4]
@property
def cmd_data(self) -> int:
"""Return the DATA portion of the C-APDU"""
return self.cmd[5:]
@property
def sw(self) -> Optional[bytes]:
"""Return Status Word (SW) of the R-APDU"""
return self.rsp[-2:] if self.rsp else None
@property
def rsp_data(self) -> Optional[bytes]:
"""Return the DATA portion of the R-APDU"""
return self.rsp[:-2] if self.rsp else None
class Apdu(Tpdu):
@property
def lc(self) -> int:
"""Return Lc; Length of C-APDU body."""
return len(self.cmd_data)
@property
def lr(self) -> int:
"""Return Lr; Length of R-APDU body."""
return len(self.rsp_data)
@property
def successful(self) -> bool:
"""Was the execution of this APDU successful?"""
method = getattr(self, '_is_success', None)
if callable(method):
return method()
# default case: only 9000 is success
return self.sw == b'\x90\x00'
class ApduCommand(Apdu, metaclass=ApduCommandMeta):
"""Base class from which you would derive individual commands/instructions like SELECT.
A derived class represents a decoder for a specific instruction.
An instance of such a derived class is one concrete APDU."""
# fall-back constructs if the derived class provides no override
_construct_p1 = Byte
_construct_p2 = Byte
_construct = HexAdapter(GreedyBytes)
_construct_rsp = HexAdapter(GreedyBytes)
def __init__(self, cmd: bytes, rsp: Optional[bytes] = None):
"""Instantiate a new ApduCommand from give cmd + resp."""
# store raw data
super().__init__(cmd, rsp)
# default to 'empty' ID column. To be set to useful values (like record number)
# by derived class {cmd_rsp}_to_dict() or process() methods
self.col_id = '-'
# fields only set by process_* methods
self.file = None
self.lchan = None
self.processed = None
# the methods below could raise exceptions and those handlers might assume cmd_{dict,resp}
self.cmd_dict = None
self.rsp_dict = None
# interpret the data
self.cmd_dict = self.cmd_to_dict()
self.rsp_dict = self.rsp_to_dict() if self.rsp else {}
@classmethod
def from_apdu(cls, apdu:Apdu, **kwargs) -> 'ApduCommand':
"""Instantiate an ApduCommand from an existing APDU."""
return cls(cmd=apdu.cmd, rsp=apdu.rsp, **kwargs)
@classmethod
def from_bytes(cls, buffer:bytes) -> 'ApduCommand':
"""Instantiate an ApduCommand from a linear byte buffer containing hdr,cmd,rsp,sw.
This is for example used when parsing GSMTAP traces that traditionally contain the
full command and response portion in one packet: "CLA INS P1 P2 P3 DATA SW" and we
now need to figure out whether the DATA part is part of the CMD or the RSP"""
apdu_case = cls.get_apdu_case(buffer)
if apdu_case in [1, 2]:
# data is part of response
return cls(buffer[:5], buffer[5:])
elif apdu_case in [3, 4]:
# data is part of command
lc = buffer[4]
return cls(buffer[:5+lc], buffer[5+lc:])
else:
raise ValueError('%s: Invalid APDU Case %u' % (cls.__name__, apdu_case))
@property
def path(self) -> List[str]:
"""Return (if known) the path as list of files to the file on which this command operates."""
if self.file:
return self.file.fully_qualified_path()
else:
return []
@property
def path_str(self) -> str:
"""Return (if known) the path as string to the file on which this command operates."""
if self.file:
return self.file.fully_qualified_path_str()
else:
return ''
@property
def col_sw(self) -> str:
"""Return the ansi-colorized status word. Green==OK, Red==Error"""
if self.successful:
return colored(b2h(self.sw), 'green')
else:
return colored(b2h(self.sw), 'red')
@property
def lchan_nr(self) -> int:
"""Logical channel number over which this ApduCommand was transmitted."""
if self.lchan:
return self.lchan.lchan_nr
else:
return lchan_nr_from_cla(self.cla)
def __str__(self) -> str:
return '%02u %s(%s): %s' % (self.lchan_nr, type(self).__name__, self.path_str, self.to_dict())
def __repr__(self) -> str:
return '%s(INS=%02x,CLA=%s)' % (self.__class__, self.ins, self.cla)
def _process_fallback(self, rs: RuntimeState):
"""Fall-back function to be called if there is no derived-class-specific
process_global or process_on_lchan method. Uses information from APDU decode."""
self.processed = {}
if not 'p1' in self.cmd_dict:
self.processed = self.to_dict()
else:
self.processed['p1'] = self.cmd_dict['p1']
self.processed['p2'] = self.cmd_dict['p2']
if 'body' in self.cmd_dict and self.cmd_dict['body']:
self.processed['cmd'] = self.cmd_dict['body']
if 'body' in self.rsp_dict and self.rsp_dict['body']:
self.processed['rsp'] = self.rsp_dict['body']
return self.processed
def process(self, rs: RuntimeState):
# if there is a global method, use that; else use process_on_lchan
method = getattr(self, 'process_global', None)
if callable(method):
self.processed = method(rs)
return self.processed
method = getattr(self, 'process_on_lchan', None)
if callable(method):
self.lchan = rs.get_lchan_by_cla(self.cla)
self.processed = method(self.lchan)
return self.processed
# if none of the two methods exist:
return self._process_fallback(rs)
@classmethod
def get_apdu_case(cls, hdr:bytes) -> int:
if hasattr(cls, '_apdu_case'):
return cls._apdu_case
method = getattr(cls, '_get_apdu_case', None)
if callable(method):
return method(hdr)
raise ValueError('%s: Class definition missing _apdu_case attribute or _get_apdu_case method' % cls.__name__)
@classmethod
def match_cla(cls, cla) -> bool:
"""Does the given CLA match the CLA list of the command?."""
if not isinstance(cla, str):
cla = '%02X' % cla
cla = cla.lower()
for cla_match in cls._cla:
cla_masked = ""
for i in range(0, 2):
if cla_match[i] == 'X':
cla_masked += 'X'
else:
cla_masked += cla[i]
if cla_masked == cla_match:
return True
return False
def cmd_to_dict(self) -> Dict:
"""Convert the Command part of the APDU to a dict."""
method = getattr(self, '_decode_cmd', None)
if callable(method):
return method()
else:
r = {}
method = getattr(self, '_decode_p1p2', None)
if callable(method):
r = self._decode_p1p2()
else:
r['p1'] = parse_construct(self._construct_p1, self.p1.to_bytes(1, 'big'))
r['p2'] = parse_construct(self._construct_p2, self.p2.to_bytes(1, 'big'))
r['p3'] = self.p3
if self.cmd_data:
r['body'] = parse_construct(self._construct, self.cmd_data)
return r
def rsp_to_dict(self) -> Dict:
"""Convert the Response part of the APDU to a dict."""
method = getattr(self, '_decode_rsp', None)
if callable(method):
return method()
else:
r = {}
if self.rsp_data:
r['body'] = parse_construct(self._construct_rsp, self.rsp_data)
r['sw'] = b2h(self.sw)
return r
def to_dict(self) -> Dict:
"""Convert the entire APDU to a dict."""
return {'cmd': self.cmd_dict, 'rsp': self.rsp_dict}
def to_json(self) -> str:
"""Convert the entire APDU to JSON."""
d = self.to_dict()
return json.dumps(d)
def _determine_file(self, lchan) -> CardFile:
"""Helper function for read/update commands that might use SFI instead of selected file.
Expects that the self.cmd_dict has already been populated with the 'file' member."""
if self.cmd_dict['file'] == 'currently_selected_ef':
self.file = lchan.selected_file
elif self.cmd_dict['file'] == 'sfi':
cwd = lchan.get_cwd()
self.file = cwd.lookup_file_by_sfid(self.cmd_dict['sfi'])
class ApduCommandSet:
"""A set of card instructions, typically specified within one spec."""
def __init__(self, name: str, cmds: List[ApduCommand] =[]):
self.name = name
self.cmds = {c._ins: c for c in cmds}
def __str__(self) -> str:
return self.name
def __getitem__(self, idx) -> ApduCommand:
return self.cmds[idx]
def __add__(self, other) -> 'ApduCommandSet':
if isinstance(other, ApduCommand):
if other.ins in self.cmds:
raise ValueError('%s: INS 0x%02x already defined: %s' %
(self, other.ins, self.cmds[other.ins]))
self.cmds[other.ins] = other
elif isinstance(other, ApduCommandSet):
for c in other.cmds.keys():
self.cmds[c] = other.cmds[c]
else:
raise ValueError(
'%s: Unsupported type to add operator: %s' % (self, other))
return self
def lookup(self, ins, cla=None) -> Optional[ApduCommand]:
"""look-up the command within the CommandSet."""
ins = int(ins)
if not ins in self.cmds:
return None
cmd = self.cmds[ins]
if cla and not cmd.match_cla(cla):
return None
return cmd
def parse_cmd_apdu(self, apdu: Apdu) -> ApduCommand:
"""Parse a Command-APDU. Returns an instance of an ApduCommand derived class."""
# first look-up which of our member classes match CLA + INS
a_cls = self.lookup(apdu.ins, apdu.cla)
if not a_cls:
raise ValueError('Unknown CLA=%02X INS=%02X' % (apdu.cla, apdu.ins))
# then create an instance of that class and return it
return a_cls.from_apdu(apdu)
def parse_cmd_bytes(self, buf:bytes) -> ApduCommand:
"""Parse from a buffer (simtrace style). Returns an instance of an ApduCommand derived class."""
# first look-up which of our member classes match CLA + INS
cla = buf[0]
ins = buf[1]
a_cls = self.lookup(ins, cla)
if not a_cls:
raise ValueError('Unknown CLA=%02X INS=%02X' % (cla, ins))
# then create an instance of that class and return it
return a_cls.from_bytes(buf)
class ApduHandler(abc.ABC):
@abc.abstractmethod
def input(self, cmd: bytes, rsp: bytes):
pass
class TpduFilter(ApduHandler):
"""The TpduFilter removes the T=0 specific GET_RESPONSE from the TPDU stream and
calls the ApduHandler only with the actual APDU command and response parts."""
def __init__(self, apdu_handler: ApduHandler):
self.apdu_handler = apdu_handler
self.state = 'INIT'
self.last_cmd = None
def input_tpdu(self, tpdu:Tpdu):
# handle SW=61xx / 6Cxx
if tpdu.sw[0] == 0x61 or tpdu.sw[0] == 0x6C:
self.state = 'WAIT_GET_RESPONSE'
# handle successive 61/6c responses by stupid phone/modem OS
if tpdu.ins != 0xC0:
self.last_cmd = tpdu.cmd
return None
else:
if self.last_cmd:
icmd = self.last_cmd
self.last_cmd = None
else:
icmd = tpdu.cmd
apdu = Apdu(icmd, tpdu.rsp)
if self.apdu_handler:
return self.apdu_handler.input(apdu)
else:
return Apdu(icmd, tpdu.rsp)
def input(self, cmd: bytes, rsp: bytes):
if isinstance(cmd, str):
cmd = bytes.fromhex(cmd)
if isinstance(rsp, str):
rsp = bytes.fromhex(rsp)
tpdu = Tpdu(cmd, rsp)
return self.input_tpdu(tpdu)
class ApduDecoder(ApduHandler):
def __init__(self, cmd_set: ApduCommandSet):
self.cmd_set = cmd_set
def input(self, apdu: Apdu):
return self.cmd_set.parse_cmd_apdu(apdu)
class CardReset:
pass

View File

@ -0,0 +1,57 @@
# coding=utf-8
"""APDU definition/decoder of GlobalPLatform Card Spec (currently 2.1.1)
(C) 2022 by Harald Welte <laforge@osmocom.org>
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 2 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
from pySim.apdu import ApduCommand, ApduCommandSet
class GpDelete(ApduCommand, n='DELETE', ins=0xE4, cla=['8X', 'CX', 'EX']):
_apdu_case = 4
class GpStoreData(ApduCommand, n='STORE DATA', ins=0xE2, cla=['8X', 'CX', 'EX']):
@classmethod
def _get_apdu_case(cls, hdr:bytes) -> int:
p1 = hdr[2]
if p1 & 0x01:
return 4
else:
return 3
class GpGetDataCA(ApduCommand, n='GET DATA', ins=0xCA, cla=['8X', 'CX', 'EX']):
_apdu_case = 4
class GpGetDataCB(ApduCommand, n='GET DATA', ins=0xCB, cla=['8X', 'CX', 'EX']):
_apdu_case = 4
class GpGetStatus(ApduCommand, n='GET STATUS', ins=0xF2, cla=['8X', 'CX', 'EX']):
_apdu_case = 4
class GpInstall(ApduCommand, n='INSTALL', ins=0xE6, cla=['8X', 'CX', 'EX']):
_apdu_case = 4
class GpLoad(ApduCommand, n='LOAD', ins=0xE8, cla=['8X', 'CX', 'EX']):
_apdu_case = 4
class GpPutKey(ApduCommand, n='PUT KEY', ins=0xD8, cla=['8X', 'CX', 'EX']):
_apdu_case = 4
class GpSetStatus(ApduCommand, n='SET STATUS', ins=0xF0, cla=['8X', 'CX', 'EX']):
_apdu_case = 3
ApduCommands = ApduCommandSet('GlobalPlatform v2.3.1', cmds=[GpDelete, GpStoreData,
GpGetDataCA, GpGetDataCB, GpGetStatus, GpInstall,
GpLoad, GpPutKey, GpSetStatus])

509
pySim/apdu/ts_102_221.py Normal file
View File

@ -0,0 +1,509 @@
# coding=utf-8
"""APDU definitions/decoders of ETSI TS 102 221, the core UICC spec.
(C) 2022 by Harald Welte <laforge@osmocom.org>
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 2 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
import logging
from pySim.construct import *
from pySim.filesystem import *
from pySim.apdu import ApduCommand, ApduCommandSet
from typing import Optional, Dict, Tuple
logger = logging.getLogger(__name__)
# TS 102 221 Section 11.1.1
class Select(ApduCommand, n='SELECT', ins=0xA4, cla=['0X', '4X', '6X']):
_apdu_case = 4
_construct_p1 = Enum(Byte, df_ef_or_mf_by_file_id=0, child_df_of_current_df=1, parent_df_of_current_df=3,
df_name=4, path_from_mf=8, path_from_current_df=9)
_construct_p2 = BitStruct(Flag,
'app_session_control'/Enum(BitsInteger(2), activation_reset=0, termination=2),
'return'/Enum(BitsInteger(3), fcp=1, no_data=3),
'aid_control'/Enum(BitsInteger(2), first_or_only=0, last=1, next=2, previous=3))
@staticmethod
def _find_aid_substr(selectables, aid) -> Optional[CardADF]:
# full-length match
if aid in selectables:
return selectables[aid]
# sub-string match
for s in selectables.keys():
if aid[:len(s)] == s:
return selectables[s]
return None
def process_on_lchan(self, lchan: RuntimeLchan):
mode = self.cmd_dict['p1']
if mode in ['path_from_mf', 'path_from_current_df']:
# rewind to MF, if needed
if mode == 'path_from_mf':
lchan.selected_file = lchan.rs.mf
path = [self.cmd_data[i:i+2] for i in range(0, len(self.cmd_data), 2)]
for file in path:
file_hex = b2h(file)
if file_hex == '7fff': # current application
if not lchan.selected_adf:
sels = lchan.rs.mf.get_app_selectables(['ANAMES'])
# HACK: Assume USIM
logger.warning('SELECT relative to current ADF, but no ADF selected. Assuming ADF.USIM')
lchan.selected_adf = sels['ADF.USIM']
lchan.selected_file = lchan.selected_adf
#print("\tSELECT CUR_ADF %s" % lchan.selected_file)
continue
else:
sels = lchan.selected_file.get_selectables(['FIDS'])
if file_hex in sels:
if self.successful:
#print("\tSELECT %s" % sels[file_hex])
lchan.selected_file = sels[file_hex]
else:
#print("\tSELECT %s FAILED" % sels[file_hex])
pass
continue
logger.warning('SELECT UNKNOWN FID %s (%s)' % (file_hex, '/'.join([b2h(x) for x in path])))
elif mode == 'df_ef_or_mf_by_file_id':
if len(self.cmd_data) != 2:
raise ValueError('Expecting a 2-byte FID')
elif mode == 'df_name':
# Select by AID (can be sub-string!)
aid = self.cmd_dict['body']
sels = lchan.rs.mf.get_app_selectables(['AIDS'])
adf = self._find_aid_substr(sels, aid)
if adf:
lchan.selected_adf = adf
lchan.selected_file = lchan.selected_adf
#print("\tSELECT AID %s" % adf)
else:
logger.warning('SELECT UNKNOWN AID %s' % aid)
pass
else:
raise ValueError('Select Mode %s not implemented' % mode)
# decode the SELECT response
if self.successful:
self.file = lchan.selected_file
return lchan.selected_file.decode_select_response(self.rsp_dict['body'])
return None
# TS 102 221 Section 11.1.2
class Status(ApduCommand, n='STATUS', ins=0xF2, cla=['8X', 'CX', 'EX']):
_apdu_case = 2
_construct_p1 = Enum(Byte, no_indication=0, current_app_is_initialized=1, terminal_will_terminate_current_app=2)
_construct_p2 = Enum(Byte, response_like_select=0, response_df_name_tlv=1, response_no_data=0x0c)
def process_on_lchan(self, lchan):
if self.cmd_dict['p2'] == 'response_like_select':
return lchan.selected_file.decode_select_response(self.rsp_dict['body'])
def _decode_binary_p1p2(p1, p2) -> Dict:
ret = {}
if p1 & 0x80:
ret['file'] = 'sfi'
ret['sfi'] = p1 & 0x1f
ret['offset'] = p2
else:
ret['file'] = 'currently_selected_ef'
ret['offset'] = ((p1 & 0x7f) << 8) & p2
return ret
# TS 102 221 Section 11.1.3
class ReadBinary(ApduCommand, n='READ BINARY', ins=0xB0, cla=['0X', '4X', '6X']):
_apdu_case = 2
def _decode_p1p2(self):
return _decode_binary_p1p2(self.p1, self.p2)
def process_on_lchan(self, lchan):
self._determine_file(lchan)
if not isinstance(self.file, TransparentEF):
return b2h(self.rsp_data)
# our decoders don't work for non-zero offsets / short reads
if self.cmd_dict['offset'] != 0 or self.lr < self.file.size[0]:
return b2h(self.rsp_data)
method = getattr(self.file, 'decode_bin', None)
if self.successful and callable(method):
return method(self.rsp_data)
# TS 102 221 Section 11.1.4
class UpdateBinary(ApduCommand, n='UPDATE BINARY', ins=0xD6, cla=['0X', '4X', '6X']):
_apdu_case = 3
def _decode_p1p2(self):
return _decode_binary_p1p2(self.p1, self.p2)
def process_on_lchan(self, lchan):
self._determine_file(lchan)
if not isinstance(self.file, TransparentEF):
return b2h(self.rsp_data)
# our decoders don't work for non-zero offsets / short writes
if self.cmd_dict['offset'] != 0 or self.lc < self.file.size[0]:
return b2h(self.cmd_data)
method = getattr(self.file, 'decode_bin', None)
if self.successful and callable(method):
return method(self.cmd_data)
def _decode_record_p1p2(p1, p2):
ret = {}
ret['record_number'] = p1
if p2 >> 3 == 0:
ret['file'] = 'currently_selected_ef'
else:
ret['file'] = 'sfi'
ret['sfi'] = p2 >> 3
mode = p2 & 0x7
if mode == 2:
ret['mode'] = 'next_record'
elif mode == 3:
ret['mode'] = 'previous_record'
elif mode == 8:
ret['mode'] = 'absolute_current'
return ret
# TS 102 221 Section 11.1.5
class ReadRecord(ApduCommand, n='READ RECORD', ins=0xB2, cla=['0X', '4X', '6X']):
_apdu_case = 2
def _decode_p1p2(self):
r = _decode_record_p1p2(self.p1, self.p2)
self.col_id = '%02u' % r['record_number']
return r
def process_on_lchan(self, lchan):
self._determine_file(lchan)
if not isinstance(self.file, LinFixedEF):
return b2h(self.rsp_data)
method = getattr(self.file, 'decode_record_bin', None)
if self.successful and callable(method):
return method(self.rsp_data)
# TS 102 221 Section 11.1.6
class UpdateRecord(ApduCommand, n='UPDATE RECORD', ins=0xDC, cla=['0X', '4X', '6X']):
_apdu_case = 3
def _decode_p1p2(self):
r = _decode_record_p1p2(self.p1, self.p2)
self.col_id = '%02u' % r['record_number']
return r
def process_on_lchan(self, lchan):
self._determine_file(lchan)
if not isinstance(self.file, LinFixedEF):
return b2h(self.cmd_data)
method = getattr(self.file, 'decode_record_bin', None)
if self.successful and callable(method):
return method(self.cmd_data)
# TS 102 221 Section 11.1.7
class SearchRecord(ApduCommand, n='SEARCH RECORD', ins=0xA2, cla=['0X', '4X', '6X']):
_apdu_case = 4
_construct_rsp = GreedyRange(Int8ub)
def _decode_p1p2(self):
ret = {}
sfi = self.p2 >> 3
if sfi == 0:
ret['file'] = 'currently_selected_ef'
else:
ret['file'] = 'sfi'
ret['sfi'] = sfi
mode = self.p2 & 0x7
if mode in [0x4, 0x5]:
if mode == 0x4:
ret['mode'] = 'forward_search'
else:
ret['mode'] = 'backward_search'
ret['record_number'] = self.p1
self.col_id = '%02u' % ret['record_number']
elif mode == 6:
ret['mode'] = 'enhanced_search'
# TODO: further decode
elif mode == 7:
ret['mode'] = 'proprietary_search'
return ret
def _decode_cmd(self):
ret = self._decode_p1p2()
if self.cmd_data:
if ret['mode'] == 'enhanced_search':
ret['search_indication'] = b2h(self.cmd_data[:2])
ret['search_string'] = b2h(self.cmd_data[2:])
else:
ret['search_string'] = b2h(self.cmd_data)
return ret
def process_on_lchan(self, lchan):
self._determine_file(lchan)
return self.to_dict()
# TS 102 221 Section 11.1.8
class Increase(ApduCommand, n='INCREASE', ins=0x32, cla=['8X', 'CX', 'EX']):
_apdu_case = 4
PinConstructP2 = BitStruct('scope'/Enum(Flag, global_mf=0, specific_df_adf=1),
BitsInteger(2), 'reference_data_nr'/BitsInteger(5))
# TS 102 221 Section 11.1.9
class VerifyPin(ApduCommand, n='VERIFY PIN', ins=0x20, cla=['0X', '4X', '6X']):
_apdu_case = 3
_construct_p2 = PinConstructP2
@staticmethod
def _pin_process(apdu):
processed = {
'scope': apdu.cmd_dict['p2']['scope'],
'referenced_data_nr': apdu.cmd_dict['p2']['reference_data_nr'],
}
if apdu.lc == 0:
# this is just a question on the counters remaining
processed['mode'] = 'check_remaining_attempts'
else:
processed['pin'] = b2h(apdu.cmd_data)
if apdu.sw[0] == 0x63:
processed['remaining_attempts'] = apdu.sw[1] & 0xf
return processed
@staticmethod
def _pin_is_success(sw):
if sw[0] == 0x63:
return True
else:
return False
def process_on_lchan(self, lchan: RuntimeLchan):
return VerifyPin._pin_process(self)
def _is_success(self):
return VerifyPin._pin_is_success(self.sw)
# TS 102 221 Section 11.1.10
class ChangePin(ApduCommand, n='CHANGE PIN', ins=0x24, cla=['0X', '4X', '6X']):
_apdu_case = 3
_construct_p2 = PinConstructP2
def process_on_lchan(self, lchan: RuntimeLchan):
return VerifyPin._pin_process(self)
def _is_success(self):
return VerifyPin._pin_is_success(self.sw)
# TS 102 221 Section 11.1.11
class DisablePin(ApduCommand, n='DISABLE PIN', ins=0x26, cla=['0X', '4X', '6X']):
_apdu_case = 3
_construct_p2 = PinConstructP2
def process_on_lchan(self, lchan: RuntimeLchan):
return VerifyPin._pin_process(self)
def _is_success(self):
return VerifyPin._pin_is_success(self.sw)
# TS 102 221 Section 11.1.12
class EnablePin(ApduCommand, n='ENABLE PIN', ins=0x28, cla=['0X', '4X', '6X']):
_apdu_case = 3
_construct_p2 = PinConstructP2
def process_on_lchan(self, lchan: RuntimeLchan):
return VerifyPin._pin_process(self)
def _is_success(self):
return VerifyPin._pin_is_success(self.sw)
# TS 102 221 Section 11.1.13
class UnblockPin(ApduCommand, n='UNBLOCK PIN', ins=0x2C, cla=['0X', '4X', '6X']):
_apdu_case = 3
_construct_p2 = PinConstructP2
def process_on_lchan(self, lchan: RuntimeLchan):
return VerifyPin._pin_process(self)
def _is_success(self):
return VerifyPin._pin_is_success(self.sw)
# TS 102 221 Section 11.1.14
class DeactivateFile(ApduCommand, n='DEACTIVATE FILE', ins=0x04, cla=['0X', '4X', '6X']):
_apdu_case = 1
_construct_p1 = BitStruct(BitsInteger(4),
'select_mode'/Enum(BitsInteger(4), ef_by_file_id=0,
path_from_mf=8, path_from_current_df=9))
# TS 102 221 Section 11.1.15
class ActivateFile(ApduCommand, n='ACTIVATE FILE', ins=0x44, cla=['0X', '4X', '6X']):
_apdu_case = 1
_construct_p1 = DeactivateFile._construct_p1
# TS 102 221 Section 11.1.16
auth_p2_construct = BitStruct('scope'/Enum(Flag, mf=0, df_adf_specific=1),
BitsInteger(2),
'reference_data_nr'/BitsInteger(5))
class Authenticate88(ApduCommand, n='AUTHENTICATE', ins=0x88, cla=['0X', '4X', '6X']):
_apdu_case = 4
_construct_p2 = auth_p2_construct
# TS 102 221 Section 11.1.16
class Authenticate89(ApduCommand, n='AUTHENTICATE', ins=0x89, cla=['0X', '4X', '6X']):
_apdu_case = 4
_construct_p2 = auth_p2_construct
# TS 102 221 Section 11.1.17
class ManageChannel(ApduCommand, n='MANAGE CHANNEL', ins=0x70, cla=['0X', '4X', '6X']):
_apdu_case = 2
_construct_p1 = Enum(Flag, open_channel=0, close_channel=1)
_construct_p2 = Struct('logical_channel_number'/Int8ub)
_construct_rsp = Struct('logical_channel_number'/Int8ub)
def process_global(self, rs):
if not self.successful:
return
mode = self.cmd_dict['p1']
if mode == 'open_channel':
created_channel_nr = self.cmd_dict['p2']['logical_channel_number']
if created_channel_nr == 0:
# auto-assignment by UICC
created_channel_nr = self.rsp_data[0]
manage_channel = rs.get_lchan_by_cla(self.cla)
manage_channel.add_lchan(created_channel_nr)
self.col_id = '%02u' % created_channel_nr
elif mode == 'close_channel':
closed_channel_nr = self.cmd_dict['p2']
rs.del_lchan(closed_channel_nr)
self.col_id = '%02u' % closed_channel_nr
else:
raise ValueError('Unsupported MANAGE CHANNEL P1=%02X' % self.p1)
# TS 102 221 Section 11.1.18
class GetChallenge(ApduCommand, n='GET CHALLENGE', ins=0x84, cla=['0X', '4X', '6X']):
_apdu_case = 2
# TS 102 221 Section 11.1.19
class TerminalCapability(ApduCommand, n='TERMINAL CAPABILITY', ins=0xAA, cla=['8X', 'CX', 'EX']):
_apdu_case = 3
# TS 102 221 Section 11.1.20
class ManageSecureChannel(ApduCommand, n='MANAGE SECURE CHANNEL', ins=0x73, cla=['0X', '4X', '6X']):
@classmethod
def _get_apdu_case(cls, hdr:bytes) -> int:
p1 = hdr[2]
p2 = hdr[3]
if p1 & 0x7 == 0: # retrieve UICC Endpoints
return 2
elif p1 & 0xf in [1,2,3]: # establish sa, start secure channel SA
p2_cmd = p2 >> 5
if p2_cmd in [0,2,4]: # command data
return 3
elif p2_cmd in [1,3,5]: # response data
return 2
elif p1 & 0xf == 4: # terminate secure channel SA
return 3
raise ValueError('%s: Unable to detect APDU case for %s' % (cls.__name__, b2h(hdr)))
# TS 102 221 Section 11.1.21
class TransactData(ApduCommand, n='TRANSACT DATA', ins=0x75, cla=['0X', '4X', '6X']):
@classmethod
def _get_apdu_case(cls, hdr:bytes) -> int:
p1 = hdr[2]
if p1 & 0x04:
return 3
else:
return 2
# TS 102 221 Section 11.1.22
class SuspendUicc(ApduCommand, n='SUSPEND UICC', ins=0x76, cla=['80']):
_apdu_case = 4
_construct_p1 = BitStruct('rfu'/BitsInteger(7), 'mode'/Enum(Flag, suspend=0, resume=1))
# TS 102 221 Section 11.1.23
class GetIdentity(ApduCommand, n='GET IDENTITY', ins=0x78, cla=['8X', 'CX', 'EX']):
_apdu_case = 4
_construct_p2 = BitStruct('scope'/Enum(Flag, mf=0, df_adf_specific=1), BitsInteger(7))
# TS 102 221 Section 11.1.24
class ExchangeCapabilities(ApduCommand, n='EXCHANGE CAPABILITIES', ins=0x7A, cla=['80']):
_apdu_case = 4
# TS 102 221 Section 11.2.1
class TerminalProfile(ApduCommand, n='TERMINAL PROFILE', ins=0x10, cla=['80']):
_apdu_case = 3
# TS 102 221 Section 11.2.2 / TS 102 223
class Envelope(ApduCommand, n='ENVELOPE', ins=0xC2, cla=['80']):
_apdu_case = 4
# TS 102 221 Section 11.2.3 / TS 102 223
class Fetch(ApduCommand, n='FETCH', ins=0x12, cla=['80']):
_apdu_case = 2
# TS 102 221 Section 11.2.3 / TS 102 223
class TerminalResponse(ApduCommand, n='TERMINAL RESPONSE', ins=0x14, cla=['80']):
_apdu_case = 3
# TS 102 221 Section 11.3.1
class RetrieveData(ApduCommand, n='RETRIEVE DATA', ins=0xCB, cla=['8X', 'CX', 'EX']):
_apdu_case = 4
@staticmethod
def _tlv_decode_cmd(self : ApduCommand) -> Dict:
c = {}
if self.p2 & 0xc0 == 0x80:
c['mode'] = 'first_block'
sfi = self.p2 & 0x1f
if sfi == 0:
c['file'] = 'currently_selected_ef'
else:
c['file'] = 'sfi'
c['sfi'] = sfi
c['tag'] = i2h([self.cmd_data[0]])
elif self.p2 & 0xdf == 0x00:
c['mode'] = 'next_block'
elif self.p2 & 0xdf == 0x40:
c['mode'] = 'retransmit_previous_block'
else:
logger.warning('%s: invalid P2=%02x' % (self, self.p2))
return c
def _decode_cmd(self):
return RetrieveData._tlv_decode_cmd(self)
def _decode_rsp(self):
# TODO: parse tag/len/val?
return b2h(self.rsp_data)
# TS 102 221 Section 11.3.2
class SetData(ApduCommand, n='SET DATA', ins=0xDB, cla=['8X', 'CX', 'EX']):
_apdu_case = 3
def _decode_cmd(self):
c = RetrieveData._tlv_decode_cmd(self)
if c['mode'] == 'first_block':
if len(self.cmd_data) == 0:
c['delete'] = True
# TODO: parse tag/len/val?
c['data'] = b2h(self.cmd_data)
return c
# TS 102 221 Section 12.1.1
class GetResponse(ApduCommand, n='GET RESPONSE', ins=0xC0, cla=['0X', '4X', '6X']):
_apdu_case = 2
ApduCommands = ApduCommandSet('TS 102 221', cmds=[Select, Status, ReadBinary, UpdateBinary, ReadRecord,
UpdateRecord, SearchRecord, Increase, VerifyPin, ChangePin, DisablePin,
EnablePin, UnblockPin, DeactivateFile, ActivateFile, Authenticate88,
Authenticate89, ManageChannel, GetChallenge, TerminalCapability,
ManageSecureChannel, TransactData, SuspendUicc, GetIdentity,
ExchangeCapabilities, TerminalProfile, Envelope, Fetch, TerminalResponse,
RetrieveData, SetData, GetResponse])

115
pySim/apdu/ts_31_102.py Normal file
View File

@ -0,0 +1,115 @@
# -*- coding: utf-8 -*-
# without this, pylint will fail when inner classes are used
# within the 'nested' kwarg of our TlvMeta metaclass on python 3.7 :(
# pylint: disable=undefined-variable
"""
APDU commands of 3GPP TS 31.102 V16.6.0
"""
from typing import Dict
from construct import *
from construct import Optional as COptional
from pySim.filesystem import *
from pySim.construct import *
from pySim.ts_31_102 import SUCI_TlvDataObject
from pySim.apdu import ApduCommand, ApduCommandSet
# Copyright (C) 2022 Harald Welte <laforge@osmocom.org>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# Mapping between USIM Service Number and its description
from pySim.apdu import ApduCommand, ApduCommandSet
# TS 31.102 Section 7.1
class UsimAuthenticateEven(ApduCommand, n='AUTHENTICATE', ins=0x88, cla=['0X', '4X', '6X']):
_apdu_case = 4
_construct_p2 = BitStruct('scope'/Enum(Flag, mf=0, df_adf_specific=1),
BitsInteger(4),
'authentication_context'/Enum(BitsInteger(3), gsm=0, umts=1,
vgcs_vbs=2, gba=4))
_cs_cmd_gsm_3g = Struct('_rand_len'/Int8ub, 'rand'/HexAdapter(Bytes(this._rand_len)),
'_autn_len'/COptional(Int8ub), 'autn'/If(this._autn_len, HexAdapter(Bytes(this._autn_len))))
_cs_cmd_vgcs = Struct('_vsid_len'/Int8ub, 'vservice_id'/HexAdapter(Bytes(this._vsid_len)),
'_vkid_len'/Int8ub, 'vk_id'/HexAdapter(Bytes(this._vkid_len)),
'_vstk_rand_len'/Int8ub, 'vstk_rand'/HexAdapter(Bytes(this._vstk_rand_len)))
_cmd_gba_bs = Struct('_rand_len'/Int8ub, 'rand'/HexAdapter(Bytes(this._rand_len)),
'_autn_len'/Int8ub, 'autn'/HexAdapter(Bytes(this._autn_len)))
_cmd_gba_naf = Struct('_naf_id_len'/Int8ub, 'naf_id'/HexAdapter(Bytes(this._naf_id_len)),
'_impi_len'/Int8ub, 'impi'/HexAdapter(Bytes(this._impi_len)))
_cs_cmd_gba = Struct('tag'/Int8ub, 'body'/Switch(this.tag, { 0xDD: 'bootstrap'/_cmd_gba_bs,
0xDE: 'naf_derivation'/_cmd_gba_naf }))
_cs_rsp_gsm = Struct('_len_sres'/Int8ub, 'sres'/HexAdapter(Bytes(this._len_sres)),
'_len_kc'/Int8ub, 'kc'/HexAdapter(Bytes(this._len_kc)))
_rsp_3g_ok = Struct('_len_res'/Int8ub, 'res'/HexAdapter(Bytes(this._len_res)),
'_len_ck'/Int8ub, 'ck'/HexAdapter(Bytes(this._len_ck)),
'_len_ik'/Int8ub, 'ik'/HexAdapter(Bytes(this._len_ik)),
'_len_kc'/COptional(Int8ub), 'ck'/If(this._len_kc, HexAdapter(Bytes(this._len_kc))))
_rsp_3g_sync = Struct('_len_auts'/Int8ub, 'auts'/HexAdapter(Bytes(this._len_auts)))
_cs_rsp_3g = Struct('tag'/Int8ub, 'body'/Switch(this.tag, { 0xDB: 'success'/_rsp_3g_ok,
0xDC: 'sync_fail'/_rsp_3g_sync}))
_cs_rsp_vgcs = Struct(Const(b'\xDB'), '_vstk_len'/Int8ub, 'vstk'/HexAdapter(Bytes(this._vstk_len)))
_cs_rsp_gba_naf = Struct(Const(b'\xDB'), '_ks_ext_naf_len'/Int8ub, 'ks_ext_naf'/HexAdapter(Bytes(this._ks_ext_naf_len)))
def _decode_cmd(self) -> Dict:
r = {}
r['p1'] = parse_construct(self._construct_p1, self.p1.to_bytes(1, 'big'))
r['p2'] = parse_construct(self._construct_p2, self.p2.to_bytes(1, 'big'))
auth_ctx = r['p2']['authentication_context']
if auth_ctx in ['gsm', 'umts']:
r['body'] = parse_construct(self._cs_cmd_gsm_3g, self.cmd_data)
elif auth_ctx == 'vgcs_vbs':
r['body'] = parse_construct(self._cs_cmd_vgcs, self.cmd_data)
elif auth_ctx == 'gba':
r['body'] = parse_construct(self._cs_cmd_gba, self.cmd_data)
else:
raise ValueError('Unsupported authentication_context: %s' % auth_ctx)
return r
def _decode_rsp(self) -> Dict:
r = {}
auth_ctx = self.cmd_dict['p2']['authentication_context']
if auth_ctx == 'gsm':
r['body'] = parse_construct(self._cs_rsp_gsm, self.rsp_data)
elif auth_ctx == 'umts':
r['body'] = parse_construct(self._cs_rsp_3g, self.rsp_data)
elif auth_ctx == 'vgcs_vbs':
r['body'] = parse_construct(self._cs_rsp_vgcs, self.rsp_data)
elif auth_ctx == 'gba':
if self.cmd_dict['body']['tag'] == 0xDD:
r['body'] = parse_construct(self._cs_rsp_3g, self.rsp_data)
else:
r['body'] = parse_construct(self._cs_rsp_gba_naf, self.rsp_data)
else:
raise ValueError('Unsupported authentication_context: %s' % auth_ctx)
return r
class UsimAuthenticateOdd(ApduCommand, n='AUTHENTICATE', ins=0x89, cla=['0X', '4X', '6X']):
_apdu_case = 4
_construct_p2 = BitStruct('scope'/Enum(Flag, mf=0, df_adf_specific=1),
BitsInteger(4),
'authentication_context'/Enum(BitsInteger(3), mbms=5, local_key=6))
# TS 31.102 Section 7.5
class UsimGetIdentity(ApduCommand, n='GET IDENTITY', ins=0x78, cla=['8X', 'CX', 'EX']):
_apdu_case = 4
_construct_p2 = BitStruct('scope'/Enum(Flag, mf=0, df_adf_specific=1),
'identity_context'/Enum(BitsInteger(7), suci=1))
_tlv_rsp = SUCI_TlvDataObject
ApduCommands = ApduCommandSet('TS 31.102', cmds=[UsimAuthenticateEven, UsimAuthenticateOdd,
UsimGetIdentity])

View File

@ -0,0 +1,34 @@
import abc
import logging
from pySim.apdu import Apdu, Tpdu, CardReset, TpduFilter
PacketType = Apdu | Tpdu | CardReset
logger = logging.getLogger(__name__)
class ApduSource(abc.ABC):
def __init__(self):
self.apdu_filter = TpduFilter(None)
@abc.abstractmethod
def read_packet(self) -> PacketType:
"""Read one packet from the source."""
pass
def read(self) -> Apdu | CardReset:
"""Main function to call for an user: Blocking read, returns Apdu or CardReset."""
apdu = None
# loop until we actually have an APDU to return
while not apdu:
r = self.read_packet()
if not r:
continue
if isinstance(r, Tpdu):
apdu = self.apdu_filter.input_tpdu(r)
elif isinstance(r, Apdu):
apdu = r
elif isinstance(r, CardReset):
apdu = r
else:
ValueError('Unknown read_packet() return %s' % r)
return apdu

View File

@ -0,0 +1,57 @@
# coding=utf-8
# (C) 2022 by Harald Welte <laforge@osmocom.org>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from pySim.gsmtap import GsmtapMessage, GsmtapSource
from . import ApduSource, PacketType, CardReset
from pySim.apdu.ts_102_221 import ApduCommands as UiccApduCommands
from pySim.apdu.ts_31_102 import ApduCommands as UsimApduCommands
from pySim.apdu.global_platform import ApduCommands as GpApduCommands
ApduCommands = UiccApduCommands + UsimApduCommands + GpApduCommands
class GsmtapApduSource(ApduSource):
"""ApduSource for handling GSMTAP-SIM messages received via UDP, such as
those generated by simtrace2-sniff. Note that *if* you use IP loopback
and localhost addresses (which is the default), you will need to start
this source before starting simtrace2-sniff, as otherwise the latter will
claim the GSMTAP UDP port.
"""
def __init__(self, bind_ip:str='127.0.0.1', bind_port:int=4729):
"""Create a UDP socket for receiving GSMTAP-SIM messages.
Args:
bind_ip: IP address to which the socket should be bound (default: 127.0.0.1)
bind_port: UDP port number to which the socket should be bound (default: 4729)
"""
super().__init__()
self.gsmtap = GsmtapSource(bind_ip, bind_port)
def read_packet(self) -> PacketType:
gsmtap_msg, addr = self.gsmtap.read_packet()
if gsmtap_msg['type'] != 'sim':
raise ValueError('Unsupported GSMTAP type %s' % gsmtap_msg['type'])
sub_type = gsmtap_msg['sub_type']
if sub_type == 'apdu':
return ApduCommands.parse_cmd_bytes(gsmtap_msg['body'])
elif sub_type == 'atr':
# card has been reset
return CardReset()
elif sub_type in ['pps_req', 'pps_rsp']:
# simply ignore for now
pass
else:
raise ValueError('Unsupported GSMTAP-SIM sub-type %s' % sub_type)

View File

@ -0,0 +1,159 @@
# coding=utf-8
# (C) 2022 by Harald Welte <laforge@osmocom.org>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import sys
import logging
from pprint import pprint as pp
from typing import Tuple
import pyshark
from pySim.utils import h2b, b2h
from pySim.apdu import Tpdu
from . import ApduSource, PacketType, CardReset
logger = logging.getLogger(__name__)
class _PysharkRspro(ApduSource):
"""APDU Source [provider] base class for reading RSPRO (osmo-remsim) via tshark."""
def __init__(self, pyshark_inst):
self.pyshark = pyshark_inst
self.bank_id = None
self.bank_slot = None
self.cmd_tpdu = None
super().__init__()
@staticmethod
def get_bank_slot(bank_slot) -> Tuple[int, int]:
"""Convert a 'bankSlot_element' field into a tuple of bank_id, slot_nr"""
bank_id = bank_slot.get_field('bankId')
slot_nr = bank_slot.get_field('slotNr')
return int(bank_id), int(slot_nr)
@staticmethod
def get_client_slot(client_slot) -> Tuple[int, int]:
"""Convert a 'clientSlot_element' field into a tuple of client_id, slot_nr"""
client_id = client_slot.get_field('clientId')
slot_nr = client_slot.get_field('slotNr')
return int(client_id), int(slot_nr)
@staticmethod
def get_pstatus(pstatus) -> Tuple[int, int, int]:
"""Convert a 'slotPhysStatus_element' field into a tuple of vcc, reset, clk"""
vccPresent = int(pstatus.get_field('vccPresent'))
resetActive = int(pstatus.get_field('resetActive'))
clkActive = int(pstatus.get_field('clkActive'))
return vccPresent, resetActive, clkActive
def read_packet(self) -> PacketType:
p = self.pyshark.next()
return self._parse_packet(p)
def _set_or_verify_bank_slot(self, bsl: Tuple[int, int]):
"""Keep track of the bank:slot to make sure we don't mix traces of multiple cards"""
if not self.bank_id:
self.bank_id = bsl[0]
self.bank_slot = bsl[1]
else:
if self.bank_id != bsl[0] or self.bank_slot != bsl[1]:
raise ValueError('Received data for unexpected B(%u:%u)' % (bsl[0], bsl[1]))
def _parse_packet(self, p) -> PacketType:
rspro_layer = p['rspro']
#print("Layer: %s" % rspro_layer)
rspro_element = rspro_layer.get_field('RsproPDU_element')
#print("Element: %s" % rspro_element)
msg_type = rspro_element.get_field('msg')
rspro_msg = rspro_element.get_field('msg_tree')
if msg_type == '12': # tpduModemToCard
modem2card = rspro_msg.get_field('tpduModemToCard_element')
#print(modem2card)
client_slot = modem2card.get_field('fromClientSlot_element')
csl = self.get_client_slot(client_slot)
bank_slot = modem2card.get_field('toBankSlot_element')
bsl = self.get_bank_slot(bank_slot)
self._set_or_verify_bank_slot(bsl)
data = modem2card.get_field('data').replace(':','')
logger.debug("C(%u:%u) -> B(%u:%u): %s" % (csl[0], csl[1], bsl[0], bsl[1], data))
# store the CMD portion until the RSP portion arrives later
self.cmd_tpdu = h2b(data)
elif msg_type == '13': # tpduCardToModem
card2modem = rspro_msg.get_field('tpduCardToModem_element')
#print(card2modem)
client_slot = card2modem.get_field('toClientSlot_element')
csl = self.get_client_slot(client_slot)
bank_slot = card2modem.get_field('fromBankSlot_element')
bsl = self.get_bank_slot(bank_slot)
self._set_or_verify_bank_slot(bsl)
data = card2modem.get_field('data').replace(':','')
logger.debug("C(%u:%u) <- B(%u:%u): %s" % (csl[0], csl[1], bsl[0], bsl[1], data))
rsp_tpdu = h2b(data)
if self.cmd_tpdu:
# combine this R-TPDU with the C-TPDU we saw earlier
r = Tpdu(self.cmd_tpdu, rsp_tpdu)
self.cmd_tpdu = False
return r
elif msg_type == '14': # clientSlotStatus
cl_slotstatus = rspro_msg.get_field('clientSlotStatusInd_element')
#print(cl_slotstatus)
client_slot = cl_slotstatus.get_field('fromClientSlot_element')
bank_slot = cl_slotstatus.get_field('toBankSlot_element')
slot_pstatus = cl_slotstatus.get_field('slotPhysStatus_element')
vccPresent, resetActive, clkActive = self.get_pstatus(slot_pstatus)
if vccPresent and clkActive and not resetActive:
logger.debug("RESET")
return CardReset()
else:
print("Unhandled msg type %s: %s" % (msg_type, rspro_msg))
class PysharkRsproPcap(_PysharkRspro):
"""APDU Source [provider] class for reading RSPRO (osmo-remsim) from a PCAP
file via pyshark, which in turn uses tshark (part of wireshark).
In order to use this, you need a wireshark patched with RSPRO support,
such as can be found at https://gitea.osmocom.org/osmocom/wireshark/src/branch/laforge/rspro
A STANDARD UPSTREAM WIRESHARK *DOES NOT WORK*.
"""
def __init__(self, pcap_filename):
"""
Args:
pcap_filename: File name of the pcap file to be opened
"""
pyshark_inst = pyshark.FileCapture(pcap_filename, display_filter='rspro', use_json=True, keep_packets=False)
super().__init__(pyshark_inst)
class PysharkRsproLive(_PysharkRspro):
"""APDU Source [provider] class for reading RSPRO (osmo-remsim) from a live capture
via pyshark, which in turn uses tshark (part of wireshark).
In order to use this, you need a wireshark patched with RSPRO support,
such as can be found at https://gitea.osmocom.org/osmocom/wireshark/src/branch/laforge/rspro
A STANDARD UPSTREAM WIRESHARK *DOES NOT WORK*.
"""
def __init__(self, interface, bpf_filter='tcp port 9999 or tcp port 9998'):
"""
Args:
interface: Network interface name to capture packets on (like "eth0")
bfp_filter: libpcap capture filter to use
"""
pyshark_inst = pyshark.LiveCapture(interface=interface, display_filter='rspro', bpf_filter=bpf_filter,
use_json=True)
super().__init__(pyshark_inst)

214
pySim/gsmtap.py Normal file
View File

@ -0,0 +1,214 @@
# -*- coding: utf-8 -*-
""" Osmocom GSMTAP python implementation.
GSMTAP is a packet format used for conveying a number of different
telecom-related protocol traces over UDP.
"""
#
# Copyright (C) 2022 Harald Welte <laforge@gnumonks.org>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
import socket
from typing import List, Dict, Optional
from construct import Optional as COptional
from construct import *
from pySim.construct import *
# The root definition of GSMTAP can be found at
# https://cgit.osmocom.org/cgit/libosmocore/tree/include/osmocom/core/gsmtap.h
GSMTAP_UDP_PORT = 4729
# GSMTAP_TYPE_*
gsmtap_type_construct = Enum(Int8ub,
gsm_um = 0x01,
gsm_abis = 0x02,
gsm_um_burst = 0x03,
sim = 0x04,
tetra_i1 = 0x05,
tetra_i1_burst = 0x06,
wimax_burst = 0x07,
gprs_gb_llc = 0x08,
gprs_gb_sndcp = 0x09,
gmr1_um = 0x0a,
umts_rlc_mac = 0x0b,
umts_rrc = 0x0c,
lte_rrc = 0x0d,
lte_mac = 0x0e,
lte_mac_framed = 0x0f,
osmocore_log = 0x10,
qc_diag = 0x11,
lte_nas = 0x12,
e1_t1 = 0x13)
# TYPE_UM_BURST
gsmtap_subtype_burst_construct = Enum(Int8ub,
unknown = 0x00,
fcch = 0x01,
partial_sch = 0x02,
sch = 0x03,
cts_sch = 0x04,
compact_sch = 0x05,
normal = 0x06,
dummy = 0x07,
access = 0x08,
none = 0x09)
gsmtap_subtype_wimax_burst_construct = Enum(Int8ub,
cdma_code = 0x10,
fch = 0x11,
ffb = 0x12,
pdu = 0x13,
hack = 0x14,
phy_attributes = 0x15)
# GSMTAP_CHANNEL_*
gsmtap_subtype_um_construct = Enum(Int8ub,
unknown = 0x00,
bcch = 0x01,
ccch = 0x02,
rach = 0x03,
agch = 0x04,
pch = 0x05,
sdcch = 0x06,
sdcch4 = 0x07,
sdcch8 = 0x08,
facch_f = 0x09,
facch_h = 0x0a,
pacch = 0x0b,
cbch52 = 0x0c,
pdtch = 0x0d,
ptcch = 0x0e,
cbch51 = 0x0f,
voice_f = 0x10,
voice_h = 0x11)
# GSMTAP_SIM_*
gsmtap_subtype_sim_construct = Enum(Int8ub,
apdu = 0x00,
atr = 0x01,
pps_req = 0x02,
pps_rsp = 0x03,
tpdu_hdr = 0x04,
tpdu_cmd = 0x05,
tpdu_rsp = 0x06,
tpdu_sw = 0x07)
gsmtap_subtype_tetra_construct = Enum(Int8ub,
bsch = 0x01,
aach = 0x02,
sch_hu = 0x03,
sch_hd = 0x04,
sch_f = 0x05,
bnch = 0x06,
stch = 0x07,
tch_f = 0x08,
dmo_sch_s = 0x09,
dmo_sch_h = 0x0a,
dmo_sch_f = 0x0b,
dmo_stch = 0x0c,
dmo_tch = 0x0d)
gsmtap_subtype_gmr1_construct = Enum(Int8ub,
unknown = 0x00,
bcch = 0x01,
ccch = 0x02,
pch = 0x03,
agch = 0x04,
bach = 0x05,
rach = 0x06,
cbch = 0x07,
sdcch = 0x08,
tachh = 0x09,
gbch = 0x0a,
tch3 = 0x10,
tch6 = 0x14,
tch9 = 0x18)
gsmtap_subtype_e1t1_construct = Enum(Int8ub,
lapd = 0x01,
fr = 0x02,
raw = 0x03,
trau16 = 0x04,
trau8 = 0x05)
gsmtap_arfcn_construct = BitStruct('pcs'/Flag, 'uplink'/Flag, 'arfcn'/BitsInteger(14))
gsmtap_hdr_construct = Struct('version'/Int8ub,
'hdr_len'/Int8ub,
'type'/gsmtap_type_construct,
'timeslot'/Int8ub,
'arfcn'/gsmtap_arfcn_construct,
'signal_dbm'/Int8sb,
'snr_db'/Int8sb,
'frame_nr'/Int32ub,
'sub_type'/Switch(this.type, {
'gsm_um': gsmtap_subtype_um_construct,
'gsm_um_burst': gsmtap_subtype_burst_construct,
'sim': gsmtap_subtype_sim_construct,
'tetra_i1': gsmtap_subtype_tetra_construct,
'tetra_i1_burst': gsmtap_subtype_tetra_construct,
'wimax_burst': gsmtap_subtype_wimax_burst_construct,
'gmr1_um': gsmtap_subtype_gmr1_construct,
'e1_t1': gsmtap_subtype_e1t1_construct,
}),
'antenna_nr'/Int8ub,
'sub_slot'/Int8ub,
'res'/Int8ub,
'body'/GreedyBytes)
osmocore_log_ts_construct = Struct('sec'/Int32ub, 'usec'/Int32ub)
osmocore_log_level_construct = Enum(Int8ub, debug=1, info=3, notice=5, error=7, fatal=8)
gsmtap_osmocore_log_hdr_construct = Struct('ts'/osmocore_log_ts_construct,
'proc_name'/PaddedString(16, 'ascii'),
'pid'/Int32ub,
'level'/osmocore_log_level_construct,
Bytes(3),
'subsys'/PaddedString(16, 'ascii'),
'src_file'/Struct('name'/PaddedString(32, 'ascii'), 'line_nr'/Int32ub))
class GsmtapMessage:
"""Class whose objects represent a single GSMTAP message. Can encode and decode messages."""
def __init__(self, encoded = None):
self.encoded = encoded
self.decoded = None
def decode(self):
self.decoded = parse_construct(gsmtap_hdr_construct, self.encoded)
return self.decoded
def encode(self, decoded):
self.encoded = gsmtap_hdr_construct.build(decoded)
return self.encoded
class GsmtapSource:
def __init__(self, bind_ip:str='127.0.0.1', bind_port:int=4729):
self.bind_ip = bind_ip
self.bind_port = bind_port
self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.sock.bind((self.bind_ip, self.bind_port))
def read_packet(self) -> GsmtapMessage:
data, addr = self.sock.recvfrom(1024)
gsmtap_msg = GsmtapMessage(data)
gsmtap_msg.decode()
if gsmtap_msg.decoded['version'] != 0x02:
raise ValueError('Unknown GSMTAP version 0x%02x' % gsmtap_msg.decoded['version'])
return gsmtap_msg.decoded, addr

View File

@ -10,7 +10,7 @@ Various constants from 3GPP TS 31.102 V16.6.0
#
# Copyright (C) 2020 Supreeth Herle <herlesupreeth@gmail.com>
# Copyright (C) 2021 Harald Welte <laforge@osmocom.org>
# Copyright (C) 2021-2022 Harald Welte <laforge@osmocom.org>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
@ -300,6 +300,10 @@ EF_USIM_ADF_map = {
'ePDGSelectionEm': '6FF6',
}
# 3gPP TS 31.102 Section 7.5.2.1
class SUCI_TlvDataObject(BER_TLV_IE, tag=0xA1):
_construct = HexAdapter(GreedyBytes)
######################################################################
# ADF.USIM
######################################################################

View File

@ -7,3 +7,5 @@ construct
bidict
gsm0338
pyyaml>=5.1
termcolor
colorlog

View File

@ -17,6 +17,8 @@ setup(
"construct >= 2.9",
"bidict",
"gsm0338",
"termcolor",
"colorlog"
],
scripts=[
'pySim-prog.py',