fake_trx: use DATAMSG classes for DATA messages

The DATAMSG API, that was introduced and extended a few commits
before, provides all required methods to create, validate,
generate and parse DATA messages. Let's use it now.

Change-Id: Ibc99126dc05d873c1ba538a5f4e74866de563f56
This commit is contained in:
Vadim Yanitskiy 2018-01-27 21:47:31 +07:00
parent 263ccef268
commit d273ebf611
4 changed files with 114 additions and 157 deletions

View File

@ -4,7 +4,7 @@
# Auxiliary tool to generate and send random bursts via TRX DATA
# interface, which may be useful for fuzzing and testing
#
# (C) 2017 by Vadim Yanitskiy <axilirator@gmail.com>
# (C) 2017-2018 by Vadim Yanitskiy <axilirator@gmail.com>
#
# All Rights Reserved
#
@ -22,7 +22,6 @@
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
import random
import signal
import getopt
import sys
@ -30,6 +29,7 @@ import sys
from rand_burst_gen import RandBurstGen
from data_if import DATAInterface
from gsm_shared import *
from data_msg import *
COPYRIGHT = \
"Copyright (C) 2017 by Vadim Yanitskiy <axilirator@gmail.com>\n" \
@ -70,38 +70,56 @@ class Application:
# Init random burst generator
burst_gen = RandBurstGen()
# Init an empty DATA message
if self.conn_mode == "TRX":
msg = DATAMSG_L12TRX()
elif self.conn_mode == "L1":
msg = DATAMSG_TRX2L1()
# Generate a random frame number or use provided one
if self.fn is None:
fn = random.randint(0, GSM_HYPERFRAME)
else:
fn = self.fn
fn_init = msg.rand_fn() if self.fn is None else self.fn
# Send as much bursts as required
for i in range(self.burst_count):
# Randomize the message header
msg.rand_hdr()
# Increase and set frame number
msg.fn = (fn_init + i) % GSM_HYPERFRAME
# Set timeslot number
if self.tn is not None:
msg.tn = self.tn
# Set transmit power level
if self.pwr is not None:
msg.pwr = self.pwr
# TODO: also set TRX2L1 specific fields
# Generate a random burst
if self.burst_type == "NB":
buf = burst_gen.gen_nb()
burst = burst_gen.gen_nb()
elif self.burst_type == "FB":
buf = burst_gen.gen_fb()
burst = burst_gen.gen_fb()
elif self.burst_type == "SB":
buf = burst_gen.gen_sb()
burst = burst_gen.gen_sb()
elif self.burst_type == "AB":
buf = burst_gen.gen_ab()
burst = burst_gen.gen_ab()
print("[i] Sending %d/%d %s burst (fn=%u) to %s..."
# Convert to soft-bits in case of TRX -> L1 message
if self.conn_mode == "L1":
burst = msg.ubit2sbit(burst)
# Set burst
msg.burst = burst
print("[i] Sending %d/%d %s burst %s to %s..."
% (i + 1, self.burst_count, self.burst_type,
fn, self.conn_mode))
msg.desc_hdr(), self.conn_mode))
# Send to TRX or L1
if self.conn_mode == "TRX":
self.data_if.send_trx_msg(buf,
self.tn, fn, self.pwr)
elif self.conn_mode == "L1":
self.data_if.send_l1_msg(buf,
self.tn, fn, self.pwr)
# Increase frame number (for count > 1)
fn = (fn + 1) % GSM_HYPERFRAME
# Send message
self.data_if.send_msg(msg)
self.shutdown()

View File

