python/trx: introduce and use Transceiver class

Change-Id: I6dc88edbb69a68746cc8e01206dc86f7ea2fa80f
This commit is contained in:
Vadim Yanitskiy 2019-01-19 10:22:59 +07:00
parent baebe451cd
commit 180a037a41
6 changed files with 142 additions and 54 deletions

View File

@ -29,8 +29,8 @@ from argparse import ArgumentParser
from argparse import ArgumentTypeError
from gnuradio import eng_notation
from grgsm.trx import CTRLInterfaceBB
from grgsm.trx import RadioInterface
from grgsm.trx import Transceiver
COPYRIGHT = \
"Copyright (C) 2016-2018 by Vadim Yanitskiy <axilirator@gmail.com>\n" \
@ -64,18 +64,17 @@ class Application:
self.phy_freq_offset, self.bind_addr,
self.remote_addr, self.base_port)
# Init TRX CTRL interface
self.server = CTRLInterfaceBB(
self.remote_addr, self.base_port + 101,
self.bind_addr, self.base_port + 1,
self.radio)
# Init Transceiver
self.trx = Transceiver(self.bind_addr,
self.remote_addr, self.base_port,
radio_if = self.radio)
print("[i] Init complete")
def run(self):
# Enter main loop
while True:
self.server.loop()
self.trx.ctrl_if.loop()
def shutdown(self):
print("[i] Shutting down...")

View File

@ -25,6 +25,7 @@ GR_PYTHON_INSTALL(
ctrl_if_bb.py
radio_if.py
radio_if_grc.py
transceiver.py
dict_toggle_sign.py
DESTINATION ${GR_PYTHON_DIR}/grgsm/trx
)

View File

@ -25,5 +25,6 @@ from ctrl_if import CTRLInterface
from ctrl_if_bb import CTRLInterfaceBB
from radio_if_grc import RadioInterfaceGRC
from radio_if import RadioInterface
from transceiver import Transceiver
from dict_toggle_sign import dict_toggle_sign

View File

@ -4,7 +4,7 @@
# GR-GSM based transceiver
# CTRL interface for OsmocomBB
#
# (C) 2016-2017 by Vadim Yanitskiy <axilirator@gmail.com>
# (C) 2016-2019 by Vadim Yanitskiy <axilirator@gmail.com>
#
# All Rights Reserved
#
@ -22,49 +22,32 @@
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
import grgsm
from ctrl_if import CTRLInterface
class CTRLInterfaceBB(CTRLInterface):
def __init__(self, remote_addr, remote_port, bind_addr, bind_port, tb):
CTRLInterface.__init__(self, remote_addr, remote_port,
bind_addr, bind_port)
def __init__(self, trx, *ctrl_if_args):
CTRLInterface.__init__(self, *ctrl_if_args)
print("[i] Init CTRL interface (%s)" % self.desc_link())
# Set link to the follow graph (top block)
self.tb = tb
# Transceiver instance we belong to
self.trx = trx
def parse_cmd(self, request):
# Power control
if self.verify_cmd(request, "POWERON", 0):
print("[i] Recv POWERON CMD")
# Ensure transceiver isn't working
if self.tb.trx_started:
print("[!] Transceiver already started")
# Start transceiver
if not self.trx.start():
return -1
print("[i] Starting transceiver...")
self.tb.trx_started = True
self.tb.start()
return 0
elif self.verify_cmd(request, "POWEROFF", 0):
print("[i] Recv POWEROFF cmd")
# TODO: flush all buffers between blocks
if self.tb.trx_started:
print("[i] Stopping transceiver...")
self.tb.trx_started = False
self.tb.set_ta(0)
self.tb.stop()
self.tb.wait()
# POWEROFF is also used to reset transceiver
self.tb.reset()
# Stop transceiver
self.trx.stop()
return 0
@ -74,7 +57,7 @@ class CTRLInterfaceBB(CTRLInterface):
# TODO: check gain value
gain = int(request[1])
self.tb.set_rx_gain(gain)
self.trx.radio_if.set_rx_gain(gain)
return 0
@ -83,7 +66,7 @@ class CTRLInterfaceBB(CTRLInterface):
# TODO: check gain value
gain = int(request[1])
self.tb.set_tx_gain(gain)
self.trx.radio_if.set_tx_gain(gain)
return 0
@ -93,7 +76,7 @@ class CTRLInterfaceBB(CTRLInterface):
# TODO: check freq range
freq = int(request[1]) * 1000
self.tb.set_rx_freq(freq)
self.trx.radio_if.set_rx_freq(freq)
return 0
@ -102,7 +85,7 @@ class CTRLInterfaceBB(CTRLInterface):
# TODO: check freq range
freq = int(request[1]) * 1000
self.tb.set_tx_freq(freq)
self.trx.radio_if.set_tx_freq(freq)
return 0
@ -116,19 +99,12 @@ class CTRLInterfaceBB(CTRLInterface):
print("[!] TS index should be in range: 0..7")
return -1
# Ignore timeslot type for now
# Channel combination number (see GSM TS 05.02)
# TODO: check this value
config = int(request[2])
print("[i] Configure timeslot filter to: %s"
% ("drop all" if config == 0 else "TS %d" % tn))
if config == 0:
# Value 0 means 'drop all'
self.tb.ts_filter.set_policy(
grgsm.FILTER_POLICY_DROP_ALL)
else:
self.tb.ts_filter.set_policy(
grgsm.FILTER_POLICY_DEFAULT)
self.tb.ts_filter.set_tn(tn)
# TODO: check return value
self.trx.radio_if.set_slot(tn, config)
return 0
@ -138,9 +114,11 @@ class CTRLInterfaceBB(CTRLInterface):
# TODO: check freq range
meas_freq = int(request[1]) * 1000
meas_dbm = str(self.tb.measure(meas_freq))
meas_dbm = self.trx.measure(meas_freq)
if meas_dbm is None:
return -1
return (0, [meas_dbm])
return (0, [str(meas_dbm)])
# Timing Advance control
elif self.verify_cmd(request, "SETTA", 1):
@ -152,7 +130,7 @@ class CTRLInterfaceBB(CTRLInterface):
print("[!] TA value must be in range: 0..63")
return -1
self.tb.set_ta(ta)
self.trx.radio_if.set_ta(ta)
return 0
# Misc

