2022-07-16 12:06:46 +00:00
|
|
|
# -*- coding: utf-8 -*-
|
|
|
|
|
|
|
|
""" Osmocom GSMTAP python implementation.
|
|
|
|
GSMTAP is a packet format used for conveying a number of different
|
|
|
|
telecom-related protocol traces over UDP.
|
|
|
|
"""
|
|
|
|
|
|
|
|
#
|
|
|
|
# Copyright (C) 2022 Harald Welte <laforge@gnumonks.org>
|
|
|
|
#
|
|
|
|
# This program is free software: you can redistribute it and/or modify
|
|
|
|
# it under the terms of the GNU General Public License as published by
|
|
|
|
# the Free Software Foundation, either version 2 of the License, or
|
|
|
|
# (at your option) any later version.
|
|
|
|
#
|
|
|
|
# This program is distributed in the hope that it will be useful,
|
|
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
|
|
# GNU General Public License for more details.
|
|
|
|
#
|
|
|
|
# You should have received a copy of the GNU General Public License
|
|
|
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
#
|
|
|
|
|
|
|
|
import socket
|
|
|
|
from construct import Optional as COptional
|
2024-02-04 20:47:34 +00:00
|
|
|
from construct import Int8ub, Int8sb, Int32ub, BitStruct, Enum, GreedyBytes, Struct, Switch
|
|
|
|
from construct import this, PaddedString
|
2022-07-16 12:06:46 +00:00
|
|
|
from pySim.construct import *
|
|
|
|
|
|
|
|
# The root definition of GSMTAP can be found at
|
|
|
|
# https://cgit.osmocom.org/cgit/libosmocore/tree/include/osmocom/core/gsmtap.h
|
|
|
|
|
|
|
|
GSMTAP_UDP_PORT = 4729
|
|
|
|
|
|
|
|
# GSMTAP_TYPE_*
|
|
|
|
gsmtap_type_construct = Enum(Int8ub,
|
|
|
|
gsm_um = 0x01,
|
|
|
|
gsm_abis = 0x02,
|
|
|
|
gsm_um_burst = 0x03,
|
|
|
|
sim = 0x04,
|
|
|
|
tetra_i1 = 0x05,
|
|
|
|
tetra_i1_burst = 0x06,
|
|
|
|
wimax_burst = 0x07,
|
|
|
|
gprs_gb_llc = 0x08,
|
|
|
|
gprs_gb_sndcp = 0x09,
|
|
|
|
gmr1_um = 0x0a,
|
|
|
|
umts_rlc_mac = 0x0b,
|
|
|
|
umts_rrc = 0x0c,
|
|
|
|
lte_rrc = 0x0d,
|
|
|
|
lte_mac = 0x0e,
|
|
|
|
lte_mac_framed = 0x0f,
|
|
|
|
osmocore_log = 0x10,
|
|
|
|
qc_diag = 0x11,
|
|
|
|
lte_nas = 0x12,
|
|
|
|
e1_t1 = 0x13)
|
|
|
|
|
|
|
|
|
|
|
|
# TYPE_UM_BURST
|
|
|
|
gsmtap_subtype_burst_construct = Enum(Int8ub,
|
|
|
|
unknown = 0x00,
|
|
|
|
fcch = 0x01,
|
|
|
|
partial_sch = 0x02,
|
|
|
|
sch = 0x03,
|
|
|
|
cts_sch = 0x04,
|
|
|
|
compact_sch = 0x05,
|
|
|
|
normal = 0x06,
|
|
|
|
dummy = 0x07,
|
|
|
|
access = 0x08,
|
|
|
|
none = 0x09)
|
|
|
|
|
|
|
|
gsmtap_subtype_wimax_burst_construct = Enum(Int8ub,
|
|
|
|
cdma_code = 0x10,
|
|
|
|
fch = 0x11,
|
|
|
|
ffb = 0x12,
|
|
|
|
pdu = 0x13,
|
|
|
|
hack = 0x14,
|
|
|
|
phy_attributes = 0x15)
|
|
|
|
|
|
|
|
# GSMTAP_CHANNEL_*
|
|
|
|
gsmtap_subtype_um_construct = Enum(Int8ub,
|
|
|
|
unknown = 0x00,
|
|
|
|
bcch = 0x01,
|
|
|
|
ccch = 0x02,
|
|
|
|
rach = 0x03,
|
|
|
|
agch = 0x04,
|
|
|
|
pch = 0x05,
|
|
|
|
sdcch = 0x06,
|
|
|
|
sdcch4 = 0x07,
|
|
|
|
sdcch8 = 0x08,
|
|
|
|
facch_f = 0x09,
|
|
|
|
facch_h = 0x0a,
|
|
|
|
pacch = 0x0b,
|
|
|
|
cbch52 = 0x0c,
|
|
|
|
pdtch = 0x0d,
|
|
|
|
ptcch = 0x0e,
|
|
|
|
cbch51 = 0x0f,
|
|
|
|
voice_f = 0x10,
|
|
|
|
voice_h = 0x11)
|
|
|
|
|
|
|
|
|
|
|
|
# GSMTAP_SIM_*
|
|
|
|
gsmtap_subtype_sim_construct = Enum(Int8ub,
|
|
|
|
apdu = 0x00,
|
|
|
|
atr = 0x01,
|
|
|
|
pps_req = 0x02,
|
|
|
|
pps_rsp = 0x03,
|
|
|
|
tpdu_hdr = 0x04,
|
|
|
|
tpdu_cmd = 0x05,
|
|
|
|
tpdu_rsp = 0x06,
|
|
|
|
tpdu_sw = 0x07)
|
|
|
|
|
|
|
|
gsmtap_subtype_tetra_construct = Enum(Int8ub,
|
|
|
|
bsch = 0x01,
|
|
|
|
aach = 0x02,
|
|
|
|
sch_hu = 0x03,
|
|
|
|
sch_hd = 0x04,
|
|
|
|
sch_f = 0x05,
|
|
|
|
bnch = 0x06,
|
|
|
|
stch = 0x07,
|
|
|
|
tch_f = 0x08,
|
|
|
|
dmo_sch_s = 0x09,
|
|
|
|
dmo_sch_h = 0x0a,
|
|
|
|
dmo_sch_f = 0x0b,
|
|
|
|
dmo_stch = 0x0c,
|
|
|
|
dmo_tch = 0x0d)
|
|
|
|
|
|
|
|
gsmtap_subtype_gmr1_construct = Enum(Int8ub,
|
|
|
|
unknown = 0x00,
|
|
|
|
bcch = 0x01,
|
|
|
|
ccch = 0x02,
|
|
|
|
pch = 0x03,
|
|
|
|
agch = 0x04,
|
|
|
|
bach = 0x05,
|
|
|
|
rach = 0x06,
|
|
|
|
cbch = 0x07,
|
|
|
|
sdcch = 0x08,
|
|
|
|
tachh = 0x09,
|
|
|
|
gbch = 0x0a,
|
|
|
|
tch3 = 0x10,
|
|
|
|
tch6 = 0x14,
|
|
|
|
tch9 = 0x18)
|
|
|
|
|
|
|
|
gsmtap_subtype_e1t1_construct = Enum(Int8ub,
|
|
|
|
lapd = 0x01,
|
|
|
|
fr = 0x02,
|
|
|
|
raw = 0x03,
|
|
|
|
trau16 = 0x04,
|
|
|
|
trau8 = 0x05)
|
|
|
|
|
|
|
|
gsmtap_arfcn_construct = BitStruct('pcs'/Flag, 'uplink'/Flag, 'arfcn'/BitsInteger(14))
|
|
|
|
|
|
|
|
gsmtap_hdr_construct = Struct('version'/Int8ub,
|
|
|
|
'hdr_len'/Int8ub,
|
|
|
|
'type'/gsmtap_type_construct,
|
|
|
|
'timeslot'/Int8ub,
|
|
|
|
'arfcn'/gsmtap_arfcn_construct,
|
|
|
|
'signal_dbm'/Int8sb,
|
|
|
|
'snr_db'/Int8sb,
|
|
|
|
'frame_nr'/Int32ub,
|
|
|
|
'sub_type'/Switch(this.type, {
|
|
|
|
'gsm_um': gsmtap_subtype_um_construct,
|
|
|
|
'gsm_um_burst': gsmtap_subtype_burst_construct,
|
|
|
|
'sim': gsmtap_subtype_sim_construct,
|
|
|
|
'tetra_i1': gsmtap_subtype_tetra_construct,
|
|
|
|
'tetra_i1_burst': gsmtap_subtype_tetra_construct,
|
|
|
|
'wimax_burst': gsmtap_subtype_wimax_burst_construct,
|
|
|
|
'gmr1_um': gsmtap_subtype_gmr1_construct,
|
|
|
|
'e1_t1': gsmtap_subtype_e1t1_construct,
|
|
|
|
}),
|
|
|
|
'antenna_nr'/Int8ub,
|
|
|
|
'sub_slot'/Int8ub,
|
|
|
|
'res'/Int8ub,
|
|
|
|
'body'/GreedyBytes)
|
|
|
|
|
|
|
|
osmocore_log_ts_construct = Struct('sec'/Int32ub, 'usec'/Int32ub)
|
|
|
|
osmocore_log_level_construct = Enum(Int8ub, debug=1, info=3, notice=5, error=7, fatal=8)
|
|
|
|
gsmtap_osmocore_log_hdr_construct = Struct('ts'/osmocore_log_ts_construct,
|
|
|
|
'proc_name'/PaddedString(16, 'ascii'),
|
|
|
|
'pid'/Int32ub,
|
|
|
|
'level'/osmocore_log_level_construct,
|
|
|
|
Bytes(3),
|
|
|
|
'subsys'/PaddedString(16, 'ascii'),
|
|
|
|
'src_file'/Struct('name'/PaddedString(32, 'ascii'), 'line_nr'/Int32ub))
|
|
|
|
|
|
|
|
|
|
|
|
class GsmtapMessage:
|
|
|
|
"""Class whose objects represent a single GSMTAP message. Can encode and decode messages."""
|
|
|
|
def __init__(self, encoded = None):
|
|
|
|
self.encoded = encoded
|
|
|
|
self.decoded = None
|
|
|
|
|
|
|
|
def decode(self):
|
|
|
|
self.decoded = parse_construct(gsmtap_hdr_construct, self.encoded)
|
|
|
|
return self.decoded
|
|
|
|
|
|
|
|
def encode(self, decoded):
|
|
|
|
self.encoded = gsmtap_hdr_construct.build(decoded)
|
|
|
|
return self.encoded
|
|
|
|
|
|
|
|
class GsmtapSource:
|
|
|
|
def __init__(self, bind_ip:str='127.0.0.1', bind_port:int=4729):
|
|
|
|
self.bind_ip = bind_ip
|
|
|
|
self.bind_port = bind_port
|
|
|
|
self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
|
|
|
self.sock.bind((self.bind_ip, self.bind_port))
|
|
|
|
|
|
|
|
def read_packet(self) -> GsmtapMessage:
|
|
|
|
data, addr = self.sock.recvfrom(1024)
|
|
|
|
gsmtap_msg = GsmtapMessage(data)
|
|
|
|
gsmtap_msg.decode()
|
|
|
|
if gsmtap_msg.decoded['version'] != 0x02:
|
|
|
|
raise ValueError('Unknown GSMTAP version 0x%02x' % gsmtap_msg.decoded['version'])
|
|
|
|
return gsmtap_msg.decoded, addr
|