@ -3,7 +3,7 @@
# Auxiliary tool to send existing bursts via TRX DATA interface
#
# (C) 2017 by Vadim Yanitskiy <axilirator@gmail.com>
# (C) 2017-2018 by Vadim Yanitskiy <axilirator@gmail.com>
#
# All Rights Reserved
#
@ -21,13 +21,13 @@
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
import random
import signal
import getopt
import sys
from data_if import DATAInterface
from gsm_shared import *
from data_msg import *
COPYRIGHT = \
"Copyright (C) 2017 by Vadim Yanitskiy <axilirator@gmail.com>\n" \
@ -74,40 +74,61 @@ class Application:
print("[i] Reading bursts from stdin...")
src = sys.stdin
# Init an empty DATA message
if self.conn_mode == "TRX":
msg = DATAMSG_L12TRX()
elif self.conn_mode == "L1":
msg = DATAMSG_TRX2L1()
# Generate a random frame number or use provided one
if self.fn is None:
fn = random.randint(0, GSM_HYPERFRAME)
else:
fn = self.fn
fn = msg.rand_fn() if self.fn is None else self.fn
# Read the burst source line-by-line
for line in src:
# Strip spaces
burst = line.strip()
buf = []
burst_str = line.strip()
burst = []
# Check length
if len(burst) != 148:
if len(burst_str) != 148:
print("[!] Dropping burst due to length != 148")
continue
print("[i] Sending a burst (fn=%u) to %s..."
% (fn, self.conn_mode))
# Randomize the message header
msg.rand_hdr()
# Set frame number
msg.fn = fn
# Set timeslot number
if self.tn is not None:
msg.tn = self.tn
# Set transmit power level
if self.pwr is not None:
msg.pwr = self.pwr
# TODO: also set TRX2L1 specific fields
# Parse a string
for bit in burst:
for bit in burst_str:
if bit == "1":
buf.append(1)
burst.append(1)
else:
buf.append(0)
burst.append(0)
# Send to TRX or L1
if self.conn_mode == "TRX":
self.data_if.send_trx_msg(buf,
self.tn, fn, self.pwr)
elif self.conn_mode == "L1":
self.data_if.send_l1_msg(buf,
self.tn, fn, self.pwr)
# Convert to soft-bits in case of TRX -> L1 message
if self.conn_mode == "L1":
burst = msg.ubit2sbit(burst)
# Set burst
msg.burst = burst
print("[i] Sending a burst %s to %s..."
% (msg.desc_hdr(), self.conn_mode))
# Send message
self.data_if.send_msg(msg)
# Increase frame number (for count > 1)
fn = (fn + 1) % GSM_HYPERFRAME

View File

@ -4,7 +4,7 @@
# Virtual Um-interface (fake transceiver)
# DATA interface implementation
#
# (C) 2017 by Vadim Yanitskiy <axilirator@gmail.com>
# (C) 2017-2018 by Vadim Yanitskiy <axilirator@gmail.com>
#
# All Rights Reserved
#
@ -22,85 +22,18 @@
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
import random
from udp_link import UDPLink
from gsm_shared import *
from data_msg import *
class DATAInterface(UDPLink):
def send_l1_msg(self, burst,
tn = None, fn = None, rssi = None):
# Generate random timeslot index if not preset
if tn is None:
tn = random.randint(0, 7)
def send_msg(self, msg):
# Validate a message
if not msg.validate():
raise ValueError("Message incomplete or incorrect")
# Generate random frame number if not preset
if fn is None:
fn = random.randint(0, GSM_HYPERFRAME)
# Generate random RSSI if not preset
if rssi is None:
rssi = -random.randint(-75, -50)
# Prepare a buffer for header and burst
buf = []
# Put timeslot index
buf.append(tn)
# Put frame number
buf.append((fn >> 24) & 0xff)
buf.append((fn >> 16) & 0xff)
buf.append((fn >> 8) & 0xff)
buf.append((fn >> 0) & 0xff)
# Put RSSI
buf.append(rssi)
# HACK: put fake TOA value
buf += [0x00] * 2
# Put burst
buf += burst
# Put two unused bytes
buf += [0x00] * 2
# Generate TRX message
payload = msg.gen_msg()
# Send message
self.send(bytearray(buf))
def send_trx_msg(self, burst,
tn = None, fn = None, pwr = None):
# Generate random timeslot index if not preset
if tn is None:
tn = random.randint(0, 7)
# Generate random frame number if not preset
if fn is None:
fn = random.randint(0, GSM_HYPERFRAME)
# Generate random power level if not preset
if pwr is None:
pwr = random.randint(0, 34)
# Prepare a buffer for header and burst
buf = []
# Put timeslot index
buf.append(tn)
# Put frame number
buf.append((fn >> 24) & 0xff)
buf.append((fn >> 16) & 0xff)
buf.append((fn >> 8) & 0xff)
buf.append((fn >> 0) & 0xff)
# Put transmit power level
buf.append(pwr)
# Put burst
buf += burst
# Send message
self.send(bytearray(buf))
self.send(payload)