View File

@ -47,9 +47,6 @@ class RadioInterface(gr.top_block):
tx_freq = None
osr = 4
# Application state flags
trx_started = False
# GSM timings (in microseconds [uS])
# One timeslot duration is 576.9 μs = 15/26 ms,
# or 156.25 symbol periods (a symbol period is 48/13 μs)
@ -304,6 +301,20 @@ class RadioInterface(gr.top_block):
self.phy_sink.set_gain(gain, 0)
self.tx_gain = gain
def set_slot(self, slot, config):
print("[i] Configure timeslot filter to: %s"
% ("drop all" if config == 0 else "tn=%d" % slot))
if config == 0:
# Value 0 is used for deactivation
self.ts_filter.set_policy(grgsm.FILTER_POLICY_DROP_ALL)
else:
# FIXME: ideally, we should (re)configure the Receiver
# block, but there is no API for that, and hard-coded
# timeslot configuration is used...
self.ts_filter.set_policy(grgsm.FILTER_POLICY_DEFAULT)
self.ts_filter.set_tn(slot)
def set_ta(self, ta):
print("[i] Setting TA value %d" % ta)
advance_time_sec = ta * self.GSM_SYM_PERIOD_uS * 1e-6

98
python/trx/transceiver.py Normal file
View File

@ -0,0 +1,98 @@
#!/usr/bin/env python2
# -*- coding: utf-8 -*-
# GR-GSM based transceiver
# Transceiver implementation
#
# (C) 2018-2019 by Vadim Yanitskiy <axilirator@gmail.com>
#
# All Rights Reserved
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
from ctrl_if_bb import CTRLInterfaceBB
class Transceiver:
""" Base transceiver implementation.
Represents a single transceiver, that can be used as for the BTS side,
as for the MS side. Each individual instance of Transceiver unifies
three basic interfaces built on three independent UDP connections:
- CLCK (base port + 100/0) - clock indications from TRX to L1,
- CTRL (base port + 101/1) - control interface for L1,
- DATA (base port + 102/2) - bidirectional data interface for bursts.
A transceiver can be either in active (i.e. working), or in idle mode.
NOTE: both CLCK and DATA interfaces are handled by the flow-graph,
(see RadioInterface), so we only initialize CTRL interface.
"""
def __init__(self, bind_addr, remote_addr, base_port, radio_if):
# Connection info
self.remote_addr = remote_addr
self.bind_addr = bind_addr
self.base_port = base_port
# Execution state (running or idle)
self.running = False
# Radio interface (handles both CLCK and DATA interfaces)
self.radio_if = radio_if
# Init CTRL interface
self.ctrl_if = CTRLInterfaceBB(self,
remote_addr, base_port + 101,
bind_addr, base_port + 1)
def start(self):
# Check execution state
if self.running:
print("[!] Transceiver is already started")
return False
# Make sure that Radio interface is ready, i.e.
# all parameters (e.g. RX / RX freq) are set.
if not self.radio_if.ready:
print("[!] RadioInterface is not ready")
return False
print("[i] Starting transceiver...")
self.radio_if.start()
self.running = True
return True
def stop(self):
# POWEROFF is also used to reset transceiver,
# so we should not complain that it isn't running.
if not self.running:
print("[i] Resetting transceiver")
self.radio_if.reset()
return
print("[i] Stopping transceiver...")
# TODO: flush all buffers between blocks
self.radio_if.stop()
self.radio_if.wait()
self.running = False
def measure(self, freq):
# TODO: transceiver should be in idle mode
return self.radio_if.measure(freq)