trx_toolkit/fake_trx.py: refactor global class hierarchy

This change is a big step towards handling of multiple transceivers
in a single process, i.e. multiple MS and multiple BTS connections.

The old class hierarchy wasn't flexible enough, because initially
fake_trx was designed as a bridge between OsmocomBB and OsmoBTS,
but not as the burst router. There were two separate, but 90%
similar implementations of the CTRL interface, two variations
of each simulation parameter - one for UL, another for DL.

The following new classes are introduced:

  - Transceiver - represents a single transceiver, that can be
    used as for the BTS side, as for the MS side. Each instance
    has its own CTRL, DATA, and (optionally) CLCK interfaces,
    among with basic state variables, such as both RX / TX freq.,
    power state (running or idle) and list of active timeslots.

  - CTRLInterfaceTRX - unified control interface handler for
    common transceiver management commands, such as POWERON,
    RXTUNE, and SETSLOT. Deprecates both CTRLInterface{BB|BTS}.

  - FakeTRX - basically, a child of Transceiver, extended with
    RF path (burst loss, RSSI, TA, ToA) simulation. Implements
    a custom CTRL command handler for CTRLInterfaceTRX.

The following classes were refactored:

  - BurstForwarder - still performs burst forwarding, but now
    it doesn't store any simulation parameters, and doesn't
    know who is BTS, and who is MS. Actually, BurstForwarder
    transforms each L12TRX message into a TRX2L1 message, and
    dispatches it between running transceivers with matching
    RX frequency and matching timeslot.

  - FakePM - still generates random RSSI values, but doesn't
    distinguish between MS and BTS anymore. As soon as a
    measurement request is received, it attempts to find
    at least one running TRX on a given frequency.

Please note that fake_trx.py still does handle only a single pair
of MS and BTS. No regressions have been observed. Both new and
refactored classes were documented.

Change-Id: Ice44e2b22566b3652ef6d43896055963b13ab185
Related: OS#3667
This commit is contained in:
Vadim Yanitskiy 2018-12-10 17:39:51 +07:00
parent 1197382e8d
commit 152a2da8d2
7 changed files with 652 additions and 779 deletions

View File

