trx_toolkit/fake_trx.py: basic TRXD version 0x01 support

Since the new TRXD header format has been introduced, FakeTRX needs
to be able to fill it correctly. In particular, the following:

  - Modulation, which can be determined from the burst length;
  - Training Sequence Code (and set), which needs to be detected
    by comparing the burst bits of L12TRX message against known
    training sequences (only GMSK and the default TS set for now);
  - C/I (Carrier-to-Interference ratio), which can be simulated
    later on, as instructed on the TRXC interface ('FAKE_CI').

The actual TRXD header version is stored in the instance of class
DATAInterface. By default (at startup), legacy version 0 is used.
The version negotiation is supposed to be performed on the TRXC
interface, and to be implemented in a follow-up change.

Different Transceivers may use different header versions, thus in
FakeTRX.send_data_msg() we need to override the original version
of the L12TRX message, and generate the corresponding PDU.

Limitations:

  - NOPE / IDLE indications are not (yet) supported;
  - TSC detection: GMSK modulation only.

Change-Id: I164f5ae4ce7694d6e324aab927a04e96d489ebd8
Related: OS#4006
This commit is contained in:
Vadim Yanitskiy 2019-07-08 10:14:10 +07:00 committed by laforge
parent 6780e47b6c
commit ebf676597b
3 changed files with 78 additions and 5 deletions

View File

@ -84,4 +84,4 @@ class BurstForwarder:
if tx_msg.tn not in trx.ts_list:
continue
trx.send_data_msg(src_trx, tx_msg)
trx.send_data_msg(src_trx, rx_msg, tx_msg)

View File

@ -4,7 +4,7 @@
# TRX Toolkit
# DATA interface implementation
#
# (C) 2017-2018 by Vadim Yanitskiy <axilirator@gmail.com>
# (C) 2017-2019 by Vadim Yanitskiy <axilirator@gmail.com>
#
# All Rights Reserved
#
@ -29,9 +29,29 @@ from data_msg import *
class DATAInterface(UDPLink):
def __init__(self, *udp_link_args):
# Default header version (legacy)
self._hdr_ver = 0x00
UDPLink.__init__(self, *udp_link_args)
log.debug("Init TRXD interface (%s)" % self.desc_link())
def set_hdr_ver(self, ver):
if not ver in DATAMSG.known_versions:
return False
self._hdr_ver = ver
return True
def match_hdr_ver(self, msg):
if msg.ver == self._hdr_ver:
return True
log.error("(%s) Rx DATA message (%s) with unexpected header "
"version %u (!= expected %u), ignoring..."
% (self.desc_link(), msg.desc_hdr(),
msg.ver, self._hdr_ver))
return False
def recv_raw_data(self):
data, _ = self.sock.recvfrom(512)
return data
@ -49,6 +69,11 @@ class DATAInterface(UDPLink):
"from R:%s:%u" % (self.remote_addr, self.remote_port))
return None
# Make sure the header version matches
# the configured one (self._hdr_ver)
if not self.match_hdr_ver(msg):
return None
return msg
def recv_trx2l1_msg(self):
@ -64,6 +89,11 @@ class DATAInterface(UDPLink):
"from R:%s:%u" % (self.remote_addr, self.remote_port))
return None
# Make sure the header version matches
# the configured one (self._hdr_ver)
if not self.match_hdr_ver(msg):
return None
return msg
def send_msg(self, msg, legacy = False):

View File

@ -35,9 +35,11 @@ import re
from app_common import ApplicationBase
from burst_fwd import BurstForwarder
from transceiver import Transceiver
from data_msg import Modulation
from clck_gen import CLCKGen
from trx_list import TRXList
from fake_pm import FakePM
from gsm_shared import *
class FakeTRX(Transceiver):
""" Fake transceiver with RF path (burst loss, RSSI, TA, ToA) simulation.
@ -98,18 +100,21 @@ class FakeTRX(Transceiver):
TOA256_BASE_DEFAULT = 0
RSSI_BASE_DEFAULT = -60
CI_BASE_DEFAULT = 90
def __init__(self, *trx_args, **trx_kwargs):
Transceiver.__init__(self, *trx_args, **trx_kwargs)
# Actual ToA / RSSI / TA values
# Actual ToA, RSSI, C/I, TA values
self.toa256_base = self.TOA256_BASE_DEFAULT
self.rssi_base = self.RSSI_BASE_DEFAULT
self.ci_base = self.CI_BASE_DEFAULT
self.ta = 0
# ToA / RSSI randomization threshold
# ToA, RSSI, C/I randomization thresholds
self.toa256_rand_threshold = 0
self.rssi_rand_threshold = 0
self.ci_rand_threshold = 0
# Path loss simulation (burst dropping)
self.burst_drop_amount = 0
@ -137,6 +142,17 @@ class FakeTRX(Transceiver):
rssi_max = self.rssi_base + self.rssi_rand_threshold
return random.randint(rssi_min, rssi_max)
@property
def ci(self):
# Check if randomization is required
if self.ci_rand_threshold is 0:
return self.ci_base
# Generate a random C/I value in required range
ci_min = self.ci_base - self.ci_rand_threshold
ci_max = self.ci_base + self.ci_rand_threshold
return random.randint(ci_min, ci_max)
# Path loss simulation: burst dropping
# Returns: True - drop, False - keep
def sim_burst_drop(self, msg):
@ -152,14 +168,41 @@ class FakeTRX(Transceiver):
return False
def _handle_data_msg_v1(self, src_msg, msg):
# TODO: NOPE indications are not (yet) supported
msg.nope_ind = False
# C/I (Carrier-to-Interference ratio)
msg.ci = self.ci
# Pick modulation type by burst length
bl = len(src_msg.burst)
msg.mod_type = Modulation.pick_by_bl(bl)
# Pick TSC (Training Sequence Code) and TSC set
if msg.mod_type is Modulation.ModGMSK:
ss = TrainingSeqGMSK.pick(src_msg.burst)
msg.tsc = ss.tsc if ss is not None else 0
msg.tsc_set = ss.tsc_set if ss is not None else 0
else: # TODO: other modulation types (at least 8-PSK)
msg.tsc_set = 0
msg.tsc = 0
# Takes (partially initialized) TRX2L1 message,
# simulates RF path parameters (such as RSSI),
# and sends towards the L1
def send_data_msg(self, src_trx, msg):
def send_data_msg(self, src_trx, src_msg, msg):
# Override header version
msg.ver = self.data_if._hdr_ver
# Complete message header
msg.toa256 = self.toa256
msg.rssi = self.rssi
# Version specific fields
if msg.ver >= 0x01:
self._handle_data_msg_v1(src_msg, msg)
# Apply optional Timing Advance
if src_trx.ta is not 0:
msg.toa256 -= src_trx.ta * 256