fake_trx/burst_fwd.py: use DATAMSG transformation API

As the DATAMSG classes were introduced, let's use them.
This approach abstracts one from dealing with raw bytes.

Also, now BurstForwarder randomizes both RSSI and ToA values,
as this feature is supported from-the-box by the DATAMSG_TRX2L1.

Change-Id: Ib15018eab749150e244914dab4b6e433ce0c9209
This commit is contained in:
Vadim Yanitskiy 2018-02-27 07:00:45 +07:00
parent 615faadcfb
commit f35413691d
1 changed files with 66 additions and 19 deletions

View File

@ -22,6 +22,10 @@
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
import random
from data_msg import *
class BurstForwarder:
# Timeslot filter (drop everything by default)
ts_pass = None
@ -30,40 +34,60 @@ class BurstForwarder:
bts_freq = None
bb_freq = None
# Constants
# TODO: add options to change this
RSSI_RAND_TRESHOLD = 10
RSSI_RAND_MIN = -90
RSSI_RAND_MAX = -60
# TODO: add options to change this
TOA_RAND_TRESHOLD = 0.3
TOA_RAND_BASE = 0.00
def __init__(self, bts_link, bb_link):
self.bts_link = bts_link
self.bb_link = bb_link
# Generate a random RSSI range
rssi = random.randint(self.RSSI_RAND_MIN, self.RSSI_RAND_MAX)
self.rssi_min = rssi - self.RSSI_RAND_TRESHOLD
self.rssi_max = rssi + self.RSSI_RAND_TRESHOLD
# Generate a random ToA range
self.toa_min = self.TOA_RAND_BASE - self.TOA_RAND_TRESHOLD
self.toa_max = self.TOA_RAND_BASE + self.TOA_RAND_TRESHOLD
def set_slot(self, ts):
if ts > 0 and ts < 8:
self.ts_pass = ts
else:
raise ValueError("Incorrect index for timeslot filter")
def process_payload(self, data):
payload = bytearray(data)
length = len(payload)
# Converts a L12TRX message to TRX2L1 message
def transform_msg(self, msg_raw):
# Attempt to parse a message
try:
msg_l12trx = DATAMSG_L12TRX()
msg_l12trx.parse_msg(bytearray(msg_raw))
except:
print("[!] Dropping unhandled DL message...")
return None
# HACK: set fake RSSI value (-53)
payload[5] = 0x35
# Compose a new message for L1
msg_trx2l1 = msg_l12trx.gen_trx2l1()
# HACK: add fake TOA value (6th and 7th bytes)
payload[6:2] = [0x00, 0x00]
# Randomize both RSSI and ToA values
msg_trx2l1.rssi = msg_trx2l1.rand_rssi(
min = self.rssi_min, max = self.rssi_max)
msg_trx2l1.toa = msg_trx2l1.rand_toa(
min = self.toa_min, max = self.toa_max)
# Convert ubits to {255..0}
for i in range(8, length):
payload[i] = 255 if payload[i] else 0
# WTF: append two unused bytes at the end
payload[length:2] = [0x00, 0x00]
return payload
return msg_trx2l1
# Downlink handler: BTS -> BB
def bts2bb(self):
# Read data from socket
data, addr = self.bts_link.sock.recvfrom(512)
payload = self.process_payload(data)
# BB is not connected / tuned
if self.bb_freq is None:
@ -73,10 +97,22 @@ class BurstForwarder:
if self.bb_freq != self.bts_freq:
return None
# Timeslot filter
if payload[0] != self.ts_pass:
# Process a message
msg = self.transform_msg(data)
if msg is None:
return None
# Timeslot filter
if msg.tn != self.ts_pass:
return None
# Validate and generate the payload
payload = msg.gen_msg()
# Append two unused bytes at the end
# in order to keep the compatibility
payload += bytearray(2)
# Send burst to BB
self.bb_link.send(payload)
@ -84,7 +120,6 @@ class BurstForwarder:
def bb2bts(self):
# Read data from socket
data, addr = self.bb_link.sock.recvfrom(512)
payload = self.process_payload(data)
# BTS is not connected / tuned
if self.bts_freq is None:
@ -94,5 +129,17 @@ class BurstForwarder:
if self.bb_freq != self.bts_freq:
return None
# Process a message
msg = self.transform_msg(data)
if msg is None:
return None
# Validate and generate the payload
payload = msg.gen_msg()
# Append two unused bytes at the end
# in order to keep the compatibility
payload += bytearray(2)
# Send burst to BTS
self.bts_link.send(payload)