@ -2,7 +2,7 @@
# -*- coding: utf-8 -*-
# TRX Toolkit
# BTS <-> BB burst forwarding
# Burst forwarding between transceivers
#
# (C) 2017-2018 by Vadim Yanitskiy <axilirator@gmail.com>
#
@ -23,321 +23,62 @@
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
import logging as log
import random
from data_msg import *
class BurstForwarder:
""" Performs burst forwarding and preprocessing between MS and BTS.
""" Performs burst forwarding between transceivers.
== Pass-filtering parameters
BurstForwarder distributes bursts between the list of given
FakeTRX (Transceiver) instances depending on the following
parameters of each transceiver:
BurstForwarder may drop or pass an UL/DL burst depending
on the following parameters:
- execution state (running or idle),
- actual RX / TX frequencies,
- list of active timeslots.
- bts_freq / bb_freq - the current BTS / MS frequency
that was set using RXTUNE control command. By default,
both freq. values are set to None, so nothing is being
forwarded (i.e. bursts are getting dropped).
FIXME: currently, we don't care about TXTUNE command
and transmit frequencies. It would be great to distinguish
between RX and TX frequencies for both BTS and MS.
- ts_pass_list - the list of active (i.e. configured)
timeslot numbers for the MS. A timeslot can be activated
or deactivated using SETSLOT control command from the MS.
FIXME: there is no such list for the BTS side.
== Preprocessing and measurement simulation
Since this is a virtual environment, we can simulate different
parameters of a virtual RF interface:
- ToA (Timing of Arrival) - measured difference between expected
and actual time of burst arrival in units of 1/256 of GSM symbol
periods. A pair of both base and threshold values defines a range
of ToA value randomization:
DL: from (toa256_dl_base - toa256_dl_threshold)
to (toa256_dl_base + toa256_dl_threshold),
UL: from (toa256_ul_base - toa256_ul_threshold)
to (toa256_ul_base + toa256_ul_threshold).
- RSSI (Received Signal Strength Indication) - measured "power" of
the signal (per burst) in dBm. A pair of both base and threshold
values defines a range of RSSI value randomization:
DL: from (rssi_dl_base - rssi_dl_threshold)
to (rssi_dl_base + rssi_dl_threshold),
UL: from (rssi_ul_base - rssi_ul_threshold)
to (rssi_ul_base + rssi_ul_threshold).
Please note that the randomization of both RSSI and ToA
is optional, and can be enabled from the control interface.
=== Timing Advance handling
The BTS is using ToA measurements for UL bursts in order to calculate
Timing Advance value, that is then indicated to a MS, which in its turn
shall apply this value to the transmitted signal in order to compensate
the delay. Basically, every burst is transmitted in advance defined by
the indicated Timing Advance value. The valid range is 0..63, where
each unit means one GSM symbol advance. The actual Timing Advance value
is set using SETTA control command from MS. By default, it's set to 0.
=== Path loss simulation - burst dropping
In some cases, e.g. due to a weak signal or high interference, a burst
can be lost, i.e. not detected by the receiver. This can also be
simulated using FAKE_DROP command on both control interfaces:
- burst_{dl|ul}_drop_amount - the amount of DL/UL bursts
to be dropped (i.e. not forwarded towards the MS/BTS),
- burst_{dl|ul}_drop_period - drop every X DL/UL burst, e.g.
1 - drop every consequent burst, 2 - drop every second burst, etc.
Each to be distributed L12TRX message is being transformed
into a TRX2L1 message, and then forwarded to transceivers
with partially initialized header. All uninitialized header
fields (such as rssi and toa256) shall be set by each
transceiver individually before sending towards the L1.
"""
def __init__(self, bts_link, bb_link):
self.bts_link = bts_link
self.bb_link = bb_link
def __init__(self, trx_list = []):
# List of Transceiver instances
self.trx_list = trx_list
# Init default parameters
self.reset_dl()
self.reset_ul()
def add_trx(self, trx):
if trx in self.trx_list:
log.error("TRX is already in the list")
return
# Initialize (or reset to) default parameters for Downlink
def reset_dl(self):
# Unset current DL freq.
self.bts_freq = None
self.trx_list.append(trx)
# Indicated RSSI / ToA values
self.toa256_dl_base = 0
self.rssi_dl_base = -60
def del_trx(self, trx):
if trx not in self.trx_list:
log.error("TRX is not in the list")
return
# RSSI / ToA randomization threshold
self.toa256_dl_threshold = 0
self.rssi_dl_threshold = 0
self.trx_list.remove(trx)
# Path loss simulation (burst dropping)
self.burst_dl_drop_amount = 0
self.burst_dl_drop_period = 1
def forward_msg(self, src_trx, rx_msg):
# Transform from L12TRX to TRX2L1
tx_msg = rx_msg.gen_trx2l1()
if tx_msg is None:
log.error("Forwarding failed, could not transform "
"message (%s) => dropping..." % rx_msg.desc_hdr())
# Initialize (or reset to) default parameters for Uplink
def reset_ul(self):
# Unset current DL freq.
self.bb_freq = None
# Iterate over all known transceivers
for trx in self.trx_list:
if trx == src_trx:
continue
# Indicated RSSI / ToA values
self.rssi_ul_base = -70
self.toa256_ul_base = 0
# Check transceiver state
if not trx.running:
continue
if trx.rx_freq != src_trx.tx_freq:
continue
if tx_msg.tn not in trx.ts_list:
continue
# RSSI / ToA randomization threshold
self.toa256_ul_threshold = 0
self.rssi_ul_threshold = 0
# Path loss simulation (burst dropping)
self.burst_ul_drop_amount = 0
self.burst_ul_drop_period = 1
# Init timeslot filter (drop everything by default)
self.ts_pass_list = []
# Reset Timing Advance value
self.ta = 0
# Converts TA value from symbols to
# units of 1/256 of GSM symbol periods
def calc_ta256(self):
return self.ta * 256
# Calculates a random ToA value for Downlink bursts
def calc_dl_toa256(self):
# Check if randomization is required
if self.toa256_dl_threshold is 0:
return self.toa256_dl_base
# Calculate a range for randomization
toa256_min = self.toa256_dl_base - self.toa256_dl_threshold
toa256_max = self.toa256_dl_base + self.toa256_dl_threshold
# Generate a random ToA value
toa256 = random.randint(toa256_min, toa256_max)
return toa256
# Calculates a random ToA value for Uplink bursts
def calc_ul_toa256(self):
# Check if randomization is required
if self.toa256_ul_threshold is 0:
return self.toa256_ul_base
# Calculate a range for randomization
toa256_min = self.toa256_ul_base - self.toa256_ul_threshold
toa256_max = self.toa256_ul_base + self.toa256_ul_threshold
# Generate a random ToA value
toa256 = random.randint(toa256_min, toa256_max)
return toa256
# Calculates a random RSSI value for Downlink bursts
def calc_dl_rssi(self):
# Check if randomization is required
if self.rssi_dl_threshold is 0:
return self.rssi_dl_base
# Calculate a range for randomization
rssi_min = self.rssi_dl_base - self.rssi_dl_threshold
rssi_max = self.rssi_dl_base + self.rssi_dl_threshold
# Generate a random RSSI value
return random.randint(rssi_min, rssi_max)
# Calculates a random RSSI value for Uplink bursts
def calc_ul_rssi(self):
# Check if randomization is required
if self.rssi_ul_threshold is 0:
return self.rssi_ul_base
# Calculate a range for randomization
rssi_min = self.rssi_ul_base - self.rssi_ul_threshold
rssi_max = self.rssi_ul_base + self.rssi_ul_threshold
# Generate a random RSSI value
return random.randint(rssi_min, rssi_max)
# DL path loss simulation
def path_loss_sim_dl(self, msg):
# Burst dropping
if self.burst_dl_drop_amount > 0:
if msg.fn % self.burst_dl_drop_period == 0:
log.info("Simulation: dropping DL burst (fn=%u %% %u == 0)"
% (msg.fn, self.burst_dl_drop_period))
self.burst_dl_drop_amount -= 1
return None
return msg
# UL path loss simulation
def path_loss_sim_ul(self, msg):
# Burst dropping
if self.burst_ul_drop_amount > 0:
if msg.fn % self.burst_ul_drop_period == 0:
log.info("Simulation: dropping UL burst (fn=%u %% %u == 0)"
% (msg.fn, self.burst_ul_drop_period))
self.burst_ul_drop_amount -= 1
return None
return msg
# DL burst preprocessing
def preprocess_dl_burst(self, msg):
# Calculate both RSSI and ToA values
msg.toa256 = self.calc_dl_toa256()
msg.rssi = self.calc_dl_rssi()
# UL burst preprocessing
def preprocess_ul_burst(self, msg):
# Calculate both RSSI and ToA values,
# also apply Timing Advance
msg.toa256 = self.calc_ul_toa256()
msg.toa256 -= self.calc_ta256()
msg.rssi = self.calc_ul_rssi()
# Converts a L12TRX message to TRX2L1 message
def transform_msg(self, msg_raw):
# Attempt to parse a message
try:
msg_l12trx = DATAMSG_L12TRX()
msg_l12trx.parse_msg(bytearray(msg_raw))
except:
log.error("Dropping unhandled DL message...")
return None
# Compose a new message for L1
return msg_l12trx.gen_trx2l1()
# Downlink handler: BTS -> BB
def bts2bb(self):
# Read data from socket
data, addr = self.bts_link.sock.recvfrom(512)
# BB is not connected / tuned
if self.bb_freq is None:
return None
# Freq. filter
if self.bb_freq != self.bts_freq:
return None
# Process a message
msg = self.transform_msg(data)
if msg is None:
return None
# Timeslot filter
if msg.tn not in self.ts_pass_list:
return None
# Path loss simulation
msg = self.path_loss_sim_dl(msg)
if msg is None:
return None
# Burst preprocessing
self.preprocess_dl_burst(msg)
# Validate and generate the payload
payload = msg.gen_msg()
# Append two unused bytes at the end
# in order to keep the compatibility
payload += bytearray(2)
# Send burst to BB
self.bb_link.send(payload)
# Uplink handler: BB -> BTS
def bb2bts(self):
# Read data from socket
data, addr = self.bb_link.sock.recvfrom(512)
# BTS is not connected / tuned
if self.bts_freq is None:
return None
# Freq. filter
if self.bb_freq != self.bts_freq:
return None
# Process a message
msg = self.transform_msg(data)
if msg is None:
return None
# Timeslot filter
if msg.tn not in self.ts_pass_list:
log.warning("TS %u is not configured, dropping UL burst..." % msg.tn)
return None
# Path loss simulation
msg = self.path_loss_sim_ul(msg)
if msg is None:
return None
# Burst preprocessing
self.preprocess_ul_burst(msg)
# Validate and generate the payload
payload = msg.gen_msg()
# Append two unused bytes at the end
# in order to keep the compatibility
payload += bytearray(2)
# Send burst to BTS
self.bts_link.send(payload)
trx.send_data_msg(src_trx, tx_msg)

View File

@ -1,219 +0,0 @@
#!/usr/bin/env python2
# -*- coding: utf-8 -*-
# TRX Toolkit
# CTRL interface implementation (OsmocomBB specific)
#
# (C) 2016-2017 by Vadim Yanitskiy <axilirator@gmail.com>
#
# All Rights Reserved
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
import logging as log
from ctrl_if import CTRLInterface
class CTRLInterfaceBB(CTRLInterface):
# Internal state variables
trx_started = False
burst_fwd = None
rx_freq = None
tx_freq = None
pm = None
def __init__(self, *udp_link_args):
CTRLInterface.__init__(self, *udp_link_args)
log.info("Init CTRL interface for BB (%s)" % self.desc_link())
def parse_cmd(self, request):
# Power control
if self.verify_cmd(request, "POWERON", 0):
log.debug("Recv POWERON CMD")
# Ensure transceiver isn't working
if self.trx_started:
log.error("Transceiver already started")
return -1
# Ensure RX / TX freq. are set
if (self.rx_freq is None) or (self.tx_freq is None):
log.error("RX / TX freq. are not set")
return -1
log.info("Starting transceiver...")
self.trx_started = True
return 0
elif self.verify_cmd(request, "POWEROFF", 0):
log.debug("Recv POWEROFF cmd")
log.info("Stopping transceiver...")
self.trx_started = False
return 0
# Tuning Control
elif self.verify_cmd(request, "RXTUNE", 1):
log.debug("Recv RXTUNE cmd")
# TODO: check freq range
self.rx_freq = int(request[1]) * 1000
self.burst_fwd.bb_freq = self.rx_freq
return 0
elif self.verify_cmd(request, "TXTUNE", 1):
log.debug("Recv TXTUNE cmd")
# TODO: check freq range
self.tx_freq = int(request[1]) * 1000
return 0
# Power measurement
elif self.verify_cmd(request, "MEASURE", 1):
log.debug("Recv MEASURE cmd")
if self.pm is None:
return -1
# TODO: check freq range
meas_freq = int(request[1]) * 1000
meas_dbm = str(self.pm.measure(meas_freq))
return (0, [meas_dbm])
elif self.verify_cmd(request, "SETSLOT", 2):
log.debug("Recv SETSLOT cmd")
if self.burst_fwd is None:
return -1
# Obtain TS index
ts = int(request[1])
if ts not in range(0, 8):
log.error("TS index should be in range: 0..7")
return -1
# Parse TS type
ts_type = int(request[2])
# TS activation / deactivation
# We don't care about ts_type
if ts_type == 0:
# Deactivate TS (remove from TS pass-filter list)
if ts in self.burst_fwd.ts_pass_list:
self.burst_fwd.ts_pass_list.remove(ts)
else:
# Activate TS (add to TS pass-filter list)
if ts not in self.burst_fwd.ts_pass_list:
self.burst_fwd.ts_pass_list.append(ts)
return 0
# Timing Advance
elif self.verify_cmd(request, "SETTA", 1):
log.debug("Recv SETTA cmd")
# Save to the BurstForwarder instance
self.burst_fwd.ta = int(request[1])
return 0
# Timing of Arrival simulation for Uplink
# Absolute form: CMD FAKE_TOA <BASE> <THRESH>
elif self.verify_cmd(request, "FAKE_TOA", 2):
log.debug("Recv FAKE_TOA cmd")
# Parse and apply both base and threshold
self.burst_fwd.toa256_ul_base = int(request[1])
self.burst_fwd.toa256_ul_threshold = int(request[2])
return 0
# Timing of Arrival simulation for Uplink
# Relative form: CMD FAKE_TOA <+-BASE_DELTA>
elif self.verify_cmd(request, "FAKE_TOA", 1):
log.debug("Recv FAKE_TOA cmd")
# Parse and apply delta
self.burst_fwd.toa256_ul_base += int(request[1])
return 0
# RSSI simulation for Uplink
# Absolute form: CMD FAKE_RSSI <BASE> <THRESH>
elif self.verify_cmd(request, "FAKE_RSSI", 2):
log.debug("Recv FAKE_RSSI cmd")
# Parse and apply both base and threshold
self.burst_fwd.rssi_ul_base = int(request[1])
self.burst_fwd.rssi_ul_threshold = int(request[2])
return 0
# RSSI simulation for Uplink
# Relative form: CMD FAKE_RSSI <+-BASE_DELTA>
elif self.verify_cmd(request, "FAKE_RSSI", 1):
log.debug("Recv FAKE_RSSI cmd")
# Parse and apply delta
self.burst_fwd.rssi_ul_base += int(request[1])
return 0
# Path loss simulation for UL: burst dropping
# Syntax: CMD FAKE_DROP <AMOUNT>
# Dropping pattern: fn % 1 == 0
elif self.verify_cmd(request, "FAKE_DROP", 1):
log.debug("Recv FAKE_DROP cmd")
# Parse / validate amount of bursts
num = int(request[1])
if num < 0:
log.error("FAKE_DROP amount shall not be negative")
return -1
self.burst_fwd.burst_ul_drop_amount = num
self.burst_fwd.burst_ul_drop_period = 1
return 0
# Path loss simulation for UL: burst dropping
# Syntax: CMD FAKE_DROP <AMOUNT> <FN_PERIOD>
# Dropping pattern: fn % period == 0
elif self.verify_cmd(request, "FAKE_DROP", 2):
log.debug("Recv FAKE_DROP cmd")
# Parse / validate amount of bursts
num = int(request[1])
if num < 0:
log.error("FAKE_DROP amount shall not be negative")
return -1
# Parse / validate period
period = int(request[2])
if period <= 0:
log.error("FAKE_DROP period shall be greater than zero")
return -1
self.burst_fwd.burst_ul_drop_amount = num
self.burst_fwd.burst_ul_drop_period = period
return 0
# Wrong / unknown command
else:
# We don't care about other commands,
# so let's merely ignore them ;)
log.debug("Ignore CMD %s" % request[0])
return 0

View File

@ -1,189 +0,0 @@
#!/usr/bin/env python2
# -*- coding: utf-8 -*-
# TRX Toolkit
# CTRL interface implementation (OsmoBTS specific)
#
# (C) 2016-2017 by Vadim Yanitskiy <axilirator@gmail.com>
#
# All Rights Reserved
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
import logging as log
from ctrl_if import CTRLInterface
class CTRLInterfaceBTS(CTRLInterface):
# Internal state variables
trx_started = False
burst_fwd = None
clck_gen = None
rx_freq = None
tx_freq = None
pm = None
def __init__(self, *udp_link_args):
CTRLInterface.__init__(self, *udp_link_args)
log.info("Init CTRL interface for BTS (%s)" % self.desc_link())
def parse_cmd(self, request):
# Power control
if self.verify_cmd(request, "POWERON", 0):
log.debug("Recv POWERON CMD")
# Ensure transceiver isn't working
if self.trx_started:
log.error("Transceiver already started")
return -1
# Ensure RX / TX freq. are set
if (self.rx_freq is None) or (self.tx_freq is None):
log.error("RX / TX freq. are not set")
return -1
log.info("Starting transceiver...")
self.trx_started = True
# Power emulation
if self.pm is not None:
self.pm.add_bts_list([self.tx_freq])
# Start clock indications
if self.clck_gen is not None:
self.clck_gen.start()
return 0
elif self.verify_cmd(request, "POWEROFF", 0):
log.debug("Recv POWEROFF cmd")
log.info("Stopping transceiver...")
self.trx_started = False
# Power emulation
if self.pm is not None:
self.pm.del_bts_list([self.tx_freq])
# Stop clock indications
if self.clck_gen is not None:
self.clck_gen.stop()
return 0
# Tuning Control
elif self.verify_cmd(request, "RXTUNE", 1):
log.debug("Recv RXTUNE cmd")
# TODO: check freq range
self.rx_freq = int(request[1]) * 1000
return 0
elif self.verify_cmd(request, "TXTUNE", 1):
log.debug("Recv TXTUNE cmd")
# TODO: check freq range
self.tx_freq = int(request[1]) * 1000
self.burst_fwd.bts_freq = self.tx_freq
return 0
# Timing of Arrival simulation for Downlink
# Absolute form: CMD FAKE_TOA <BASE> <THRESH>
elif self.verify_cmd(request, "FAKE_TOA", 2):
log.debug("Recv FAKE_TOA cmd")
# Parse and apply both base and threshold
self.burst_fwd.toa256_dl_base = int(request[1])
self.burst_fwd.toa256_dl_threshold = int(request[2])
return 0
# Timing of Arrival simulation for Downlink
# Relative form: CMD FAKE_TOA <+-BASE_DELTA>
elif self.verify_cmd(request, "FAKE_TOA", 1):
log.debug("Recv FAKE_TOA cmd")
# Parse and apply delta
self.burst_fwd.toa256_dl_base += int(request[1])
return 0
# RSSI simulation for Downlink
# Absolute form: CMD FAKE_RSSI <BASE> <THRESH>
elif self.verify_cmd(request, "FAKE_RSSI", 2):
log.debug("Recv FAKE_RSSI cmd")
# Parse and apply both base and threshold
self.burst_fwd.rssi_dl_base = int(request[1])
self.burst_fwd.rssi_dl_threshold = int(request[2])
return 0
# RSSI simulation for Downlink
# Relative form: CMD FAKE_RSSI <+-BASE_DELTA>
elif self.verify_cmd(request, "FAKE_RSSI", 1):
log.debug("Recv FAKE_RSSI cmd")
# Parse and apply delta
self.burst_fwd.rssi_dl_base += int(request[1])
return 0
# Path loss simulation for DL: burst dropping
# Syntax: CMD FAKE_DROP <AMOUNT>
# Dropping pattern: fn % 1 == 0
elif self.verify_cmd(request, "FAKE_DROP", 1):
log.debug("Recv FAKE_DROP cmd")
# Parse / validate amount of bursts
num = int(request[1])
if num < 0:
log.error("FAKE_DROP amount shall not be negative")
return -1
self.burst_fwd.burst_dl_drop_amount = num
self.burst_fwd.burst_dl_drop_period = 1
return 0
# Path loss simulation for DL: burst dropping
# Syntax: CMD FAKE_DROP <AMOUNT> <FN_PERIOD>
# Dropping pattern: fn % period == 0
elif self.verify_cmd(request, "FAKE_DROP", 2):
log.debug("Recv FAKE_DROP cmd")
# Parse / validate amount of bursts
num = int(request[1])
if num < 0:
log.error("FAKE_DROP amount shall not be negative")
return -1
# Parse / validate period
period = int(request[2])
if period <= 0:
log.error("FAKE_DROP period shall be greater than zero")
return -1
self.burst_fwd.burst_dl_drop_amount = num
self.burst_fwd.burst_dl_drop_period = period
return 0
# Wrong / unknown command
else:
# We don't care about other commands,
# so let's merely ignore them ;)
log.debug("Ignore CMD %s" % request[0])
return 0

View File

@ -0,0 +1,155 @@
#!/usr/bin/env python2
# -*- coding: utf-8 -*-
# TRX Toolkit
# CTRL interface implementation (common commands)
#
# (C) 2016-2018 by Vadim Yanitskiy <axilirator@gmail.com>
#
# All Rights Reserved
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
import logging as log
from ctrl_if import CTRLInterface
class CTRLInterfaceTRX(CTRLInterface):
""" CTRL interface handler for common transceiver management commands.
The following set of commands is mandatory for every transceiver:
- POWERON / POWEROFF - state management (running / idle),
- RXTUNE / TXTUNE - RX / TX frequency management,
- SETSLOT - timeslot management.
Additionally, there is an optional MEASURE command, which is used
by OsmocomBB to perform power measurement on a given frequency.
A given transceiver may also define its own command handler,
that is prioritized, i.e. it can overwrite any commands mentioned
above. If None is returned, a command is considered as unhandled.
"""
def __init__(self, trx, *udp_link_args):
CTRLInterface.__init__(self, *udp_link_args)
log.info("Init CTRL interface (%s)" % self.desc_link())
# Link with Transceiver instance we belong to
self.trx = trx
def parse_cmd(self, request):
# Custom command handlers (prioritized)
res = self.trx.ctrl_cmd_handler(request)
if res is not None:
return res
# Power control
if self.verify_cmd(request, "POWERON", 0):
log.debug("Recv POWERON CMD")
# Ensure transceiver isn't working
if self.trx.running:
log.error("Transceiver already started")
return -1
# Ensure RX / TX freq. are set
if (self.trx.rx_freq is None) or (self.trx.tx_freq is None):
log.error("RX / TX freq. are not set")
return -1
log.info("Starting transceiver...")
self.trx.running = True
# Notify transceiver about that
self.trx.power_event_handler("POWERON")
return 0
elif self.verify_cmd(request, "POWEROFF", 0):
log.debug("Recv POWEROFF cmd")
log.info("Stopping transceiver...")
self.trx.running = False
# Notify transceiver about that
self.trx.power_event_handler("POWEROFF")
return 0
# Tuning Control
elif self.verify_cmd(request, "RXTUNE", 1):
log.debug("Recv RXTUNE cmd")
# TODO: check freq range
self.trx.rx_freq = int(request[1]) * 1000
return 0
elif self.verify_cmd(request, "TXTUNE", 1):
log.debug("Recv TXTUNE cmd")
# TODO: check freq range
self.trx.tx_freq = int(request[1]) * 1000
return 0
elif self.verify_cmd(request, "SETSLOT", 2):
log.debug("Recv SETSLOT cmd")
# Obtain TS index
ts = int(request[1])
if ts not in range(0, 8):
log.error("TS index should be in range: 0..7")
return -1
# Parse TS type
ts_type = int(request[2])
# TS activation / deactivation
# We don't care about ts_type
if ts_type == 0:
# Deactivate TS (remove from the list of active timeslots)
if ts in self.trx.ts_list:
self.trx.ts_list.remove(ts)
else:
# Activate TS (add to the list of active timeslots)
if ts not in self.trx.ts_list:
self.trx.ts_list.append(ts)
return 0
# Power measurement
if self.verify_cmd(request, "MEASURE", 1):
log.debug("Recv MEASURE cmd")
# Power Measurement interface is optional
# for Transceiver, thus may be uninitialized
if self.trx.pwr_meas is None:
log.error("Power Measurement interface "
"is not initialized => rejecting command")
return -1
# TODO: check freq range
meas_freq = int(request[1]) * 1000
meas_dbm = self.trx.pwr_meas.measure(meas_freq)
return (0, [str(meas_dbm)])
# Wrong / unknown command
else:
# We don't care about other commands,
# so let's merely ignore them ;)
log.debug("Ignore CMD %s" % request[0])
return 0

View File

@ -2,9 +2,9 @@
# -*- coding: utf-8 -*-
# TRX Toolkit
# Power measurement emulation for BB
# Power measurement emulation
#
# (C) 2017 by Vadim Yanitskiy <axilirator@gmail.com>
# (C) 2017-2018 by Vadim Yanitskiy <axilirator@gmail.com>
#
# All Rights Reserved
#
@ -25,29 +25,53 @@
from random import randint
class FakePM:
# Freq. list for good power level
bts_list = []
""" Power measurement emulation for fake transceivers.
def __init__(self, noise_min, noise_max, bts_min, bts_max):
# Save power level ranges
There is no such thing like RF signal level in fake Um-interface,
so we need to emulate this. The main idea is to have a list of
all running and idle transceivers. As soon as a measurement
request is received, FakePM will attempt to find a running
transceiver on a given frequency.
The result of such "measurement" is a random RSSI value
in one of the following ranges:
- trx_min ... trx_max - if at least one TRX was found,
- noise_min ... noise_max - no TRX instances were found.
FIXME: it would be great to average the rate of bursts
and indicated power / attenuation values for all
matching transceivers, so "pure traffic" ARFCNs
would be handled properly.
"""
def __init__(self, noise_min, noise_max, trx_min, trx_max):
# Init list of transceivers
self.trx_list = []
# RSSI randomization ranges
self.noise_min = noise_min
self.noise_max = noise_max
self.bts_min = bts_min
self.bts_max = bts_max
self.trx_min = trx_min
self.trx_max = trx_max
def measure(self, bts):
if bts in self.bts_list:
return randint(self.bts_min, self.bts_max)
else:
return randint(self.noise_min, self.noise_max)
@property
def rssi_noise(self):
return randint(self.noise_min, self.noise_max)
def update_bts_list(self, new_list):
self.bts_list = new_list
@property
def rssi_trx(self):
return randint(self.trx_min, self.trx_max)
def add_bts_list(self, add_list):
self.bts_list += add_list
def measure(self, freq):
# Iterate over all known transceivers
for trx in self.trx_list:
if not trx.running:
continue
def del_bts_list(self, del_list):
for item in del_list:
if item in self.bts_list:
self.bts_list.remove(item)
# Match by given frequency
if trx.tx_freq == freq:
return self.rssi_trx
return self.rssi_noise

View File

@ -27,17 +27,237 @@ APP_CR_HOLDERS = [("2017-2018", "Vadim Yanitskiy <axilirator@gmail.com>")]
import logging as log
import signal
import argparse
import random
import select
import sys
from app_common import ApplicationBase
from ctrl_if_bts import CTRLInterfaceBTS
from ctrl_if_bb import CTRLInterfaceBB
from burst_fwd import BurstForwarder
from transceiver import Transceiver
from clck_gen import CLCKGen
from fake_pm import FakePM
from udp_link import UDPLink
from clck_gen import CLCKGen
class FakeTRX(Transceiver):
""" Fake transceiver with RF path (burst loss, RSSI, TA, ToA) simulation.
== ToA / RSSI measurement simulation
Since this is a virtual environment, we can simulate different
parameters of the physical RF interface:
- ToA (Timing of Arrival) - measured difference between expected
and actual time of burst arrival in units of 1/256 of GSM symbol
periods. A pair of both base and threshold values defines a range
of ToA value randomization:
from (toa256_base - toa256_rand_threshold)
to (toa256_base + toa256_rand_threshold).
- RSSI (Received Signal Strength Indication) - measured "power" of
the signal (per burst) in dBm. A pair of both base and threshold
values defines a range of RSSI value randomization:
from (rssi_base - rssi_rand_threshold)
to (rssi_base + rssi_rand_threshold).
Please note that randomization of both RSSI and ToA is optional,
and can be enabled from the control interface.
== Timing Advance handling
The BTS is using ToA measurements for UL bursts in order to calculate
Timing Advance value, that is then indicated to a MS, which in its turn
shall apply this value to the transmitted signal in order to compensate
the delay. Basically, every burst is transmitted in advance defined by
the indicated Timing Advance value. The valid range is 0..63, where
each unit means one GSM symbol advance. The actual Timing Advance value
is set using SETTA control command from MS. By default, it's set to 0.
== Path loss simulation
=== Burst dropping
In some cases, e.g. due to a weak signal or high interference, a burst
can be lost, i.e. not detected by the receiver. This can also be
simulated using FAKE_DROP command on the control interface:
- burst_drop_amount - the amount of DL/UL bursts
to be dropped (i.e. not forwarded towards the MS/BTS),
- burst_drop_period - drop a DL/UL burst if its (fn % period) == 0.
== Configuration
All simulation parameters mentioned above can be changed at runtime
using the commands with prefix 'FAKE_' on the control interface.
All of them are handled by our custom CTRL command handler.
"""
TOA256_BASE_DEFAULT = 0
RSSI_BASE_DEFAULT = -60
def __init__(self, *trx_args, **trx_kwargs):
Transceiver.__init__(self, *trx_args, **trx_kwargs)
# Actual ToA / RSSI / TA values
self.toa256_base = self.TOA256_BASE_DEFAULT
self.rssi_base = self.RSSI_BASE_DEFAULT
self.ta = 0
# ToA / RSSI randomization threshold
self.toa256_rand_threshold = 0
self.rssi_rand_threshold = 0
# Path loss simulation (burst dropping)
self.burst_drop_amount = 0
self.burst_drop_period = 1
@property
def toa256(self):
# Check if randomization is required
if self.toa256_rand_threshold is 0:
return self.toa256_base
# Generate a random ToA value in required range
toa256_min = self.toa256_base - self.toa256_rand_threshold
toa256_max = self.toa256_base + self.toa256_rand_threshold
return random.randint(toa256_min, toa256_max)
@property
def rssi(self):
# Check if randomization is required
if self.rssi_rand_threshold is 0:
return self.rssi_base
# Generate a random RSSI value in required range
rssi_min = self.rssi_base - self.rssi_rand_threshold
rssi_max = self.rssi_base + self.rssi_rand_threshold
return random.randint(rssi_min, rssi_max)
# Path loss simulation: burst dropping
# Returns: True - drop, False - keep
def sim_burst_drop(self, msg):
# Check if dropping is required
if self.burst_drop_amount is 0:
return False
if msg.fn % self.burst_drop_period == 0:
log.info("Simulation: dropping burst (fn=%u %% %u == 0)"
% (msg.fn, self.burst_drop_period))
self.burst_drop_amount -= 1
return True
return False
# Takes (partially initialized) TRX2L1 message,
# simulates RF path parameters (such as RSSI),
# and sends towards the L1
def send_data_msg(self, src_trx, msg):
# Complete message header
msg.toa256 = self.toa256
msg.rssi = self.rssi
# Apply optional Timing Advance
if src_trx.ta is not 0:
msg.toa256 -= src_trx.ta * 256
# Path loss simulation
if self.sim_burst_drop(msg):
return
# TODO: make legacy mode configurable (via argv?)
self.data_if.send_msg(msg, legacy = True)
# Simulation specific CTRL command handler
def ctrl_cmd_handler(self, request):
# Timing Advance
# Syntax: CMD SETTA <TA>
if self.ctrl_if.verify_cmd(request, "SETTA", 1):
log.debug("Recv SETTA cmd")
# Store indicated value
self.ta = int(request[1])
return 0
# Timing of Arrival simulation
# Absolute form: CMD FAKE_TOA <BASE> <THRESH>
elif self.ctrl_if.verify_cmd(request, "FAKE_TOA", 2):
log.debug("Recv FAKE_TOA cmd")
# Parse and apply both base and threshold
self.toa256_base = int(request[1])
self.toa256_rand_threshold = int(request[2])
return 0
# Timing of Arrival simulation
# Relative form: CMD FAKE_TOA <+-BASE_DELTA>
elif self.ctrl_if.verify_cmd(request, "FAKE_TOA", 1):
log.debug("Recv FAKE_TOA cmd")
# Parse and apply delta
self.toa256_base += int(request[1])
return 0
# RSSI simulation
# Absolute form: CMD FAKE_RSSI <BASE> <THRESH>
elif self.ctrl_if.verify_cmd(request, "FAKE_RSSI", 2):
log.debug("Recv FAKE_RSSI cmd")
# Parse and apply both base and threshold
self.rssi_base = int(request[1])
self.rssi_rand_threshold = int(request[2])
return 0
# RSSI simulation
# Relative form: CMD FAKE_RSSI <+-BASE_DELTA>
elif self.ctrl_if.verify_cmd(request, "FAKE_RSSI", 1):
log.debug("Recv FAKE_RSSI cmd")
# Parse and apply delta
self.rssi_base += int(request[1])
return 0
# Path loss simulation: burst dropping
# Syntax: CMD FAKE_DROP <AMOUNT>
# Dropping pattern: fn % 1 == 0
elif self.ctrl_if.verify_cmd(request, "FAKE_DROP", 1):
log.debug("Recv FAKE_DROP cmd")
# Parse / validate amount of bursts
num = int(request[1])
if num < 0:
log.error("FAKE_DROP amount shall not be negative")
return -1
self.burst_drop_amount = num
self.burst_drop_period = 1
return 0
# Path loss simulation: burst dropping
# Syntax: CMD FAKE_DROP <AMOUNT> <FN_PERIOD>
# Dropping pattern: fn % period == 0
elif self.ctrl_if.verify_cmd(request, "FAKE_DROP", 2):
log.debug("Recv FAKE_DROP cmd")
# Parse / validate amount of bursts
num = int(request[1])
if num < 0:
log.error("FAKE_DROP amount shall not be negative")
return -1
# Parse / validate period
period = int(request[2])
if period <= 0:
log.error("FAKE_DROP period shall be greater than zero")
return -1
self.burst_drop_amount = num
self.burst_drop_period = period
return 0
# Unhandled command
return None
class Application(ApplicationBase):
def __init__(self):
@ -51,72 +271,58 @@ class Application(ApplicationBase):
self.app_init_logging(self.argv)
def run(self):
# Init TRX CTRL interface for BTS
self.bts_ctrl = CTRLInterfaceBTS(
self.argv.bts_addr, self.argv.bts_base_port + 101,
self.argv.trx_bind_addr, self.argv.bts_base_port + 1)
# Init TRX CTRL interface for BB
self.bb_ctrl = CTRLInterfaceBB(
self.argv.bb_addr, self.argv.bb_base_port + 101,
self.argv.trx_bind_addr, self.argv.bb_base_port + 1)
# Init shared clock generator
self.clck_gen = CLCKGen([])
# Power measurement emulation
# Noise: -120 .. -105
# BTS: -75 .. -50
self.pm = FakePM(-120, -105, -75, -50)
self.fake_pm = FakePM(-120, -105, -75, -50)
# Share a FakePM instance between both BTS and BB
self.bts_ctrl.pm = self.pm
self.bb_ctrl.pm = self.pm
# Init TRX instance for BTS
self.bts_trx = FakeTRX(self.argv.trx_bind_addr,
self.argv.bts_addr, self.argv.bts_base_port,
clck_gen = self.clck_gen)
# Init DATA links
self.bts_data = UDPLink(
self.argv.bts_addr, self.argv.bts_base_port + 102,
self.argv.trx_bind_addr, self.argv.bts_base_port + 2)
self.bb_data = UDPLink(
self.argv.bb_addr, self.argv.bb_base_port + 102,
self.argv.trx_bind_addr, self.argv.bb_base_port + 2)
# Init TRX instance for BB
self.bb_trx = FakeTRX(self.argv.trx_bind_addr,
self.argv.bb_addr, self.argv.bb_base_port,
pwr_meas = self.fake_pm)
# BTS <-> BB burst forwarding
self.burst_fwd = BurstForwarder(self.bts_data, self.bb_data)
# Share a BurstForwarder instance between BTS and BB
self.bts_ctrl.burst_fwd = self.burst_fwd
self.bb_ctrl.burst_fwd = self.burst_fwd
# Provide clock to BTS
self.bts_clck = UDPLink(
self.argv.bts_addr, self.argv.bts_base_port + 100,
self.argv.trx_bind_addr, self.argv.bts_base_port)
self.clck_gen = CLCKGen([self.bts_clck])
self.bts_ctrl.clck_gen = self.clck_gen
# Burst forwarding between transceivers
self.burst_fwd = BurstForwarder()
self.burst_fwd.add_trx(self.bts_trx)
self.burst_fwd.add_trx(self.bb_trx)
log.info("Init complete")
# Enter main loop
while True:
socks = [self.bts_ctrl.sock, self.bb_ctrl.sock,
self.bts_data.sock, self.bb_data.sock]
socks = [self.bts_trx.ctrl_if.sock, self.bb_trx.ctrl_if.sock,
self.bts_trx.data_if.sock, self.bb_trx.data_if.sock]
# Wait until we get any data on any socket
r_event, w_event, x_event = select.select(socks, [], [])
# Downlink: BTS -> BB
if self.bts_data.sock in r_event:
self.burst_fwd.bts2bb()
if self.bts_trx.data_if.sock in r_event:
msg = self.bts_trx.recv_data_msg()
if msg is not None:
self.burst_fwd.forward_msg(self.bts_trx, msg)
# Uplink: BB -> BTS
if self.bb_data.sock in r_event:
self.burst_fwd.bb2bts()
if self.bb_trx.data_if.sock in r_event:
msg = self.bb_trx.recv_data_msg()
if msg is not None:
self.burst_fwd.forward_msg(self.bb_trx, msg)
# CTRL commands from BTS
if self.bts_ctrl.sock in r_event:
self.bts_ctrl.handle_rx()
if self.bts_trx.ctrl_if.sock in r_event:
self.bts_trx.ctrl_if.handle_rx()
# CTRL commands from BB
if self.bb_ctrl.sock in r_event:
self.bb_ctrl.handle_rx()
if self.bb_trx.ctrl_if.sock in r_event:
self.bb_trx.ctrl_if.handle_rx()
def shutdown(self):
log.info("Shutting down...")

View File

@ -0,0 +1,155 @@
#!/usr/bin/env python2
# -*- coding: utf-8 -*-
# TRX Toolkit
# Transceiver implementation
#
# (C) 2018 by Vadim Yanitskiy <axilirator@gmail.com>
#
# All Rights Reserved
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
import logging as log
from ctrl_if_trx import CTRLInterfaceTRX
from data_if import DATAInterface
from udp_link import UDPLink
class Transceiver:
""" Base transceiver implementation.
Represents a single transceiver, that can be used as for the BTS side,
as for the MS side. Each individual instance of Transceiver unifies
three basic interfaces built on three independent UDP connections:
- CLCK (base port + 100/0) - clock indications from TRX to L1,
- CTRL (base port + 101/1) - control interface for L1,
- DATA (base port + 102/2) - bidirectional data interface for bursts.
A transceiver can be either in active (i.e. working), or in idle mode.
The active mode should ensure that both RX/TX frequencies are set.
NOTE: CLCK is not required for some L1 implementations, so it is optional.
== Timeslot configuration
Transceiver has a list of active (i.e. configured) TDMA timeslots.
The L1 should configure a timeslot before sending or expecting any
data on it. This is done by SETSLOT control command, which also
indicates an associated channel combination (see GSM TS 05.02).
NOTE: we don't store the associated channel combinations,
as they are only useful for burst detection and demodulation.
== Clock distribution (optional)
The clock indications are not expected by L1 when transceiver
is not running, so we monitor both POWERON / POWEROFF events
from the control interface, and keep the list of CLCK links
in a given CLCKGen instance updated. The clock generator is
started and stopped automatically.
NOTE: a single instance of CLCKGen can be shared between multiple
transceivers, as well as multiple transceivers may use
individual CLCKGen instances.
== Power Measurement (optional)
Transceiver may have an optional power measurement interface,
that shall provide at least one method: measure(freq). This
is required for the MS side (i.e. OsmocomBB).
"""
def __init__(self, bind_addr, remote_addr, base_port,
clck_gen = None, pwr_meas = None):
# Connection info
self.remote_addr = remote_addr
self.bind_addr = bind_addr
self.base_port = base_port
# Init DATA interface
self.data_if = DATAInterface(
remote_addr, base_port + 102,
bind_addr, base_port + 2)
# Init CTRL interface
self.ctrl_if = CTRLInterfaceTRX(self,
remote_addr, base_port + 101,
bind_addr, base_port + 1)
# Init optional CLCK interface
self.clck_gen = clck_gen
if clck_gen is not None:
self.clck_if = UDPLink(
remote_addr, base_port + 100,
bind_addr, base_port)
# Optional Power Measurement interface
self.pwr_meas = pwr_meas
# Internal state
self.running = False
# Actual RX / TX frequencies
self.rx_freq = None
self.tx_freq = None
# List of active (configured) timeslots
self.ts_list = []
# To be overwritten if required,
# no custom command handlers by default
def ctrl_cmd_handler(self, request):
return None
def power_event_handler(self, event):
# Trigger clock generator if required
if self.clck_gen is not None:
clck_links = self.clck_gen.clck_links
if not self.running and (self.clck_if in clck_links):
# Transceiver was stopped
clck_links.remove(self.clck_if)
elif self.running and (self.clck_if not in clck_links):
# Transceiver was started
clck_links.append(self.clck_if)
if not self.clck_gen.timer and len(clck_links) > 0:
log.info("Starting clock generator")
self.clck_gen.start()
elif self.clck_gen.timer and not clck_links:
log.info("Stopping clock generator")
self.clck_gen.stop()
def recv_data_msg(self):
# Read and parse data from socket
msg = self.data_if.recv_l12trx_msg()
if not msg:
return None
# Make sure that transceiver is configured and running
if not self.running:
log.warning("RX DATA message (%s), but transceiver "
"is not running => dropping..." % msg.desc_hdr())
return None
# Make sure that indicated timeslot is configured
if msg.tn not in self.ts_list:
log.warning("RX DATA message (%s), but timeslot "
"is not configured => dropping..." % msg.desc_hdr())
return None
return msg