View File

@ -27,6 +27,8 @@ import sys
import scapy.all
from data_msg import *
COPYRIGHT = \
"Copyright (C) 2018 by Vadim Yanitskiy <axilirator@gmail.com>\n" \
"License GPLv2+: GNU GPL version 2 or later " \
@ -98,32 +100,39 @@ class Application:
trx = udp.payload
# Convert to bytearray
trx = bytearray(str(trx))
# Parse GSM TDMA specific data
fn = (trx[1] << 24) | (trx[2] << 16) | (trx[3] << 8) | trx[4]
tn = trx[0]
msg_raw = bytearray(str(trx))
# Determine a burst direction (L1 <-> TRX)
l12trx = udp.sport > udp.dport
# Create an empty DATA message
msg = DATAMSG_L12TRX() if l12trx else DATAMSG_TRX2L1()
# Attempt to parse the payload as a DATA message
try:
msg.parse_msg(msg_raw)
except:
print("[!] Failed to parse message, dropping...")
self.cnt_burst_dropped_num += 1
return
# Poke burst pass filter
rc = self.burst_pass_filter(l12trx, fn, tn)
rc = self.burst_pass_filter(l12trx, msg.fn, msg.tn)
if rc is False:
self.cnt_burst_dropped_num += 1
return
# Debug print
print("[i] %s burst: fn=%u, tn=%d" \
% ("L1 -> TRX" if l12trx else "TRX -> L1", fn, tn))
print("[i] %s burst: %s" \
% ("L1 -> TRX" if l12trx else "TRX -> L1", msg.desc_hdr()))
# Poke burst handler
rc = self.burst_handle(trx, l12trx, fn, tn)
rc = self.burst_handle(l12trx, msg_raw, msg)
if rc is False:
self.shutdown()
# Poke burst counter
rc = self.burst_count(fn, tn)
rc = self.burst_count(msg.fn, msg.tn)
if rc is True:
self.shutdown()
@ -149,47 +158,23 @@ class Application:
# Burst passed ;)
return True
def burst_handle(self, trx_burst, l12trx, fn, tn):
def burst_handle(self, l12trx, msg_raw, msg):
if self.print_bursts:
self.burst_dump_bits(sys.stdout, trx_burst, l12trx)
sys.stdout.flush()
print(msg.burst)
if self.output_file is not None:
# TLV: tag defines burst direction (one byte, BE)
self.output_file.write('\x01' if l12trx else '\x02')
# TLV: length of value (one byte, BE)
length = len(trx_burst)
length = len(msg_raw)
self.output_file.write(chr(length))
# TLV: raw value
self.output_file.write(trx_burst)
self.output_file.write(msg_raw)
return True
def burst_dump_bits(self, dst, trx_burst, l12trx):
# Split out burst header
if l12trx:
burst = trx_burst[6:]
else:
burst = trx_burst[8:]
# Print normal bits: 0 or 1
for i in range(0, 148):
# Convert bits to chars
if l12trx:
# Convert bits as is
bit = '1' if burst[i] else '0'
else:
# Convert trx bits {254..0} to sbits {-127..127}
bit = -127 if burst[i] == 255 else 127 - burst[i]
# Convert sbits {-127..127} to ubits {0..1}
bit = '1' if bit < 0 else '0'
# Write a normal bit
dst.write(bit)
dst.write("\n")
def burst_count(self, fn, tn):
# Update frame counter
if self.cnt_frame_last is None: